淺談線程服務的停止技巧
我們大部分情況下并發任務都是交由線程池統一管理,所以對于線程池的關閉就涉及任務的終止和資源兜底,所以本文將針對線程池優雅關閉這一話題展開探討。

一、阻塞式中斷的哲學
1. 線程停止準則
對于線程的生命周期的管理,按照并發的哲學:
除非擁有某個線程,否則不能對該線程進行操控
所以只有線程所屬的線程池才具備對其生命周期的管理,所以在Java應用程序的維度,它是不具備直接管理線程池線程的權限,即非所屬線程池維度的線程關閉是需要通過服務于線程池生命周期方法間接關閉線程:

例如應用程序關閉時,對應服務ExecutorService這個服務的關閉可直接通過shutdown或者shutdownNow中斷所有的線程:
//等待所有任務結束后關閉
threadPool.shutdown();
//即刻關閉所有的任務,返回已提交但是還未開始的任務
threadPool.shutdownNow();這里也補充一句shutdown和shutdownNow的區別:
- shutdown會等待所有任務執行完成再關閉,所以關閉的響應可能會有些許延遲。
- shutdownNow會直接強行關閉執行任務,同時將未啟動的任務返回。
2. 強制關閉線程池的弊端
如果采用強制關閉的方式將線程池直接關閉,就可能導致一些資源未能及時處理而丟失,例如:我們的有一個日志線程,它會不斷輪詢外部線程投遞到阻塞隊列的信息并將其寫入磁盤。如下圖,可以看到如果日志線程在隊列未消費完過程中直接強制打斷,就會導致一些數據未能及時消費而丟失:

對應的我們給出這個日志工具的示例,可以看到筆者的所編寫的Logwritter,它可以通過構造方法日志隊列和工作線程寫入日志的文件路徑logPath,使得logThread可以通過阻塞輪詢隊列完成日志異步寫入:
public class Logwritter {
private final BlockingQueue<String> queue;
private final LogThread logThread;
//基于外部入參完成消費者線程初始化
public Logwritter(String logPath) {
this.queue = new ArrayBlockingQueue<>(100);
this.logThread = new LogThread(logPath, queue);
}
//啟動消費者線程
public void start() {
logThread.start();
}
//外部線程可通過log方法將消息存入隊列中,讓logThread寫入本地
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
//......
}對應的我們也給出日志線程的代碼,可以看到在interrupted沒有被設置為true之前這段該線程就會不斷輪詢隊列數據:
private static class LogThread extends Thread {
private final BlockingQueue<String> queue;
private final BufferedOutputStream outputStream;
//基于外部入參初始化日志寫入路徑和消費日志消息的阻塞隊列
public LogThread(String logPath, BlockingQueue<String> queue) {
this.outputStream = FileUtil.getOutputStream(logPath);
this.queue = queue;
}
private boolean interrupted = false;
//中斷當前線程
public void interrupt() {
this.interrupted = true;
}
public void run() {
try {
//標識非中斷則繼續阻塞獲取日志數據
while (!interrupted) {
Console.log("阻塞獲取日志......");
outputStream.write(queue.take().getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
//......
}
}
}
}
}對應的我們給出基礎調測代碼:
Logwritter logwritter = new Logwritter("/tmp/logwritter.log");
logwritter.start();
logwritter.log("hello");
logwritter.log("hello");
new Thread(() -> {
logwritter.interrupt();
}).start();輸出結果如下,可以看到代碼理想情況下會因為標識的設置而中斷:
阻塞獲取日志......
線程中斷......所以這段代碼存在一個很嚴重的缺陷,試想一下如果阻塞隊列沒有元素的情況下,我們嘗試打斷日志線程,此時日志線程就會因為阻塞等待隊列元素而無法輪詢查看中斷標識,進而處于長時間阻塞等待的一種狀態:

如下代碼所示,我們不添加任何元素的情況下直接異步打斷線程:
Logwritter logwritter = new Logwritter("/tmp/logwritter.log");
logwritter.start();
new Thread(() -> {
logwritter.interrupt();
}).start();從輸出結果就可以看出,此時代碼就處于一個阻塞狀態,必須等到獲取完一個元素后才能中斷循環:
阻塞獲取日志......3. 基于原子化標識優雅關閉
所以,要想解決上述問題,我們必須做到以下幾點:
- 中斷日志線程時,其他線程再次調用log寫入日志時會告知日志線程已停止,當前線程日志寫入失敗
- 日志線程收到中斷信號會確保將需要寫入的日志寫入到文件后再中斷停止
- 中斷要盡可能的讓日志線程感知,避免阻塞等待下一個元素到來時才檢查標識完成中斷

基于這種思路,筆者給出優化后的代碼,我們先從頂層的Logwritter開始,可以看到筆者將中斷操作做了如下改動:
- 聲明一個中斷標識和記錄阻塞隊列容量的變量remaining
- 調用log寫入日志前,檢查是否中斷,若沒中斷則累加計數并寫入日志到隊列,反之直接返回
- 中斷時設置中斷標識,并調用logThread中斷方法讓其中斷
private final BlockingQueue<String> queue;
private final BlockingQueue<String> queue;
private final LogThread logThread;
private boolean interrupted = false;
private int remaining;
//基于外部入參完成消費者線程初始化
public Logwritter(String logPath, int size) {
this.queue = new ArrayBlockingQueue<>(size);
this.logThread = new LogThread(logPath, queue);
}
//啟動消費者線程
public void start() {
logThread.start();
}
//外部線程可通過log方法將消息存入隊列中,讓logThread寫入本地
public void log(String msg) throws InterruptedException {
synchronized (this) {//上實例鎖檢查中斷,若中斷則輸出日志直接返回,反之累加remaining
if (interrupted) {
Console.log("日志線程已中斷,消息:{}無法寫入", msg);
return;
}
remaining++;
}
//將阻塞操作放在鎖外部,避免因為隊列阻塞等待導致所有線程鎖住
queue.put(msg);
Console.log("寫入消息成功,消息:{}", msg);
}
//中斷當前線程
public void interrupt() {
synchronized (this) {
interrupted = true;
}
logThread.interrupt();
}
//......
}重點來了,LogThread邏輯調整為,收到中斷信號后,基于interrupted保留中斷狀態,繼續基于remaining數值完成剩余日志寫入:
private class LogThread extends Thread {
private final BlockingQueue<String> queue;
private final BufferedOutputStream outputStream;
//基于外部入參初始化日志寫入路徑和消費日志消息的阻塞隊列
public LogThread(String logPath, BlockingQueue<String> queue) {
this.outputStream = FileUtil.getOutputStream(logPath);
this.queue = queue;
}
public void run() {
try {
//線程未中斷或者remaining不為0則繼續執行循環,知道被中斷且remaining為0時退出
while (!interrupted || remaining != 0) {
try {
String msg = queue.take();
outputStream.write((msg + "\r\n").getBytes(StandardCharsets.UTF_8));
Console.log("寫入日志{}成功", msg);
remaining--;
} catch (InterruptedException e) {//收到中斷后,保存中斷狀態,繼續完成隊列中元素消費后退出
Console.log("線程中斷......");
interrupted = true;
}
}
Console.log("線程處理結束");
} catch (IOException e) {
//處理io異常
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
//......
}
}
}
}
}最后我們給出測試代碼,大體邏輯:
- 是啟動日志線程后阻塞等待
- 生產者投遞日志
- 在日志線程消費者將其打斷,查看日志線程是否會在收到中斷后完成日志消費再退出
Logwritter logwritter = new Logwritter("/tmp/logwritter.log", 100);
//啟動日志線程,阻塞等待小肥
logwritter.start();
//寫入日志
for (int i = 0; i < 10; i++) {
logwritter.log("msg" + i);
}
//中斷
new Thread(() -> logwritter.interrupt()).start();這里筆者基于IDEA的線程模式調試出這段邏輯,對應的輸出結果如下,可以看到即使收到中斷信號,線程也會將隊列中的消息消費完成再退出循環:
寫入消息成功,消息:msg0
寫入消息成功,消息:msg1
寫入消息成功,消息:msg2
寫入消息成功,消息:msg3
寫入消息成功,消息:msg4
寫入消息成功,消息:msg5
寫入消息成功,消息:msg6
寫入消息成功,消息:msg7
寫入消息成功,消息:msg8
寫入消息成功,消息:msg9
寫入日志msg0成功
線程中斷......
寫入日志msg1成功
寫入日志msg2成功
寫入日志msg3成功
寫入日志msg4成功
寫入日志msg5成功
寫入日志msg6成功
寫入日志msg7成功
寫入日志msg8成功
寫入日志msg9成功
線程處理結束4. 無界隊列與毒丸消費
對于傳統的生產者消費者模式,面對不會阻塞的無界隊列,我們完全可以使用毒丸(poison pill)即特定的元素作為中斷標識,確保生產者可以在適當的時機將其放在隊列上,當消費者消費到這個對象時立即退出:

對應的我們給出毒丸的定義,因為筆者演示的阻塞隊列是字符串類型所以協定好的毒丸就是字符串對象:
//協定好的結束標識
public static final String POISON_PILL = "POISON_PILL";消費者的代碼邏輯也很簡單,直接輪詢讀取隊列數據,如果碰到的元素是毒丸則直接退出循環:
public class Consumer implements Runnable {
private final BlockingQueue<String> queue;
private final Thread thread;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
thread = new Thread(this);
}
public void start() {
Console.log("消費者啟動");
thread.start();
}
@Override
public void run() {
while (true) {
try {
//利用毒丸感知異常中斷退出
String element = queue.take();
if (element.equals(POISON_PILL)) {
Console.log("消費到毒丸,消費者立即停止");
break;
}
Console.log("消費元素{}成功", element);
} catch (InterruptedException e) {
//......
}
}
}
}測試代碼如下,可以看到筆者嘗試插入100w個元素,并利用另外一個線程隨機插入毒丸將生產者停止:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
Consumer consumer = new Consumer(queue);
consumer.start();
new Thread(() -> {
try {
//隨機插入毒丸
ThreadUtil.sleep(RandomUtil.randomInt(5000));
queue.put(POISON_PILL);
} catch (InterruptedException e) {
//......
}
}).start();
IntStream.range(0, 100_0000).forEach(i -> {//輪詢插入100w個元素
try {
queue.offer(String.valueOf(i), 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//.....
}
});可以看到,在消費了84w左右的元素時,消費者看到毒丸立即停止退出了:

毒丸在已知的生產者消費者模式下,可以注入有限的毒丸標識中斷線程,所以使用這種方式控制線程就必須保證消費者數目已知,可以通過聲明有限的毒丸停止線程。
5. 用后即焚的線程池
對于只需要使用一次,注意筆者所強調的只需要使用到一次的異步線程池,我們可以直接通過juc流程控制工具CountDownLatch確保線程池中所有任務完成后銷毀線程池,將線程池限制在當前函數的生命周期。
例如我們現在希望執行一批數據的乘2運算,我們希望并行執行這批數據的運算再累加起來,此時我們就可以遍歷這批數據并將其提交到線程池中完成計算并累加,然后使用shutdown銷毀線程池:

對應的我們給出一次性線程池示例和演示代碼:
public static void main(String[] args) throws InterruptedException {
Console.log("并發計算和:{}", calculateInParallel(4));//并發計算和:20
}
/**
* 從1開始遍歷入參閉集,并提交到線程池中執行*2運算,累加返回
*
* @param rangeClosed
* @return 并行運算的最終結果
*/
public static int calculateInParallel(int rangeClosed) {
LongAdder adder = new LongAdder();
CountDownLatch countDownLatch = new CountDownLatch(rangeClosed);
//聲明閉集數一致的線程池
ExecutorService executorService = Executors.newFixedThreadPool(rangeClosed);
for (int i = 1; i <= rangeClosed; i++) {//并行運算每個數值的雙數倍并利用原子類累加
int num = i;
executorService.execute(() -> {
adder.add(num << 1);
countDownLatch.countDown();
});
}
try {
//等待所有線程執行完成
countDownLatch.await();
} catch (InterruptedException e) {
//......
}
//用后即焚一次性線程池
executorService.shutdown();
return adder.intValue();
}最后筆者要特別說明一下,這個方案有一定的局限性,使用時必須保線程池對應的函數僅被使用少次,如果單位時間內并發調用這個函數盡可能導致獨立線程池飆升,進而打爆內存:

對于此類線程池管理的使用案例,感興趣的讀者可以關注筆者這篇文章:《Java 線程池知識點小結》
二、利用實現shutdownNow線程中斷與取消可監控
1. 設計與實現思路
從微觀的角度了解了關于線程池中的線程的優雅關閉幾種技巧之后,我們再來聊聊線程池維度對于取消和中斷任務的監控。
上文我們了解到shutdown是優雅關閉,確保所有的任務都執行完成之后銷毀線程池。而shutdownNow是一種能夠實時關閉正在執行任務,同時還能夠取消還未執行任務并返回的函數。所以,如果對于實時性要求較高的場景,我們更推薦使用shutdownNow。
但shutdownNow也存在一定的局限性,即它只能知曉那些數取消的任務,卻不知道那些是中斷的任務,所以shutdownNow對于需要監控異或者恢復中斷的任務的場景就有些力不從心了。
對此,我們可以自行繼承線程池框架,并對shutdownNow進行改造,大體思路為:
- 調用shutdown關閉線程池時,內部調用shutdownNow獲取已提交未執行的任務,保存到任務取消列表。
- shutdownNow會線程池會將正在執行的任務中斷,利用這個中斷判斷當前線程池狀態是否被設置為關閉且當前線程狀態是否中斷,如果則將其存入中斷列表。

對應的我們給出落地代碼,整體實現思路如下:
- 自定義線程池繼承AbstractExecutorService獲取線程池基本行為函數
- 聲明取消隊列cancelledTaskList和中斷隊列interruptedTaskList
- 實現stop方法,內部調用shutdownNow將已提交未執行的取消任務存入取消隊列cancelledTaskList
- execute函數重寫,將外部任務提交到我們內部聚合的線程池中,并保證線程池關閉且當前線程執行被中斷的情況下,將該任務存入中斷隊列
public class ResumableThreadPoolExecutor extends AbstractExecutorService {
/**
* 記錄已提交但未執行就被取消的任務
*/
private final List<Runnable> cancelledTaskList = new ArrayList<>();
/**
* 記錄正在執行然后被中斷的任務
*/
private final List<Runnable> interruptedTaskList = new ArrayList<>();
private ExecutorService executor;
public ResumableThreadPoolExecutor(int size) {
executor = Executors.newFixedThreadPool(size);
}
@Override
public void execute(Runnable command) {
executor.execute(() -> {
try {
Console.log("{}執行任務", Thread.currentThread().getName());
command.run();
} finally {
if (isShutdown() &&
Thread.currentThread().isInterrupted()) {//將線程池關閉后中斷的任務存入中斷隊列
interruptedTaskList.add(command);
}
}
});
}
public List<Runnable> getCancelledTaskList() {
if (!executor.isTerminated()) {
throw new RuntimeException("線程池未關閉");
}
//安全發布取消隊列,避免對內部取消列表的不安全修改
return new ArrayList<>(cancelledTaskList);
}
public List<Runnable> getInterruptedTaskList() {
//安全發布取消隊列,避免對內部中斷列表的不安全修改
return new ArrayList<>(interruptedTaskList);
}
@Override
public void shutdown() {
executor.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}
public void stop() {
cancelledTaskList.addAll(executor.shutdownNow());
//help gc
executor = null;
}
//......
}測試代碼如下,因為筆者聲明的線程池數為1,所以關閉線程池之后所得到的中斷任務和取消任務數都為1:
ResumableThreadPoolExecutor threadPool = new ResumableThreadPoolExecutor(1);
threadPool.execute(() -> {
try {
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
Console.log("task-0被中斷,保留中斷狀態");
//保留中斷狀態,避免catch后中斷狀態被清除,進而導致中斷任務無法存入中斷隊列
Thread.currentThread().interrupt();
}
});
threadPool.execute(() -> {
ThreadUtil.sleep(1, TimeUnit.DAYS);
});
threadPool.stop();
threadPool.awaitTermination(5, TimeUnit.SECONDS);
Console.log("中斷的任務數:{}", threadPool.getInterruptedTaskList().size());
Console.log("取消的任務數:{}", threadPool.getCancelledTaskList().size());2. 功能落地注意事項
這段代碼邏輯比較簡單,唯一需要注意的是task-0對于終端狀態的保留,默認情況下shutdown或者shutdownNow關閉線程池時正在執行的線程就會被中斷,對應的我們可以查看ThreadPoolExecutor的shutdownNow方法印證:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//......
//中斷正在運行的線程
interruptWorkers();
tasks = drainQueue();
} finally {
//......
}
//......
return tasks;
}重點來了,被中斷的線程一旦被catch塊捕獲,對應的中斷狀態就會被清除,如果我們不保留這個狀態的話,那么這個被中斷的任務就會b因為狀態被清除而導致無法被存入中斷隊列,這也就是為什么筆者的測試代碼在捕獲到中斷之后又手動處理執行一下中斷,就是為了保證execute的finally語句塊能夠感知到線程中斷狀態保證任務能夠正確的被存入中斷隊列:

3. 使用注意事項
該線程雖然保證線程中斷與取消狀態保留,但讀者在基于該線程池恢復啟動任務時還是需要注意一下任務處理的冪等性,因為線程池僅僅保留的中斷的狀態,對于任務的狀態并沒有做相應的處理。
三、小結
我們來簡單概括一下本文的內容:
- 線程池的幾種關閉方式
- 線程池關閉的幾個準則和實踐
- 一次性線程池的使用技巧
- 如何實現狀態可監控的線程池
- 線程池中斷與恢復的注意事項





















