并發編程中的等待通知模型
為避免輪詢條件為真的開銷,并發編程中常用等待通知模型來優化這一點,而本文將針對等待通知模型這一知識點進行深入剖析,希望對你有所啟發。

一、同步鎖下的等待通知模型
1. 狀態依賴性的管理
在經典的生產者和消費者模式中,我們經常用到ArrayBlockingQueue作為并發安全的有界緩存,而該有界緩解進行讀寫操作時都必須嚴格按照如下兩個條件謂語時機執行,即:
- 針對阻塞隊列進行元素存操作時,有界緩存必須有空閑空間,即可非滿
- 針對阻塞隊列進行取操作時,有界隊列必須有元素,即非空
基于上述的說法,我們基于同步鎖synchronized實現了一個數組形式的環形阻塞隊列的核心方法模板,大體思路為:
- 當進行元素存操作時,互斥調用doPut函數,判斷是否到達數組末端,若到達則直接將元素存到索引0,并累加count
- 進行元素取操作時,互斥上鎖執行doTake,同樣執行步驟1的邊界判斷,完成后扣減count
- 基于count判斷非空和非滿
我們的環形有界隊列是用數組實現的,所以筆者也用數組直觀的展現這個流程,當然讀者可以在邏輯上將數組首位相接,即可構成一個環形隊列:

對應的筆者也給出這個環形隊列的抽象模板,核心函數思路和上述基本一致,讀者可結合圖文注釋了解大體流程,后文將基于該模板落地一個支持阻塞等待空閑通知線程存取元素的緩存隊列:
public abstract class BaseBoundedBuffer<V> {
private final V[] items;
private int head;
private int tail;
private int count;
/**
* 初始化環形有界隊列
*
* @param capacity 容量
*/
protected BaseBoundedBuffer(int capacity) {
items = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) throws InterruptedException {
//尾節點添加元素
items[tail] = v;
//如果到達數組末端,則重新從0開始
if (++tail == items.length) {
tail = 0;
}
//累加元素個數
count++;
}
protected synchronized final V doTake() throws InterruptedException {
//頭節點取元素
V v = items[head];
//頭節點置空實現刪除
items[head] = null;
if (++head == items.length) {//如果到達邊界,則循環從0開始
head = 0;
}
//減元素個數
count--;
return v;
}
public synchronized final boolean isFull() {
return count == items.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}2. 基于異常式的隊列模型
我們先來看看第一個有界緩存的基本實現,一旦觸發如下兩個條件時,該緩存就會拋出異常:
- 獲取元素時隊列空
- 插入元素時隊列滿
對應落地代碼如下,直接繼承有界隊列后落地落采用異常通知方式實現元素存取的緩存隊列:
public class GrumpyBoundedBuffer extends BaseBoundedBuffer<Integer> {
protected GrumpyBoundedBuffer(int capacity) {
super(capacity);
}
public synchronized void put(int value) throws Exception {
//隊列滿了,直接拋出異常
if (isFull()) {
throw new RuntimeException("queue is full");
}
//隊列沒滿,正常入隊
doPut(value);
}
public synchronized int take() throws Exception {
//隊列為空,直接拋出異常
if (isEmpty()) {
throw new RuntimeException("queue is empty");
}
//隊列不為空,正常出隊
return doTake();
}
}雖然這種方式使得緩存在實現非常的簡單,但是這種方案對于使用者來說非常的不友好,在業務正常的情況下,即使存取消費的緩存在單位時間滿即直接拋出異常告知線程不可存取,讓使用者手動捕獲異常進行重試:
public static void main(String[] args) {
GrumpyBoundedBuffer grumpyBoundedBuffer = new GrumpyBoundedBuffer(1);
ThreadUtil.execAsync(() -> {
while (true) {
try {
grumpyBoundedBuffer.put(1);
} catch (Exception e) {
Console.error("隊列已滿,1s后重試");
ThreadUtil.sleep(1000);
}
}
});
}輸出結果如下所示,非常的不方便:

3. 輪詢檢測式的等待喚醒
于是我們就考慮在隊列存儲上在一個重試的的機制,即當隊列存取失敗時,進行休眠重試,直到成功后返回。
但是對于程序的性能表現而言,也是一種災難,這種做法設計釋放鎖之后的休眠和循環重試,這就使得設計者需要在CPU使用率和響應性之間做好權衡:
- 如果設置休眠時間相對短,那么重試就會盡可能快,響應性就會越高,但是循環帶來的CPU資源的開銷卻急劇增加。
- 如果休眠時間設置過長,有概率完成任務處理,但是卻來響應的延遲。
public class SleepyBoundedBuffer extends BaseBoundedBuffer<Integer> {
protected SleepyBoundedBuffer(int capacity) {
super(capacity);
}
/**
* 輪詢重試,直到成功
*
* @param value
* @throws InterruptedException
*/
public synchronized void put(int value) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(value);
}
}
Console.log("隊列已滿,500ms后重試");
ThreadUtil.sleep(500);
}
}
public synchronized int take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
Console.log("隊列已空,500ms后重試");
ThreadUtil.sleep(500);
}
}
}這種方案一定程度解決用戶手動捕獲異常重試的繁瑣,但也存在著如下缺點:
- 重試時休眠間隔500ms可能太長也可能太短,固定值等待非常不合理
- 頻繁循環重試使得線程大量時間得到CPU時間片做一些無用功
- 重試多次無果后無法中斷
4. 基于條件等待的有界緩存
所以我們需要進行進一步的優化即通過如下兩個列條件謂語避免線程無用的輪詢開銷:
- 當隊列滿的時候,當前存線程阻塞等待,直到隊列非空時被喚醒
- 當隊列空的時候,取線程阻塞等待,知道隊列有元素時將其喚醒
總結起來就是一句話,非滿的時候喚醒存線程嘗試存元素,非空的時候通知取線程取元素,由此得出如下兩個條件謂語isNotFull和isNotEmpty:

所以我們需要以object中對應的wait、notify和notifyAll構成內部條件隊列的交互通知,當然要調用這些通知方法的前提也就是需要獲取當前這個對象的鎖。
以我們有界緩存存元素操作為例,我們執行添加操作時執行步驟為:
- 獲得這個對象的鎖
- 當發現緩存空間已滿即不符合檢測條件時,則調用當前對象(有界緩存)的wait方法將當前線程掛起
- 與此同時,線程也會釋放這把鎖,等待隊列非滿時通過notify或者notifyAll嘗試將當前線程喚醒。
對應我們給出代碼示例,這種方式相比于休眠的方案,改進了響應的效率和CPU使用率的開銷,避免了非必要的檢測步驟:
public class BoundedBuffer extends BaseBoundedBuffer<Integer> {
protected BoundedBuffer(int capacity) {
super(capacity);
}
public synchronized void put(int value) throws InterruptedException {
if (isFull()) {
Console.log("隊列已滿,等待");
wait();
}
Console.log("隊列非滿,開始寫入");
doPut(value);
//通知阻塞線程消費
notifyAll();
}
public synchronized int take() throws InterruptedException {
if (isEmpty()) {
Console.log("隊列已空,等待");
wait();
}
int value = doTake();
//通知阻塞線程寫入
notifyAll();
return value;
}
}對應的筆者以線程調試模式給出下面這段代碼,在首先讓線程1執行兩次寫操作,查看是否在第二次阻塞是否會在消費者線程消費后存入,所以筆者也會在兩個線程執行完畢后,判斷隊列非空來查看是否實現這一點:
//創建一個容量為1的緩沖區
BoundedBuffer boundedBuffer = new BoundedBuffer(1);
CountDownLatch countDownLatch = new CountDownLatch(2);
//啟動寫入線程第一次寫入成功,第二次寫入阻塞,直到消費者線程完成消費
new Thread(() -> {
try {
boundedBuffer.put(1);
boundedBuffer.put(2);
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
new Thread(() -> {
try {
ThreadUtil.sleep(1000);
Console.log("take:{}", boundedBuffer.take());
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException();
}
}).start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//通過非空函數判斷線程1第二個元素是否存成功
Console.log("main線程結束:{}", boundedBuffer.isEmpty());對應輸出結果如下,可以看到第二次寫入因為隊列滿而阻塞,一旦消費者完成消費后,生產者就立刻被喚醒寫入:

二、關于條件謂詞的一些探討
1. 條件謂詞的使用方式
要想正確的使用條件隊列,就需要正確的抓住線程與條件謂語之間的關聯,保證合適的條件下當線程添加至條件隊列,并在合適的時機將其喚醒,以我們的本文一直在強調的有界隊列:
- 對于put方法來說:只有條件非滿的情況下,才能添加元素至隊列
- 對于take方法來說,只有條件非空的情況下,才能取出元素
同時,每一次wait的調用都會將調用者隱式的和條件隊列加以關聯,例如:
- 調用有界緩存的take方法時,若沒有元素,當前線程調用wait阻塞存入監視鎖底層的waitSet
- 調用有界緩存put方法時,若空間已滿,當前線程調用wait存入監視鎖底層的waitset
當然這一切的都有一個前提,即調用者已經獲取到的當前wait方法對應的對象的監視鎖,這是并發互斥中等待通知模型有序協調的一個必要條件:

2. 過早的喚醒或錯誤喚醒
對條件隊列有了基本的概念之后,我們再來更進一步的探討這套設計理念,實際上按照目前的設計來看,這套等待喚醒模型還是存在一定的缺陷,即多條件關聯單監視鎖導致的錯誤喚醒問題。
舉個例子,假設基于我們要上述的有界緩存隊列,我們打算增加一個關閉有界緩存的操作,即直接起一個線程查看shutdownFlag如果為false則掛起等待,當其他線程將shutdownFlag設置為true的時候將其喚醒,對應的我們也給出下面這樣一段代碼:
public synchronized void shutdown() {
isShuttingDown = true;
notifyAll();
}
private volatile boolean isShuttingDown = false;
public synchronized void shutdownIfInNeed() throws InterruptedException {
if (isShuttingDown == false) {
wait();
Console.log("關閉線程被喚醒");
}
//執行阻塞隊列中斷和關閉所有線程的操作
//......
}對此我們試想這樣一個情況,我們現在有一個上界為1的有界隊列,對應3個線程按如下順序執行:
- 消費者線程嘗試從有界緩存獲取元素,阻塞等待喚醒
- 停止線程發現停止標識為false,阻塞等待喚醒
- 生產者線程存入元素,隊列有新元素,調用notifyall通知消費者消費
重點來了,停止線程和消費者線程都處于當前監視鎖的等待隊列中,所以notifyall操作可能會誤喚醒停止線程將隊列消費和所有線程中斷造成系統崩潰。
除此之外處于wait的線程還可能會被錯誤的喚醒即沒有任何征兆的情況下蘇醒被CPU時間片執行,引用《java并發編程實戰中》的說法:
以 “早餐” 烤面包機烤面包完成后通知人們食用為例 , 這就好?烤?包機的線 路 連 接 有 問 題 , 有時候當?包還未烤 時 , 鈴聲 就 響起來了

對應的我們也給出這個案例的代碼:
public static void main(String[] args) {
//創建一個容量為1的緩沖區
BoundedBuffer boundedBuffer = new BoundedBuffer(1);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
try {
//線程0取元素阻塞
Console.log("take:{}", boundedBuffer.take());
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException();
}
}, "t0").start();
new Thread(() -> {
try {
//線程1查看停止信號為false阻塞
boundedBuffer.shutdownIfInNeed();
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t1").start();
new Thread(() -> {
try {
//線程2put操作隊列非空執行通知操作,導致停止線程被錯誤的喚醒
boundedBuffer.put(1);
countDownLatch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t2").start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Console.log("main線程結束:{}", boundedBuffer.size());
}輸出結果如下,可以看到在生產者生產元素后的通知動作,把關閉線程給喚醒了,這就是經典的錯誤喚醒:
隊列已空,take線程:t0等待
隊列非滿,開始寫入
關閉線程被喚醒
take線程 t0被喚醒
take:-1
main線程結束:1本質原因就是一個監視鎖中的隊列關聯多個條件,使得在多條件的等待通知場景下存在錯誤通知的情況,考慮到這一點,無論是對于put、take還是shutdown方法,我們都需要進行改進,確保:
- 生產者被喚醒后,進行必要的非滿檢查,且只有將空隊列存入元素后通知消費者
- 消費者被喚醒后,進行必要的非空檢查,只有將非空隊列消費空之后,通知生產者
- shutdown線程被喚醒后,進行必要的狀態標識檢查,只有狀態標識為true才能停止線程
改進后的代碼如下所示,可以看到筆者將if條件判斷后wait的操作改為while+wait操作確保喚醒后的再確認:
public synchronized void put(int value) throws InterruptedException {
while (isFull()) {//條件觸發時循環檢測一下
wait();
}
//空變為非空
boolean wasEmpty = isEmpty();
doPut(value);
if (wasEmpty) {//僅當空變為非空時才通知
notifyAll();
}
}
public synchronized int take() throws InterruptedException {
while (isEmpty()) {
wait();
}
//滿變為非滿才通知
boolean wasFull = isFull();
int value = doTake();
if (wasFull) {
notifyAll();
}
return value;
}
public synchronized void shutdownIfInNeed() throws InterruptedException {
while (isShuttingDown == false) {
wait();
Console.log("關閉線程被喚醒");
}
//執行阻塞隊列中斷和關閉所有線程的操作
//......
}3. notify下的信號丟失問題
我們再來說說通知的哲學,剛接觸java這門語言的時候,都會了解到notify和notifyAll的區別,這一點我們也可以直接從源碼的注釋上了解這一點,即前者僅僅通知監視鎖下的單個線程而后者則是所有線程:
1. notify:Wakes up a single thread that is waiting on this object's monitor.
2. notifyAll:Wakes up all threads that are waiting on this object's monitor. A thread waits on an object's monitor by calling one of the wait methods.所以這也就是為什么筆者在實現上述通知這個動作的時候,使用的是notifyAll而非notify,即notify存在信號丟失問題,還是用我上述的生產者-消費者和異步關閉線程的例子,試想下述場景:
- 有界隊列元素空間為1
- 線程1取元素為空,阻塞
- 線程2查看停止標識為false,阻塞
- 線程0添加元素,元素非空,notify選中了線程2
- 本該處理元素的線程1因為沒收到通知,造成了一種信號丟失的情況

這本質就是同步鎖和wait以及條件謂語上一種設計缺陷,即一個同步鎖只能關聯一組條件隊列,而條件隊列無法做區分。
所以基于上述條件隊列的案例,我們通過條件通知的方式進行比對保證更高效的準確的通知,避免每次操作之后都非常激進的通知所有線程造成非必要的上下文切換開銷,當然讀者在進行這樣的優化時務必記得,只有保證程序可以使用的情況下,在進行優化的哲學:
4. 基于條件變量下的等待通知模型
內置隊列存在一個內置鎖關聯多個條件隊列的情況,這使得很多線程被錯誤的喚醒,導致非必要的CPU時鐘消耗和上下文切換和并發競爭鎖的開銷。針對上述的問題,我們必須到借由一種工具保證同一把鎖下的各個條件的線程都放置到不同的隊列中,從而保證正確的喚醒,即:
等待隊列非滿的生產者線程存到一個隊列,待消費者完成元素消費后通知這個隊列
- 等待隊列非空的消費者線程存到一個等待隊列,待生產者完成元素投遞后通知這個隊列

所以,通過juc包下的鎖即可實現將條件放到不同的條件隊列中,同時它還能可以實現隊列內部公平的喚醒,以保證等待喚醒的是需要的線程從而從而等到通知的高效,以及減小非必要的上下文切換的開銷:
public class ConditionBoundedBuffer<V> {
private final V[] items;
private int head;
private int tail;
private int count;
//下述兩個條件隊列關聯同一把鎖,線程按照各自條件與隊列關聯
private final ReentrantLock lock = new ReentrantLock();
//生產者等待隊列非滿的等待隊列
private final Condition notFull = lock.newCondition();
//消費者等待隊列非空的等待隊列
private final Condition notEmpty = lock.newCondition();
public ConditionBoundedBuffer(int capacity) {
this.items = (V[]) new Object[capacity];
}
public boolean isFull() {
return count == items.length;
}
public boolean isEmpty() {
return count == 0;
}
public void put(V v) throws InterruptedException {
lock.lock();
try {
while (isFull()) {//輪詢檢測非滿
notFull.await();
}
//添加元素
items[tail++] = v;
count++;
if (tail == items.length) {
tail = 0;
}
notEmpty.signal();
} finally {
lock.unlock();
}
}
public V take() throws InterruptedException {
lock.lock();
try {
while (isEmpty()) {//輪詢檢測非空
notEmpty.await();
}
//消費元素
V v = items[head];
items[head] = null;
head++;
count--;
if (head == items.length) {
head = 0;
}
notFull.signal();
return v;
} finally {
lock.unlock();
}
}
}對應的我們也給出壓測代碼,最終斷言也是正確的:
ConditionBoundedBuffer<Integer> conditionBoundedBuffer = new ConditionBoundedBuffer<>(1);
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
for (int i = 0; i < 100_0000; i++) {
//提交1一個元素
threadPool.execute(() -> {
try {
conditionBoundedBuffer.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//消費一個元素
threadPool.execute(() -> {
try {
conditionBoundedBuffer.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
//判斷并發下線程是否正確的對等生產和消費
Assert.equals(conditionBoundedBuffer.count, 0);三、小結
自此我們針對并發編程中的等待通知模型中的狀態管理,等待通知原則和技巧進行了深入的分析和演示,希望對你有幫助。
































