深入解析AQS:同步器的基石與鎖的實現原理
1. AQS概述與核心地位
AbstractQueuedSynchronizer(AQS)是Java并發編程中的核心框架,自JDK 1.5引入以來,一直是Java并發包中大多數同步器的基礎。AQS提供了一個通用的框架,用于構建依賴于先進先出(FIFO)等待隊列的阻塞鎖和相關的同步器。理解AQS的設計思想和實現機制,對于深入掌握Java并發編程至關重要。
AQS的核心價值在于它通過模板方法模式,將同步狀態的管理、線程的排隊與等待等通用邏輯封裝起來,同時將特定同步器的核心操作(如獲取鎖、釋放鎖)留給子類實現。這種設計使得開發者能夠基于AQS輕松構建各種復雜的同步器,而無需關心底層的線程排隊、阻塞和喚醒機制。
2. AQS的核心架構與設計思想
2.1 同步狀態管理
AQS內部維護了一個volatile的int類型狀態變量state,這個變量是AQS實現各種同步功能的基礎。不同的同步器可以以不同的方式解釋這個狀態:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步狀態
private volatile int state;
// 獲取當前同步狀態
protected final int getState() {
return state;
}
// 設置同步狀態
protected final void setState(int newState) {
state = newState;
}
// 原子地更新狀態
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}2.2 CLH隊列變體
AQS使用一個虛擬的CLH(Craig, Landin, and Hagersten)隊列的變體來實現線程的排隊管理。這個隊列實際上是一個FIFO的雙向鏈表,每個節點代表一個等待線程:
static final class Node {
// 節點模式:共享模式
static final Node SHARED = new Node();
// 節點模式:獨占模式
static final Node EXCLUSIVE = null;
// 等待狀態:線程已取消
static final int CANCELLED = 1;
// 等待狀態:后繼線程需要取消掛起
static final int SIGNAL = -1;
// 等待狀態:線程在條件隊列中等待
static final int CONDITION = -2;
// 等待狀態:下一次acquireShared應該無條件傳播
static final int PROPAGATE = -3;
// 等待狀態
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 后繼節點
volatile Node next;
// 節點關聯的線程
volatile Thread thread;
// 條件隊列的下一個節點,或者共享模式的標記
Node nextWaiter;
}3. AQS的核心實現機制
3.1 獨占模式實現原理
獨占模式是AQS支持的一種基本同步模式,一次只允許一個線程獲取同步狀態。ReentrantLock就是基于獨占模式實現的。
3.1.1 獲取鎖的實現
// 獨占模式獲取鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 嘗試獲取鎖,由子類實現
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 獲取失敗,加入隊列等待
selfInterrupt(); // 在等待過程中被中斷,重新設置中斷標志
}
// 將當前線程包裝為節點加入等待隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速嘗試:直接CAS設置尾節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速嘗試失敗,使用完整入隊方法
enq(node);
return node;
}
// 完整入隊方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 隊列為空,需要初始化
if (compareAndSetHead(new Node())) // 設置虛擬頭節點
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 在隊列中等待獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 獲取前驅節點
if (p == head && tryAcquire(arg)) { // 前驅是頭節點且嘗試獲取鎖成功
setHead(node); // 設置自己為頭節點
p.next = null; // 幫助GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 檢查是否應該阻塞
parkAndCheckInterrupt()) // 阻塞并檢查中斷狀態
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 取消獲取
}
}3.1.2 釋放鎖的實現
// 獨占模式釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) { // 嘗試釋放鎖,由子類實現
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒后繼節點
return true;
}
return false;
}
// 喚醒后繼節點
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除狀態
Node s = node.next; // 后繼節點
if (s == null || s.waitStatus > 0) { // 后繼節點不存在或已取消
s = null;
// 從尾向前查找有效的后繼節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒線程
}3.2 共享模式實現原理
共享模式允許多個線程同時獲取同步狀態,適用于信號量、讀寫鎖等場景。
3.2.1 共享獲取實現
// 共享模式獲取鎖
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 嘗試共享獲取
doAcquireShared(arg); // 獲取失敗,加入隊列等待
}
// 在隊列中共享獲取
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 添加共享節點
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 嘗試獲取
if (r >= 0) { // 獲取成功
setHeadAndPropagate(node, r); // 設置頭節點并傳播
p.next = null; // 幫助GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 設置頭節點并傳播喚醒
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄舊的頭節點
setHead(node);
// 檢查是否需要傳播喚醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 執行共享釋放
}
}4. 基于AQS實現自定義鎖
下面我們通過實現一個簡單的不可重入鎖來展示如何基于AQS構建同步器:
public class SimpleLock implements Lock {
private final Sync sync = new Sync();
// 自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 嘗試獲取鎖
@Override
protected boolean tryAcquire(int acquires) {
// 使用CAS操作嘗試獲取鎖
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); // 設置獨占線程
return true;
}
return false;
}
// 嘗試釋放鎖
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) // 檢查狀態
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); // 清除獨占線程
setState(0); // 重置狀態
return true;
}
// 判斷是否被獨占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 創建條件變量
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}5. AQS在JDK中的應用實例
5.1 ReentrantLock的實現
ReentrantLock是基于AQS實現的經典獨占鎖,支持重入特性:
public class ReentrantLock implements Lock {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平鎖嘗試獲取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 重入檢查
int nextc = c + acquires;
if (nextc < 0) // 溢出
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 嘗試釋放鎖
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
// 非公平鎖同步器
static final class NonfairSync extends Sync {
@Override
protected boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平鎖同步器
static final class FairSync extends Sync {
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平鎖:檢查隊列中是否有前驅節點
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}5.2 Semaphore的實現
Semaphore基于AQS的共享模式實現,用于控制同時訪問特定資源的線程數量:
public class Semaphore {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
// 共享獲取
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 共享釋放
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // 溢出
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
}6. AQS的條件變量機制
AQS還提供了條件變量(ConditionObject)的實現,用于實現線程間的精確等待和通知:
public class ConditionObject implements Condition {
// 條件隊列的頭節點
private transient Node firstWaiter;
// 條件隊列的尾節點
private transient Node lastWaiter;
// 等待方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 添加到條件隊列
int savedState = fullyRelease(node); // 完全釋放鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 不在同步隊列中
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 清理已取消的節點
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 通知方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 喚醒第一個等待節點
}
}7. AQS的性能優化與最佳實踐
7.1 性能優化策略
1. 減少CAS操作:在設計同步器時,應盡量減少CAS操作的使用,因為CAS在高度競爭的情況下性能會下降。
2. 避免虛假喚醒:正確使用條件變量,確保在等待條件時使用while循環檢查條件,而不是if語句。
3. 合理選擇同步模式:根據具體場景選擇獨占模式或共享模式,避免不必要的性能開銷。
7.2 最佳實踐
// 正確的鎖使用模式
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while (!conditionMet) { // 使用while循環避免虛假喚醒
condition.await();
}
// 執行臨界區代碼
} finally {
lock.unlock(); // 確保在finally塊中釋放鎖
}8. 總結
AQS作為Java并發框架的基石,通過精心設計的同步狀態管理、線程排隊機制和條件變量支持,為構建各種復雜的同步器提供了強大的基礎。其核心價值在于:
- 模板方法模式:將通用邏輯封裝在基類中,特定邏輯交由子類實現
- 靈活的同步狀態:通過int類型的state變量支持各種同步語義
- 高效的線程管理:基于CLH隊列變體實現高效的線程排隊和喚醒機制
- 完備的條件支持:內置條件變量實現,支持精確的線程等待和通知
深入理解AQS的實現原理,不僅有助于我們更好地使用Java并發包中的各種同步工具,還能夠指導我們設計和實現高性能的自定義同步器,是Java并發編程進階的必備知識。






























