精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

徹底搞懂Java線程池的工作原理

開發 后端
本篇文章將深入分析Java中線程池的工作原理。這也是為什么把線程池放到最后來寫的原因。本篇文章權當是一個并發系列的綜合練習,剛好鞏固實踐一下前面知識點的運用。

[[411692]]

 前言

多線程并發是Java語言中非常重要的一塊內容,同時,也是Java基礎的一個難點。說它重要是因為多線程是日常開發中頻繁用到的知識,說它難是因為多線程并發涉及到的知識點非常之多,想要完全掌握Java的并發相關知識并非易事。也正因此,Java并發成了Java面試中最高頻的知識點之一。

本篇文章將深入分析Java中線程池的工作原理。個人認為線程池是Java并發中比較難已理解的一塊知識,因為線程池內部實現使用到了大量的像ReentrantLock、AQS、AtomicInteger、CAS以及“生產者-消費者”模型等并發相關的知識,基本上涵蓋了并發系列前幾篇文章的大部分知識點。這也是為什么把線程池放到最后來寫的原因。本篇文章權當是一個并發系列的綜合練習,剛好鞏固實踐一下前面知識點的運用。

線程池基礎知識

在Java語言中,雖然創建并啟動一個線程非常方便,但是由于創建線程需要占用一定的操作系統資源,在高并發的情況下,頻繁的創建和銷毀線程會大量消耗CPU和內存資源,對程序性能造成很大的影響。為了避免這一問題,Java給我們提供了線程池。

線程池是一種基于池化技術思想來管理線程的工具。在線程池中維護了多個線程,由線程池統一的管理調配線程來執行任務。通過線程復用,減少了頻繁創建和銷毀線程的開銷。

本章內容我們先來了解一下線程池的一些基礎知識,學習如何使用線程池以及了解線程池的生命周期。

線程池的使用

線程池的使用和創建可以說非常的簡單,這得益于JDK提供給我們良好封裝的API。線程池的實現被封裝到了ThreadPoolExecutor中,我們可以通過ThreadPoolExecutor的構造方法來實例化出一個線程池,代碼如下: 

  1. // 實例化一個線程池  
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,  
  3.         TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));  
  4. // 使用線程池執行一個任務          
  5. executor.execute(() -> {  
  6.     // Do something  
  7. });  
  8. // 關閉線程池,會阻止新任務提交,但不影響已提交的任務  
  9. executor.shutdown();  
  10. // 關閉線程池,阻止新任務提交,并且中斷當前正在運行的線程  
  11. executor.showdownNow(); 

創建好線程池后直接調用execute方法并傳入一個Runnable參數即可將任務交給線程池執行,通過shutdown/shutdownNow方法可以關閉線程池。

ThreadPoolExecutor的構造方法中參數眾多,對于初學者而言在沒有了解各個參數的作用的情況下很難去配置合適的線程池。因此Java還為我們提供了一個線程池工具類Executors來快捷的創建線程池。Executors提供了很多簡便的創建線程池的方法,舉兩個例子,代碼如下: 

  1. // 實例化一個單線程的線程池  
  2. ExecutorService singleExecutor = Executors.newSingleThreadExecutor();  
  3. // 創建固定線程個數的線程池 
  4. ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);  
  5. // 創建一個可重用固定線程數的線程池  
  6. ExecutorService executorService2 = Executors.newCachedThreadPool(); 

但是,通常來說在實際開發中并不推薦直接使用Executors來創建線程池,而是需要根據項目實際情況配置適合自己項目的線程池,關于如何配置合適的線程池這是后話,需要我們理解線程池的各個參數以及線程池的工作原理之后才能有答案。

線程池的生命周期

線程池從誕生到死亡,中間會經歷RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五個生命周期狀態。

  •  RUNNING 表示線程池處于運行狀態,能夠接受新提交的任務且能對已添加的任務進行處理。RUNNING狀態是線程池的初始化狀態,線程池一旦被創建就處于RUNNING狀態。
  •  SHUTDOWN 線程處于關閉狀態,不接受新任務,但可以處理已添加的任務。RUNNING狀態的線程池調用shutdown后會進入SHUTDOWN狀態。
  •  STOP 線程池處于停止狀態,不接收任務,不處理已添加的任務,且會中斷正在執行任務的線程。RUNNING狀態的線程池調用了shutdownNow后會進入STOP狀態。
  •  TIDYING 當所有任務已終止,且任務數量為0時,線程池會進入TIDYING。當線程池處于SHUTDOWN狀態時,阻塞隊列中的任務被執行完了,且線程池中沒有正在執行的任務了,狀態會由SHUTDOWN變為TIDYING。當線程處于STOP狀態時,線程池中沒有正在執行的任務時則會由STOP變為TIDYING。
  •  TERMINATED 線程終止狀態。處于TIDYING狀態的線程執行terminated()后進入TERMINATED狀態。

根據上述線程池生命周期狀態的描述,可以畫出如下所示的線程池生命周期狀態流程示意圖。

線程池的工作機制

ThreadPoolExecutor中的參數

上一小節中,我們使用ThreadPoolExecutor的構造方法來創建了一個線程池。其實在ThreadPoolExecutor中有多個構造方法,但是最終都調用到了下邊代碼中的這一個構造方法: 

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     public ThreadPoolExecutor(int corePoolSize,  
  3.                               int maximumPoolSize,  
  4.                               long keepAliveTime, 
  5.                                TimeUnit unit,  
  6.                               BlockingQueue<Runnable> workQueue,  
  7.                               ThreadFactory threadFactory,  
  8.                               RejectedExecutionHandler handler) {  
  9.         // ...省略校驗相關代碼  
  10.         this.corePoolSize = corePoolSize;  
  11.         this.maximumPoolSize = maximumPoolSize;  
  12.         this.workQueue = workQueue;  
  13.         this.keepAliveTime = unit.toNanos(keepAliveTime);  
  14.         this.threadFactory = threadFactory;  
  15.         this.handler = handler;  
  16.     }  
  17.     // ...     

這個構造方法中有7個參數之多,我們逐個來看每個參數所代表的含義:

  •  corePoolSize 表示線程池的核心線程數。當有任務提交到線程池時,如果線程池中的線程數小于corePoolSize,那么則直接創建新的線程來執行任務。
  •  workQueue 任務隊列,它是一個阻塞隊列,用于存儲來不及執行的任務的隊列。當有任務提交到線程池的時候,如果線程池中的線程數大于等于corePoolSize,那么這個任務則會先被放到這個隊列中,等待執行。
  •  maximumPoolSize 表示線程池支持的最大線程數量。當一個任務提交到線程池時,線程池中的線程數大于corePoolSize,并且workQueue已滿,那么則會創建新的線程執行任務,但是線程數要小于等于maximumPoolSize。
  •  keepAliveTime 非核心線程空閑時保持存活的時間。非核心線程即workQueue滿了之后,再提交任務時創建的線程,因為這些線程不是核心線程,所以它空閑時間超過keepAliveTime后則會被回收。
  •  unit 非核心線程空閑時保持存活的時間的單位
  •  threadFactory 創建線程的工廠,可以在這里統一處理創建線程的屬性
  •  handler 拒絕策略,當線程池中的線程達到maximumPoolSize線程數后且workQueue已滿的情況下,再向線程池提交任務則執行對應的拒絕策略

線程池工作流程

線程池提交任務是從execute方法開始的,我們可以從execute方法來分析線程池的工作流程。

(1)當execute方法提交一個任務時,如果線程池中線程數小于corePoolSize,那么不管線程池中是否有空閑的線程,都會創建一個新的線程來執行任務。

(2)當execute方法提交一個任務時,線程池中的線程數已經達到了corePoolSize,且此時沒有空閑的線程,那么則會將任務存儲到workQueue中。

(3)如果execute提交任務時線程池中的線程數已經到達了corePoolSize,并且workQueue已滿,那么則會創建新的線程來執行任務,但總線程數應該小于maximumPoolSize。

(4)如果線程池中的線程執行完了當前的任務,則會嘗試從workQueue中取出第一個任務來執行。如果workQueue為空則會阻塞線程。

(5)如果execute提交任務時,線程池中的線程數達到了maximumPoolSize,且workQueue已滿,此時會執行拒絕策略來拒絕接受任務。

(6)如果線程池中的線程數超過了corePoolSize,那么空閑時間超過keepAliveTime的線程會被銷毀,但程池中線程個數會保持為corePoolSize。

(7)如果線程池存在空閑的線程,并且設置了allowCoreThreadTimeOut為true。那么空閑時間超過keepAliveTime的線程都會被銷毀。

線程池的拒絕策略

如果線程池中的線程數達到了maximumPoolSize,并且workQueue隊列存儲滿的情況下,線程池會執行對應的拒絕策略。在JDK中提供了RejectedExecutionHandler接口來執行拒絕操作。實現RejectedExecutionHandler的類有四個,對應了四種拒絕策略。分別如下:DiscardPolicy 當提交任務到線程池中被拒絕時,線程池會丟棄這個被拒絕的任務

  •  DiscardOldestPolicy 當提交任務到線程池中被拒絕時,線程池會丟棄等待隊列中最老的任務。
  •  CallerRunsPolicy 當提交任務到線程池中被拒絕時,會在線程池當前正在運行的Thread線程中處理被拒絕額任務。即哪個線程提交的任務哪個線程去執行。
  •  AbortPolicy 當提交任務到線程池中被拒絕時,直接拋出RejectedExecutionException異常。

線程池源碼分析

從上一章對線程池的工作流程解讀來看,線程池的原理似乎并沒有很難。但是開篇時我說過想要讀懂線程池的源碼并不容,主要原因是線程池內部運用到了大量并發相關知識,另外還與線程池中用到的位運算有關。

線程池中的位運算(了解內容)

在向線程池提交任務時有兩個比較中要的參數會決定任務的去向,這兩個參數分別是線程池的狀態和線程池中的線程數。在ThreadPoolExecutor內部使用了一個AtomicInteger類型的整數ctl來表示這兩個參數,代碼如下: 

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     // Integer.SIZE = 32.所以 COUNT_BITS29  
  3.     private static final int COUNT_BITS = Integer.SIZE - 3;  
  4.     // 00001111 11111111 11111111 11111111 這個值可以表示線程池的最大線程容量  
  5.     private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;  
  6.     // 將-1左移29位得到RUNNING狀態的值  
  7.     private static final int RUNNING    = -1 << COUNT_BITS;     
  8.     // 線程池運行狀態和線程數  
  9.     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  
  10.     private static int ctlOf(int rs, int wc) { return rs | wc; }  
  11.     // ...  
  12. }     

因為涉及多線程的操作,這里為了保證原子性,ctl參數使用了AtomicInteger類型,并且通過ctlOf方法來計算出了ctl的初始值。如果你不了解位運算大概很難理解上述代碼的用意。

我們知道,int類型在Java中占用4byte的內存,一個byte占用8bit,所以Java中的int類型共占用32bit。對于這個32bit,我們可以進行高低位的拆分。做Android開發的同學應該都了解View測量流程中的MeasureSpec參數,這個參數將32bit的int拆分成了高2位和低30位,分別表示View的測量模式和測量值。而這里的ctl與MeasureSpec類似,ctl將32位的int拆分成了高3位和低29位,分別表示線程池的運行狀態和線程池中的線程個數。

下面我們通過位運算來驗證一下ctl是如何工作的,當然,如果你不理解這個位運算的過程對理解線程池的源碼影響并不大,所以對以下驗證內容不感興趣的同學可以直接略過。

可以看到上述代碼中RUNNING的值為-1左移29位,我們知道在計算機中**負數是以其絕對值的補碼來表示的,而補碼是由反碼加1得到。**因此-1在計算機中存儲形式為1的反碼+1。 

  1. 1的原碼:00000000 00000000 00000000 00000001  
  2.                                             +  
  3. 1的反碼:11111111 11111111 11111111 11111110  
  4.        ---------------------------------------  
  5. -1存儲: 11111111 11111111 11111111 11111111 

接下來對-1左移29位可以得到RUNNING的值為: 

  1. // 高三位表示線程狀態,即高三位為111表示RUNNING  
  2. 11100000 00000000 00000000 00000000 

而AtomicInteger初始線程數量是0,因此ctlOf方法中的“|”運算如下: 

  1. RUNNING:  11100000 00000000 00000000 00000000  
  2.                                                |  
  3. 線程數為0:  00000000 00000000 00000000 00000000  
  4.           ---------------------------------------  
  5. 得到ctl:   11100000 00000000 00000000 00000000 

通過RUNNING|0(線程數)即可得到ctl的初始值。同時還可以通過以下方法將ctl拆解成運行狀態和線程數: 

  1. // 00001111 11111111 11111111 11111111  
  2. private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;  
  3. // 獲取線程池運行狀態 
  4. private static int runStateOf(int c)     { return c & ~COUNT_MASK; }  
  5. // 獲取線程池中的線程數  
  6. private static int workerCountOf(int c)  { return c & COUNT_MASK; } 

假設此時線程池為RUNNING狀態,且線程數為0,驗證一下runStateOf是如何得到線程池的運行狀態的: 

  1. COUNT_MASK:  00001111 11111111 11111111 11111111  
  2. ~COUNT_MASK: 11110000 00000000 00000000 00000000  
  3.                                                   &  
  4. ctl:         11100000 00000000 00000000 00000000  
  5.             ---------------------------------------- 
  6.  RUNNING:     11100000 00000000 00000000 00000000      

如果不理解上邊的驗證流程沒有關系,只要知道通過runStateOf方法可以得到線程池的運行狀態,通過workerCountOf可以得到線程池中的線程數即可。

接下來我們進入線程池的源碼的源碼分析環節。

ThreadPoolExecutor的execute

向線程池提交任務的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法為入口來進行剖析,execute方法的代碼如下: 

  1. public void execute(Runnable command) {  
  2.      if (command == null)  
  3.          throw new NullPointerException();  
  4.      // 獲取ctl的值  
  5.      int c = ctl.get();  
  6.      // 1.線程數小于corePoolSize  
  7.      if (workerCountOf(c) < corePoolSize) {  
  8.          // 線程池中線程數小于核心線程數,則嘗試創建核心線程執行任務  
  9.          if (addWorker(command, true))  
  10.              return;  
  11.          c = ctl.get(); 
  12.      }  
  13.      // 2.到此處說明線程池中線程數大于核心線程數或者創建線程失敗  
  14.      if (isRunning(c) && workQueue.offer(command)) {  
  15.          // 如果線程是運行狀態并且可以使用offer將任務加入阻塞隊列未滿,offer是非阻塞操作。  
  16.          int recheck = ctl.get();  
  17.          // 重新檢查線程池狀態,因為上次檢測后線程池狀態可能發生改變,如果非運行狀態就移除任務并執行拒絕策略  
  18.          if (! isRunning(recheck) && remove(command))  
  19.              reject(command);  
  20.          // 如果是運行狀態,并且線程數是0,則創建線程  
  21.          else if (workerCountOf(recheck) == 0)  
  22.              // 線程數是0,則創建非核心線程,且不指定首次執行任務,這里的第二個參數其實沒有實際意義  
  23.              addWorker(null, false);  
  24.      }  
  25.      // 3.阻塞隊列已滿,創建非核心線程執行任務  
  26.      else if (!addWorker(command, false))  
  27.          // 如果失敗,則執行拒絕策略  
  28.          reject(command);  
  29.  } 

execute方法中的邏輯可以分為三部分:

  •  1.如果線程池中的線程數小于核心線程,則直接調用addWorker方法創建新線程來執行任務。
  •  2.如果線程池中的線程數大于核心線程數,則將任務添加到阻塞隊列中,接著再次檢驗線程池的運行狀態,因為上次檢測過之后線程池狀態有可能發生了變化,如果線程池關閉了,那么移除任務,執行拒絕策略。如果線程依然是運行狀態,但是線程池中沒有線程,那么就調用addWorker方法創建線程,注意此時傳入任務參數是null,即不指定執行任務,因為任務已經加入了阻塞隊列。創建完線程后從阻塞隊列中取出任務執行。
  •  3.如果第2步將任務添加到阻塞隊列失敗了,說明阻塞隊列任務已滿,那么則會執行第三步,即創建非核心線程來執行任務,如果非核心線程創建失敗那么就執行拒絕策略。

可以看到,代碼的執行邏輯和我們在第二章中分析的線程池的工作流程是一樣的。

接下來看下execute方法中創建線程的方法addWoker,addWoker方法承擔了核心線程和非核心線程的創建,通過一個boolean參數core來區分是創建核心線程還是非核心線程。先來看addWorker方法前半部分的代碼: 

  1. // 返回值表示是否成功創建了線程  
  2.   private boolean addWorker(Runnable firstTask, boolean core) {  
  3.        // 這里做了一個retry標記,相當于goto.  
  4.        retry:  
  5.        for (int c = ctl.get();;) {  
  6.            // Check if queue empty only if necessary.  
  7.            if (runStateAtLeast(c, SHUTDOWN)  
  8.                && (runStateAtLeast(c, STOP)  
  9.                    || firstTask != null  
  10.                    || workQueue.isEmpty()))  
  11.                return false;  
  12.            for (;;) {  
  13.                // 根據core來確定創建最大線程數,超過最大值則創建線程失敗,注意這里的最大值可能有s三個corePoolSize、maximumPoolSize和線程池線程的最大容量  
  14.                if (workerCountOf(c)  
  15.                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))  
  16.                    return false;  
  17.                // 通過CAS來將線程數+1,如果成功則跳出循環,執行下邊邏輯     
  18.                if (compareAndIncrementWorkerCount(c))  
  19.                    break retry;  
  20.                c = ctl.get();  // Re-read ctl  
  21.                // 線程池的狀態發生了改變,退回retry重新執行  
  22.                if (runStateAtLeast(c, SHUTDOWN))  
  23.                    continue retry;  
  24.            }  
  25.        }  
  26.        // ...省略后半部分  
  27.        return workerStarted;  
  28.    } 

這部分代碼會通過是否創建核心線程來確定線程池中線程數的值,如果是創建核心線程,那么最大值不能超過corePoolSize,如果是創建非核心線程那么線程數不能超過maximumPoolSize,另外無論是創建核心線程還是非核心線程,最大線程數都不能超過線程池允許的最大線程數COUNT_MASK(有可能設置的maximumPoolSize大于COUNT_MASK)。如果線程數大于最大值就返回false,創建線程失敗。

接下來通過CAS將線程數加1,如果成功那么就break retry結束無限循環,如果CAS失敗了則就continue retry從新開始for循環,注意這里的retry不是Java的關鍵字,是一個可以任意命名的字符。

接下來,如果能繼續向下執行則開始執行創建線程并執行任務的工作了,看下addWorker方法的后半部分代碼: 

  1. private boolean addWorker(Runnable firstTask, boolean core) {  
  2.        // ...省略前半部分  
  3.        boolean workerStarted = false 
  4.        boolean workerAdded = false 
  5.        Worker w = null 
  6.        try { 
  7.             // 實例化一個Worker,內部封裝了線程  
  8.            w = new Worker(firstTask);  
  9.            // 取出新建的線程  
  10.            final Thread t = w.thread;  
  11.            if (t != null) {  
  12.                // 這里使用ReentrantLock加鎖保證線程安全  
  13.                final ReentrantLock mainLock = this.mainLock;  
  14.                mainLock.lock();  
  15.                try {  
  16.                    int c = ctl.get();  
  17.                    // 拿到鎖湖重新檢查線程池狀態,只有處于RUNNING狀態或者處于SHUTDOWN并且firstTask==null時候才會創建線程  
  18.                    if (isRunning(c) ||  
  19.                        (runStateLessThan(c, STOP) && firstTask == null)) {  
  20.                        // 線程不是處于NEW狀態,說明線程已經啟動,拋出異常  
  21.                        if (t.getState() != Thread.State.NEW)  
  22.                            throw new IllegalThreadStateException();  
  23.                        // 將線程加入線程隊列,這里的worker是一個HashSet    
  24.                        workers.add(w);  
  25.                        workerAdded = true 
  26.                        int s = workers.size();  
  27.                        if (s > largestPoolSize)  
  28.                            largestPoolSize = s;  
  29.                    }  
  30.                } finally {  
  31.                    mainLock.unlock();  
  32.                }  
  33.                if (workerAdded) {  
  34.                    // 開啟線程執行任務  
  35.                    t.start();  
  36.                    workerStarted = true 
  37.                }  
  38.            }  
  39.        } finally {  
  40.            if (! workerStarted)  
  41.                addWorkerFailed(w);  
  42.        }  
  43.        return workerStarted;  
  44.    } 

這部分邏輯其實比較容易理解,就是創建Worker并開啟線程執行任務的過程,Worker是對線程的封裝,創建的worker會被添加到ThreadPoolExecutor中的HashSet中。也就是線程池中的線程都維護在這個名為workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的線程可能處于正在工作的狀態,也可能處于空閑狀態,一旦達到指定的空閑時間,則會根據條件進行回收線程。

我們知道,線程調用start后就會開始執行線程的邏輯代碼,執行完后線程的生命周期就結束了,那么線程池是如何保證Worker執行完任務后仍然不結束的呢?當線程空閑超時或者關閉線程池又是怎樣進行線程回收的呢?這個實現邏輯其實就在Worker中??聪耊orker的代碼: 

  1. private final class Worker  
  2.        extends AbstractQueuedSynchronizer  
  3.        implements Runnable  
  4.    {  
  5.        // 執行任務的線程  
  6.        final Thread thread;  
  7.        // 初始化Worker時傳進來的任務,可能為null,如果不空,則創建和立即執行這個task,對應核心線程創建的情況  
  8.        Runnable firstTask;  
  9.        Worker(Runnable firstTask) {  
  10.            // 初始化時設置setate為-1  
  11.            setState(-1); // inhibit interrupts until runWorker  
  12.            this.firstTask = firstTask;  
  13.            // 通過線程工程創建線程  
  14.            this.thread = getThreadFactory().newThread(this);  
  15.        }  
  16.        // 線程的真正執行邏輯  
  17.        public void run() {  
  18.            runWorker(this);  
  19.        }  
  20.        // 判斷線程是否是獨占狀態,如果不是意味著線程處于空閑狀態  
  21.        protected boolean isHeldExclusively() {  
  22.            return getState() != 0; 
  23.        }  
  24.        // 獲取鎖  
  25.        protected boolean tryAcquire(int unused) {  
  26.            if (compareAndSetState(0, 1)) {  
  27.                setExclusiveOwnerThread(Thread.currentThread());  
  28.                return true; 
  29.             }  
  30.            return false;  
  31.        }  
  32.        // 釋放鎖 
  33.         protected boolean tryRelease(int unused) {  
  34.            setExclusiveOwnerThread(null);  
  35.            setState(0);  
  36.            return true;  
  37.        }  
  38.        // ...  
  39.    } 

Worker是位于ThreadPoolExecutor中的一個內部類,它繼承了AQS,使用AQS來實現了獨占鎖的功能,但是并沒支持可重入。這里使用不可重入的特性來表示線程的執行狀態,即可以通過isHeldExclusively方法來判斷,如果是獨占狀態,說明線程正在執行任務,如果非獨占狀態,說明線程處于空閑狀態。關于AQS我們前邊文章中已經詳細分析過了,不了解AQS的可以翻看前邊ReentrantLock的文章。

另外,Worker還實現了Runnable接口,因此它的執行邏輯就是在run方法中,run方法調用的是線程池中的runWorker(this)方法。任務的執行邏輯就在runWorker方法中,它的代碼如下: 

  1. final void runWorker(Worker w) {  
  2.      Thread wt = Thread.currentThread();  
  3.      // 取出Worker中的任務,可能為空  
  4.      Runnable task = w.firstTask;  
  5.      w.firstTask = null 
  6.      w.unlock(); // allow interrupts  
  7.      boolean completedAbruptly = true 
  8.      try {  
  9.          // task不為null或者阻塞隊列中有任務,通過循環不斷的從阻塞隊列中取出任務執行 
  10.          while (task != null || (task = getTask()) != null) {  
  11.              w.lock();  
  12.              // ...  
  13.              try {  
  14.                  // 任務執行前的hook點  
  15.                  beforeExecute(wt, task);  
  16.                  try {  
  17.                      // 執行任務  
  18.                      task.run();  
  19.                      // 任務執行后的hook點  
  20.                      afterExecute(task, null);  
  21.                  } catch (Throwable ex) {  
  22.                      afterExecute(task, ex);  
  23.                      throw ex;  
  24.                  }  
  25.              } finally {  
  26.                  task = null 
  27.                  w.completedTasks++;  
  28.                  w.unlock();  
  29.              }  
  30.          }  
  31.          completedAbruptly = false 
  32.      } finally {  
  33.          // 超時沒有取到任務,則回收空閑超時的線程  
  34.          processWorkerExit(w, completedAbruptly);  
  35.      }  
  36.  } 

可以看到,runWorker的核心邏輯就是不斷通過getTask方法從阻塞隊列中獲取任務并執行.通過這樣的方式實現了線程的復用,避免了創建線程。這里要注意的是這里是一個“生產者-消費者”模式,getTask是從阻塞隊列中取任務,所以如果阻塞隊列中沒有任務的時候就會處于阻塞狀態。

getTask中通過判斷是否要回收線程而設置了等待超時時間,如果阻塞隊列中一直沒有任務,那么在等待keepAliveTime時間后會拋出異常。最終會走到上述代碼的finally方法中,意味著有線程空閑時間超過了keepAliveTime時間,那么調用processWorkerExit方法移除Worker。processWorkerExit方法中沒有復雜難以理解的邏輯,這里就不再貼代碼了。我們重點看下getTask中是如何處理的,代碼如下: 

  1.  private Runnable getTask() {  
  2.     boolean timedOut = false; // Did the last poll() time out?  
  3.     for (;;) {  
  4.         int c = ctl.get();  
  5.         // ...   
  6.         // Flag1. 如果配置了allowCoreThreadTimeOut==true或者線程池中的線程數大于核心線程數,則timed為true,表示開啟指定線程超時后被回收  
  7.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 
  8.         // ...  
  9.         try {  
  10.             // Flag2. 取出阻塞隊列中的任務,注意如果timed為true,則會調用阻塞隊列的poll方法,并設置超時時間為keepAliveTime,如果超時沒有取到任務則會拋出異常。  
  11.             Runnable r = timed ?  
  12.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
  13.                 workQueue.take();  
  14.             if (r != null)  
  15.                 return r;  
  16.             timedOut = true 
  17.         } catch (InterruptedException retry) {  
  18.             timedOut = false 
  19.         }  
  20.     }  

重點看getTask是如何處理空閑超時的邏輯的。我們知道,回收線程的條件是線程大于核心線程數或者配置了allowCoreThreadTimeOut為true,當線程空閑超時的情況下就會回收線程。上述代碼在Flag1處先判斷了如果線程池中的線程數大于核心線程數,或者開啟了allowCoreThreadTimeOut,那么就需要開啟線程空閑超時回收。

所有在Flag2處,timed為true的情況下調用了阻塞隊列的poll方法,并傳入了超時時間為keepAliveTime,如果在keepAliveTime時間內,阻塞隊列一直為null那么久會拋出異常,結束runWorker的循環。進而執行runWorker方法中回收線程的操作。

這里需要我們理解阻塞隊列poll方法的使用,poll方法接受一個時間參數,是一個阻塞操作,在給定的時間內沒有獲取到數據就會拋出異常。其實說白了,阻塞隊列就是一個使用ReentrantLock實現的“生產者-消費者”模式,我們在深入理解Java線程的等待與喚醒機制(二)這篇文章中使用ReentrantLock實現“生產者-消費者”模型其實就是一個簡單的阻塞隊列,與JDK中的BlockingQueue實現機制類似。感興趣的同學可以自己查看ArrayBlockingQueue等阻塞隊列的實現,限于文章篇幅,這里就不再贅述了。

ThreadPoolExecutor的拒絕策略

上一小節中我們多次提到線程池的拒絕策略,它是在reject方法中實現的。實現代碼也非常簡單,代碼如下: 

  1. final void reject(Runnable command) {  
  2.     handler.rejectedExecution(command, this);  

通過調用handler的rejectedExecution方法實現。這里其實就是運用了策略模式,handler是一個RejectedExecutionHandler類型的成員變量,RejectedExecutionHandler是一個接口,只有一個rejectedExecution方法。在實例化線程池時構造方法中傳入對應的拒絕策略實例即可。前文已經提到了Java提供的幾種默認實現分別為DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

以AbortPolicy直接拋出異常為例,來看下代碼實現: 

  1. public static class AbortPolicy implements RejectedExecutionHandler {  
  2.    public AbortPolicy() { }  
  3.    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
  4.        throw new RejectedExecutionException("Task " + r.toString() +  
  5.                                             " rejected from " +  
  6.                                             e.toString());  
  7.    } 

可以看到直接在rejectedExecution方法中拋出了RejectedExecutionException來拒絕任務。其他的幾個策略實現也都比較簡單,有興趣可以自己查閱代碼。

ThreadPoolExecutor的shutdown

調用shutdown方法后,會將線程池標記為SHUTDOWN狀態,上邊execute的源碼可以看出,只有線程池是RUNNING狀態才接受任務,因此被標記位SHUTDOWN后,再提交任務會被線程池拒絕。shutdown的代碼如下: 

  1. public void shutdown() {  
  2.     final ReentrantLock mainLock = this.mainLock;  
  3.     mainLock.lock();  
  4.     try {  
  5.         //檢查是否可以關閉線程  
  6.         checkShutdownAccess();  
  7.         // 將線程池狀態置為SHUTDOWN狀態  
  8.         advanceRunState(SHUTDOWN);  
  9.         // 嘗試中斷空閑線程  
  10.         interruptIdleWorkers();  
  11.         // 空方法,線程池關閉的hook點 
  12.          onShutdown();   
  13.     } finally {  
  14.         mainLock.unlock();  
  15.     }  
  16.     tryTerminate();  
  17.  
  18. private void interruptIdleWorkers() {  
  19.     interruptIdleWorkers(false);  
  20. }     

修改線程池為SHUTDOWN狀態后,會調用interruptIdleWorkers去中斷空閑線程線程,具體實現邏輯是在interruptIdleWorkers(boolean onlyOne)方法中,如下:   

  1. private void interruptIdleWorkers(boolean onlyOne) {  
  2.        final ReentrantLock mainLock = this.mainLock;  
  3.        mainLock.lock();  
  4.        try {  
  5.            for (Worker w : workers) {  
  6.                Thread t = w.thread;  
  7.                // 嘗試tryLock獲取鎖,如果拿鎖成功說明線程是空閑狀態  
  8.                if (!t.isInterrupted() && w.tryLock()) {  
  9.                    try {  
  10.                        // 中斷線程  
  11.                        t.interrupt();  
  12.                    } catch (SecurityException ignore) {  
  13.                    } finally {  
  14.                        w.unlock();  
  15.                    } 
  16.                 }  
  17.                if (onlyOne)  
  18.                    break;  
  19.            }  
  20.        } finally {  
  21.            mainLock.unlock();  
  22.        }  
  23.    } 

shutdown的邏輯比較簡單,里邊做了兩件比較重要的事情,即先將線程池狀態修改為SHUTDOWN,接著遍歷所有Worker,將空閑的Worker進行中斷。

總結

本文深入的探究了線程池的工作流程和實現原理。就線程池的工作流程而言其實并不難以理解。但是在分析線程池的源碼時,如果沒有很好的并發基礎的話,大概率是難以讀懂線程池的源碼的。因為線程池內部使用了大量并發知識,對任何一點用到的并發知識認識不到位都會造成理解偏差。寫這篇文章參看了很多的其他線程池的相關文章,幾乎沒有找到一篇能夠剖析清楚線程池源碼的文章。歸根結底還是沒能系統的理解Atomic、Lock與AQS、CAS、阻塞隊列等并發相關知識。 

 

責任編輯:龐桂玉 來源: Android技術之家
相關推薦

2023-05-29 08:12:38

2023-10-18 10:55:55

HashMap

2020-10-14 08:50:38

搞懂 Netty 線程

2022-08-26 13:24:03

version源碼sources

2021-10-11 11:58:41

Channel原理recvq

2021-10-09 19:05:06

channelGo原理

2022-09-26 00:48:14

線程池阻塞數據

2024-03-11 18:18:58

項目Spring線程池

2024-11-27 08:15:50

2012-05-15 02:18:31

Java線程池

2022-04-24 11:06:54

SpringBootjar代碼

2021-07-08 10:08:03

DvaJS前端Dva

2025-07-17 07:26:47

2024-07-15 08:20:24

2025-11-13 08:08:15

2025-07-15 02:00:00

2022-04-11 10:56:43

線程安全

2018-10-31 15:54:47

Java線程池源碼

2020-10-22 09:35:11

線程池核心線程阻塞隊列

2025-04-21 04:00:00

點贊
收藏

51CTO技術棧公眾號

国产九九九九九| 精品久久国产字幕高潮| 欧美精品亚洲| 一级一片免费看| 91偷拍一区二区三区精品| 91精品国模一区二区三区| 久操手机在线视频| 亚洲av成人精品一区二区三区在线播放| 午夜在线a亚洲v天堂网2018| 最近2019年手机中文字幕 | 久久人人爽人人爽人人片 | 国产免费黄色大片| 日韩黄色影院| 国产成人免费视频网站| 456亚洲影院| 精品无码一区二区三区蜜臀| 黄色美女久久久| 欧美日韩卡一卡二| 久草热视频在线观看| 天堂资源在线中文| 99精品欧美一区二区三区小说| 国产欧美一区二区三区视频 | 嫩草研究院在线观看| 久久国产精品第一页| 97精品国产97久久久久久免费| jizz中文字幕| 欧美毛片免费观看| 欧美一级片在线看| 91热这里只有精品| cao在线视频| 亚洲柠檬福利资源导航| 欧美日韩精品久久| 欧美熟妇交换久久久久久分类| 美女在线观看视频一区二区| 57pao国产成人免费| 青青草原国产视频| 卡一精品卡二卡三网站乱码| 欧美日韩一区二区在线观看| 男人的天堂99| 1区2区3区在线| 亚洲人精品午夜| 亚洲精品在线视频观看| 日本一区高清| 93久久精品日日躁夜夜躁欧美| 69堂成人精品视频免费| 91麻豆成人精品国产| 日韩精品三区四区| 热久久这里只有| 亚洲精品国产精品乱码| 一区在线播放| 国产69精品久久久久99| 精品视频久久久久| 欧美精品国产一区| 久久国产天堂福利天堂| www.av免费| 亚洲综合色站| 欧美精品免费看| 三级黄色录像视频| 99热国内精品| 精品激情国产视频| 五月综合色婷婷| 欧美影院一区| 欧美激情网站在线观看| 久青草免费视频| 亚洲三级免费| 欧美在线激情视频| 日韩精品一区不卡| 日韩精品亚洲一区| 国产精品视频自拍| 91成人一区二区三区| 久久99精品国产91久久来源| 国产美女久久精品香蕉69| 中文字幕一区二区在线视频 | 超碰91在线播放| 日韩在线精品强乱中文字幕| 精品日韩在线观看| 97人妻精品一区二区三区免 | 国产四区在线观看| 国产视频一区二区| 夜夜嗨av一区二区三区网页| 中国丰满熟妇xxxx性| 国产美女高潮在线| 色久优优欧美色久优优| 色婷婷.com| 91成人入口| 亚洲第一页在线| 麻豆精品免费视频| 欧美国产美女| 久久久久久久97| 91video| 日韩福利电影在线观看| 亚洲v日韩v综合v精品v| 免费av网站观看| 国产午夜精品福利| 精品国产三级a∨在线| 韩国精品一区| 欧美丰满少妇xxxbbb| 荫蒂被男人添免费视频| 欧美日韩国产传媒| 精品中文字幕视频| 亚洲成熟少妇视频在线观看| 国产一区不卡视频| 欧美精品一区在线| 性直播体位视频在线观看| 欧美性猛交99久久久久99按摩| 小明看看成人免费视频| 久草在线综合| 久久av资源网站| 欧美a视频在线观看| 国产美女一区二区三区| 欧美久久久久久久| 欧美野外wwwxxx| 在线中文字幕一区二区| 欧美熟妇精品一区二区 | 午夜探花在线观看| 中文一区一区三区高中清不卡免费| 欧美日韩另类国产亚洲欧美一级| 欧类av怡春院| 亚洲精品网址| 国产精品成人一区二区三区吃奶| 色婷婷中文字幕| 亚洲色图欧美偷拍| 国产一级做a爰片久久| 免费福利视频一区| 久久成人亚洲精品| 中文字幕av在线免费观看| 不卡的av电影在线观看| 在线观看污视频| 国产精品99| 亚洲精品一区二区三区在线| 欧美电影影音先锋| 欧洲美一区二区三区亚洲| 亚洲一级电影| 666精品在线| 浪潮av一区| 欧美偷拍一区二区| 在线观看av中文字幕| 激情久久综合| 国产精品免费观看高清| gogogogo高清视频在线| 欧美乱熟臀69xxxxxx| 日本xxxx裸体xxxx| 在线一区欧美| 国产女人水真多18毛片18精品 | 天天综合网在线| 一区二区三区日韩| 日本成人xxx| 亚洲人成免费网站| 亚洲自拍小视频| 成人在线网址| 日韩欧美一级在线播放| 精品无码久久久久成人漫画| 国内外成人在线视频| 亚洲一区二区自拍偷拍| 成人看片毛片免费播放器| 在线播放日韩专区| 涩涩视频在线观看| 中文字幕高清一区| 亚洲图色中文字幕| 香蕉久久网站| 国产高清自拍一区| а√在线中文在线新版| 亚洲精品720p| 黄色网址中文字幕| 亚洲国产高清aⅴ视频| 9久久婷婷国产综合精品性色| 人人狠狠综合久久亚洲婷婷| 国产啪精品视频网站| 超碰超碰在线| 亚洲成色777777女色窝| 欧美一区二区三区四| 久久久美女艺术照精彩视频福利播放| av免费在线播放网站| 日韩欧美视频专区| 97se在线视频| 一区二区乱码| 中文字幕亚洲国产| 国产特黄一级片| 天天摸夜夜添狠狠添婷婷| 国产精品污www在线观看| 成人日韩在线视频| 欧美日韩理论| 久久婷婷开心| av在线播放一区| 欧美裸体xxxx极品少妇| 天堂av2024| 欧美亚洲国产一区在线观看网站 | 一区二区三区在线视频观看| 免费无码一区二区三区| 欧美日韩高清在线一区| 二区三区不卡| 日韩漫画puputoon| 久久网福利资源网站| 国产综合在线播放| 在线观看一区二区视频| 无码黑人精品一区二区| aaa亚洲精品| 中文字幕成人在线视频| 激情偷拍久久| 亚洲国产欧美日韩| 亚洲天堂中文字幕在线观看| 日本韩国欧美精品大片卡二| av大全在线| 亚洲精品一区二区三区婷婷月 | heyzo高清中文字幕在线| 亚洲人成啪啪网站| 午夜精品久久久久久久99| 色婷婷综合久色| 青青草手机在线观看| 国产欧美日韩三级| 亚洲国产精品第一页| 青椒成人免费视频| 性欧美大战久久久久久久| 日韩欧美精品| 欧美日韩最好看的视频| 综合成人在线| 国产综合香蕉五月婷在线| 日韩av影片| 久久久久久久久91| 最新97超碰在线| 日韩精品在线视频| 亚洲精品无码专区| 欧美人xxxx| 91视频久久久| 高跟丝袜一区二区三区| 精品无码人妻一区二区三区| 中文字幕中文字幕中文字幕亚洲无线| 在线精品一区二区三区| 国产91精品露脸国语对白| 亚洲欧美偷拍另类| 丝袜国产日韩另类美女| 久久久久久久久久久99| 国产精品国码视频| 成人短视频在线看| 国产在视频线精品视频| 精品动漫3d一区二区三区免费版| 亚洲一区三区| 久久日文中文字幕乱码| 日本视频精品一区| 欧美变态网站| 国产精品区二区三区日本| 日韩免费一级| 亚洲free嫩bbb| 国产高清日韩| 91在线无精精品一区二区| 日本久久一区| 成人黄色在线播放| 日韩欧美专区| 成人性生交大片免费看视频直播 | 成人性生交大片免费看在线播放| 久久久91精品| 粗大黑人巨茎大战欧美成人| 久久在线精品视频| a黄色片在线观看| 久久综合伊人77777蜜臀| 国产在线1区| 九九久久久久99精品| 色yeye免费人成网站在线观看| 欧美国产中文字幕| 91jq激情在线观看| 57pao成人永久免费视频| 久久sese| 国产精品欧美一区二区| 成人在线中文| 91在线观看免费高清| 欧美激情三级| 国产精品免费观看高清| 亚洲人成网站77777在线观看| 欧美日韩电影一区二区三区| 日韩久久电影| 少妇一晚三次一区二区三区| 亚洲精品三级| www.日本xxxx| 国产一区福利在线| 丝袜熟女一区二区三区| 久久美女高清视频| 日本黄区免费视频观看 | 日韩精品久久久久久久酒店| 欧美日韩亚洲系列| 中文字幕一区2区3区| 欧美电影精品一区二区| 色资源在线观看| 最新日韩中文字幕| 青春草在线视频| 日本久久久久久| 免费欧美网站| 欧美日韩国产精品一卡| 久久久久久久久久久久久久| 日韩中字在线观看| 日本中文字幕一区| 国产裸体视频网站| 久久婷婷久久一区二区三区| 网站永久看片免费| 亚洲国产人成综合网站| 99久久久无码国产精品免费蜜柚 | 欧美不卡在线一区二区三区| 欧美成人高清视频在线观看| 91成人免费在线观看| 美女一区二区在线观看| 亚洲欧洲一区二区| 1024日韩| 亚洲欧美偷拍另类| av午夜精品一区二区三区| 一本在线免费视频| 偷拍日韩校园综合在线| 91中文字幕在线播放| 亚洲欧美中文日韩v在线观看| 国产三区视频在线观看| 国产成人一区二区三区| 91成人福利| 中文字幕中文字幕在线中心一区| 夜夜精品视频| 性生活一级大片| 国产情人综合久久777777| 国产真实的和子乱拍在线观看| 欧美日韩一区小说| 久久久久久青草| 久久久免费av| 日韩一区二区三区精品| 五月天男人天堂| 日韩高清在线观看| 自拍视频一区二区| 一区二区三区在线观看国产| 亚洲熟女乱色一区二区三区久久久| 精品视频www| 爱看av在线| 亚洲一区二区三区久久| 久久免费精品视频在这里| av免费网站观看| 91亚洲国产成人精品一区二三| 久热这里有精品| 91精品在线一区二区| 97视频在线观看网站| 国产成人精品久久| 亚洲va久久| 欧美s码亚洲码精品m码| eeuss鲁片一区二区三区在线观看| 欧美日韩在线观看成人| 这里是久久伊人| 黄色网址在线免费播放| 国产精品女人网站| 成人综合一区| 亚洲 欧美 另类人妖| 国产蜜臀av在线一区二区三区 | 在线播放91灌醉迷j高跟美女 | 国产精品少妇在线视频| 91女厕偷拍女厕偷拍高清| 国产手机在线视频| 亚洲国产精品系列| av最新在线| 欧美精品一区二区三区久久| 爽好多水快深点欧美视频| 精品久久久久久中文字幕人妻最新| 黄网站色欧美视频| 牛牛澡牛牛爽一区二区| 国产成人精品久久久| 欧美偷拍综合| 在线免费看污网站| 亚洲视频狠狠干| 超碰在线观看99| 国内成人精品一区| 无码久久精品国产亚洲av影片| 成人午夜电影网站| 久久网免费视频| 日韩成人免费视频| 免费成人直播| 亚洲国产一区二区精品视频| 久久精品国产一区二区| 青青草原在线免费观看| 欧美tickling网站挠脚心| 秋霞伦理一区| 色综合影院在线观看| 国内精品伊人久久久久av影院| 欧美成人精品欧美一级| 亚洲激情视频网| 日韩一级二级| 无码毛片aaa在线| 99视频超级精品| 中文字幕观看视频| 欧美成年人视频网站欧美| 韩国女主播一区二区三区| 成熟老妇女视频| 亚洲视频资源在线| 日本xxxx人| 国产精品久久久久久久久借妻 | 欧美国产亚洲另类动漫| 国产欧美综合视频| 亚洲97在线观看| 成人影院在线| 男人女人拔萝卜视频| 一本久久a久久免费精品不卡| 麻豆影院在线| 美女视频久久| 国产精品一区二区你懂的| 日本韩国欧美中文字幕| 久久精品久久久久久国产 免费| 都市激情久久| 亚洲精品免费一区亚洲精品免费精品一区| 亚洲一级电影视频| 97电影在线|