自己動手實現線程池 jdk線程池ThreadPoolExecutor工作原理解析(一)( 三 )


ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING) 。但由于v1版本的MyThreadPoolExecutorV1不支持優雅停止 , 所以不在本篇博客中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的博客中展開)
記錄線程池運行過程中的一些關鍵指標

  1. completedTaskCount(線程池自啟動后已完成的總任務數)
  2. largestPoolSize(線程池自啟動后工作線程個數的最大值)在運行過程中,ThreadPoolExecutor會在對應的地方進行埋點,統計一些指標并提供相應的api給用戶實時的查詢,以提高線程池工作時的可觀測性 。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{/*** 指定的最大核心線程數量* */private volatile int corePoolSize;/*** 指定的最大線程數量* */private volatile int maximumPoolSize;/*** 線程保活時間(單位:納秒 nanos)* */private volatile long keepAliveTime;/*** 存放任務的工作隊列(阻塞隊列)* */private final BlockingQueue<Runnable> workQueue;/*** 線程工廠* */private volatile ThreadFactory threadFactory;/*** 拒絕策略* */private volatile MyRejectedExecutionHandler handler;/*** 是否允許核心線程在idle一定時間后被銷毀(和非核心線程一樣)* */private volatile boolean allowCoreThreadTimeOut;/*** 主控鎖* */private final ReentrantLock mainLock = new ReentrantLock();/*** 當前線程池已完成的任務數量* */private long completedTaskCount;/*** 維護當前存活的worker線程集合* */private final HashSet<MyWorker> workers = new HashSet<>();/*** 當前線程池中存在的worker線程數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性字段加鎖來保證一致性)* v1版本只關心前者,即worker線程數量*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;/*** 32位的有符號整數,有3位是用來存放線程池狀態的,所以用來維護當前工作線程個數的部分就只能用29位了* 被占去的3位中,有1位原來的符號位 , 2位是原來的數值位 。* */private static final int CAPACITY= (1 << COUNT_BITS) - 1;/*** 線程池狀態poolStatus常量(狀態值只會由小到大,單調遞增)* 線程池狀態遷移圖:*SHUTDOWN* RUNNING↓TIDYING → TERMINATED*STOP* 1 RUNNING狀態,代表著線程池處于正常運行的狀態 。能正常的接收并處理提交的任務* 線程池對象初始化時,狀態為RUNNING* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));** 2 SHUTDOWN狀態,代表線程池處于停止對外服務的狀態 。不再接收新提交的任務 , 但依然會將workQueue工作隊列中積壓的任務處理完* 調用了shutdown方法時 , 狀態由RUNNING -> SHUTDOWN* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);** 3 STOP狀態,代表線程池處于停止狀態 。不再接受新提交的任務,同時也不再處理workQueue工作隊列中積壓的任務,當前還在處理任務的工作線程將收到interrupt中斷通知* 之前未調用shutdown方法,直接調用了shutdownNow方法,狀態由RUNNING -> STOP* 之前先調用了shutdown方法 , 后調用了shutdownNow方法 , 狀態由SHUTDOWN -> STOP* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);** 4 TIDYING狀態,代表著線程池即將完全終止,正在做最后的收尾工作* 當前線程池狀態為SHUTDOWN,任務被消費完工作隊列workQueue為空,且工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING* 當前線程池狀態為STOP,工作線程全部退出完成工作線程集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)** 5 TERMINATED狀態,代表著線程池完全的關閉 。之前線程池已經處于TIDYING狀態,且調用的鉤子函數terminated已返回* 當前線程池狀態為TIDYING,調用的鉤子函數terminated已返回* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));* */private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int workerCountOf(int c){ return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/*** 跟蹤線程池曾經有過的最大線程數量(只能在mainLock的并發保護下更新)*/private int largestPoolSize;private boolean compareAndIncrementWorkerCount(int expect) {return this.ctl.compareAndSet(expect, expect + 1);}private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}private void decrementWorkerCount() {do {// cas更新 , workerCount自減1} while (!compareAndDecrementWorkerCount(ctl.get()));}public MyThreadPoolExecutorV1(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,MyRejectedExecutionHandler handler) {// 基本的參數校驗if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {throw new IllegalArgumentException();}if (unit == null || workQueue == null || threadFactory == null || handler == null) {throw new NullPointerException();}// 設置成員變量this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}public ThreadFactory getThreadFactory() {return threadFactory;}}

推薦閱讀