精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

線程池的管理哲學(xué)

開發(fā)
針對(duì)線程池拒絕策略的設(shè)計(jì)和使用更多是考察讀者對(duì)于線程池源碼的理解和使用經(jīng)驗(yàn),這里筆者僅在思路上給出示例,當(dāng)然實(shí)現(xiàn)上也存在很多不完美的地方。

收到不少讀者反饋,其中有這幾道關(guān)于線程池的問題:

  • 在進(jìn)行線程池設(shè)計(jì)時(shí),如何選擇拒絕策略?
  • 如果不允許丟棄任務(wù)任務(wù),應(yīng)該選擇哪個(gè)拒絕策略?
  • 使用CallerRunsPolicy這個(gè)拒絕策略有什么風(fēng)險(xiǎn)?有沒有更好的處理方式呢?

一、詳解拒絕策略常見問題

1. 線程池是如何工作的

我們先來復(fù)習(xí)一下線程池的工作流程,每次任務(wù)提交時(shí),線程池都會(huì)嘗試將任務(wù)提交到核心線程上,如果線程數(shù)小于核心線程數(shù),線程池就會(huì)添加工作線程并執(zhí)行當(dāng)前任務(wù)。 若核心線程都處于工作狀態(tài),這就表明當(dāng)前線程池有些忙碌,那么這些無法及時(shí)處理的任務(wù)就會(huì)提交到阻塞任務(wù)隊(duì)列中。 隨著任務(wù)的遞增,任務(wù)隊(duì)列無法容納最新的任務(wù),線程池就會(huì)認(rèn)為現(xiàn)處于高峰期,便臨時(shí)增加應(yīng)急線程處理任務(wù)。隨著任務(wù)逐步處理完成,線程在指定時(shí)間內(nèi)沒有要處理的任務(wù),這些線程也就會(huì)依次退出。

對(duì)應(yīng)我們也給出ThreadPoolExecutor提交任務(wù)的execute方法的源碼:

public void execute(Runnable command) {
  //任務(wù)判空
        if (command == null)
            throw new NullPointerException();
        //查看當(dāng)前運(yùn)行的線程數(shù)量
        int c = ctl.get();
     //若小于核心線程則直接添加一個(gè)工作線程并執(zhí)行任務(wù)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線程數(shù)等于核心線程數(shù)則嘗試將任務(wù)入隊(duì)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //入隊(duì)失敗,調(diào)用addWorker參數(shù)為false,嘗試創(chuàng)建應(yīng)急線程處理突發(fā)任務(wù)
        else if (!addWorker(command, false))
         //如果創(chuàng)建應(yīng)急線程失敗,說明當(dāng)前線程數(shù)已經(jīng)大于最大線程數(shù),這個(gè)任務(wù)只能拒絕了
            reject(command);
    }

上文提到了應(yīng)急線程長(zhǎng)時(shí)間沒有要處理的任務(wù)就會(huì)被銷毀的邏輯,這里我們也簡(jiǎn)單的介紹一下,首先在線程池中每一個(gè)線程是以Worker的形式封裝呈現(xiàn),其本質(zhì)就是對(duì)Thread的封裝,Worker啟動(dòng)后會(huì)調(diào)用run方法調(diào)用runWorker方法輪詢處理任務(wù):

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        
        final Thread thread;
       
  
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
 
 //......
}

查看runWorker方法,一旦在規(guī)定時(shí)間內(nèi)getTask沒有拿到任務(wù)就會(huì)退出循環(huán),直接通過processWorkerExit結(jié)束這個(gè)工作線程:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
         //對(duì)應(yīng)時(shí)間內(nèi)沒有拿到task則退出循環(huán)
            while (task != null || (task = getTask()) != null) {
               //略
            }
            completedAbruptly = false;
        } finally {
         //結(jié)束這個(gè)工作線程
            processWorkerExit(w, completedAbruptly);
        }
    }

自此,我們將線程池整體工作流程簡(jiǎn)單的梳理完畢。

2. 拒絕策略的選擇

先來說說第一道題,關(guān)于拒絕策略的選擇,我們不妨直接查看RejectedExecutionHandler子類的源碼進(jìn)行說明。 先來看看CallerRunsPolicy,該拒絕策略會(huì)直接用當(dāng)前調(diào)用者執(zhí)行當(dāng)前任務(wù):

public static class CallerRunsPolicy implements RejectedExecutionHandler {
       
        public CallerRunsPolicy() { }

     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
             //直接基于當(dāng)前線程調(diào)用run方法執(zhí)行任務(wù)
                r.run();
            }
        }
    }

然后就是AbortPolicy,也很簡(jiǎn)單,直接拋異常:

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

      
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

DiscardPolicy則是什么也不做,這也就意味著這個(gè)任務(wù)沒有任務(wù)處理,等同于丟棄:

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

       //不做任何事情任務(wù)直接丟棄
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

最后一個(gè)就是DiscardOldestPolicy,該策略會(huì)將隊(duì)首部任務(wù)丟棄,然后嘗試將再次execute這個(gè)任務(wù):

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
      
        public DiscardOldestPolicy() { }

       //丟掉隊(duì)首的任務(wù),然后往線程池提交當(dāng)前任務(wù)
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

不同拒絕策略都有著不同的使用場(chǎng)景:

  • 如果我們的任務(wù)不算耗時(shí)還要保證能夠被執(zhí)行,那么CallerRunsPolicy則是第一選擇。
  • 若突增大量任務(wù)導(dǎo)致無法及時(shí)處理從業(yè)務(wù)的角度認(rèn)為是異常的話,那么我們則建議拋出AbortPolicy讓開發(fā)介入及時(shí)調(diào)優(yōu)處理,前提是當(dāng)前業(yè)務(wù)正處于業(yè)務(wù)提測(cè)階段。
  • 對(duì)于那些需要提交實(shí)時(shí)性消息的監(jiān)控型任務(wù),那么新提交的任務(wù)勢(shì)必實(shí)時(shí)性會(huì)由于更早的任務(wù),這種場(chǎng)景使用DiscardOldestPolicy即可。
  • 如果這些任務(wù)相較于系統(tǒng)可靠性來說,如果不是很重要,那么直接采用rejectedExecution丟棄任務(wù)即可。

3. 主流框架對(duì)于拒絕策略的選擇

只要繼承RejectedExecutionHandler就可以實(shí)現(xiàn)相應(yīng)的拒絕策略,所以我們也不妨看看一些主流的框架是如何使用拒絕策略的吧。

tomcat線程池的拒絕策略也是拋出異常:

private static class RejectHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r,
                java.util.concurrent.ThreadPoolExecutor executor) {
            throw new RejectedExecutionException();
        }

    }

而Dubbo則相對(duì)友好一些,它會(huì)優(yōu)先打印一個(gè)日志,并告知異常堆棧信息,然后拋出異常:

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
   //......

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }
    private void dumpJStack() {
       //省略實(shí)現(xiàn)
    }
}

Netty就相對(duì)穩(wěn)健一些,它的拒絕策略則是直接創(chuàng)建一個(gè)線程池以外的線程處理這些任務(wù),為了保證任務(wù)的實(shí)時(shí)處理,這種做法可能需要良好的硬件設(shè)備且臨時(shí)創(chuàng)建的線程無法做到準(zhǔn)確的監(jiān)控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy() {
        super();
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            final Thread t = new Thread(r, "Temporary task executor");
            t.start();
        } catch (Throwable e) {
            throw new RejectedExecutionException(
                    "Failed to start a new thread", e);
        }
    }
}

ActiveMq則是嘗試在指定的時(shí)效內(nèi)盡可能的爭(zhēng)取將任務(wù)入隊(duì),以保證最大交付:

new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }
                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
                }
            });

4. CallerRunsPolicy存在的問題及解決對(duì)策

默認(rèn)情況下,我們都會(huì)為了保證任務(wù)不被丟棄都優(yōu)先考慮CallerRunsPolicy,這也是相對(duì)維穩(wěn)的做法,這種做法的隱患是假設(shè)走到CallerRunsPolicy的任務(wù)是個(gè)非常耗時(shí)的任務(wù),就會(huì)導(dǎo)致主線程就很卡死。

下面就是筆者通過主線程使用線程池的方法,該線程池限定了最大線程數(shù)為2還有阻塞隊(duì)列大小為1,這意味著第4個(gè)任務(wù)就會(huì)走到拒絕策略:

//創(chuàng)建線程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                2,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.CallerRunsPolicy());

        threadPoolExecutor.execute(() -> {
            log.info("核心線程執(zhí)行");
            ThreadUtil.sleep(1, TimeUnit.DAYS);
        });

        threadPoolExecutor.execute(() -> {
            log.info("任務(wù)入隊(duì)");
            ThreadUtil.sleep(1, TimeUnit.DAYS);
        });

        threadPoolExecutor.execute(() -> {
            log.info("應(yīng)急線程處理");
            ThreadUtil.sleep(1, TimeUnit.DAYS);
        });

        threadPoolExecutor.execute(() -> {
            log.info("CallerRunsPolicy task");
            ThreadUtil.sleep(1, TimeUnit.DAYS);
        });


        threadPoolExecutor.execute(() -> {
            log.info("因?yàn)橹骶€程卡住,無法被處理的任務(wù)");

        });

從輸出結(jié)果可以看出,因?yàn)镃allerRunsPolicy這個(gè)拒絕策略,導(dǎo)致耗時(shí)的任務(wù)用了主線程執(zhí)行,導(dǎo)致線程池阻塞,進(jìn)而導(dǎo)致后續(xù)任務(wù)無法及時(shí)執(zhí)行,嚴(yán)重的情況下很可能導(dǎo)致OOM:

2024-04-03 00:08:12.617  INFO 20804 --- [           main] com.sharkChili.ThreadPoolApplication     : 啟動(dòng)成功?。?2024-04-03 00:08:15.739  INFO 20804 --- [pool-1-thread-1] com.sharkChili.ThreadPoolApplication     : 核心線程執(zhí)行
2024-04-03 00:08:36.768  INFO 20804 --- [pool-1-thread-2] com.sharkChili.ThreadPoolApplication     : 應(yīng)急線程處理
2024-04-03 00:08:49.333  INFO 20804 --- [           main] com.sharkChili.ThreadPoolApplication     : CallerRunsPolicy task

我們從問題的本質(zhì)入手,調(diào)用者采用CallerRunsPolicy是希望所有的任務(wù)都能夠被執(zhí)行,按照筆者的經(jīng)驗(yàn),假如我們的場(chǎng)景是偶發(fā)這種突發(fā)場(chǎng)景,在內(nèi)存允許的情況下,我們建議增加阻塞隊(duì)列BlockingQueue的大小并調(diào)整堆內(nèi)存以容納更多的任務(wù),確保任務(wù)能夠被準(zhǔn)確執(zhí)行。

若當(dāng)前服務(wù)器內(nèi)存資源緊張,但我們配置線程池還為盡可能利用到CPU,我們建議調(diào)整線程中maximumPoolSize以保證盡可能壓榨CPU資源:

如果服務(wù)器資源達(dá)到可利用的極限,這就意味我們要在設(shè)計(jì)策略上改變線程池的調(diào)度了,我們都知道,導(dǎo)致主線程卡死的本質(zhì)就是因?yàn)槲覀儾幌M魏我粋€(gè)任務(wù)被丟棄。換個(gè)思路,有沒有辦法既能保證任務(wù)不被丟棄且在服務(wù)器有余力時(shí)及時(shí)處理呢?

這里筆者提供的一種思路,即任務(wù)持久化,注意這里筆者更多強(qiáng)調(diào)的是思路而不是實(shí)現(xiàn),這里所謂的任務(wù)持久化,包括但不限于:

  • 設(shè)計(jì)一張任務(wù)表間任務(wù)存儲(chǔ)到MySQL數(shù)據(jù)庫(kù)中。
  • Redis緩存任務(wù)。
  • 將任務(wù)提交到消息隊(duì)列中。

筆者以方案一為例,通過繼承BlockingQueue實(shí)現(xiàn)一個(gè)混合式阻塞隊(duì)列,該隊(duì)列包含JDK自帶的ArrayBlockingQueue和一個(gè)自定義的隊(duì)列(數(shù)據(jù)表),通過魔改隊(duì)列的添加邏輯達(dá)到任務(wù)可以存入ArrayBlockingQueue或者數(shù)據(jù)表的目的。

如此一來,一旦我們的線程池中線程達(dá)到滿載時(shí),我們就可以通過拒絕策略將最新任務(wù)持久化到MySQL數(shù)據(jù)庫(kù)中,等到線程池有了有余力處理所有任務(wù)時(shí),讓其優(yōu)先處理數(shù)據(jù)庫(kù)中的任務(wù)以避免"饑餓"問題。

這里筆者也給出混合隊(duì)列實(shí)現(xiàn)的核心源碼,即通過繼承BlockingQueue魔改了入隊(duì)和出隊(duì)的邏輯:

public class HybridBlockingQueue<E> implements BlockingQueue<E> {

  private Object mysqlLock = new Object();


    private ArrayBlockingQueue<E> arrayBlockingQueue;

    //構(gòu)造方法初始化阻塞隊(duì)列大小
    public HybridBlockingQueue(int maxSize) {
        arrayBlockingQueue = new ArrayBlockingQueue<>(maxSize);
    }


    

    /**
     * 線程池會(huì)調(diào)用的入隊(duì)方法
     * @param e
     * @return
     */
    @Override
    public boolean offer(E e) {
        return arrayBlockingQueue.offer(e);
    }


    /**
     * 取任務(wù)時(shí),優(yōu)先從數(shù)據(jù)庫(kù)中讀取最早的任務(wù)
     *
     * @return
     * @throws InterruptedException
     */
    @Override
    public E take() throws InterruptedException {

        synchronized (mysqlLock) {
            //從數(shù)據(jù)庫(kù)中讀取任務(wù),通過上鎖讀取避免重復(fù)消費(fèi)
            TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
            TaskInfo taskInfo = taskMapper.selectByExample(null).stream()
                    .findFirst()
                    .orElse(null);


            //若數(shù)據(jù)庫(kù)存在該任務(wù),則先刪后返回
            if (ObjUtil.isNotEmpty(taskInfo)) {
                taskMapper.deleteByPrimaryKey(taskInfo.getId());
                Task task = new Task(taskInfo.getData());
                return (E) task;
            }
        }

        //若數(shù)據(jù)庫(kù)沒有要處理的任務(wù)則從內(nèi)存中獲取
        return arrayBlockingQueue.poll();
    }

    /**
     * 帶有時(shí)間限制的任務(wù)獲取
     *
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        //從數(shù)據(jù)庫(kù)中讀取任務(wù),通過上鎖讀取避免重復(fù)消費(fèi)
        synchronized (mysqlLock) {
            //從數(shù)據(jù)庫(kù)中讀取任務(wù),
            TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
            TaskInfo taskInfo = taskMapper.selectByExample(null).stream()
                    .findFirst()
                    .orElse(null);


            //若數(shù)據(jù)庫(kù)存在該任務(wù),則先刪后返回
            if (ObjUtil.isNotEmpty(taskInfo)) {
                taskMapper.deleteByPrimaryKey(taskInfo.getId());
                Task task = new Task(taskInfo.getData());
                return (E) task;
            }
        }
        //若數(shù)據(jù)庫(kù)沒有要處理的任務(wù)則從內(nèi)存中獲取
        return arrayBlockingQueue.poll(timeout, unit);

    }
 
 //......
}

接下來就是自定義拒絕策略了,很明顯我們的拒絕策略就叫持久化策略:

public class PersistentTaskPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  //任務(wù)入庫(kù)
        TaskInfoMapper taskMapper = SpringUtil.getBean(TaskInfoMapper.class);
        Task task = (Task) r;
        TaskInfo taskInfo = new TaskInfo();
        taskInfo.setData(JSONUtil.toJsonStr(task.getTaskInfo()));
        taskMapper.insertSelective(taskInfo);
    }
}

最終我們的使用示例如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                2,
                60, TimeUnit.SECONDS,
                new HybridBlockingQueue<>(1),
                new PersistentTaskPolicy());

        threadPoolExecutor.execute(new Task("core thread"));

        threadPoolExecutor.execute(new Task("queueTask"));

        threadPoolExecutor.execute(new Task("max thread"));

        threadPoolExecutor.execute(new Task("insert into mysql database"));

因?yàn)榫€程池?zé)o法及時(shí)處理而走了我們自定義的拒絕策略而持久化入庫(kù),最終我們的insert into mysql database,等待線程池中其他任務(wù)完成后被取出執(zhí)行:

2024-04-14 11:30:16.865  INFO 1052 --- [           main] com.sharkChili.PersistentTaskPolicy      : 任務(wù)持久化,taskInfo:{"data":"insert into mysql database"}
2024-04-14 11:31:08.516  INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task                      : task execution completed,task info:max thread
2024-04-14 11:31:08.516  INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task                      : task execution completed,task info:core thread
2024-04-14 11:32:08.563  INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task                      : task execution completed,task info:queueTask
2024-04-14 11:32:08.563  INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task                      : task execution completed,task info:insert into mysql database

二、更高維度的思考——線程池限流的藝術(shù)

上文我們以大篇幅的維度探討拒絕策略上優(yōu)化,需要保證準(zhǔn)確、有效執(zhí)行的任務(wù)能夠被線程池處理,且不會(huì)破壞程序的穩(wěn)定性,即提交的任務(wù)能夠被正確處理且線程池不會(huì)被打死。

這一點(diǎn),結(jié)合《Java并發(fā)編程實(shí)戰(zhàn)》的說法,我們也可以利用信號(hào)量Semaphore作為令牌,確保只有拿到令牌的線程才能將任務(wù)提交到線程池,保證線程池可以在單位時(shí)間內(nèi)按照我們?cè)O(shè)定的并發(fā)數(shù)執(zhí)行任務(wù):

利用信號(hào)量完成線程池的限流,既保證任務(wù)可被執(zhí)行和工作線程池的穩(wěn)定性,又能將性能瓶頸和程序穩(wěn)定性問題拋給更高層級(jí)的調(diào)用者,尤上層根據(jù)需要決定當(dāng)前任務(wù)是等待被線程池處理,還是直接中斷結(jié)束。

對(duì)應(yīng)的我們給出流控性質(zhì)的線程池代碼示例,讀者可參考筆者所說的思路和注釋了解一下落地思路:

public class RateLimitedExecutor {

    private final ExecutorService threadPool;
    private final Semaphore semaphore;

    //基于bound創(chuàng)建對(duì)應(yīng)并發(fā)度的線程池和流控令牌
    public RateLimitedExecutor(int bound) {
        this.threadPool = Executors.newFixedThreadPool(bound);
        this.semaphore = new Semaphore(bound, true);

    }


    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        Console.log("{}獲取令牌成功,執(zhí)行時(shí)間:{}", Thread.currentThread().getName(), DateUtil.now());
        try {
            threadPool.execute(() -> {
                try {
                    //執(zhí)行任務(wù)
                    command.run();
                } finally {
                    //線程執(zhí)行完成后釋放令牌
                    semaphore.release();
                }
            });
        } catch (RejectedExecutionException e) {//異常兜底
            semaphore.release();
        }
    }

}

對(duì)應(yīng)的我們也給出使用示例,可以看到我們創(chuàng)建了流控為5的線程池,并創(chuàng)建10個(gè)并發(fā)線程執(zhí)行提交操作:

public static void main(String[] args) {
        RateLimitedExecutor executor = new RateLimitedExecutor(5);
        for (int i = 0; i < 10; i++) {
            new Thread(new Task("任務(wù)" + i, executor)).start();

        }
    }

    private static class Task implements Runnable {
        private final String threadName;
        private final RateLimitedExecutor executor;

        public Task(String threadName, RateLimitedExecutor executor) {
            this.threadName = threadName;
            this.executor = executor;
        }

        @SneakyThrows
        @Override
        public void run() {

            executor.submitTask(() -> {
                ThreadUtil.sleep(5000);
                Console.log("{}執(zhí)行任務(wù)完成", threadName);
            });
        }
    }

輸出結(jié)果如下,可以看到流控符合預(yù)期為5,同時(shí)我們也將程序穩(wěn)定性和性能瓶頸等各方面的壓力轉(zhuǎn)移給上層調(diào)用者,避免了非必要的拒絕策略處理,讓線程池專注于并發(fā)度的優(yōu)化:

Thread-1獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:04
Thread-9獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:04
Thread-5獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:04
Thread-8獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:04
Thread-3獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:04
任務(wù)5執(zhí)行任務(wù)完成
任務(wù)9執(zhí)行任務(wù)完成
任務(wù)3執(zhí)行任務(wù)完成
任務(wù)1執(zhí)行任務(wù)完成
任務(wù)8執(zhí)行任務(wù)完成
Thread-6獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:10
Thread-0獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:10
Thread-7獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:10
Thread-2獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:10
Thread-4獲取令牌成功,執(zhí)行時(shí)間:2025-07-02 09:47:10
任務(wù)0執(zhí)行任務(wù)完成
任務(wù)4執(zhí)行任務(wù)完成
任務(wù)6執(zhí)行任務(wù)完成
任務(wù)7執(zhí)行任務(wù)完成
任務(wù)2執(zhí)行任務(wù)完成

三、流限線程池優(yōu)化思路

當(dāng)然關(guān)于流限,如果用戶明確知曉超時(shí)中斷的時(shí)機(jī)并具備靈活響應(yīng)中斷的能力,我們完全可以補(bǔ)充一個(gè)帶有超時(shí)限制的任務(wù)提交函數(shù)trySubmitTask:

public boolean trySubmitTask(final Runnable command,
                              long timeout,
                              TimeUnit unit) throws InterruptedException {

        if (!semaphore.tryAcquire(timeout, unit)) {
            Console.log("{}獲取令牌失敗,執(zhí)行時(shí)間:{}", Thread.currentThread().getName(), DateUtil.now());
            return false;
        }

        Console.log("{}獲取令牌成功,執(zhí)行時(shí)間:{}", Thread.currentThread().getName(), DateUtil.now());
        try {
            threadPool.execute(() -> {
                try {
                    //執(zhí)行任務(wù)
                    command.run();
                } finally {
                    //線程執(zhí)行完成后釋放令牌
                    semaphore.release();
                }
            });
        } catch (RejectedExecutionException e) {//異常兜底
            semaphore.release();
            return false;
        }
        return true;
    }

對(duì)應(yīng)我們也給出使用示例:

public static void main(String[] args) {
        RateLimitedExecutor executor = new RateLimitedExecutor(1);
        for (int i = 0; i < 2; i++) {
            new Thread(new Task("任務(wù)" + i, executor)).start();

        }
    }

    private static class Task implements Runnable {
        private final String threadName;
        private final RateLimitedExecutor executor;

        public Task(String threadName, RateLimitedExecutor executor) {
            this.threadName = threadName;
            this.executor = executor;
        }

        @SneakyThrows
        @Override
        public void run() {

            executor.trySubmitTask(() -> {
                ThreadUtil.sleep(5000);
                Console.log("{}執(zhí)行任務(wù)完成", threadName);
            }, 1, TimeUnit.SECONDS);
        }
    }

輸出結(jié)果如下,可以看到線程1按照正確的超時(shí)等待返回了:

Thread-0獲取令牌成功,執(zhí)行時(shí)間:2025-07-04 09:28:27
Thread-1獲取令牌失敗,執(zhí)行時(shí)間:2025-07-04 09:28:28
任務(wù)0執(zhí)行任務(wù)完成

四、小結(jié)

針對(duì)線程池拒絕策略的設(shè)計(jì)和使用更多是考察讀者對(duì)于線程池源碼的理解和使用經(jīng)驗(yàn),這里筆者僅在思路上給出示例,當(dāng)然實(shí)現(xiàn)上也存在很多不完美的地方,例如:

  • 如何保證持久化任務(wù)被可靠消費(fèi)。
  • 如何保證數(shù)據(jù)庫(kù)和內(nèi)存中任務(wù)的公平調(diào)度。
  • 持久化任務(wù)是先刪后返回還是先返回處理完成后刪除如何決定?
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2025-07-03 07:10:00

線程池并發(fā)編程代碼

2025-11-14 09:37:40

2012-05-15 02:18:31

Java線程池

2023-05-19 08:01:24

Key消費(fèi)場(chǎng)景

2021-09-11 15:26:23

Java多線程線程池

2024-07-15 08:20:24

2021-06-17 06:57:10

SpringBoot線程池設(shè)置

2023-11-29 16:38:12

線程池阻塞隊(duì)列開發(fā)

2015-08-20 09:17:36

Java線程池

2010-08-25 16:26:59

研發(fā)

2023-11-22 08:37:40

Java線程池

2020-12-10 08:24:40

線程池線程方法

2023-10-13 08:20:02

Spring線程池id

2010-03-11 11:14:38

施振榮

2024-11-06 09:39:52

2023-06-07 13:49:00

多線程編程C#

2021-06-24 08:02:35

線程池Java代碼

2019-12-27 09:09:42

Tomcat線程池JDK

2025-01-09 11:24:59

線程池美團(tuán)動(dòng)態(tài)配置中心

2020-10-19 10:01:12

Nodejs線程池設(shè)計(jì)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

91成人免费观看网站| 日韩久久免费视频| 日韩精品在线中文字幕| 日色在线视频| 久久精品99国产精品| 欧美久久精品午夜青青大伊人| 精品无码av一区二区三区| 小视频免费在线观看| 国产精品你懂的| 97超级碰碰| 日韩黄色片网站| 午夜久久久久| 国产亚洲综合久久| 中国xxxx性xxxx产国| 欧美日韩亚洲国产| 亚洲综合激情另类小说区| 日韩欧美亚洲日产国| 亚洲av综合色区无码一区爱av| 男人天堂欧美日韩| 九色精品免费永久在线| 91精品久久久久久久久久久久| 97久久综合精品久久久综合| 欧美午夜一区二区三区| 国产精品沙发午睡系列| 伊人手机在线| 国产视频一区二区在线| 国产精品一区视频网站| 国产精品伦理一区| 久久久久99| 国色天香2019中文字幕在线观看| 三级黄色录像视频| 国产va免费精品观看精品视频| 日韩欧美在线影院| 97超碰人人爽| 欧美日韩美女| 欧美日韩免费看| 亚洲精品无码国产| 黄色av网站在线播放| 国产欧美日韩另类视频免费观看| 黑人中文字幕一区二区三区| 99久久精品日本一区二区免费| 日日摸夜夜添夜夜添亚洲女人| 777777777亚洲妇女| 国产午夜小视频| 欧美日韩国产欧| 超碰日本道色综合久久综合| 任我爽在线视频| 日本a级不卡| 亚洲一区999| 日本少妇高潮喷水xxxxxxx| 色橹橹欧美在线观看视频高清| 精品国产自在久精品国产| 日韩av福利在线观看| 少妇高潮一区二区三区99| 欧美三级韩国三级日本三斤| 亚洲成色www.777999| 少妇一区视频| 在线视频国内自拍亚洲视频| 欧美日韩中文在线视频| 国产亚洲一区二区手机在线观看 | 精品噜噜噜噜久久久久久久久试看| 中文字幕第38页| 精品国产不卡一区二区| 日韩三级精品电影久久久| 日韩黄色一区二区| 久久国产精品色av免费看| 亚洲精品xxx| 色噜噜日韩精品欧美一区二区| 九九综合九九| 中文字幕亚洲欧美一区二区三区| 免费成人深夜天涯网站| 久久久9色精品国产一区二区三区| xxxxxxxxx欧美| 国产精品白嫩白嫩大学美女| 亚洲国产裸拍裸体视频在线观看乱了中文 | 色哟哟一区二区三区| 一区二区三区网址| 国产精区一区二区| 精品第一国产综合精品aⅴ| 私密视频在线观看| 蜜桃精品wwwmitaows| 深夜精品寂寞黄网站在线观看| 91狠狠综合久久久| 久久久久99人妻一区二区三区| 91精品免费| 美女av一区二区| 国产精品免费久久久久影院| 国产精品一区二区三| 日本a在线免费观看| 涩涩涩视频在线观看| 在线影视一区二区三区| 欧美激情第四页| 夜夜春成人影院| 久久精品91久久香蕉加勒比| 久久综合久久鬼| 久久亚洲美女| 999国产在线| 九九在线视频| 一区二区视频在线| 黄色国产小视频| 日韩精品视频一区二区三区| 亚洲伦理中文字幕| 亚洲欧美精品aaaaaa片| 亚洲主播在线| 不卡一区二区三区四区五区| 成年人视频网站在线| 亚洲综合久久久久| 男女视频在线看| **爰片久久毛片| 中文字幕视频在线免费欧美日韩综合在线看 | av中文字幕一区二区| 久久久久久久香蕉网| 夜夜狠狠擅视频| 91老师片黄在线观看| 国产又粗又长又爽视频| 日本欧美一区| 亚洲精品v欧美精品v日韩精品| 久久久久久久久久久久久女过产乱| 男人的天堂亚洲| 国产伦精品一区二区三区在线| 免费在线看黄网站| 91福利国产精品| 国产情侣久久久久aⅴ免费| 国产大片一区| 国产精品福利网站| 色综合久久网女同蕾丝边| 亚洲三级电影网站| 亚洲欧美视频二区| 九九久久婷婷| 欧美一级淫片videoshd| 日本韩国免费观看| 亚洲一区二区三区在线| 中文av字幕在线观看| 日韩精品永久网址| 国产精品福利在线观看| 欧美zozo| 色综合天天综合| 国产艳俗歌舞表演hd| 影音先锋一区| 国产精品一区二区三区不卡| 在线中文字幕视频观看| 91精品国产综合久久久久| 国产视频精品免费| 免费看日韩精品| 日韩欧美亚洲区| 97欧美成人| 在线精品播放av| 在线免费一区二区| 国产三级精品三级在线专区| www精品久久| 激情av综合| 国内精品小视频在线观看| 性欧美18一19性猛交| 综合久久一区二区三区| 91 视频免费观看| 91精品一区二区三区综合| 91免费综合在线| 女囚岛在线观看| 欧美精品一区二区三区视频| 在线免费观看毛片| 99re视频精品| 人人爽人人av| 我不卡神马影院| 亚洲影视中文字幕| 国产桃色电影在线播放| 亚洲成人黄色在线| 免费看日批视频| 国产精品久久久久一区二区三区| 99国产精品久久久久久| 欧美日一区二区三区在线观看国产免| 懂色av一区二区三区在线播放| 国产在线天堂www网在线观看| 亚洲国产精品视频在线观看| 无码人妻精品一区二区蜜桃色欲| 日本一区二区三区在线不卡| 免费成人黄色大片| 亚洲日本久久| 日韩精品欧美一区二区三区| 国产精久久一区二区| 久久久久久这里只有精品| 日漫免费在线观看网站| 欧美日韩成人综合在线一区二区| 永久看片925tv| 91在线国产福利| 九九热免费精品视频| 中文字幕免费一区二区| 精品日韩电影| 精品176极品一区| 欧美激情一区二区三区成人| 久久久久久青草| 欧美一区二区三区四区久久 | 亚洲三级视频| 这里只有精品66| 久草精品视频| 成人免费看片视频| 欧美裸体视频| 久久影院在线观看| 欧美色综合一区二区三区| 欧美电影一区二区三区| 欧美亚洲精品天堂| 亚洲激情校园春色| 欧美自拍偷拍网| 99久久99久久久精品齐齐| 在线观看日本一区二区| 日韩午夜免费视频| 青青草免费在线视频观看| 日韩高清有码在线| 久久精品成人一区二区三区蜜臀 | 九九热99久久久国产盗摄| 视频二区在线播放| 美女露胸视频在线观看| 日韩小视频在线| 欧美一区二区视频| 日韩美女视频在线| 嫩草影院一区二区三区| 亚洲成人自拍偷拍| 希岛爱理中文字幕| 久久久www成人免费毛片麻豆| 手机在线免费毛片| 美女视频网站黄色亚洲| 黄页免费在线观看视频| 欧美a级一区| 一区二区精品在线| 国产午夜一区| 精品视频一区二区三区四区| 日韩一二三区| 91在线免费观看网站| 影视一区二区三区| 青草热久免费精品视频| 欧美私密网站| 97精品国产97久久久久久| 直接在线观看的三级网址| 色琪琪综合男人的天堂aⅴ视频| 五月婷婷丁香六月| 欧美精品一区男女天堂| www.五月天激情| 日韩一区二区视频| 国产深喉视频一区二区| 精品视频在线视频| 国产成人自拍偷拍| 91精品1区2区| 无码无套少妇毛多18pxxxx| 欧美日韩激情视频| 日韩xxx高潮hd| 午夜a成v人精品| 日韩免费黄色片| 午夜视频一区在线观看| 日本中文字幕网| 亚洲不卡一区二区三区| 国产无套内射又大又猛又粗又爽| 一区二区三区在线免费视频| 1024手机在线视频| 亚洲黄色片在线观看| 久久久久久久久久久久久久久久久| 亚洲黄色免费电影| 日韩精品无码一区二区| 精品国产乱码久久久久久婷婷| 亚洲第一在线播放| 欧美在线一区二区三区| 亚洲一线在线观看| 91精品国产91久久久久久一区二区| 国产精品欧美综合亚洲| 欧美大片日本大片免费观看| 欧美自拍第一页| 亚洲另类图片色| 亚洲1卡2卡3卡4卡乱码精品| 久久精品国产99国产精品澳门 | 色综合久久88色综合天天看泰| 在线观看电影av| 欧美性视频网站| 久久精品国产精品亚洲毛片| 亚洲一区二区三区久久| 国产精品15p| 日韩欧美视频第二区| 伊人成综合网| 午夜精品久久久久久久无码 | 国产精品va在线播放| 999精品嫩草久久久久久99| 国产精品v欧美精品∨日韩| 日韩精选在线| 一级做a爰片久久| 激情欧美一区| 国产aaaaa毛片| 国产白丝精品91爽爽久久| 国产亚洲无码精品| 中文字幕中文字幕一区| 久久露脸国语精品国产91| 精品视频一区二区不卡| 刘亦菲毛片一区二区三区| 亚洲图片在区色| 欧洲中文在线| 国产精品亚洲激情| 国产精品对白久久久久粗| 亚洲国产婷婷香蕉久久久久久99| 欧美日韩 国产精品| 人妻无码视频一区二区三区| 国产精品一区二区无线| 女人又爽又黄免费女仆| 亚洲免费观看视频| 好吊妞视频一区二区三区| 91精品国产品国语在线不卡| 亚洲 欧美 自拍偷拍| 欧美成人一区在线| 九色成人搞黄网站| 国产在线视频欧美一区二区三区| 久久精品高清| 欧美女人性生活视频| 国产精品1区2区3区| www亚洲色图| 欧美视频免费在线观看| 精品国产99久久久久久宅男i| 亚洲视频在线免费看| av中文字幕在线观看第一页| 成人在线观看视频网站| 欧美猛男男男激情videos| 精品人妻人人做人人爽| 捆绑变态av一区二区三区| 成人h动漫精品一区| 亚洲一区二区在线免费看| 亚洲在线精品视频| 亚洲网站视频福利| 欧美freesex黑人又粗又大| 99久久久精品免费观看国产| 日韩在线不卡| 男女无套免费视频网站动漫| 99re热视频精品| 国产精品二区一区二区aⅴ| 日韩一级片在线播放| 毛片在线播放a| 国产精品视频专区| 精品日韩一区| 国产精品99久久免费黑人人妻| av激情综合网| 亚洲国产精品成人无久久精品| 精品美女被调教视频大全网站| 91一区二区三区在线| 92看片淫黄大片看国产片| 久久久久久美女精品| www.色欧美| 综合网在线视频| 国产美女三级无套内谢| 久久九九国产精品怡红院 | 日本高清一二三区| 欧美日韩大陆一区二区| 日韩欧美小视频| 国产一区私人高清影院| 日韩亚洲一区在线| 成年网站免费在线观看| 中文字幕亚洲区| 国产又粗又猛又色又| 久久久av一区| 午夜久久av| 丁香六月激情网| 99视频有精品| 日本高清不卡码| 自拍偷拍免费精品| 在线观看欧美| 久久福利一区二区| 不卡的看片网站| 黄色av一级片| 中文字幕亚洲无线码a| 人人玩人人添人人澡欧美| 国产精品12p| 粉嫩av亚洲一区二区图片| 日本一级淫片免费放| 亚洲欧美精品中文字幕在线| 免费成人毛片| www.18av.com| k8久久久一区二区三区| 亚洲av无码不卡| 日韩视频中文字幕| 日韩精品一级| 久久无码高潮喷水| 欧美激情一区二区三区| 精品国产999久久久免费| 38少妇精品导航| 日韩欧美视频专区| 一级全黄裸体片| 色婷婷久久久久swag精品| 黄色片网站在线观看| 精品免费日产一区一区三区免费| 丝袜脚交一区二区| www.99re7| 亚洲精品视频中文字幕| 成人97精品毛片免费看| 成人在线播放网址| 国产亚洲一区二区在线观看| 国产欧美综合视频| 热久久这里只有精品| 婷婷综合视频| 亚洲一区二区三区无码久久| 欧美视频日韩视频在线观看| 男女羞羞视频在线观看| 日韩精品久久久毛片一区二区| 国产精品自拍一区| 五月婷婷六月婷婷| 欧美激情精品久久久久久久变态| 国内精品久久久久久久影视简单| 野花视频免费在线观看| 欧美亚洲动漫精品|