精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

ForkJoinPool:大任務拆分,讓并行嗨起來

開發 前端
在使用ForkJoinPool時,需要特別注意任務的類型是否為純函數計算類型,也就是這些任務不應該關心狀態或者外界的變化,這樣才是最安全的做法。如果是阻塞類型任務,那么你需要謹慎評估技術方案。

線程池ThreadPoolExecutor,它通過對任務隊列和線程的有效管理實現了對并發任務的處理。然而,ThreadPoolExecutor有兩個明顯的缺點:一是無法對大任務進行拆分,對于某個任務只能由單線程執行;二是工作線程從隊列中獲取任務時存在競爭情況。這兩個缺點都會影響任務的執行效率,要知道高并發場景中的每一毫秒都彌足珍貴。

針對這兩個問題,本文即將介紹的ForkJoinPool給出了可選的答案。在本文中,我們將首先從分治算法開始介紹,接著體驗ForkJoinPool中自定義任務的實現,最后再深入到Java中去理解ForkJoinPool的原理和用法。

一、分治算法與Fork/Join模式

在并發計算中,Fork/Join模式往往用于對大任務的并行計算,它通過遞歸的方式對任務不斷地拆解,再將結果進行合并。如果從其思想上看,Fork/Join并不復雜,其本質是分治算法(Divide-and-Conquer) 的應用。

分治算法的基本思想是將一個規模為N的問題分解為K個規模較小的子問題,這些子問題相互獨立且與原問題性質相同。求出子問題的解,就可得到原問題的解。分治算法的步驟如下:

(1)分解:將要解決的問題劃分成若干規模較小的同類問題;

(2)求解:當子問題劃分得足夠小時,用較簡單的方法解決;

(3)合并:按原問題的要求,將子問題的解逐層合并構成原問題的解。

在并發計算中,Fork/Join模式往往用于對大任務的并行計算,它通過遞歸的方式對任務不斷地拆解,再將結果進行合并。如果從其思想上看,Fork/Join并不復雜,其本質是分治算法(Divide-and-Conquer) 的應用。

分治算法的基本思想是將一個規模為N的問題分解為K個規模較小的子問題,這些子問題相互獨立且與原問題性質相同。求出子問題的解,就可得到原問題的解。分治算法的步驟如下:

(1)分解:將要解決的問題劃分成若干規模較小的同類問題;

(2)求解:當子問題劃分得足夠小時,用較簡單的方法解決;

(3)合并:按原問題的要求,將子問題的解逐層合并構成原問題的解。

圖片圖片

Fork/Join對任務的拆分和對結果合并過程也是如此,可以用下面偽代碼來表示:

solve(problem):
    if problem is small enough:
        // 如果任務足夠小,執行任務
        solve problem directly (sequential algorithm)
    else:
        // 拆分任務
        for part in subdivide(problem)
            fork subtask to solve(part)
        // 合并結果
        join all subtasks spawned in previous loop
        return combined results

所以,理解Fork/Join模型和ForkJoinPool線程池,首先要理解其背后的算法的目的和思想,因為后文所要詳述的ForkJoinPool不過只是這種算法的一種的實現和應用。

二、Fork/Join應用場景與體驗

按照思想->實現->源碼的思路,在了解了Fork/Join思想之后,我們先通過一個場景手工實現一個RecursiveTask,這樣可以更好地體驗Fork/Join的用法。

場景:給定兩個自然數,計算兩個數之間的總和。比如1~n之間的和:1+2+3+…+n

為了解決這個問題,我們創建了TheKingRecursiveSumTask這個核心類,它繼承于RecursiveTask。RecursiveTask是ForkJoinPool中的一種任務類型,你暫且不必深入了解它,后文會有詳細描述。TheKingRecursiveSumTask中定義了任務計算的起止范圍(sumBegin和sumEnd)和拆分閾值(threshold),以及核心計算邏輯compute()。

public class TheKingRecursiveSumTask extends RecursiveTask<Long> {
    private static final AtomicInteger taskCount = new AtomicInteger();
    private final int sumBegin;
    private final int sumEnd;
    /**
     * 任務拆分閾值,當任務尺寸大于該值時,進行拆分
     */
    private final int threshold;

    public TheKingRecursiveSumTask(int sumBegin, int sumEnd, int threshold) {
        this.sumBegin = sumBegin;
        this.sumEnd = sumEnd;
        this.threshold = threshold;
    }

    @Override
    protected Long compute() {
        if ((sumEnd - sumBegin) > threshold) {
            // 兩個數之間的差值大于閾值,拆分任務
            TheKingRecursiveSumTask subTask1 = new TheKingRecursiveSumTask(sumBegin, (sumBegin + sumEnd) / 2, threshold);
            TheKingRecursiveSumTask subTask2 = new TheKingRecursiveSumTask((sumBegin + sumEnd) / 2, sumEnd, threshold);
            subTask1.fork();
            subTask2.fork();
            taskCount.incrementAndGet();
            return subTask1.join() + subTask2.join();
        }
        // 直接執行結果
        long result = 0L;
        for (int i = sumBegin; i < sumEnd; i++) {
            result += i;
        }
        return result;
    }

    public static AtomicInteger getTaskCount() {
        return taskCount;
    }
}

在下面的代碼中,我們設置的計算區間值0~10000000,當計算的個數超過100時,將對任務進行拆分,最大并發數設置為16.

public static void main(String[] args) {
     int sumBegin = 0, sumEnd = 10000000;
     computeByForkJoin(sumBegin, sumEnd);
     computeBySingleThread(sumBegin, sumEnd);
 }

 private static void computeByForkJoin(int sumBegin, int sumEnd) {
     ForkJoinPool forkJoinPool = new ForkJoinPool(16);
     long forkJoinStartTime = System.nanoTime();
     TheKingRecursiveSumTask theKingRecursiveSumTask = new TheKingRecursiveSumTask(sumBegin, sumEnd, 100);
     long forkJoinResult = forkJoinPool.invoke(theKingRecursiveSumTask);
     System.out.println("======");
     System.out.println("ForkJoin任務拆分:" + TheKingRecursiveSumTask.getTaskCount());
     System.out.println("ForkJoin計算結果:" + forkJoinResult);
     System.out.println("ForkJoin計算耗時:" + (System.nanoTime() - forkJoinStartTime) / 1000000);
 }

 private static void computeBySingleThread(int sumBegin, int sumEnd) {
     long computeResult = 0 L;
     long startTime = System.nanoTime();
     for (int i = sumBegin; i < sumEnd; i++) {
         computeResult += i;
     }
     System.out.println("======");
     System.out.println("單線程計算結果:" + computeResult);
     System.out.println("單線程計算耗時:" + (System.nanoTime() - startTime) / 1000000);
 }

運行結果如下:

======
ForkJoin任務拆分:131071
ForkJoin計算結果:49999995000000
ForkJoin計算耗時:207
======
單線程計算結果:49999995000000
單線程計算耗時:40

Process finished with exit code 0

從計算結果中可以看到,ForkJoinPool總共進行了131071次的任務拆分,最終的計算結果是49999995000000,耗時207毫秒。不過,細心的你可能已經發現了,ForkJoin的并行計算的耗時竟然比單程程還慢?并且足足慢了近5倍!先別慌,關于ForkJoin的性能問題,我們會在后文有講解。

三、ForkJoinPool設計與源碼分析

在Java中,ForkJoinPool是Fork/Join模型的實現,于Java7引入并在Java8中廣泛應用。ForkJoinPool允許其他線程向它提交任務,并根據設定將這些任務拆分為粒度更細的子任務,這些子任務將由ForkJoinPool內部的工作線程來并行執行,并且工作線程之間可以竊取彼此之間的任務。

在接口實現和繼承關系上,ForkJoinPool和ThreadPoolExecutor類似,都實現了Executor和ExecutorService接口,并繼承了AbstractExecutorService抽類。而在任務類型上,ForkJoinPool主要有兩種任務類型:RecursiveAction和RecursiveTask,它們繼承于ForkJoinTask. 相關關系如下圖所示:

圖片圖片

解讀ForkJoinPool的源碼并不容易,雖然它的思想較為簡單,但在實現上要考慮的顯然更多,加上部分代碼可讀性一般,所以講解它的全部源碼是不現實的,當然也是沒必要的。在下文中,我們將主要介紹其核心的任務提交和執行相關的部分源碼,其他源碼有興趣的可以自行閱讀。

1. 構造ForkJoinPool的幾種不同方式

ForkJoinPool中有四個核心參數,用于控制線程池的并行數、工作線程的創建、異常處理和模式指定等。各參數解釋如下:

  • int parallelism:指定并行級別(parallelism level)。ForkJoinPool將根據這個設定,決定工作線程的數量。如果未設置的話,將使用Runtime.getRuntime().availableProcessors()來設置并行級別;
  • ForkJoinWorkerThreadFactory factory:ForkJoinPool在創建線程時,會通過factory來創建。注意,這里需要實現的是ForkJoinWorkerThreadFactory,而不是ThreadFactory. 如果你不指定factory,那么將由默認的DefaultForkJoinWorkerThreadFactory負責線程的創建工作;
  • UncaughtExceptionHandler handler:指定異常處理器,當任務在運行中出錯時,將由設定的處理器處理;
  • boolean asyncMode:從名字上看,你可能會覺得它是異步模式設置,但其實是設置隊列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE. 當asyncMode為true時,將使用先進先出隊列,而為false時則使用后進先出的模式。

圍繞上面的四個核心參數,ForkJoinPool提供了三種構造方式,使用時你可以根據需要選擇其中的一種。

方式一:默認無參構造

在該構造方式中,你無需設定任何參數。ForkJoinPool將根據當前處理器數量來設置并行數量,并使用默認的線程構造工廠。不推薦。

public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
 }

方式二:通過并行數構造

在該構造方式中,你可以指定并行數量,以更有效地平衡處理器數量和負載。建議在設置時,并行級別應低于當前處理器的數量。

public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
 }

方式三:自定義全部參數構造

以上兩種構造方式都是基于這種構造,它允許你配置所有的核心參數。為了更有效地管理ForkJoinPool,建議你使用這種構造方式。

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
 }

2. 按類型提交不同任務

任務提交是ForkJoinPool的核心能力之一,在提交任務時你有三種選擇,如下面表格所示:


從非fork/join線程調用

從fork/join調用

提交異步執行

execute(ForkJoinTask)

ForkJoinTask.fork()

等待并獲取結果

invoke(ForkJoinTask)

ForkJoinTask.invoke()

提交執行獲取Future結果

submit(ForkJoinTask)

ForkJoinTask.fork() (ForkJoinTasks are Futures)

第一類核心方法:invoke

invoke類型的方法接受ForkJoinTask類型的任務,并在任務執行結束后,返回泛型結果。如果提交的任務是null,將拋出空指針異常。

public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
 }

第二類核心方法:execute

execute類型的方法在提交任務后,不會返回結果。另外要注意的是,ForkJoinPool不僅允許提交ForkJoinTask類型任務,還允許提交Callable或Runnable任務,因此你可以像使用現有Executors一樣使用ForkJoinPool。

當然,Callable或Runnable類型任務時,將會轉換為ForkJoinTask類型,具體可以查看任務提交的相關源碼。那么,這類任務和直接提交ForkJoinTask任務有什么區別呢?還是有的。區別在于,由于任務是不可切分的,所以這類任務無法獲得任務拆分這方面的效益,不過仍然可以獲得任務竊取帶來的好處和性能提升。

public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
 }

public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
        else
            job = new ForkJoinTask.RunnableExecuteAction(task);
        externalPush(job);
 }

第三類核心方法:submit

submit類型的方法支持三種類型的任務提交:ForkJoinTask類型、Callable類型和Runnable類型。在提交任務后,將返回ForkJoinTask類型的結果。如果提交的任務是null,將拋出空指針異常,并且當任務不能按計劃執行的話,將拋出任務拒絕異常。

public < T > ForkJoinTask < T > submit(ForkJoinTask < T > task) {
       if (task == null)
           throw new NullPointerException();
       externalPush(task);
       return task;
   }

   public < T > ForkJoinTask < T > submit(Callable < T > task) {
       ForkJoinTask < T > job = new ForkJoinTask.AdaptedCallable < T > (task);
       externalPush(job);
       return job;
   }

   public < T > ForkJoinTask < T > submit(Runnable task, T result) {
       ForkJoinTask < T > job = new ForkJoinTask.AdaptedRunnable < T > (task, result);
       externalPush(job);
       return job;
   }

   public ForkJoinTask < ? > submit(Runnable task) {
       if (task == null)
           throw new NullPointerException();
       ForkJoinTask < ? > job;
       if (task instanceof ForkJoinTask < ? > ) // avoid re-wrap
           job = (ForkJoinTask < ? > ) task;
       else
           job = new ForkJoinTask.AdaptedRunnableAction(task);
       externalPush(job);
       return job;
   }

3. ForkJoinTask

ForkJoinTask是ForkJoinPool的核心之一,它是任務的實際載體,定義了任務執行時的具體邏輯和拆分邏輯,本文前面的示例代碼就是通過繼承它實現。作為一個抽象類,ForkJoinTask的行為有點類似于線程,但它更為輕量,因為它不維護自己的運行時堆棧或程序計數器等。

在類的設計上,ForkJoinTask繼承了Future接口,所以也可以將其看作是輕量級的Future,它們之間的關系如下圖所示。

(1)fork與join

fork()/join()是ForkJoinTask甚至是ForkJoinPool的核心方法,承載著主要的任務協調作用,一個用于任務提交,一個用于結果獲取。

fork-提交任務

fork()方法用于向當前任務所運行的線程池中提交任務,比如上文示例代碼中的subTask1.fork(). 注意,不同于其他線程池的寫法,任務提交由任務自己通過調用fork()完成,對此不要感覺詫異,fork()內部會將任務與當前線程進行關聯。

從源碼中看,如果當前線程是ForkJoinWorkerThread類型,將會放入該線程的任務隊列,否則放入common線程池的任務隊列中。關于common線程池,后續會有介紹。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join-獲取任務執行結果

前面,你已經知道可以通過fork()提交任務。那么現在,你則可以通過join()方法獲取任務的執行結果。調用join()時,將阻塞當前線程直到對應的子任務完成運行并返回結果。從源碼看,join()的核心邏輯由doJoin()負責。doJoin()雖然很短,但可讀性較差,閱讀時稍微忍一下。

public final V join() {
    int s;
    // 如果調用doJoin返回的非NORMAL狀態,將報告異常
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    // 正常執行結束,返回原始結果
    return getRawResult();
}

private int doJoin() {
    int s;
    Thread t;
    ForkJoinWorkerThread wt;
    ForkJoinPool.WorkQueue w;
    //如果已完成,返回狀態
    return (s = status) < 0 ? s :
     //如果未完成且當前線程是ForkJoinWorkerThread,則從該線程中取出workQueue,并嘗試將當前task取出執行。如果執行的結果是完成,則返回狀態;否則,使用當前線程池awaitJoin方法進行等待
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread) t).workQueue).
    tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0 L):
     //當前線程非ForkJoinWorkerThread,調用externalAwaitDone方法等待
        externalAwaitDone();
}

final int doExec() {
    int s;
    boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        // 執行完成后,將狀態設置為NORMAL
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

(2)RecursiveAction與RecursiveTask

圖片圖片

在ForkJoinPool中,常用的有兩種任務類型:返回結果的和不返回結果的,這方面和ThreadPoolExecutor等線程池是一致的,對應的兩個類分別是:RecursiveAction和RecursiveTask. 從類圖中可以看到,它們均繼承于ForkJoinTask.

RecursiveAction:無結果返回

RecursiveAction用于遞歸執行但不需要返回結果的任務,比如下面的排序就是它的典型應用場景。在使用RecursiveAction時,你需要繼承并實現它的核心方法compute().

static class SortTask extends RecursiveAction {
    final long[] array;
    final int lo, hi;
    SortTask(long[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }
    SortTask(long[] array) {
        this(array, 0, array.length);
    }
    // 核心計算方法
    protected void compute() {
        if (hi - lo < THRESHOLD)
            // 直接執行
            sortSequentially(lo, hi);
        else {
            // 拆分任務
            int mid = (lo + hi) >>> 1;
            invokeAll(new SortTask(array, lo, mid),
                new SortTask(array, mid, hi));
            merge(lo, mid, hi);
        }
    }
    // implementation details follow:
    static final int THRESHOLD = 1000;
    void sortSequentially(int lo, int hi) {
        Arrays.sort(array, lo, hi);
    }
    void merge(int lo, int mid, int hi) {
        long[] buf = Arrays.copyOfRange(array, lo, mid);
        for (int i = 0, j = lo, k = mid; i < buf.length; j++)
            array[j] = (k == hi || buf[i] < array[k]) ?
            buf[i++] : array[k++];
    }
}

RecursiveTask:返回結果

RecursiveTask用于遞歸執行需要返回結果的任務,比如前面示例代碼中的求和或下面這段求斐波拉契數列求和都是它的典型應用場景。在使用RecursiveTask時,你也需要繼承并實現它的核心方法compute().

class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
 }

(3)ForkJoinTask使用限制

雖然在某些場景下,ForkJoinTask可以通過任務拆解的方式提高執行效率,但是需要注意的是它并非適合所有的場景。ForkJoinTask在使用時需要謹記一些限制,違背這些限制可能會適得其反甚至引來災難。

為什么這么說呢?

這是因為,ForkJoinTask最適合用于純粹的計算任務,也就是純函數計算,計算過程中的對象都是獨立的,對外部沒有依賴。你可以想象,如果大量的任務或被拆分的子任務之間彼此依賴或對外部存在嚴重阻塞依賴,那將是怎樣的畫面...用千絲萬縷來形容也不為過,外部依賴會帶來任務執行和問題排查方面的雙重不確定性。

所以,在理想情況下,提交到ForkJoinPool中的任務應避免執行阻塞I/O,以免出現不可控的意外情況。當然,這也并非是絕對的,在必要時你也可以定義和使用可阻塞的ForkJoinTask,只不過你需要付出更多的代價和考慮,使用時應當慎之又慎,本文對此不作敘述。

4. 工作隊列與任務竊取

前面已經說到,ForkJoinPool與ThreadPoolExecutor有個很大的不同之處在于,ForkJoinPool存在引入了任務竊取設計,它是其性能保證的關鍵之一。

關于任務竊取,簡單來說,就是允許空閑線程從繁忙線程的雙端隊列中竊取任務。默認情況下,工作線程從它自己的雙端隊列的頭部獲取任務。但是,當自己的任務為空時,線程會從其他繁忙線程雙端隊列的尾部中獲取任務。這種方法,最大限度地減少了線程競爭任務的可能性。

ForkJoinPool的大部分操作都發生在工作竊取隊列(work-stealing queues ) 中,該隊列由內部類WorkQueue實現。其實,這個隊列也不是什么神奇之物,它是Deques的特殊形式,但僅支持三種操作方式:push、pop和poll(也稱為竊取)。當然,在ForkJoinPool中,隊列的讀取有著嚴格的約束,push和pop僅能從其所屬線程調用,而poll則可以從其他線程調用。換句話說,前兩個方法是留給自己用的,而第三種方法則是為了方便別人來竊取任務用的。任務竊取的相關過程,可以用下面這幅圖來表示,這幅圖建議你收藏:

圖片圖片

看到這里,不知你是否會有疑問:為什么工作線程總是從自己的頭部獲取任務?為什么要這樣設計?首先處理隊列中等待時間較長的任務難道不是更有意義嗎?

答案當然不會是“更有意義”。這樣做的主要原因是為了提高性能,通過始終選擇最近提交的任務,可以增加資源仍分配在CPU緩存中的機會,這樣CPU處理起來要快一些。而竊取者之所以從尾部獲取任務,則是為了降低線程之間的競爭可能,畢竟大家都從一個部分拿任務,競爭的可能要大很多。

此外,這樣的設計還有一種考慮。由于任務是可分割的,那隊列中較舊的任務最有可能粒度較大,因為它們可能還沒有被分割,而空閑的線程則相對更有“精力”來完成這些粒度較大的任務。

5. ForkJoinPool監控

對于一個復雜框架來說,實時地了解ForkJoinPool的內部狀態是十分必要的。因此,ForkJoinPool提供了一些常用方法。通過這些方法,你可以了解當前的工作線程、任務處理等情況。

(1)獲取運行狀態的線程總數

public int getRunningThreadCount() {
    int rc = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null && w.isApparentlyUnblocked())
                ++rc;
        }
    }
    return rc;
}

(2)獲取活躍線程數量

public int getActiveThreadCount() {
    int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
    return (r <= 0) ? 0 : r; // suppress momentarily negative values
}

(3)判斷ForkJoinPool是否空閑

public boolean isQuiescent() {
    return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}

(4)獲取任務竊取數量

public long getStealCount() {
    AtomicLong sc = stealCounter;
    long count = (sc == null) ? 0 L : sc.get();
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.nsteals;
        }
    }
    return count;
}

(5)獲取隊列中的任務數量

public long getQueuedTaskCount() {
    long count = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 1; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.queueSize();
        }
    }
    return count;
}

(6)獲取已提交的任務數量

public int getQueuedSubmissionCount() {
    int count = 0;
    WorkQueue[] ws;
    WorkQueue w;
    if ((ws = workQueues) != null) {
        for (int i = 0; i < ws.length; i += 2) {
            if ((w = ws[i]) != null)
                count += w.queueSize();
        }
    }
    return count;
}

四、警惕ForkJoinPool#commonPool

在上文中所示的源碼中,你可能已經在多處注意到commonPool的存在。在ForkJoinPool中,commonPool是一個共享的、靜態的線程池,并且在實際使用時才會進行懶加載,Java8中的CompletableFuture和并行流(Parallel Streams)用的就是它。不過,使用CompletableFuture時你可以指定自己的線程池,但是并行流在使用時卻不可以,這也是我們要警惕的地方。

為什么這么說呢?ForkJoinPool中的commonPool設計初衷是為了降低線程池的重復創建,讓一些任務共用同一個線程池,畢竟創建線程池和創建線程都是昂貴的。然而,凡事都有兩面性,commonPool在某些場景下確實可以達到線程池復用的目的,但是,如果你決定與別人分享自己空間,那么當你想使用它的時候,它可能不再完全屬于你。也就是說,當你想用commonPool時,它可能已經其他任務填滿了。

提交到ForkJoinPool中的任務一般有兩類:計算類型和阻塞類型。考慮一個場景,應用中多處都在使用這個共享線程池,有人在某處做了個不當操作,比如往池子里丟入了阻塞型任務,那么結果會怎樣?結果當然是,整個線程池都有可能被阻塞!如此,整個應用都面臨著被拖垮的風險。看到這里,對于Java8中的并行流的使用,你就應該高度警惕了。

那怎么避免這種情況發生呢?答案是盡量避免使用commonPool,并且在需要運行阻塞任務時,應當創建獨立的線程池,和系統的其他部分保持隔離,以免風險擴散。

五、ForkJoinPool性能評估

為了測試ForkJoinPool的性能,我做了一組簡單的、非正式實驗。實驗分三組進行,為了盡可能讓每組的數據客觀,每組實驗均運行5次,取最后的平均數。

  • 實驗代碼:本文第一部分的示例代碼;
  • 實驗環境:Mac;
  • JDK版本:8;
  • 任務分隔閾值:100

實驗結果如下方表格所示:

從實驗結果(0表示不到1毫秒)來看,ForkJoinPool的性能竟然不如單線程的效率高!這樣的結果,似乎很驚喜、很意外...然而,為什么會這樣?

不要驚訝,之所以會出現這個令你匪夷所思的結果,其原因在于任務拆分的粒度過小!在上面的測試中,任務拆分閾值僅為100,導致Fork/Join在計算時出現大量的任務拆分動作,也就是任務分的太細,大量的任務拆分和管理也是需要額外成本的。

以0~1000000求和為例,當把閾值從100調整為100000時,其結果結果如下。可以看到,Fork/Join的優勢就體現出來了。

======
ForkJoin任務拆分:16383
ForkJoin計算結果:499999999500000000
ForkJoin計算耗時:143
======
單線程計算結果:499999999500000000
單線程計算耗時:410

那么,問題又來了,哪些因素會影響Fork/Join的性能呢?

根據經驗和實驗,任務總數、單任務執行耗時以及并行數都會影響到性能。所以,當你使用Fork/Join框架時,你需要謹慎評估這三個指標,最好能通過模擬對比評估,不要憑感覺冒然在生產環境使用。

小結

以上就是關于ForkJoinPool的全部內容。Fork/Join是一種基于分治算法的模型,在并發處理計算型任務時有著顯著的優勢。其效率的提升主要得益于兩個方面:

  • 任務切分:將大的任務分割成更小粒度的小任務,讓更多的線程參與執行;
  • 任務竊取:通過任務竊取,充分地利用空閑線程,并減少競爭。

在使用ForkJoinPool時,需要特別注意任務的類型是否為純函數計算類型,也就是這些任務不應該關心狀態或者外界的變化,這樣才是最安全的做法。如果是阻塞類型任務,那么你需要謹慎評估技術方案。雖然ForkJoinPool也能處理阻塞類型任務,但可能會帶來復雜的管理成本。

而在性能方面,要認識到Fork/Join的性能并不是開箱即來,而是需要你去評估和驗證一些重要指標,通過數據對比得出最佳結論。

此外,ForkJoinPool雖然提供了commonPool,但出于潛在的風險考慮,不推薦使用或謹慎使用。

責任編輯:武曉燕 來源: 一安未來
相關推薦

2015-04-21 15:05:32

海霖

2024-01-25 08:40:12

線程雙異步Java8

2019-11-25 10:12:59

Python技巧工具

2014-11-28 09:42:23

程序員

2019-12-19 14:07:33

IT運營CIO安全

2013-06-18 10:21:43

云計算云服務公共云服務

2013-12-09 10:38:08

程序員任務

2016-06-27 15:55:15

移動

2012-04-06 10:31:44

Java

2009-07-22 14:56:50

ERPVPNVPN加速

2009-10-20 11:12:26

綜合布線系統

2011-05-24 15:29:05

程序CC++

2012-07-01 03:23:31

JBuilder

2024-10-31 16:42:41

2014-02-12 13:43:50

代碼并行任務

2009-03-10 19:03:11

Linux圖形環境桌面

2018-05-11 11:00:11

LinuxWindows系統調用

2009-03-13 10:54:18

SQL Server并行查詢數據庫管理

2024-11-21 07:00:00

線程池Java開發

2023-12-05 07:54:18

Java 7ThreadPool
點贊
收藏

51CTO技術棧公眾號

国产91富婆露脸刺激对白| 亚洲一区二区电影| 国产亚洲欧美色| 国产啪精品视频| 欧美黑人猛猛猛| 亚洲黄色a级片| 91久久夜色精品国产九色| 亚洲精选一区二区| 欧美两根一起进3p做受视频| 国产精品一卡二卡三卡| 岛国精品在线播放| 国产精品国内视频| 黄网站免费在线| 精品国产午夜| 亚洲国产美女久久久久| 亚洲综合色在线观看| 亚洲免费一级片| 爽爽淫人综合网网站| 日韩电视剧在线观看免费网站 | 日本精品久久久久中文| 欧美影院在线| 欧美三级欧美一级| jizzjizz国产精品喷水| 国产原厂视频在线观看| 久久久久久综合| 91黄色国产视频| 中文在线字幕免费观| 一本色道久久精品| 欧美另类极品videosbest最新版本| 法国伦理少妇愉情| 激情视频网站在线播放色| 高清不卡在线观看av| 国产精品久久久久久久久久久不卡| 精品99久久久久成人网站免费| 精品久久久久久久久久久下田| 亚洲精品在线观看视频| 亚洲一二三不卡| 久久爱.com| 一本色道久久综合亚洲91| 日韩精品综合在线| 操你啦视频在线| 国产精品乱码一区二三区小蝌蚪| 蜜桃导航-精品导航| 亚洲免费一级片| 国产成人av电影在线观看| 成人福利网站在线观看| 亚洲一级av毛片| 美女一区二区三区在线观看| 在线播放国产一区二区三区| 波多野结衣av在线免费观看| 久久精品亚洲成在人线av网址| 欧美大片在线观看一区| 日韩小视频网站| 粗大黑人巨茎大战欧美成人| 中文字幕亚洲一区二区av在线| 91久久久久久| 一卡二卡在线视频| 欧美二区视频| 不卡毛片在线看| 小早川怜子一区二区的演员表| 日韩高清欧美| 日韩在线视频导航| 一区二区三区在线播放视频| 色呦哟—国产精品| 日韩中文字幕网站| 亚洲少妇xxx| 久久久久亚洲| 亚洲激情国产精品| 国产二级一片内射视频播放| 奇米影视777在线欧美电影观看| 日韩电视剧免费观看网站| 中国黄色a级片| 国产一区网站| 久久九九亚洲综合| 免费又黄又爽又色的视频| 欧美午夜精品| 欧美性受xxxx白人性爽| 成人黄色三级视频| 久久99国产精品尤物| 2019国产精品视频| 亚洲精品一区二区三区区别| 99麻豆久久久国产精品免费| 91美女福利视频高清| 99精品视频在线播放免费| 国产成人在线免费观看| 久久99久久99精品蜜柚传媒| 国产黄色在线| 亚洲人午夜精品天堂一二香蕉| 丁香婷婷综合激情| 国产精品一区二区av影院萌芽| 欧美日韩亚洲高清一区二区| 亚洲少妇一区二区| 嫩草一区二区三区| 美女黄色丝袜一区| 日本一区二区三区精品| 精品一区二区三区在线播放视频| 国产精品免费观看高清| 成人资源www网在线最新版| 自拍偷拍欧美精品| 国产男女无遮挡| 99国内精品久久久久| 欧美成人在线直播| 精品国产aaa| 国产精品激情| 国产精品一区久久| 少妇一区二区三区四区| 国产精品久久久久三级| 妞干网在线观看视频| 青青在线精品| 亚洲欧美成人精品| 日韩a级片在线观看| 久久裸体视频| 国产精品免费一区二区三区观看 | 久久资源免费视频| 中文字幕亚洲乱码熟女1区2区| 国产精品亚洲人在线观看| 国产美女久久精品香蕉69| 成人1区2区3区| 国产精品美女久久久久久久久久久 | 国产精品国产三级国产在线观看| 538国产精品一区二区在线 | 久久露脸国语精品国产91| 欧美日韩调教| 国产欧美精品在线播放| 五月婷婷开心中文字幕| 一区二区三区 在线观看视频| 亚洲天堂av线| 亚洲美女久久| 97视频免费观看| 国产极品999| 一区免费观看视频| 免费看a级黄色片| 欧美变态网站| 国精产品一区一区三区有限在线| 99在线小视频| 中文字幕一区二区三| 日韩欧美在线免费观看视频| 啪啪激情综合网| 久久久久久久国产| 亚洲成人中文字幕在线| 亚洲日本在线观看| 日韩成人精品视频在线观看| 精品美女视频| 国产精品成人一区二区| 噜噜噜噜噜在线视频| 欧美日韩加勒比精品一区| 在线观看免费视频国产| 欧美日韩网址| 国产一区二区三区黄| 18网站在线观看| 日韩三区在线观看| 乱h高h女3p含苞待放| 精品一区二区三区的国产在线播放| 视频一区二区在线| 高清av一区二区三区| 91精品国产欧美一区二区| 亚洲激情图片网| 极品少妇一区二区| 日本丰满大乳奶| 视频二区欧美| 91国产精品电影| 激情福利在线| 一区二区三区不卡视频| 成年人看片网站| 亚洲精选91| 欧美一级二级三级| 日本美女久久| 久久成人av网站| 好吊色视频一区二区| 激情成人中文字幕| xxxx日本免费| 国产真实乱偷精品视频免| 中国黄色录像片| 任我爽精品视频在线播放| 456亚洲影院| 亚洲成人影院麻豆| 欧美成人性福生活免费看| 日韩毛片在线视频| 久久精品一区二区三区不卡牛牛 | 亚欧洲精品在线视频| 97久久精品人人做人人爽50路| 黄www在线观看| 日韩.com| 国产伦精品一区二区三区高清版| 东京一区二区| 超薄丝袜一区二区| 无码精品黑人一区二区三区| 欧美中文一区二区三区| 国产精品老熟女一区二区| 91亚洲男人天堂| 激情 小说 亚洲 图片: 伦| 中文字幕一区二区精品区| 黑人另类av| 免费视频成人| 91av国产在线| 国产成人l区| 亚洲欧美日韩国产中文专区| 国产农村老头老太视频| 色综合天天性综合| 国产精品久久久精品四季影院| 91在线观看下载| 一区二区三区四区毛片| 亚洲影院免费| 国产精品一二三在线观看| 久久av影院| 136fldh精品导航福利| 免费黄色电影在线观看| 亚洲精品aⅴ中文字幕乱码 | 成人av三级| 欧美成人一二三| 国产免费av在线| 亚洲成人精品av| 97精品久久人人爽人人爽| 五月天精品一区二区三区| 免费成人美女女在线观看| 97精品视频在线观看自产线路二| 中文av字幕在线观看| 亚洲欧美清纯在线制服| 亚洲熟妇无码av在线播放| 99热国内精品| 手机看片福利永久国产日韩| 欧美偷窥清纯综合图区| 99re视频在线观看| 欧美videosex性欧美黑吊| 最新的欧美黄色| 欧美一区二区三区少妇| 亚洲第一页在线| 精品免费久久久| 欧美精品免费视频| 日本成人一级片| 色综合色狠狠天天综合色| 91香蕉在线视频| 夜夜精品浪潮av一区二区三区| 成人18视频免费69| 国产日韩精品一区二区三区| 无遮挡aaaaa大片免费看| 不卡在线观看av| 久久久久亚洲av成人网人人软件| 亚洲性图久久| 国产精品久久久对白| 成人国产精品久久| 国产日韩欧美黄色| 激情久久99| 国产精品视频自在线| 亚洲天堂1区| 国产精品999999| 91精品韩国| 国产精品电影观看| 123成人网| 国产精品亚洲一区二区三区| av成人在线看| 国产免费成人av| 伊人亚洲精品| 99电影在线观看| xxxx日韩| 国产国语videosex另类| 伊人久久av| 国产经典一区二区| 精品三级在线| 亚洲精品女av网站| 欧美三级网站| 欧美一级大片在线观看| 成人勉费视频| 国产精品自产拍在线观| 亚洲伊人精品酒店| 91久久精品www人人做人人爽| 亚洲一区二区三区中文字幕在线观看 | 欧美在线一区视频| 国产欧美精品久久| 黄色一级一级片| 毛片一区二区三区| 小日子的在线观看免费第8集| 国产a区久久久| 国产肉体xxxx裸体784大胆| 久久久91精品国产一区二区三区| 少妇精品无码一区二区免费视频| 国产精品你懂的在线欣赏| 久热这里有精品| 五月激情综合网| 五月婷婷六月婷婷| 91精品国产综合久久久久| www精品国产| 亚洲精品自在久久| 成人在线影视| 国产91成人video| 欧美黄页免费| 国产精品日韩一区二区| 国产免费久久| 国产对白在线播放| 亚洲精品影院在线观看| 国内外成人免费在线视频| 国产99久久精品| 无码人妻丰满熟妇啪啪欧美| 尤物在线观看一区| 国产成人无码av| 日韩免费视频一区| 国产大学生校花援交在线播放| 美女福利精品视频| 欧美成人黑人| 国产福利久久精品| 日韩国产一区二区| 国产免费黄视频| 国产在线精品国自产拍免费| 波多野结衣av在线免费观看| 亚洲精品国产a| 波多野结衣午夜| 制服丝袜亚洲精品中文字幕| 免费在线黄色影片| 欧美日韩福利电影| 男人亚洲天堂| 欧美精品一区二区三区在线四季 | 亚洲成人免费在线观看| 中文字幕+乱码+中文| 日韩av有码在线| 宅男网站在线免费观看| 国产欧美日韩视频| 国产伦一区二区三区| 久在线观看视频| 国产99久久久国产精品潘金 | 黄色国产在线| 97精品视频在线| 97se亚洲| 国产欧美综合一区| 国产在线乱码一区二区三区| 午夜时刻免费入口| 欧美日韩国产精品一区二区三区四区| www男人的天堂| 不卡av在线网站| 成人综合日日夜夜| 午夜欧美性电影| 天堂久久久久va久久久久| 亚州av综合色区无码一区| 亚洲精品少妇30p| 九九热视频在线免费观看| 91久久精品一区二区| 中国a一片一级一片| 日韩精品在线免费观看| 九九在线视频| 91精品国产91久久久久久久久| 91国内精品白嫩初高生| mm131午夜| 国产一区二区视频在线播放| 欧美大片xxxx| 日韩欧美国产一二三区| 在线中文字幕电影| 91亚洲国产精品| 亚洲精品2区| 四虎1515hh.com| 一区二区视频免费在线观看| 国产婷婷在线视频| 久久国产精品久久精品| 美女国产精品久久久| 国产日产欧美一区二区| 韩国v欧美v亚洲v日本v| 少妇aaaaa| 欧美v国产在线一区二区三区| 国产啊啊啊视频在线观看| 国产精品10p综合二区| 亚洲高清成人| 人妻精品久久久久中文字幕 | 四虎永久在线精品无码视频| 91美女视频网站| www.久久视频| xxx欧美精品| 欧美第一在线视频| 国产曰肥老太婆无遮挡| 91女厕偷拍女厕偷拍高清| 天天射天天干天天| 日韩在线视频免费观看| 欧美高清hd| 9久久9毛片又大又硬又粗| 久久久精品国产免大香伊| 艳妇乳肉豪妇荡乳av无码福利| 日韩在线视频网站| 日韩区一区二| 黄色动漫在线免费看| 国产欧美一区二区三区沐欲| 一级黄色免费看| 欧美激情在线有限公司| 在线日韩网站| 日本精品一区在线| 欧美日韩加勒比精品一区| 男人天堂久久久| 国产91一区二区三区| 三级在线观看一区二区| 亚洲欧美精品aaaaaa片| 亚洲国产女人aaa毛片在线| 日韩一区精品| 国产精品国三级国产av| 久久久国产精品麻豆| 国产欧美第一页| 欧美亚洲日本黄色| 久久久久久久久国产一区| 黄色a一级视频| 欧美一区二区女人| 性高爱久久久久久久久| 久久久久福利视频| 国产女人aaa级久久久级| 动漫av一区二区三区| 国产精品一区=区|