自己動手實現線程池 jdk線程池ThreadPoolExecutor工作原理解析(一)( 五 )

可以看到,execute方法源碼中對于任務處理的邏輯很清晰 , 也能與ThreadPoolExecutor運行時工作流程中所介紹的流程所匹配 。
addWorker方法(創建新的工作線程)在execute方法中當需要創建核心線程或普通線程時,便需要通過addWorker方法嘗試創建一個新的工作線程 。
/*** 向線程池中加入worker* */private boolean addWorker(Runnable firstTask, boolean core) {// retry標識外層循環retry:for (;;) {int currentCtl = ctl.get();// 用于cas更新workerCount的內層循環(注意這里面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)for (;;) {// 判斷當前worker數量是否超過了限制int workerCount = workerCountOf(currentCtl);if (workerCount >= CAPACITY) {// 當前worker數量超過了設計上允許的最大限制return false;}if (core) {// 創建的是核心線程,判斷當前線程數是否已經超過了指定的核心線程數if (workerCount >= this.corePoolSize) {// 超過了核心線程數,創建核心worker線程失敗return false;}} else {// 創建的是非核心線程,判斷當前線程數是否已經超過了指定的最大線程數if (workerCount >= this.maximumPoolSize) {// 超過了最大線程數,創建非核心worker線程失敗return false;}}// cas更新workerCount的值boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);if (casSuccess) {// cas成功,跳出外層循環break retry;}// compareAndIncrementWorkerCount方法cas爭搶失敗 , 重新執行內層循環}}boolean workerStarted = false;MyWorker newWorker = null;try {// 創建一個新的workernewWorker = new MyWorker(firstTask);final Thread myWorkerThread = newWorker.thread;if (myWorkerThread != null) {// MyWorker初始化時內部線程創建成功// 加鎖,防止并發更新final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (myWorkerThread.isAlive()) {// 預檢查線程的狀態,剛初始化的worker線程必須是未喚醒的狀態throw new IllegalThreadStateException();}// 加入worker集合this.workers.add(newWorker);int workerSize = workers.size();if (workerSize > largestPoolSize) {// 如果當前worker個數超過了之前記錄的最大存活線程數 , 將其更新largestPoolSize = workerSize;}// 創建成功} finally {// 無論是否發生異常,都先將主控鎖解鎖mainLock.unlock();}// 加入成功,啟動worker線程myWorkerThread.start();// 標識為worker線程啟動成功 , 并作為返回值返回workerStarted = true;}}finally {if (!workerStarted) {addWorkerFailed(newWorker);}}return workerStarted;}addWorker可以分為兩部分:判斷當前是否滿足創建新工作線程的條件、創建并啟動新的Worker工作線程 。
判斷當前是否滿足創建新工作線程的條件入口處開始的retry標識的for循環部分,便是用于判斷是否滿足創建新工作線程的條件 。

  • 首先判斷當前工作線程數量是否超過了理論的最大值CAPACITY(即2^29-1),超過了則不能創建,返回false , 不創建新工作線程
  • 根據boolean類型參數core判斷是否創建核心工作線程,core=true則判斷是否超過了corePoolSize的限制 , core=false則判斷是否超過了maximumPoolSize的限制 。不滿足則返回false,不創建新工作線程
  • 滿足上述限制條件后,則說明可以創建新線程了,compareAndIncrementWorkerCount方法進行cas的增加當前工作線程數 。如果cas失敗,則說明存在并發的更新了,則再一次的循環重試,并再次的進行上述檢查 。
需要注意的是:這里面有兩個for循環的原因在于v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層循環的) 。如果線程池處于停止狀態則不能再創建新工作線程了,因此也需要判斷線程池當前的狀態,不滿足條件則也需要返回false,不創建工作線程 。而且compareAndIncrementWorkerCount中cas更新ctl時,如果并發的線程池被停止而導致線程池狀態發生了變化,也會導致cas失敗重新檢查 。這也是jdk的實現中為什么把線程池狀態和工作線程數量綁定在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個字段的并發爭搶 。(更具體的細節會在下一篇博客的v2版本中介紹)
創建并啟動新的Worker工作線程在通過retry那部分的層層條件檢查后 , 緊接著便是實際創建新工作線程的邏輯 。