search
尋找貓咪~QQ 地點 桃園市桃園區 Taoyuan , Taoyuan

分析Java延遲與周期任務的實現原理

延遲或周期執行任務可以使用Timer或者ScheduledThreadPoolExecutor,前者可以拋棄,後者是今天的主角。

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,對應執行任務變成ScheduledFutureTask。本文會在前三篇分析線程池原理的基礎上,分析ScheduledThreadPoolExecutor的實現原理,最後介紹下為什麼不用Timer了。

ScheduledThreadPoolExecutor的創建ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10); ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor;public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue, threadFactory, handler); }

ScheduledThreadPoolExecutor的創建可以使用Executors,也可以自己傳參構建。上面的構造函數是參數最全的版本,可以設置線程目標數量、線程工廠和飽和策略。至於等待隊列,使用內部類DelayedWorkQueue,看後文分析。

ScheduledFutureTask

ScheduledFutureTask的構造函數沒什麼特別,保存了三個參數。

ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement; } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement; }

  • time:任務執行時間;
  • period:任務周期執行間隔;
  • sequenceNumber:自增的任務序號。

Callable默認period=0,表示任務不是周期執行,因為只有Runnable可以周期執行。想想也是,Callable目的是獲得執行結果,沒有必要重複調用。

圖1

ScheduledFutureTask繼承了我們熟悉的FutureTask,這個不用多說。圖1是它實現的介面,比較陌生的是Delayed,而Delayed又繼承了Comparable。

public long getDelay(TimeUnit unit) { return unit.convert(time - now, NANOSECONDS); }public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }

這兩個介面的存在很容易理解,ScheduledFutureTask在等待隊列里調度不再按照FIFO,而是按照執行時間,誰即將執行,誰就排在前面。在這裡也可以看到sequenceNumber的作用,當執行時間相同時,按照序號排序。

添加延遲任務

對ScheduledThreadPoolExecutor使用通用的execute或者submit提交任務,最終調用schedule方法,默認馬上執行。如果需要延遲執行,需要直接使用schedule,傳遞時間參數。

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException; RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }

Runnable和Callable包裝成ScheduledFutureTask實例,保存了延遲信息,然後執行delayedExecute。

private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown) reject(task); else { super.getQueue.add(task); if (isShutdown && !canRunInCurrentRunState(task.isPeriodic) && remove(task)) task.cancel(false); else ensurePrestart; } }boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }

如果線程池已經關閉,直接調用飽和策略,否則將任務加入等待隊列。加入之後,需要再判斷線程池的狀態,和當前任務是否能運行。如果不能繼續執行,將任務移出隊列並取消任務。

canRunInCurrentRunState處理任務加入等待隊列后,又未執行就發生線程池關閉的情況,它通過預設的兩個變數判斷任務到底能不能執行。

  • 延遲任務用executeExistingDelayedTasksAfterShutdown
  • 周期任務用continueExistingPeriodicTasksAfterShutdown

void ensurePrestart { int wc = workerCountOf(ctl.get); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }

最後調用到ensurePrestart,使用addWorkder增加工作線程,這在ThreadPoolExecutor解釋過了

添加周期任務public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException; if (period <= 0) throw new IllegalArgumentException; ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException; if (delay <= 0) throw new IllegalArgumentException; ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }

執行周期任務有上面兩個方法,具體作用方法名寫得很清楚:

  • scheduleAtFixedRate:按固定的頻率執行,不受執行時長影響,到點就執行;
  • scheduleWithFixedDelay:任務執行完后,按固定的延後時間再執行。

兩個方法幾乎一樣,不同的是構建ScheduledFutureTask時,period一個傳正數,另一個傳負數。不用懷疑,區分兩種情況就是用正負。

等待隊列

線程池的等待隊列使用了內部類DelayedWorkQueue,和普通線程池等待隊列最大的不同是它的任務是按照目標執行時間進行排序。

入隊的offer被重寫了,add和put方法也是調用offer,具體BlockingQueue的實現邏輯不在這裡討論,重點是看offer里的siftUp方法。

private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }

siftUp根據任務的compareTo,將任務移動到隊列中指定的位置,就是這樣。

對應地,出隊take方法,根據任務的delay時間,小於等於0時將任務出隊,否則等待。

任務執行

當線程池從等待隊列取出一個任務時,會執行它的run方法。

public void run { boolean periodic = isPeriodic; if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run; else if (ScheduledFutureTask.super.runAndReset) { setNextRunTime; reExecutePeriodic(outerTask); } }

方法有三個分支,第一個if判斷任務在當前線程池狀態下是否能執行,canRunInCurrentRunState已經講解過。第二個if是判斷是否周期任務,不是的話直接執行,不需要多餘的操作。重點來看第三個if,也就是周期執行任務。

  • runAndReset:任務執行完重置為初始狀態,等待下一次執行;
  • setNextRunTime:計算下次執行時間;
  • reExecutePeriodic:再調度任務。

private void setNextRunTime { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }

計算下次執行時間,period根據正負有不同的計算邏輯,負的時間也會先改正,很明顯對應上文的scheduleAtFixedRate和scheduleWithFixedDelay兩個方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue.add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart; } }

將任務重新加入等待隊列,中間幾個方法都解釋過了。

Timer的缺陷

自從知道ScheduledThreadPoolExecutor,再沒有使用Timer,因為它有幾個缺陷:

  • 多任務在單線程里執行,一個任務結束,另一個任務才能開始,時間間隔不準;
  • 出現異常會導致全部任務停止;
  • 絕對時間,受系統時間影響。

private final TaskQueue queue = new TaskQueue;private final TimerThread thread = new TimerThread(queue);

Timer的代碼很簡單,主要數據結構是一個任務隊列和一個執行線程。新增的任務會加入任務隊列,到達時間后,由執行線程執行。只有一個線程,很容易理解上面講的缺陷。

ScheduledThreadPoolExecutor每個任務都有對應的執行線程,時間使用相對時間計算,也就沒有上面的缺陷,所以沒有理由使用Timer了。

    學習Java的同學注意了!!!

    學習過程中遇到什麼問題或者想獲取學習資源的話,歡迎加入Java學習交流群,群號碼:392216227我們一起學Java!



熱門推薦

本文由 yidianzixun 提供 原文連結

寵物協尋 相信 終究能找到回家的路
寫了7763篇文章,獲得2次喜歡
留言回覆
回覆
精彩推薦