Java 線程池詳解,圖文并茂,我看還有誰不會?!
兄弟們,今天咱們來聊聊 Java 并發編程里的 “硬核裝備”—— 線程池。想象一下,你開了一家工廠,訂單像雪花一樣飛來。要是每次來訂單都現招工人,招完人又解雇,這成本得多高?線程池就是干這個的:提前招一批工人(線程),訂單(任務)來了直接分配,工人空閑了也不趕走,隨時待命。這就是線程池的核心思想 ——復用線程資源,降低開銷。
線程池的好處可不止省錢。比如你是個外賣平臺,用戶下單就是任務,線程池里的線程就是騎手。訂單一來,馬上有騎手接單,不用現招人,這響應速度颼颼的。而且你還能統一管理這些騎手,比如高峰期多派點人,低谷期讓他們休息,這就是線程池的可管理性。
一、ThreadPoolExecutor:線程池的 “心臟”
Java 里線程池的核心實現類是ThreadPoolExecutor,它就像工廠的 “大管家”,負責調度工人、管理訂單。咱們先看看它的構造方法,這可是理解線程池的鑰匙:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)1. 核心參數:工廠的 “招工規則”
- corePoolSize(核心線程數):工廠里的 “正式工”,就算沒訂單也不會被解雇。比如設置為 5,就算現在沒任務,這 5 個工人也會留在廠里喝茶聊天(保持存活)。
- maximumPoolSize(最大線程數):工廠最多能招多少工人,包括正式工和臨時工。比如設置為 10,當訂單太多,正式工和臨時工加起來最多 10 個。
- keepAliveTime(空閑存活時間):臨時工空閑多久會被解雇。比如設置為 30 秒,臨時工要是 30 秒沒活干,就會被趕走。
- workQueue(任務隊列):訂單堆積的倉庫。常用的有ArrayBlockingQueue(有界隊列)和LinkedBlockingQueue(無界隊列)。
- threadFactory(線程工廠):生產工人的 “流水線”,可以自定義線程的名字、優先級等。
- handler(拒絕策略):當訂單太多,倉庫滿了,工人也招滿了,新來的訂單怎么辦?這就是拒絕策略要解決的問題。
2. 工作流程:訂單是如何被處理的?
當一個任務提交到線程池,流程大概是這樣的:
- 判斷當前線程數是否小于 corePoolSize:如果是,不管有沒有空閑線程,直接創建新線程執行任務。這就像工廠剛開張,就算有正式工閑著,老板也會再招一個臨時工來干活(可能是因為老板有錢任性?)。
- 如果線程數 >= corePoolSize,任務進入隊列:這時候訂單會被放進倉庫,等待工人來取。
- 如果隊列滿了,判斷線程數是否小于 maximumPoolSize:如果是,創建臨時工處理任務。
- 如果線程數 >= maximumPoolSize,執行拒絕策略:這時候訂單太多,工廠實在忙不過來,只能拒絕。
舉個栗子:假設 corePoolSize=5,maximumPoolSize=10,workQueue 容量 = 100。當有 1000 個任務進來時,前 5 個任務會直接創建正式工執行;接下來的 95 個任務進入隊列;當隊列滿了,再創建 5 個臨時工(總線程數到 10);如果還有任務進來,就會觸發拒絕策略。
3. 拒絕策略:訂單太多怎么辦?
JDK 自帶了四種拒絕策略:
- AbortPolicy(直接拒絕):拋出異常,就像工廠直接對客戶說:“沒工人了,訂單不接了!”
- CallerRunsPolicy(調用者處理):讓提交任務的線程自己執行,相當于老板親自上陣干活。
- DiscardPolicy(默默丟棄):直接扔掉任務,就當沒這回事。
- DiscardOldestPolicy(丟棄最老的任務):把倉庫里最老的訂單扔掉,然后處理新訂單。
4. 線程池狀態:工廠的 “營業狀態”
線程池有五種狀態:
- RUNNING:正常營業,可以接收新訂單,處理隊列里的訂單。
- SHUTDOWN:不再接收新訂單,但會處理完隊列里的訂單。
- STOP:不再接收新訂單,也不處理隊列里的訂單,直接中斷正在執行的任務。
- TIDYING:所有任務都處理完了,線程數為 0,準備進入 TERMINATED 狀態。
- TERMINATED:線程池徹底關閉。
二、線程池的 “七十二變”:常見類型
Java 通過Executors工具類提供了幾種常見的線程池類型,不過要注意,生產環境不建議直接使用 Executors,因為它可能會導致內存溢出或線程數無限增長。但咱們可以了解一下它們的原理。
1. FixedThreadPool:固定大小的 “正規軍”
ExecutorService fixedPool = Executors.newFixedThreadPool(5);- 核心線程數和最大線程數相同,都是 5,沒有臨時工。
- 使用LinkedBlockingQueue(無界隊列),訂單可能會無限堆積,導致內存溢出。
- 適用場景:任務量穩定,需要控制線程數的場景。
2. CachedThreadPool:靈活的 “臨時工大隊”
ExecutorService cachedPool = Executors.newCachedThreadPool();- 核心線程數為 0,最大線程數為Integer.MAX_VALUE,相當于無限招臨時工。
- 使用SynchronousQueue(直接提交隊列),每個任務都需要等待工人來接。
- 臨時工空閑 60 秒會被解雇。
- 適用場景:任務短、數量多的場景,比如處理 HTTP 請求。
3. SingleThreadExecutor:孤獨的 “獨行俠”
ExecutorService singlePool = Executors.newSingleThreadExecutor();- 只有一個線程,任務按順序執行。
- 使用LinkedBlockingQueue,同樣有內存溢出風險。
- 適用場景:需要保證任務順序執行的場景,比如數據庫寫入。
4. ScheduledThreadPool:準時的 “鬧鐘”
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);- 可以定時或周期性執行任務。
- 使用DelayedWorkQueue實現定時功能。
- 適用場景:定時任務,比如每天凌晨備份數據。
三、源碼探秘:線程池是如何 “運轉” 的?
1. execute () 方法:任務提交的入口
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 步驟1:如果線程數 < corePoolSize,創建核心線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 步驟2:任務進入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 步驟3:創建非核心線程
else if (!addWorker(command, false))
reject(command);
}2. addWorker () 方法:創建工人
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查線程池狀態,如果不是RUNNING,并且不允許創建線程,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新獲取ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 檢查線程是否已經啟動
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}3. Worker 類:工人的 “化身”
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 禁止中斷,直到runWorker被調用
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 省略其他方法
}4. runWorker () 方法:工人的 “干活流程”
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允許中斷
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 檢查線程池狀態,如果需要中斷,并且線程未中斷,就中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
thrownew Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}5. getTask () 方法:從隊列中取任務
private Runnable getTask() {
boolean timedOut = false; // 上次poll是否超時
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池狀態是SHUTDOWN或以上,并且隊列為空,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
returnnull;
}
int wc = workerCountOf(c);
// 是否需要超時控制(當線程數超過corePoolSize時)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
returnnull;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}四、最佳實踐:如何優雅地使用線程池?
1. 自定義線程池:別再用 Executors 了!
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
30, TimeUnit.SECONDS, // keepAliveTime
new ArrayBlockingQueue<>(100), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new ThreadPoolExecutor.CallerRunsPolicy() // handler
);2. 參數調優:找到工廠的 “最佳配置”
- CPU 密集型任務:核心線程數 = CPU 核心數 + 1。比如 4 核 CPU,設置為 5。
- IO 密集型任務:核心線程數 = CPU 核心數 × 2。比如 4 核 CPU,設置為 8。
- 隊列選擇:
快速響應:SynchronousQueue + 較大的 maximumPoolSize。
流量削峰:LinkedBlockingQueue + 合理的隊列容量。
優先級調度:PriorityBlockingQueue。
3. 異常處理:別讓工人 “罷工”
- 任務內部捕獲異常:
executor.submit(() -> {
try {
// 執行任務
} catch (Exception e) {
// 處理異常
}
});- 自定義線程工廠:
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, e) -> {
System.err.println("線程異常:" + e.getMessage());
});
return t;
};4. 監控與關閉:工廠的 “健康檢查”
- 監控指標:
活躍線程數:executor.getActiveCount()
隊列任務數:executor.getQueue().size()
已完成任務數:executor.getCompletedTaskCount()
- 優雅關閉:
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}五、常見問題:這些坑你踩過嗎?
1. 內存溢出:無界隊列的 “陷阱”
// 錯誤示范:使用無界隊列LinkedBlockingQueue
ExecutorService pool = Executors.newFixedThreadPool(5);
// 正確示范:使用有界隊列
ExecutorService pool = new ThreadPoolExecutor(
5, 10, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);2. 線程泄漏:忘記關閉線程池
// 錯誤示范:用完不關閉
ExecutorService pool = Executors.newFixedThreadPool(5);
pool.submit(() -> System.out.println("Hello"));
// 正確示范:優雅關閉
ExecutorService pool = Executors.newFixedThreadPool(5);
pool.submit(() -> System.out.println("Hello"));
pool.shutdown();3. 拒絕策略不當:訂單被默默丟棄
// 錯誤示范:使用DiscardPolicy
ExecutorService pool = new ThreadPoolExecutor(
1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardPolicy()
);
// 正確示范:使用CallerRunsPolicy
ExecutorService pool = new ThreadPoolExecutor(
1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy()
);4. 阻塞任務:工人被 “卡住”
// 錯誤示范:任務中包含阻塞操作
ExecutorService pool = Executors.newFixedThreadPool(5);
pool.submit(() -> {
Thread.sleep(1000); // 模擬阻塞
});
// 正確示范:優化任務邏輯或增加線程數
ExecutorService pool = new ThreadPoolExecutor(
5, 10, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);六、總結:線程池的 “終極秘籍”
- 核心參數要理解:corePoolSize、maximumPoolSize、workQueue 是關鍵。
- 拒絕策略要選對:根據業務需求選擇合適的策略。
- 避免使用 Executors:生產環境用自定義線程池。
- 監控與關閉不能少:確保線程池健康運行。
- 異常處理要完善:避免線程泄漏和程序崩潰。























