面試官再問 synchronized 底層原理,這樣回答讓他眼前一亮!
因為之前關(guān)于synchronized關(guān)鍵字底層工作機制理解有所偏差,簡單查閱了一下其底層的實現(xiàn),遂以此文簡單記錄一下jdk8版本synchronized關(guān)鍵字的設(shè)計理念和實現(xiàn),希望對你有所幫助。

一、基于objectMonitor深入剖析synchronized關(guān)鍵字
1. 宏觀了解synchronized同步機制
在正式分析synchronized關(guān)鍵字之前,我先從全局的維度了解一下synchronized關(guān)鍵字的實現(xiàn)。假設(shè)我們現(xiàn)在有三個線程嘗試爭搶對象鎖,此時線程0最先通過CAS機制將鎖的持有者標識為自己并成功獲取鎖,對應的線程1和線程2則CAS失敗,此階段線程0就可以操作鎖內(nèi)部對應的臨界資源。同理,這個持有者標識就是_owner字段:

線程1和線程2在競爭失敗后,會先嘗試自旋獲取鎖,如果自旋失敗則會進入阻塞隊列中等待,直到鎖釋放,而這個隊列可以是_cxq或者_EntryList。這里筆者為了簡單直觀地展現(xiàn)這一過程,將競爭失敗的線程直接放入_EntryList隊列中:

線程0調(diào)用同一個鎖對應的示例的同一個方法,例如我們之前上鎖調(diào)用的是function1現(xiàn)在調(diào)用的是function2,這也就是我們常說的同一個對象嘗試上鎖兩次也就是鎖重入的情況,對應底層的objectMonitor會針對這種情況進行維護,也就是通過一個_recursions字段進行累加,這也就意味著線程0進行鎖釋放時必須釋放兩次:
public class Example1 {
public synchronized void function1(){
Console.log("function1");
}
public synchronized void function2(){
Console.log("function2");
}
}線程0期間因為特定原因,主動掛起調(diào)用wait方法,此時就需要完成鎖釋放操作,對應底層操作則是將其存入等待隊列_WaitSet中,并將_EntryList中等待的線程喚醒嘗試競爭鎖(注意,不一定完全是_EntryList,這里更多是為了舉例做一個普適的說明)。

后續(xù)線程0被notify或者notifyall喚醒之后,還會再次回到_EntryList競爭鎖。
自此我們從宏觀的角度了解了synchronized底層的工作機制,對應的我們也通過源碼的角度給出了上述概念中的幾個成員字段,整體和上述表述類似
- _count:等待獲取該鎖的線程數(shù),通常為_cxq和_EntryList隊列中的線程數(shù)總和
- _waiters:調(diào)用wait方法進入等待狀態(tài)的線程數(shù)
- _owner:指向當前持有鎖的線程
- _recursions:線程重入次數(shù),用于記錄當前線程對同一把鎖的重入次數(shù)
ObjectMonitor() {
_header = NULL;
_count = 0;//搶占該鎖的線程數(shù)即 _cxq.size + EntryList.size
_waiters = 0,//處于等待狀態(tài)的線程數(shù)
_recursions = 0;//線程重入次數(shù)
_object = NULL;
_owner = NULL;//當前鎖即ObjectMonitor的擁有者
_WaitSet = NULL;//處于wait狀態(tài)的線程會存入到該隊列中
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;//多線程競爭時會進入該隊列中
FreeNext = NULL ;
_EntryList = NULL ;//處于block等待鎖狀態(tài)的線程會進入該隊列
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}上述說明我們也可以在源碼objectMonitor.cpp中的注釋中查閱到,感興趣的讀者可以翻閱這段注釋了解具體的實現(xiàn)細節(jié),其大體意思與上述描述基本一致。對應的,筆者簡單概括如下:
- 線程必須通過cas將owner從null變?yōu)樽约翰潘愠晒Λ@得鎖
- 任意時刻線程因阻塞等待必須處于cxq或EntryList隊列中,或者因wait調(diào)用進入WaitSet隊列
- 持有鎖線程釋放后,其必須喚醒EntryList一個候選線程爭搶鎖
- ......

2. 并發(fā)競爭監(jiān)視鎖
線程基于synchronized關(guān)鍵字爭搶鎖資源的本質(zhì)實現(xiàn)就是objectMonitor.cpp的enter方法,其上鎖步驟嚴格:
(1) 嘗試cas將null設(shè)置為當前線程的指針,如果cas方法返回null則說明自己獲取鎖成功直接返回,反之進入步驟2
(2) 如果非空但返回指針地址是自己,說明當前線程之前已經(jīng)獲取過一個這把鎖,本次為鎖重入,直接累加重入次數(shù)_recursions后返回,反之進入步驟3
(3) 判斷當前線程是否是這把鎖的輕量級鎖持有者,如果是則執(zhí)行一次鎖升級將其升級為重量級鎖,并將owner對象中棧上BasicLockObject地址轉(zhuǎn)為完整的thread指針,反之進入步驟4
(4) 上述步驟都不成功,則說明當前鎖被其他線程持有,嘗試自旋幾次獲取這把鎖如果成功則直接返回,反之進入步驟5
(5) 步驟4還是沒有拿到鎖,為了避免非必要的CPU自旋開銷,累加count說明等待獲取鎖的線程數(shù)加一后,再次自旋嘗試幾次,
(6) 幾次無果將當前線程封裝為等待節(jié)點存入等待隊列,直到被喚醒拿到鎖后退出
(7) 通過步驟6的方式拿到鎖的線程,會通過cas的方式完成等待線程數(shù)count值扣減并退出
對應的我們給出了這段代碼的流程圖,讀者可參考上述說明理解:

有了上述的基礎(chǔ)之后,我們就可以查閱objectMonitor.cpp上鎖邏輯enter的實現(xiàn),邏輯和上述說明基本一致,我們也不難看出對應jdk8版本的synchronized關(guān)鍵字內(nèi)置的優(yōu)化,即核心理念就是盡可能利用cas競爭鎖資源,明確感知到競爭激烈之后主動將其掛起,再利用等待通知模型喚醒線程競爭資源:
void ATTR ObjectMonitor::enter(TRAPS) {
//獲取到當前線程的指針
Thread * const Self = THREAD ;
void * cur ;
//嘗試CAS的方式將owner從null設(shè)置為當前線程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
//如果返回null,說明當前線程是第一個獲取到這把鎖的,直接返回
if (cur == NULL) {
//......
return ;
}
//非null且指針就是自己說明是重入,直接累加重入次數(shù)_recursions后返回
if (cur == Self) {
//......
_recursions ++ ;
return ;
}
//如果當前線程是輕量級鎖的持有則,則進入該邏輯進行鎖升級
if (Self->is_lock_owned ((address)cur)) {
//......
//將owner從線程棧上的BasicLockObject地址轉(zhuǎn)換為完整的Thread指針
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
// We've encountered genuine contention.
//當前鎖被其他線程占有,需要搶占
assert (Self->_Stalled == 0, "invariant") ;
//記錄需要搶占的線程monitor指針
Self->_Stalled = intptr_t(this) ;
//嘗試自旋獲取鎖,成功后返回
if (Knob_SpinEarly && TrySpin (Self) > 0) {
//......
return ;
}
//......
//經(jīng)過上述步驟還是沒有搶到鎖,原子累加count值,即增加一個等待獲取鎖的線程
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;
//......
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
//嘗試CAS或者自旋等方式獲取鎖,若失敗則通過頭插法將線程存入cxq隊列中,等待后續(xù)通知喚醒
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//......
//完成操作后,鎖釋放通知其他線程上鎖
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
// acquire it.
}
//原子扣減count告知等待鎖的線程少一個
Atomic::dec_ptr(&_count);
//......
}3. 鎖釋放流程解析
對于鎖釋放流程,整體邏輯比較簡單,對應的執(zhí)行步驟為:
(1) 查看當前線程是否是owner指針指向的線程,如果明確當前監(jiān)視鎖非輕量級鎖,則說明當前線程并非監(jiān)視鎖持有者(進入monitor但沒有正確退出),則直接返回,反之進入步驟2
(2) 查看_recursions是否不為0,若不為0則說明是重入,扣減_recursions后返回
(3) 上述檢查無誤,將鎖直接釋放,然后進入步驟4
(4) 當前線程進行cas嘗試將owner從null設(shè)置為自己以查看是否存在其他線程競爭,如果失敗則說明鎖被其他線程持有,不需要執(zhí)行后續(xù)喚醒線程的任務直接返回,反之進入步驟5
(5) 這也就意味著當前監(jiān)視鎖沒有被其它線程獲取,當前線程需要執(zhí)行喚醒等待線程的任務
(6) 到這一步就意味著需要喚醒等待線程的步驟了,筆者上文說過,喚醒線程不一定是從EntryList也可能從cxq隊列,對應的喚醒策略由QMode決定,具體規(guī)則如下:
1. QMode為2直接喚醒cxq隊列首元素線程,讓其競爭監(jiān)視鎖
2. QMode為3,則查看cxq隊列是否非空,若明確非空則將其追加到EntryList中,若EntryList為空則直接將cxq隊列設(shè)置為EntryList,然后將首元素喚醒
3. QMode為4,則將EntryList追加到cxq隊列后,然后讓cxq隊列作為EntryList首元素,將首個線程喚醒競爭監(jiān)視鎖
/************************* 執(zhí)行到步驟4和5,說明代碼中的邏輯判斷明確EntryList為空,對應不同的QMode規(guī)則為**************/
4. QMode為1,將cxq隊列倒敘排列后作為EntryList,并將首元素喚醒
5. QMode為0或2,將cxq直接封裝為ObjectWaiter作為EntryList,并將首元素喚醒(7) 特殊步驟:如果發(fā)現(xiàn)w為空則說明cxq和EntryList隊列都為空,則重新從循環(huán)頭開始執(zhí)行正確的退出協(xié)議
對應如下是筆者整理的流程圖,讀者可以基于上述說明進行梳理歸納:

對應位于objectMonitor.cpp的exit函數(shù)就是我們釋放鎖的邏輯,這里筆者整理出核心的代碼段,讀者可結(jié)合注釋更進一步理解:
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
//如果當前線程不等于_owner則進入當前邏輯
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {//如果當前線程持有這把鎖的輕量級鎖,則將其設(shè)置為重量級鎖并讓_owner指向完整的線程地址,然后執(zhí)行后續(xù)的鎖釋放邏輯
//......
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
//......
/*
JNI(Java Native Interface)本地代碼中可能出現(xiàn)的監(jiān)視器(即 synchronized 鎖)使用不平衡的情況,
比如進入 monitor 但沒有正確退出,或者退出沒有對應的進入。當檢測到這種異常情況時,應該拋出 Java 的
IllegalMonitorStateException 異常來通知開發(fā)者。
*/
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
//_recursions不為0,說明此前線程重入過,完成_recursions扣減后返回,不重置owner指針
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
//......
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
//......
//將ownder設(shè)置為null,即釋放鎖
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ; // See if we need to wake a successor
//......
//cas嘗試一下線程從null設(shè)置為自己,如果非空則說明鎖被其他線程持有,不執(zhí)行后續(xù)喚醒其他線程的邏輯
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
//釋放鎖
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
//......
//嘗試cas上鎖,如果不成功則說明當前鎖被其他線程持有,則當前線程不負責后續(xù)喚醒等待線程的職責,直接返回
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
//......
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {//QMode為2且cxq隊列非空,直接喚醒cxq隊列首元素直接返回
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
//傳入_cxq隊列首元素地址,喚醒對應線程去競爭鎖資源
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
//
// 通過CAS將_cxq設(shè)置為空
// 將w(即原有的cxq隊列)轉(zhuǎn)換為雙向鏈表的waiter
// 將最近到達的線程(RATs)追加到EntryList
// TODO:將EntryList組織為循環(huán)雙向鏈表(CDLL),這樣我們就能在常量時間內(nèi)定位到尾部。
// 把cxq掛到_EntryList隊列上,后續(xù)從_EntryList中喚醒
// 查找EntryList的尾部
// 如果EntryList為空,則將w設(shè)為EntryList
// 否則將w鏈接到EntryList尾部
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
//通過cas把cxq設(shè)置為空
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
//將w即原有cxq隊列轉(zhuǎn)為一個雙向鏈表的waiter
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
//如果_EntryList非空的話直接將其追加到cxq隊列,反之cxq隊列直接作為_EntryList
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
if (QMode == 4 && _cxq != NULL) {//如果QMode且_cxq非空,則把_EntryList掛到cxq上
// Aggressively drain cxq into EntryList at the first opportunity.
//w指針指向cxq隊列后,通過cas將cxq指針置空
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
//將w(原cxq隊列封裝為等待隊列)
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Prepend the RATs to the EntryList
//將_EntryList追加到w(原cxq隊列)
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
w = _EntryList ;
if (w != NULL) {//從_EntryList找到節(jié)點喚醒指定線程返回
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
w = _cxq ;
//如果發(fā)現(xiàn)w為空則說明cxq和entrylist隊列都為空,則重新從循環(huán)頭開始執(zhí)行正確的退出協(xié)議
if (w == NULL) continue ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
//當 QMode 設(shè)置為 1 時,
//ObjectMonitor 會將 cxq(競爭隊列)中的所有等待線程移動到 EntryList(入口列表)中,
// 但在這個過程中會將線程的順序顛倒。這種設(shè)計可能是為了實現(xiàn)某種調(diào)度策略,比如讓最后到達的線程優(yōu)先獲得鎖(LIFO - 后進先出)
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) { //將cxq隊列元素倒敘排列一下
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2 直接順序生成等待節(jié)點
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
//拿到首元素喚醒,讓其嘗試競爭監(jiān)視鎖資源
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}3. 掛起等待的wait調(diào)用
線程調(diào)用wait方法時就會釋放鎖進入等待隊列,等待超時或者被喚醒后重新競爭監(jiān)視鎖資源,對應的處理步驟為:
(1) 進行必要的中斷檢查,如果發(fā)現(xiàn)調(diào)用wait時線程正在處理中斷,則直接拋出中斷異常,反之進入步驟2
(2) 將節(jié)點封裝為等待節(jié)點
(3) 自旋獲取等待隊列的鎖,將線程存入等待隊列
(4) 保存好_recursions重入次數(shù)后,調(diào)用exit退出監(jiān)視鎖
(5) 按照wait傳參進行有限或者無限時間的等待,直到被喚醒
(6) 喚醒后,查看自己當前狀態(tài)如果處于TS_RUN則直接嘗試競爭鎖,如果是TS_ENTER或者TS_CXQ則進行必要的自旋嘗試競爭鎖后掛起等待
(7) 上鎖成功后,恢復鎖重入次數(shù),扣減_waiters告知等待線程減少一個
(8) 操作臨界資源
這段邏輯比較簡單,讀者可直接基于上述說明查看wait代碼的底層實現(xiàn):
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
//Self指向當前線程的指針
Thread * const Self = THREAD ;
//......
// check for a pending interrupt
// 檢查是否存在待處理的中斷,若明確處理中斷則直接拋出中斷異常
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
//......
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
//......
//將當前節(jié)點封裝為wait
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
//......
//自旋獲取waitSet鎖將節(jié)點存入waiterSet然后釋放鎖
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
intptr_t save = _recursions; // record the old recursion count 保存舊的遞歸計數(shù),便于后續(xù)喚醒恢復
_waiters++; // increment the number of waiters 增加等待者數(shù)量
_recursions = 0; // set the recursion level to be 1 將遞歸級別設(shè)置為1
exit (true, Self) ; // exit the monitor 退出監(jiān)視器
//......
int ret = OS_OK ;
int WasNotified = 0 ;
//獲取當前線程的park掛起
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
//......
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
// 故意為空
} else
//按照指定的傳入等待時間參數(shù)等待,若沒設(shè)置時間則無限期掛起等待直到喚醒
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
//......
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
//......
//自旋獲取鎖將線程中等待節(jié)點移除,并將其狀態(tài)設(shè)置為TS_RUN,嘗試爭搶鎖
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
// 從WaitSet中解除鏈接
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
//......
ObjectWaiter::TStates v = node.TState ;
//查看被喚醒的線程當前處于什么狀態(tài),如果狀態(tài)為TS_RUN則調(diào)用enter直接去競爭鎖,反之說明節(jié)點在等待隊列中,進行必要的自旋競爭后進入cxq或者entrylist中等待喚醒
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
//......
}
//......
_recursions = save; // restore the old recursion count 恢復舊的遞歸計數(shù)
_waiters--; // decrement the number of waiters 減少等待者數(shù)量
//......
}4. 通知喚醒的notify操作
在分析wait源碼的時候,筆者提及wait喚醒的隊列可能在cxq或者在entryList隊列中,本質(zhì)上是notify操作的等待隊列處理策略,當線程通過notify操作嘗試通知處于waitset中的線程時,其底層的notify調(diào)用整體執(zhí)行步驟為:
(1) 判斷_WaitSet是否非空,若非空執(zhí)行步驟2
(2) 通過自旋獲取_WaitSet的鎖,著手處理_WaitSet中的線程
(3) 上鎖成功后,按照配置的策略處理_WaitSet中的元素,具體策略為:
策略0:如果EntryList非空的話,則將等待者添加到EntryList的頭部
策略1:將等待者添加到EntryList的尾部
策略2:將等待者添加到cxq的頭部
策略3:將等待者添加到cxq的尾部
其他: 直接喚醒WaitSet中的首個線程,讓其競爭監(jiān)視鎖(4) 釋放WaitSet鎖
notify源碼邏輯比較清晰簡單,讀者可結(jié)合筆者的注釋自行閱讀了解一下細節(jié),并串聯(lián)上述的說明完成邏輯閉環(huán):
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
//_WaitSet為空直接返回
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
// 記錄空通知事件
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
//通過自旋獲取等待隊列的鎖
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
//拿到waitSet
ObjectWaiter * iterator = DequeueWaiter() ;
// 從等待隊列中取出一個等待者
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
// 將等待者狀態(tài)設(shè)置為進入狀態(tài)
}
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id();
//定位到等待隊列
ObjectWaiter * List = _EntryList ;
// 獲取EntryList
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
// 策略0:如果EntryList非空的話,則將等待者添加到EntryList的頭部
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
// 策略1:將等待者添加到EntryList的尾部
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
// 如果EntryList為空,則直接將等待者設(shè)為EntryList
} else {
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
// 查找EntryList的尾部
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
// 斷言:尾部不為NULL且下一個元素為NULL
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
// 將等待者添加到EntryList的尾部
}
} else
//策略2:將等待者添加到cxq的頭部
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
// 如果EntryList為空,則直接將等待者設(shè)為EntryList
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
// 將等待者狀態(tài)設(shè)置為CXQ狀態(tài)
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
// 原子性地將等待者添加到cxq的頭部
}
}
}
} else
//策略3:將等待者添加到cxq的尾部
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
// 將等待者狀態(tài)設(shè)置為CXQ狀態(tài)
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
// 如果cxq為空,則原子性地將等待者設(shè)為cxq
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
// 查找cxq的尾部
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
// 將等待者添加到cxq的尾部
}
}
} else {
ParkEvent * ev = iterator->_event ;
// 獲取等待者的事件
iterator->TState = ObjectWaiter::TS_RUN ;
// 將等待者狀態(tài)設(shè)置為運行狀態(tài)
OrderAccess::fence() ;
// 內(nèi)存屏障
ev->unpark() ;
// 直接喚醒等待者
}
}
// 釋放WaitSet鎖
Thread::SpinRelease (&_WaitSetLock) ;
//......
}二、關(guān)于synchronized更進一步的理解
1. 為什么鎖釋放之后還要進行一次cas上鎖
上文提到執(zhí)行exit方法時,線程通過release_store_ptr完成鎖釋放之后,會嘗試cas再次嘗試將鎖的owner指向自己,這樣做的目的是什么呢?
//釋放鎖
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
//......
//cas上鎖
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
//執(zhí)行后續(xù)喚醒邏輯這本質(zhì)上是針對cpu時間片的利用和調(diào)度效率上的一種優(yōu)化,即當前線程完成鎖釋放后,盡可能利用當前線程還在使用時間片的時刻,嘗試cas上鎖,如果上鎖成功則說明當前鎖沒有人獲取,則執(zhí)行后續(xù)喚醒等待線程的邏輯,由此盡可能利用cpu時間片避免擱淺問題(鎖釋放了所有線程還是處于park狀態(tài)):

2. 自旋重試的性能壓榨
競爭鎖資源的enter方法在明確幾次cas和自旋失敗后,會將其添加到cxq隊列中,但設(shè)計者并不會直接添加,而是盡可能利用每一次機會,利用我們將線程封裝為等待節(jié)點會通過cas存入等待隊列,為了盡可能利用每一次cpu時間片,再進行cas自旋入隊時,設(shè)計者會利用每次cas失敗的時機再次嘗試自旋上鎖,這本質(zhì)就是對于入隊激烈競爭的一種臨時規(guī)避,盡可能利用這個間隙再次獲取鎖資源:
for (;;) {
//當前節(jié)點指向cxq隊列的首元素
node._next = nxt = _cxq ;
//通過cas將當前元素設(shè)置為cxq隊列的首元素,若成功則退出循環(huán)
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
//若cas失敗則再次嘗試自旋上鎖,盡可能利用cpu執(zhí)行開銷
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}3. 不同喚醒策略的設(shè)計理念
鎖釋放時,源碼中對于線程的喚醒策略給出了大體5種策略,實際上這也是對系統(tǒng)資源規(guī)律的把握和不同場景效率的優(yōu)化,對應的筆者結(jié)合之前整理的幾種策略進行說明:
1. QMode為2直接喚醒cxq隊列首元素線程,讓其競爭監(jiān)視鎖,這種方式避免cxq轉(zhuǎn)移到EntryList的開銷,直接讓cxq隊列元素喚醒,適用于追求高并發(fā)和極致的場景,不需要考慮公平性
2. QMode為3,則查看cxq隊列是否非空,若明確非空則將其追加到EntryList尾部,若EntryList為空則直接將cxq隊列設(shè)置為EntryList,然后將首元素喚醒,因為線程競爭不到鎖資源之后是采用頭插法存入cxq隊列,所以為了避免cxq反序追加到EntryList這個開銷,該策略是直接將cxq隊列追加到EntryList上,這本質(zhì)上就是一種公平性和性能的折中
3. QMode為4,則將EntryList追加到cxq隊列后,然后讓cxq隊列作為EntryList首元素,將首個線程喚醒競爭監(jiān)視鎖,這種方案是考慮到最新的線程會位于cxq隊列隊首,所以cpu緩存中可能存有這個線程的緩存數(shù)據(jù),因此優(yōu)先喚醒該線程保證高效完成計算
/************************* 執(zhí)行到步驟4和5,說明代碼中的邏輯判斷明確EntryList為空,對應不同的QMode規(guī)則為**************/
4. QMode為1,將cxq隊列倒序排列后作為EntryList,并將首元素喚醒,這種就是將頭插法的cxq隊列順序排列追加到EntryList,嚴格保證調(diào)度的公平性,避免線程饑餓
5. QMode為0(默認策略),將cxq直接封裝為ObjectWaiter作為EntryList,繞過兩個列表關(guān)聯(lián)直接將首元素喚醒,適用于高并發(fā)和高性能的場景4. 為什么wait獲取鎖采用自旋而非重量級鎖
_WaitSetLock保護的等待隊列本質(zhì)上基本只有監(jiān)視器所有者進行訪問,個別情況由于超時中斷返回操作隊列的情況,所以競爭不算激烈,所以在操作等待隊列時上鎖的方式是采用自旋而非重量級鎖:
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;





























