精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

并發編程之ForkJoin框架原理分析

開發 前端
Java7 又提供了的一個用于并行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認識Fork/Join 框架。

[[358064]]

 前言

前面我們介紹了線程池框架(ExecutorService)的兩個具體實現:

  • ThreadPoolExecutor 默認線程池
  • ScheduledThreadPoolExecutor定時線程池

線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到多個任務上。Java7 又提供了的一個用于并行執行的任務的框架 Fork/Join ,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認識Fork/Join 框架。

任務性質類型

CPU密集型(CPU bound)

CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能相對于CPU要好很好多,此時,系統運作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。

在多重程序系統中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之 CPU bound。例如一個計算圓周率至小數點一千位以下的程序,在執行的過程當中絕大部分時間在用三角函數和開根號的計算,便是屬于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相當高。這可能是因為任務本身不太需要訪問I/O設備,也可能是因為程序是多線程實現因此屏蔽了等待I/O的時間。

  • 線程數一般設置為:線程數 = CPU核數 + 1(現代CPU支持超線程)

IO密集型(I/O bound)

I/O密集型指的是系統的CPU性能相對硬盤、內存要好很多,此時,系統運作,大部分的狀況是 CPU 在等 I/O(硬盤/內存)的讀/寫操作,此時 CPU Loading 并不高。

I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。

  • 線程數一般設置為:線程數 = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數目

CPU密集型 VS I/O密集型

我們可以把任務分為計算密集型和I/O密集型。

計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等于CPU的核心數。

計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。

第二種任務的類型是I/O密集型,涉及到網絡、磁盤I/O的任務都是I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因為I/O的速度遠遠低于CPU和內存的速度)。對于I/O密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是I/O密集型任務,比如Web應用。

I/O密集型任務執行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對于I/O密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

什么是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一個用于并行執行的任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

Fork 就是把一個大任務切分為若干個子任務并行的執行,Join 就是合并這些子任務的執行結果,最后得到這個大任務的結果。比如計算 1+2+......+10000,可以分割成10個子任務,每個子任務對1000個數進行求和,最終匯總這10個子任務的結果。如下圖所示:


Fork/Join的特性:

  1. ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于實現“分而治之”的算法,特別是分治之后遞歸調用的函數,例如 quick sort 等;
  3. ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O、線程間同步、sleep() 等會造成線程長時間阻塞的情況時,最好配合 MangedBlocker。

關于“分而治之”的算法,可以查看《分治、回溯的實現和特性》

工作竊取算法

工作竊取(work-stealing)算法 是指某個線程從其他隊列里竊取任務來執行。

我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。

但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。


工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。


  1. ForkJoinPool 的每個工作線程都維護著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
  2. 每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
  3. 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自于剛剛提交到 pool 的任務,或是來自于其他工作線程的工作隊列),竊取的任務位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
  4. 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,并等待其完成。
  5. 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

Fork/Join的使用

使用場景示例

定義fork/join任務,如下示例,隨機生成2000w條數據在數組當中,然后求和_

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import java.util.concurrent.RecursiveTask; 
  4.  
  5. /** 
  6.  * RecursiveTask 并行計算,同步有返回值 
  7.  * ForkJoin框架處理的任務基本都能使用遞歸處理,比如求斐波那契數列等,但遞歸算法的缺陷是: 
  8.  * 一只會只用單線程處理, 
  9.  * 二是遞歸次數過多時會導致堆棧溢出; 
  10.  * ForkJoin解決了這兩個問題,使用多線程并發處理,充分利用計算資源來提高效率,同時避免堆棧溢出發生。 
  11.  * 當然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架, 
  12.  * 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數組排序。 
  13.  * 最佳應用場景:多核、多內存、可以分割計算再合并的計算密集型任務 
  14.  */ 
  15. class LongSum extends RecursiveTask<Long> { 
  16.     //任務拆分的最小閥值 
  17.     static final int SEQUENTIAL_THRESHOLD = 1000; 
  18.     static final long NPS = (1000L * 1000 * 1000); 
  19.     static final boolean extraWork = true; // change to add more than just a sum 
  20.  
  21.  
  22.     int low; 
  23.     int high; 
  24.     int[] array; 
  25.  
  26.     LongSum(int[] arr, int lo, int hi) { 
  27.         array = arr; 
  28.         low = lo; 
  29.         high = hi; 
  30.     } 
  31.  
  32.     /** 
  33.      * fork()方法:將任務放入隊列并安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢并重新初始化。 
  34.      * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不一定成功。 
  35.      * join()方法:等待計算完成并返回計算結果。 
  36.      * isCompletedAbnormally()方法:用于判斷任務計算是否發生異常。 
  37.      */ 
  38.     protected Long compute() { 
  39.  
  40.         if (high - low <= SEQUENTIAL_THRESHOLD) { 
  41.             long sum = 0; 
  42.             for (int i = low; i < high; ++i) { 
  43.                 sum += array[i]; 
  44.             } 
  45.             return sum
  46.  
  47.         } else { 
  48.             int mid = low + (high - low) / 2; 
  49.             LongSum left = new LongSum(array, low, mid); 
  50.             LongSum right = new LongSum(array, mid, high); 
  51.             left.fork(); 
  52.             right.fork(); 
  53.             long rightAns = right.join(); 
  54.             long leftAns = left.join(); 
  55.             return leftAns + rightAns; 
  56.         } 
  57.     } 

 執行fork/join任務

  1. package com.niuh.forkjoin.recursivetask; 
  2.  
  3. import com.niuh.forkjoin.utils.Utils; 
  4.  
  5. import java.util.concurrent.ForkJoinPool; 
  6. import java.util.concurrent.ForkJoinTask; 
  7.  
  8. public class LongSumMain { 
  9.     //獲取邏輯處理器數量 
  10.     static final int NCPU = Runtime.getRuntime().availableProcessors(); 
  11.     /** 
  12.      * for time conversion 
  13.      */ 
  14.     static final long NPS = (1000L * 1000 * 1000); 
  15.  
  16.     static long calcSum; 
  17.  
  18.     static final boolean reportSteals = true
  19.  
  20.     public static void main(String[] args) throws Exception { 
  21.         int[] array = Utils.buildRandomIntArray(2000000); 
  22.         System.out.println("cpu-num:" + NCPU); 
  23.         //單線程下計算數組數據總和 
  24.         long start = System.currentTimeMillis(); 
  25.         calcSum = seqSum(array); 
  26.         System.out.println("seq sum=" + calcSum); 
  27.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  28.  
  29.         start = System.currentTimeMillis(); 
  30.         //采用fork/join方式將數組求和任務進行拆分執行,最后合并結果 
  31.         LongSum ls = new LongSum(array, 0, array.length); 
  32.         ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數 
  33.         ForkJoinTask<Long> task = fjp.submit(ls); 
  34.  
  35.         System.out.println("forkjoin sum=" + task.get()); 
  36.         System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start)); 
  37.         if (task.isCompletedAbnormally()) { 
  38.             System.out.println(task.getException()); 
  39.         } 
  40.  
  41.         fjp.shutdown(); 
  42.  
  43.     } 
  44.  
  45.  
  46.     static long seqSum(int[] array) { 
  47.         long sum = 0; 
  48.         for (int i = 0; i < array.length; ++i) { 
  49.             sum += array[i]; 
  50.         } 
  51.         return sum
  52.     } 

 Fork/Join框架原理

Fork/Join 其實就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實現其三個抽象子類)為任務、ForkJoinWorkerThread作為執行任務的具體線程實體這三者構成的任務調度機制。


ForkJoinWorkerThread

ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對線程的調度執行做任何更改。


ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創建出來之后都被設置成為了守護線程,由它來執行ForkJoinTasks。該類主要為了維護創建線程實例時通過ForkJoinPool為其創建的任務隊列,與其他兩個線程池整個線程池只有一個任務隊列不同,ForkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實現任務竊取機制,該隊列被設計成一個雙端隊列,而ForkJoinWorkerThread的首要任務就是執行自己的這個雙端任務隊列中的任務,其次是竊取其他線程的工作隊列,以下是其代碼片段:

  1. public class ForkJoinWorkerThread extends Thread { 
  2.  // 這個線程工作的ForkJoinPool池 
  3.     final ForkJoinPool pool;     
  4.     // 這個線程擁有的工作竊取機制的工作隊列 
  5.     final ForkJoinPool.WorkQueue workQueue;  
  6.  
  7.     //創建在給定ForkJoinPool池中執行的ForkJoinWorkerThread。 
  8.     protected ForkJoinWorkerThread(ForkJoinPool pool) { 
  9.         // Use a placeholder until a useful name can be set in registerWorker 
  10.         super("aForkJoinWorkerThread"); 
  11.         this.pool = pool; 
  12.         //向ForkJoinPool執行池注冊當前工作線程,ForkJoinPool為其分配一個工作隊列 
  13.         this.workQueue = pool.registerWorker(this);  
  14.     } 
  15.  
  16.     //該工作線程的執行內容就是執行工作隊列中的任務 
  17.     public void run() { 
  18.         if (workQueue.array == null) { // only run once 
  19.             Throwable exception = null
  20.             try { 
  21.                 onStart(); 
  22.                 pool.runWorker(workQueue); //執行工作隊列中的任務 
  23.             } catch (Throwable ex) { 
  24.                 exception = ex; //記錄異常 
  25.             } finally { 
  26.                 try { 
  27.                     onTermination(exception); 
  28.                 } catch (Throwable ex) { 
  29.                     if (exception == null
  30.                         exception = ex; 
  31.                 } finally { 
  32.                     pool.deregisterWorker(this, exception); //撤銷工作 
  33.                 } 
  34.             } 
  35.         } 
  36.     } 
  37.  
  38.     ..... 

 ForkJoinTask

ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。


ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join框架提供類以下幾個子類:

  • RecursiveAction:用于沒有返回結果的任務。(比如寫數據到磁盤,然后就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執行。我們可以通過繼承來實現一個 RecusiveAction)
  • RescursiveTask:用于有返回結果的任務。(可以將自己的工作分割為若干更小任務,并將這些子任務的執行合并到一個集體結果??梢杂袔讉€水平的分割和合并)
  • CountedCompleter :在任務完成執行后會觸發執行一個自定義的鉤子函數。

常量介紹

ForkJoinTask 有一個int類型的status字段:

  • 其高16位存儲任務執行狀態例如NORMAL、CANCELLED或EXCEPTIONAL
  • 低16位預留用于用戶自定義的標記。

任務未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小于0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.

  1. public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 
  2.  
  3.     /** 該任務的執行狀態 */ 
  4.     volatile int status; // accessed directly by pool and workers 
  5.     static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits 
  6.     static final int NORMAL      = 0xf0000000;  // must be negative 
  7.     static final int CANCELLED   = 0xc0000000;  // must be < NORMAL 
  8.     static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED 
  9.     static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 
  10.     static final int SMASK       = 0x0000ffff;  // short bits for tags 
  11.  
  12.     // 異常哈希表 
  13.  
  14.     //被任務拋出的異常數組,為了報告給調用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數組。注意,取消異常不會出現在數組,而是記錄在statue字段中 
  15.     //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。 
  16.     private static final ExceptionNode[] exceptionTable;        //異常哈希鏈表數組 
  17.     private static final ReentrantLock exceptionTableLock; 
  18.     private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應的異常節點對象的引用隊列 
  19.  
  20.     /** 
  21.     * 固定容量的exceptionTable. 
  22.     */ 
  23.     private static final int EXCEPTION_MAP_CAPACITY = 32; 
  24.  
  25.  
  26.     //異常數組的鍵值對節點。 
  27.     //該哈希鏈表數組使用線程id進行比較,該數組具有固定的容量,因為它只維護任務異常足夠長,以便參與者訪問它們,所以在持續的時間內不應該變得非常大。但是,由于我們不知道最后一個joiner何時完成,我們必須使用弱引用并刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變為isQuiescent時都會調用helpExpungeStaleExceptions 
  28.     static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 
  29.         final Throwable ex; 
  30.         ExceptionNode next
  31.         final long thrower;  // 拋出異常的線程id 
  32.         final int hashCode;  // 在弱引用消失之前存儲hashCode 
  33.         ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { 
  34.             super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節點加入隊列exceptionTableRefQueue 
  35.             this.ex = ex; 
  36.             this.next = next
  37.             this.thrower = Thread.currentThread().getId(); 
  38.             this.hashCode = System.identityHashCode(task); 
  39.         } 
  40.     } 
  41.  
  42.     ................. 

 除了status記錄任務的執行狀態之外,其他字段主要是為了對任務執行的異常的處理,ForkJoinTask采用了哈希數組 + 鏈表的數據結構(JDK8以前的HashMap實現方法)存放所有(因為這些字段是static)的ForkJoinTask任務的執行異常。

fork 方法(安排任務異步執行)

fork() 做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里(安排任務異步執行)??梢詤⒖匆韵碌脑创a:

  1. public final ForkJoinTask<V> fork() { 
  2.     Thread t; 
  3.     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 
  4.         ((ForkJoinWorkerThread)t).workQueue.push(this); 
  5.     else 
  6.         ForkJoinPool.common.externalPush(this); 
  7.     return this; 

 該方法其實就是將任務通過push方法加入到當前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務),等待被線程池調度執行,這是一個非阻塞的立即返回方法。

  • 這里需要知道,ForkJoinPool線程池通過哈希數組+雙端隊列的方式將所有的工作線程擁有的任務隊列和從外部提交的任務分別映射到哈希數組的不同槽位上。

join 方法(等待執行結果)

join() 的工作則復雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。

  1. 檢查調用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。
  2. 查看任務的完成狀態,如果已經完成,直接返回結果。
  3. 如果任務尚未完成,但處于自己的工作隊列內,則完成它。
  4. 如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成 join 的任務。
  5. 如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
  6. 遞歸地執行第5步。

將上述流程畫成序列圖的話就是這個樣子:

 由于文章篇幅有限,源碼分析請查看文章末尾的“了解更多”

小結

通常ForkJoinTask只適用于非循環依賴的純函數的計算或孤立對象的操作,否則,執行可能會遇到某種形式的死鎖,因為任務循環地等待彼此。但是,這個框架支持其他方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用于構造解決這種依賴任務的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short類型的值,并使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用于任何目的,但是它們可以用于構造專門的子類,由此可以使用提供的方法來避免重新訪問已經處理過的節點/任務。

ForkJoinTask應該執行相對較少的計算,并且應該避免不確定的循環。大任務應該被分解成更小的子任務,通常通過遞歸分解。如果任務太大,那么并行性就不能提高吞吐量。如果太小,那么內存和內部任務維護開銷可能會超過處理開銷。

ForkJoinTask是可序列化的,這使它們能夠在諸如遠程執行框架之類的擴展中使用。只在執行之前或之后序列化任務才是明智的,而不是在執行期間。

ForkJoinPool

ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。


常量介紹

ForkJoinPool 與 內部類 WorkQueue 共享的一些常量

  1. // Constants shared across ForkJoinPool and WorkQueue 
  2.  
  3. // 限定參數 
  4. static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位 
  5. static final int MAX_CAP = 0x7fff;      //  工作線程最大容量 
  6. static final int EVENMASK = 0xfffe;     //  偶數低位掩碼 
  7. static final int SQMASK = 0x007e;       //  workQueues 數組最多64個槽位 
  8.  
  9. // ctl 子域和 WorkQueue.scanState 的掩碼和標志位 
  10. static final int SCANNING = 1;          // 標記是否正在運行任務 
  11. static final int INACTIVE = 1 << 31;    // 失活狀態  負數 
  12. static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA問題 
  13.  
  14. // ForkJoinPool.config 和 WorkQueue.config 的配置信息標記 
  15. static final int MODE_MASK = 0xffff << 16;  // 模式掩碼 
  16. static final int LIFO_QUEUE = 0;    // LIFO隊列 
  17. static final int FIFO_QUEUE = 1 << 16;  // FIFO隊列 
  18. static final int SHARED_QUEUE = 1 << 31;    // 共享模式隊列,負數 ForkJoinPool 中的相關常量和實例字段: 

 ForkJoinPool 中的相關常量和實例字段

  1. // 低位和高位掩碼 
  2. private static final long SP_MASK = 0xffffffffL; 
  3. private static final long UC_MASK = ~SP_MASK; 
  4.  
  5. // 活躍線程數 
  6. private static final int AC_SHIFT = 48; 
  7. private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數增量 
  8. private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數掩碼 
  9.  
  10. // 工作線程數 
  11. private static final int TC_SHIFT = 32; 
  12. private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數增量 
  13. private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼 
  14. private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創建工作線程標志 
  15.  
  16. // 池狀態 
  17. private static final int RSLOCK = 1; 
  18. private static final int RSIGNAL = 1 << 1; 
  19. private static final int STARTED = 1 << 2; 
  20. private static final int STOP = 1 << 29; 
  21. private static final int TERMINATED = 1 << 30; 
  22. private static final int SHUTDOWN = 1 << 31; 
  23.  
  24. // 實例字段 
  25. volatile long ctl;                   // 主控制參數 
  26. volatile int runState;               // 運行狀態鎖 
  27. final int config;                    // 并行度|模式 
  28. int indexSeed;                       // 用于生成工作線程索引 
  29. volatile WorkQueue[] workQueues;     // 主對象注冊信息,workQueue 
  30. final ForkJoinWorkerThreadFactory factory;// 線程工廠 
  31. final UncaughtExceptionHandler ueh;  // 每個工作線程的異常信息 
  32. final String workerNamePrefix;       // 用于創建工作線程的名稱 
  33. volatile AtomicLong stealCounter;    // 偷取任務總數,也可作為同步監視器 
  34.  
  35. /** 靜態初始化字段 */ 
  36. //線程工廠 
  37. public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 
  38. //啟動或殺死線程的方法調用者的權限 
  39. private static final RuntimePermission modifyThreadPermission; 
  40. // 公共靜態pool 
  41. static final ForkJoinPool common; 
  42. //并行度,對應內部common池 
  43. static final int commonParallelism; 
  44. //備用線程數,在tryCompensate中使用 
  45. private static int commonMaxSpares; 
  46. //創建workerNamePrefix(工作線程名稱前綴)時的序號 
  47. private static int poolNumberSequence; 
  48. //線程阻塞等待新的任務的超時值(以納秒為單位),默認2秒 
  49. private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 
  50. //空閑超時時間,防止timer未命中 
  51. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms 
  52. //默認備用線程數 
  53. private static final int DEFAULT_COMMON_MAX_SPARES = 256; 
  54. //阻塞前自旋的次數,用在在awaitRunStateLock和awaitWork中 
  55. private static final int SPINS  = 0; 
  56. //indexSeed的增量 
  57. private static final int SEED_INCREMENT = 0x9e3779b9; 

 ForkJoinPool 的內部狀態都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:

  • AC: 正在運行工作線程數減去目標并行度,高16位
  • TC: 總工作線程數減去目標并行度,中高16位
  • SS: 棧頂等待線程的版本計數和狀態,中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

ForkJoinPool.WorkQueue 中的相關屬性:

  1. //初始隊列容量,2的冪 
  2. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 
  3. //最大隊列容量 
  4. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 
  5.  
  6. // 實例字段 
  7. volatile int scanState;    // Woker狀態, <0: inactive; odd:scanning 
  8. int stackPred;             // 記錄前一個棧頂的ctl 
  9. int nsteals;               // 偷取任務數 
  10. int hint;                  // 記錄偷取者索引,初始為隨機索引 
  11. int config;                // 池索引和模式 
  12. volatile int qlock;        // 1: locked, < 0: terminate; else 0 
  13. volatile int base;         // 下一個poll操作的索引(棧底/隊列頭) 
  14. int top;                   // 一個push操作的索引(棧頂/隊列尾) 
  15. ForkJoinTask<?>[] array;   // 任務數組 
  16. final ForkJoinPool pool;   // the containing pool (may be null
  17. final ForkJoinWorkerThread owner; // 當前工作隊列的工作線程,共享模式下為null 
  18. volatile Thread parker;    // 調用park阻塞期間為owner,其他情況為null 
  19. volatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務 
  20. volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務 

 內部數據結構

ForkJoinPool采用了哈希數組 + 雙端隊列的方式存放任務,但這里的任務分為兩類:

  • 一類是通過execute、submit 提交的外部任務
  • 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務

ForkJoinPool并沒有把這兩種任務混在一個任務隊列中,對于外部任務,會利用Thread內部的隨機probe值映射到哈希數組的偶數槽位中的提交隊列中,這種提交隊列是一種數組實現的雙端隊列稱之為Submission Queue,專門存放外部提交的任務。

對于ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數組的奇數槽位,每一個工作線程fork/join分解的任務都會被添加到自己擁有的那個工作隊列中。

在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數組,其元素就是內部類WorkQueue實現的基于數組的雙端隊列。該哈希數組的長度為2的冪,并且支持擴容。如下就是該哈希數組的示意結構圖:

如圖,提交隊列位于哈希數組workQueue的奇數索引槽位,工作線程的工作隊列位于偶數槽位。

  • 默認情況下,asyncMode為false時:因此工作線程把工作隊列當著棧一樣使用(后進先出),將分解的子任務推入工作隊列的top端,取任務的時候也從top端取(凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top);而當某些工作線程的任務為空的時候,就會從其他隊列(不限于workQueue,也會是提交隊列)竊取(steal)任務,如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務,竊取任務的時候采用的是先進先出FIFO的策略(即從base端竊取任務),這樣不但可以避免在取任務的時候與擁有其隊列的工作線程發生沖突,從而減小競爭,還可以輔助其完成比較大的任務。
  • asyncMode為true的話,擁有該工作隊列的工作線程將按照先進先出的策略從base端取任務,這一般只用于不需要返回結果的任務,或者事件消息傳遞框架。

ForkJoinPool構造函數

其完整構造方法如下

  1. private ForkJoinPool(int parallelism, 
  2.                      ForkJoinWorkerThreadFactory factory, 
  3.                      UncaughtExceptionHandler handler, 
  4.                      int mode, 
  5.                      String workerNamePrefix) { 
  6.     this.workerNamePrefix = workerNamePrefix; 
  7.     this.factory = factory; 
  8.     this.ueh = handler; 
  9.     this.config = (parallelism & SMASK) | mode; 
  10.     long np = (long)(-parallelism); // offset ctl counts 
  11.     this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 

 重要參數解釋

  1. parallelism:并行度( the parallelism level),默認情況下跟我們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機器運行時可用的CPU個數。
  2. factory:創建新線程的工廠( the factory for creating new threads)。默認情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
  3. handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執行任務時由于某些無法預料到的錯誤而導致任務線程中斷時進行一些處理,默認情況為null。
  4. asyncMode:這個參數要注意,在ForkJoinPool中,每一個工作線程都有一個獨立的任務隊列
  • asyncMode表示工作線程內的任務隊列是采用何種方式進行調度,可以是先進先出FIFO,也可以是后進先出LIFO。如果為true,則線程池中的工作線程則使用先進先出方式進行任務調度,默認情況下是false。

ForkJoinPool.submit 方法

  1. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 
  2.     if (task == null
  3.         throw new NullPointerException(); 
  4.     //提交到工作隊列 
  5.     externalPush(task); 
  6.     return task; 

ForkJoinPool 自身擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工作隊列被稱為 submitting queue 。 submit() 和 fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味著提交的任務真正開始進入執行階段。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-11-30 16:01:03

Semaphore

2020-12-09 08:21:47

編程Exchanger工具

2020-12-04 19:28:53

CountDownLaPhaserCyclicBarri

2020-12-03 11:15:21

CyclicBarri

2020-12-08 08:53:53

編程ThreadPoolE線程池

2017-09-19 14:53:37

Java并發編程并發代碼設計

2022-11-09 09:01:08

并發編程線程池

2020-12-10 07:00:38

編程線程池定時任務

2012-03-09 10:44:11

Java

2020-12-11 07:32:45

編程ThreadLocalJava

2020-11-13 08:42:24

Synchronize

2022-04-13 08:23:31

Golang并發

2017-01-10 13:39:57

Python線程池進程池

2020-12-07 09:40:19

Future&Futu編程Java

2019-11-07 09:20:29

Java線程操作系統

2021-03-10 15:59:39

JavaSynchronize并發編程

2016-10-21 11:04:07

JavaScript異步編程原理解析

2020-07-06 08:03:32

Java悲觀鎖樂觀鎖

2025-03-20 06:48:55

性能優化JDK

2020-11-16 08:11:32

ReentrantLo
點贊
收藏

51CTO技術棧公眾號

黑鬼狂亚洲人videos| 日韩精品你懂的| 蜜桃视频污在线观看| 国产精品一页| 在线播放精品一区二区三区 | 伊人国产精品视频| 久久免费电影| 国产欧美精品日韩区二区麻豆天美| 国产精品稀缺呦系列在线| 可以直接看的黄色网址| 日韩动漫一区| 91精品欧美一区二区三区综合在| 警花观音坐莲激情销魂小说| 天天操天天干天天舔| 美洲天堂一区二卡三卡四卡视频| 欧美巨乳美女视频| 日韩丰满少妇无码内射| 国产亚洲精aa在线看| 欧美日韩在线第一页| 一区二区三区我不卡| 成人爽a毛片一区二区| 日韩精品免费视频人成| 欧美丰满片xxx777| 成人黄色短视频| 无码少妇一区二区三区| 日韩情涩欧美日韩视频| 亚洲欧美激情网| 国产精品蜜臀| 亚洲欧美色图小说| 精品一区二区三区自拍图片区 | av鲁丝一区鲁丝二区鲁丝三区| 日韩精品一区二区三区av| 亚洲高清免费观看| 法国空姐在线观看免费| av色图一区| 91亚洲精品久久久蜜桃网站| 99r国产精品视频| 一区精品在线观看| 狂野欧美一区| 国产69精品久久久久久| 精品处破女学生| 欧美oldwomenvideos| 亚洲一二在线观看| 久久久久久久久免费看无码| 懂色av一区二区| 欧美一级生活片| 红桃视频 国产| 精品美女一区| 欧美无砖砖区免费| 亚洲欧美激情网| 美女网站视频一区| 色婷婷av一区| aaa毛片在线观看| 午夜激情在线播放| 欧美性猛交xxxx黑人猛交| www.av中文字幕| а√在线天堂官网| 午夜精品视频一区| 亚洲熟妇无码一区二区三区导航| 日韩av毛片| 亚洲综合在线第一页| 日本a级片在线观看| 毛片在线看片| 亚洲精品成a人| 精品无码av无码免费专区| 日本韩国在线视频爽| 国产精品初高中害羞小美女文| 亚洲精品成人a8198a| 91美女视频在线| 《视频一区视频二区| 四虎精品欧美一区二区免费| 羞羞网站在线免费观看| 亚洲午夜一区二区| 成人一对一视频| 日韩a**中文字幕| 欧洲国产伦久久久久久久| 性刺激的欧美三级视频| 日本午夜精品久久久久| 日韩写真欧美这视频| 国产人成视频在线观看| 窝窝社区一区二区| 日韩在线视频一区| 免费人成在线观看| 国产视频一区三区| 国产精品久久久久久久久久尿| 最近中文字幕免费在线观看| 激情综合五月天| 亚洲综合最新在线| 性高潮久久久久久久久久| 久久久精品2019中文字幕之3| 日韩av电影免费播放| 麻豆电影在线播放| 亚洲国产精品综合小说图片区| 国产女大学生av| 成人在线免费av| 欧美v亚洲v综合ⅴ国产v| avtt香蕉久久| 国产精品99久久| 国内精品视频久久| 影音先锋国产资源| www.亚洲免费av| 先锋影音欧美| 免费电影网站在线视频观看福利| 欧美午夜性色大片在线观看| 亚洲午夜激情影院| 色天下一区二区三区| 久久亚洲精品视频| 国产剧情在线视频| 国产高清在线精品| 日韩精彩视频| 蜜乳av一区| 欧美亚洲丝袜传媒另类| 无码国产69精品久久久久网站| 少妇av一区二区| 国产黄色大片在线观看| 亚洲国产精品久久久久婷婷884| 99草草国产熟女视频在线| 视频一区国产| 在线日韩中文字幕| 欧美一二三区视频| 国产精品一区二区在线播放| 青青草久久网络| av影视在线看| 欧美一级午夜免费电影| 男女男精品视频网站| 国产欧美日韩一级| 成人动漫视频在线观看免费| 成人免费在线电影| 精品美女国产在线| 好吊操视频这里只有精品| 色综合咪咪久久网| 国产精品高潮呻吟久久av黑人| 狠狠躁夜夜躁av无码中文幕| 亚洲天堂久久久久久久| 9久久婷婷国产综合精品性色| 国产一级成人av| 久久99精品久久久久久青青91 | 欧美日韩中文一区| 欧美熟妇激情一区二区三区| 99香蕉国产精品偷在线观看 | 亚洲天堂网在线观看视频| 91麻豆精品秘密| 国产九九九九九| 亚洲精品18| 欧美成人免费全部观看天天性色| 老熟妇一区二区三区啪啪| 91色乱码一区二区三区| 久草视频国产在线| 97成人在线| 欧美黑人一级爽快片淫片高清| 国产乱码精品一区二三区蜜臂| 国产欧美精品一区二区色综合| 国产精品亚洲αv天堂无码| 午夜欧洲一区| 777国产偷窥盗摄精品视频| 国产91绿帽单男绿奴| 亚洲高清免费一级二级三级| 无码人妻精品一区二区三区99不卡| 在线免费观看日本欧美爱情大片| 91久久久久久| 黄色免费在线看| 欧美另类一区二区三区| 久久久久久久久久97| 黑人巨大精品欧美黑白配亚洲| 亚洲欧洲日本国产| 国产95亚洲| 欧美精品免费看| 丰满人妻一区二区| 精品欧美aⅴ在线网站| 欧美在线一级片| 亚洲欧美日韩国产一区二区| 欧美日韩精品久久| 日韩久久一区二区三区| 中日韩午夜理伦电影免费 | a级在线观看| 日韩欧美中文一区二区| 国产一级二级毛片| 久久婷婷国产综合国色天香| 国产三级日本三级在线播放| 久久婷婷蜜乳一本欲蜜臀| 亚洲va久久久噜噜噜久久天堂| 怡红院在线观看| 日韩成人黄色av| 中文有码在线播放| 亚洲激情自拍视频| 爱爱免费小视频| 极品少妇一区二区| 欧美大片在线播放| 欧美亚洲国产激情| 99久久综合狠狠综合久久止 | 中文字幕 91| 午夜久久久久| 欧美一区国产一区| 国产一区二区| 欧美在线视频观看| 黄色免费网站在线观看| 亚洲精品久久久久中文字幕欢迎你 | 99国产揄拍国产精品| 亚洲成人资源网| 精品伦精品一区二区三区视频密桃| 国产成人亚洲综合a∨婷婷| 哪个网站能看毛片| 欧美激情第10页| 欧美综合77777色婷婷| 精品一区二区三区亚洲| 日本精品一区二区三区在线播放视频| 麻豆传媒在线完整视频| 日韩精品在线观看一区二区| 在线免费a视频| 红桃视频成人在线观看| 午夜爽爽爽男女免费观看| 91蜜桃婷婷狠狠久久综合9色| 中文字幕亚洲影院| 久久激情一区| 2018中文字幕第一页| 色综合狠狠操| 日本一区美女| 果冻天美麻豆一区二区国产| 国产美女高潮久久白浆| 久久久久久久| 久久欧美在线电影| 综合图区亚洲| 中文字幕日韩av电影| 亚洲AV成人无码一二三区在线| 91精品黄色片免费大全| 成人黄色三级视频| 欧美日韩免费在线观看| 精品小视频在线观看| 亚洲久本草在线中文字幕| 男人舔女人下部高潮全视频| 97超碰欧美中文字幕| 日本xxxx免费| 国产一区二区三区四区五区入口| 国产福利一区视频| 亚洲尤物在线| 日韩免费视频播放| 亚洲午夜电影| 欧美国产视频一区| 欧美国产91| 国内外成人激情免费视频| 欧美大片aaaa| 亚洲欧洲日夜超级视频| 欧美一级本道电影免费专区| 欧美精品一区三区在线观看| 欧美调教在线| 久久国产精品久久| 亚洲精品中文字幕99999| 久久精品成人一区二区三区蜜臀| 黄色欧美在线| 国产在线一区二| 精品一区二区男人吃奶| 精品国产乱码久久久久久88av| 日韩成人久久| julia一区二区中文久久94| 欧美午夜网站| av资源站久久亚洲| 久久午夜影院| 久久日韩精品| 欧美日韩123| 日产精品久久久一区二区| 日韩一区电影| 中文字幕一区二区三区四区五区| 国产精品久久久久久久| 亚洲区成人777777精品| 午夜精品视频| 亚洲色成人www永久在线观看| 欧美三级免费| 逼特逼视频在线| 日韩一区精品视频| 国产又大又黄又猛| 国产伦精品一区二区三区免费 | 国产欧美一区二区三区在线看蜜臂| 精品日本美女福利在线观看| 日本熟女毛茸茸| 欧美性猛片xxxx免费看久爱| 国产又黄又大又粗的视频| 日韩视频一区二区三区在线播放| 亚洲伦理在线观看| 精品一区二区三区四区在线| www.亚洲免费| 久久99精品国产99久久6尤物| 欧美亚洲日本精品| 国产精品一区二区三区毛片淫片 | 国产日韩欧美在线播放不卡| 虎白女粉嫩尤物福利视频| 日本视频一区二区三区| 91香蕉视频免费看| xnxx国产精品| 91n在线视频| 亚洲成人黄色影院| 欧美成人一区二区视频| 日韩一区二区三区av| 天堂在线观看视频| 日韩在线视频免费观看| 9999精品成人免费毛片在线看| 国产成一区二区| 国产精一区二区| 噜噜噜噜噜久久久久久91| 97视频热人人精品免费| 毛片在线视频播放| 久久国产婷婷国产香蕉| 老熟妇精品一区二区三区| 国产精品久久毛片av大全日韩| 玖玖爱免费视频| 欧美在线不卡视频| 粉嫩av一区二区夜夜嗨| 在线视频精品一| 老牛影视精品| 92福利视频午夜1000合集在线观看 | 亚洲精品美女| 亚洲第一区第二区第三区| 久久久国产精华| 国产精品 欧美 日韩| 欧美日本在线播放| 酒色婷婷桃色成人免费av网| 欧美国产日韩一区二区在线观看| 成人国产在线| 欧美日韩国产精品一卡| 亚洲国产网站| 97人人模人人爽人人澡| 国产精品丝袜一区| 国产精品久久久久久久久久精爆| 日韩精品一区二区三区视频| 日本三级视频在线观看| 国产suv精品一区二区| 久久porn| 日韩成人三级视频| 国产一区二区三区在线看麻豆| 日韩欧美黄色网址| 色诱亚洲精品久久久久久| 天天干免费视频| 色中色综合影院手机版在线观看 | 在线观看91视频| 日韩午夜影院| 97国产真实伦对白精彩视频8| 成人豆花视频| 宅男av一区二区三区| 蜜臀久久99精品久久久久久9| 国产精品三级在线观看无码| 午夜精品久久久久久久| 亚洲产国偷v产偷v自拍涩爱| 不卡av电影院| 看亚洲a级一级毛片| 女同性恋一区二区| 国产尤物一区二区| 国产日产精品一区二区三区的介绍| 欧美在线你懂的| 国产高清在线| 国产精品人成电影在线观看| 欧洲三级视频| 乌克兰美女av| 国产精品久久久久久久久免费樱桃| 日本视频www色| 综合网中文字幕| 97成人超碰| 在线综合视频网站| 国产精品资源网| 久久精品这里有| 亚洲激情免费观看| 成人av三级| 亚洲国产一区二区三区在线| 老司机免费视频一区二区| 182在线观看视频| 欧美mv日韩mv亚洲| 欧美aa免费在线| 日本在线高清视频一区| 蜜桃免费网站一区二区三区| 一级免费黄色录像| 日韩欧美成人激情| 国产伦子伦对白在线播放观看| 开心色怡人综合网站| 男女视频一区二区| 丝袜 亚洲 另类 欧美 重口| 日韩你懂的在线观看| 激情国产在线| 亚欧精品在线| 国产盗摄精品一区二区三区在线| 国产无套内射又大又猛又粗又爽| 日韩电影免费观看在线观看| 性欧美1819sex性高清| 伊人久久大香线蕉午夜av| 国产成人在线网站| 亚洲欧美自拍视频| www.日韩系列| 超碰在线亚洲| 国产精品天天av精麻传媒| 亚洲三级在线观看| 天堂av在线7| 成人国内精品久久久久一区| 韩国久久久久| 国产探花视频在线播放| 日韩免费一区二区| 肉色欧美久久久久久久免费看| 中文字幕日韩一区二区三区不卡| 成人黄页在线观看| 中文字幕人妻一区二区三区视频| 久久91精品国产91久久久| 亚洲视频分类| 中国男女全黄大片| 欧美午夜精品电影|