C++原子操作實戰:無鎖數據結構的實現技巧
在高并發場景下,你是否遇到過這樣的困境:用互斥鎖保護共享數據,卻因線程頻繁阻塞等待導致性能瓶頸?比如高頻交易系統的訂單處理、服務器的請求隊列,鎖競爭帶來的上下文切換開銷,往往成為壓垮性能的 “最后一根稻草”。而無鎖數據結構,正是解決這一問題的核心方案 —— 它基于 C++ 原子操作,無需依賴傳統鎖機制,就能實現線程安全的并發訪問,讓高并發代碼真正 “飛起來”。
你會學到如何用std::atomic封裝原子指針、如何通過循環 CAS 操作處理并發沖突、如何利用內存序保證數據一致性,還會避開 ABA 問題、內存泄漏等常見坑。無論你是想優化現有項目的并發性能,還是想深入理解原子操作的實戰應用,這篇文章都能給你清晰的指引。跟著實戰步驟一步步推進,你會發現,無鎖編程不僅能大幅提升高并發場景下的性能,還能讓你對 C++ 并發編程的理解更上一層樓~
一、原子操作是什么?
原子操作,從字面意義上理解,就如同化學中的原子一樣,是不可再分的基本單元。在計算機科學里,特別是在多線程編程的語境下,原子操作是指那些在執行過程中不會被線程調度機制打斷的操作。這意味著,一個原子操作要么完整地執行完畢,要么壓根就不執行,不存在執行到一半被其他線程干擾的情況 。
打個比方,你去銀行辦理轉賬業務,從你的賬戶 A 轉一筆錢到賬戶 B。這個轉賬操作包含了從賬戶 A 扣除金額和向賬戶 B 增加金額兩個步驟。如果這是一個原子操作,那么這兩個步驟要么一起成功完成,要么因為某種原因(比如賬戶余額不足)一起不執行。絕對不會出現從賬戶 A 扣了錢,但錢卻沒轉到賬戶 B 的尷尬情況。
在 C++ 中,原子操作通常由硬件指令直接支持,這使得它們在多線程環境下能夠高效地保證數據的一致性。例如,對于簡單的變量賦值操作,如果使用原子操作,就可以確保在多個線程同時嘗試賦值時,不會出現數據混亂的情況。像std::atomic<int> a = 5;這樣的聲明,就創建了一個原子整數變量a,對它的操作都是原子的。
在多線程或多核環境中,若操作不具備原子性,可能引發以下問題:
- 競態條件(Race Condition):多個線程同時修改共享數據,導致結果不可預測。
- 數據不一致:如銀行轉賬操作中,扣款和入賬若被中斷,可能導致金額錯誤。
- 邏輯錯誤:如計數器遞增操作被中斷,導致計數丟失。
二、C++ 中的原子操作
2.1 std::atomic 類模板
C++11 的到來,為多線程編程帶來了一個強大的武器 ——std::atomic類模板。它就像是一個堅固的堡壘,為基本數據類型的原子操作提供了堅實的支持。這個類模板的設計理念非常簡潔直觀,讓開發者能夠輕松地將原子操作融入到代碼中。
使用std::atomic類模板非常簡單,就像創建一個普通變量一樣,只是需要在類型前面加上std::atomic。比如,我們要創建一個原子整數變量,可以這樣寫:std::atomic<int> atomicInt; 。這就聲明了一個名為atomicInt的原子整數,它的所有操作都是原子的,不會被其他線程打斷。
下面通過一個簡單的代碼示例來展示std::atomic的基本用法:
#include <iostream>
#include <atomic>
#include <thread>
std::atomic<int> sharedAtomicVariable(0);
void atomicIncrement() {
for (int i = 0; i < 10000; ++i) {
sharedAtomicVariable++;
}
}
int main() {
std::thread thread1(atomicIncrement);
std::thread thread2(atomicIncrement);
thread1.join();
thread2.join();
std::cout << "Final value of sharedAtomicVariable: " << sharedAtomicVariable << std::endl;
return 0;
}在這個示例中,sharedAtomicVariable是一個原子整數變量。atomicIncrement函數對它進行 10000 次遞增操作。由于sharedAtomicVariable是原子的,所以在多線程環境下,它的遞增操作不會出現數據競爭的問題。最終運行結果,sharedAtomicVariable的值一定會是 20000,這與前面非原子操作的例子形成了鮮明對比。
std::atomic類模板還提供了一系列豐富的成員函數,用于實現各種原子操作。比如fetch_add函數,它可以原子地增加變量的值,并返回增加前的值;store函數用于原子地存儲一個新值;load函數則用于原子地加載當前值。這些函數為開發者提供了更多的控制靈活性,以滿足不同場景下的需求。例如:
std::atomic<int> num(5);
int oldValue = num.fetch_add(3, std::memory_order_relaxed);
std::cout << "Old value: " << oldValue << ", New value: " << num << std::endl;這段代碼中,fetch_add函數將num的值增加 3,并返回原來的值5,最終num的值變為 8 。通過這些成員函數,我們可以根據具體的業務邏輯,選擇最合適的原子操作方式。
2.2 原子操作的內存模型
在 C++ 的原子操作領域,內存模型是一個至關重要且復雜的概念,它就像一個無形的指揮家,控制著原子操作在內存中的可見性和順序,對程序的正確性和性能有著深遠的影響。C++ 定義了多種內存模型,每一種都有其獨特的作用和適用場景。
首先是memory_order_relaxed,它是最寬松的內存模型。在這種模型下,原子操作僅保證自身的原子性,不對操作的內存順序做任何保證。這意味著,編譯器和處理器可以根據自身的優化策略,自由地對指令進行重排序。雖然這種模型在某些特定場景下可以提高性能,但由于缺乏對內存順序的控制,可能會導致一些難以調試的問題。例如:
std::atomic<int> a(0);
std::atomic<bool> flag(false);
void threadFunction1() {
a.store(1, std::memory_order_relaxed);
flag.store(true, std::memory_order_relaxed);
}
void threadFunction2() {
while (!flag.load(std::memory_order_relaxed));
std::cout << "a: " << a.load(std::memory_order_relaxed) << std::endl;
}在這個例子中,由于memory_order_relaxed的寬松性,threadFunction1中a.store(1)和flag.store(true)的執行順序可能會被重排。如果重排發生,threadFunction2可能會在a還未被賦值為 1 時就讀取a的值,從而輸出錯誤的結果。
與memory_order_relaxed不同,memory_order_acquire和memory_order_release則提供了更強的內存可見性保證。memory_order_acquire確保在此操作之前的所有內存讀取都對其他線程可見,就像一個關卡,在它之后的操作不會被重排到它之前;而memory_order_release確保在此操作之后的所有內存寫入都對其他線程可見,類似一個屏障,在它之前的操作不會被重排到它之后。這兩種內存模型常常一起使用,來實現線程之間的同步。
例如經典的生產者 - 消費者模型:
std::atomic<int> data(0);
std::atomic<bool> ready(false);
void producer() {
data.store(42, std::memory_order_release);
ready.store(true, std::memory_order_release);
}
void consumer() {
while (!ready.load(std::memory_order_acquire));
std::cout << "Data: " << data.load(std::memory_order_acquire) << std::endl;
}在這個模型中,生產者線程在生產數據后,使用memory_order_release存儲ready為true,確保數據寫入對消費者線程可見;消費者線程在讀取數據前,使用memory_order_acquire等待ready為true,確保能讀取到最新的數據。
memory_order_acq_rel則是同時具有memory_order_acquire和memory_order_release的特性,適用于那些既需要讀取又需要寫入的復雜場景。還有memory_order_seq_cst,它是默認的內存模型,提供了最嚴格的順序一致性保證,所有線程都能看到一致的操作順序,但相應地,它的性能開銷也相對較大。
選擇合適的內存模型對于保證程序的正確性和性能至關重要。在實際編程中,我們需要根據具體的業務邏輯和性能需求,仔細權衡選擇。如果對性能要求極高,且數據一致性要求相對較低,可以考慮使用memory_order_relaxed;而對于需要嚴格保證數據一致性和線程同步的場景,則應選擇更嚴格的內存模型。
2.3 原子操作的性能考慮
原子操作雖然為多線程編程提供了強大的線程安全保障,但就像任何技術一樣,它并非完美無缺,在享受其帶來的便利時,我們也需要關注它所引入的性能開銷。原子操作的性能開銷是由多個因素共同作用產生的,深入了解這些因素,有助于我們在編寫高效多線程程序時做出更明智的決策。
從硬件層面來看,原子操作依賴于特定的硬件指令,如比較并交換(CAS,Compare-And-Swap)指令。這些指令的執行通常比普通的內存操作開銷更大,因為它們需要在多個處理器核心之間進行復雜的同步操作,以確保原子性。例如,在一個多核處理器系統中,當一個核心執行原子操作時,需要通過緩存一致性協議通知其他核心,使得它們能夠及時更新緩存中的數據,這一過程會帶來額外的通信和同步開銷。
緩存一致性也是影響原子操作性能的重要因素。為了保證數據的一致性,原子操作需要通過緩存一致性協議來同步不同核心之間的數據。當一個核心對原子變量進行修改時,不僅要更新自己的緩存,還需要通知其他核心的緩存,讓它們失效或更新。這就像是一場跨核心的信息傳遞接力賽,每一次傳遞都需要消耗時間和資源,尤其是在多核環境下,頻繁的緩存同步會顯著增加性能開銷。
編譯器優化在原子操作中也扮演著重要角色。編譯器在生成原子操作的機器代碼時,會進行一系列優化,試圖減少性能開銷。但這些優化并不總是能達到預期效果,特別是在復雜的場景下。例如,在一些情況下,編譯器可能無法準確判斷原子操作的依賴關系,導致優化過度或不足,從而影響程序性能。
為了減少原子操作的性能開銷,我們可以采取一些有效的策略。首先,應盡量減少原子操作的使用。在不需要原子操作的場景下,避免不必要的原子變量聲明和操作。比如,對于一些局部變量,它們不會被多個線程共享,就沒有必要使用原子類型,使用普通變量即可,這樣可以避免原子操作帶來的額外開銷。
選擇合適的內存模型也是關鍵。正如前面提到的,不同的內存模型具有不同的性能特點。根據實際需求選擇合適的內存模型,避免過度同步。如果程序對內存順序的要求不高,可以選擇像memory_order_relaxed這樣寬松的內存模型,以減少不必要的同步開銷;而對于需要嚴格保證內存順序的場景,再選擇更嚴格的內存模型。
使用無鎖數據結構是另一種有效的策略。在可能的情況下,使用無鎖數據結構來替代傳統的鎖機制。無鎖數據結構利用原子操作來實現線程安全,避免了鎖帶來的上下文切換和競爭開銷。例如,無鎖鏈表、無鎖隊列等,它們在高并發場景下能夠顯著提高性能。但需要注意的是,無鎖數據結構的實現通常較為復雜,需要開發者具備一定的技巧和經驗。
三、原子操作應用場景
3.1 計數器和標志位
在多線程編程的世界里,計數器和標志位就像是交通信號燈,起著至關重要的同步作用。它們能夠協調各個線程的執行順序,確保數據的準確性和程序的穩定性。而原子操作則為這些計數器和標志位提供了堅實的保障,使其在多線程環境下能夠高效、安全地運行。
以一個簡單的線程等待初始化操作為例,假設我們有一個復雜的系統,其中有多個工作線程需要依賴某個資源的初始化。在這種情況下,我們可以使用原子標志位來實現線程間的同步。代碼如下:
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<bool> initialized(false);
void initThread() {
// 模擬初始化操作,可能包含復雜的計算或資源加載
std::this_thread::sleep_for(std::chrono::seconds(2));
initialized.store(true, std::memory_order_release);
}
void workerThread() {
while (!initialized.load(std::memory_order_acquire)) {
// 線程等待初始化完成,這里可以使用std::this_thread::yield()讓出CPU時間片
std::this_thread::yield();
}
std::cout << "Worker thread is starting to work." << std::endl;
// 執行實際的工作任務
}
int main() {
std::thread init(initThread);
std::thread worker(workerThread);
init.join();
worker.join();
return 0;
}在這段代碼中,initialized是一個原子布爾變量,它就像一個信號燈,初始狀態為false,表示資源尚未初始化。initThread函數負責執行初始化操作,完成后將initialized設置為true,并使用std::memory_order_release內存模型,確保所有之前的內存寫入操作對其他線程可見。workerThread函數則在一個循環中不斷檢查initialized的值,只要它為false,就調用std::this_thread::yield()讓出 CPU 時間片,避免不必要的 CPU 消耗。當initialized變為true時,工作線程就知道資源已經初始化完成,可以開始執行實際的工作任務了。
這種使用原子標志位的方式,相比傳統的鎖機制,具有更高的效率和更低的開銷。因為它避免了鎖的加鎖和解鎖操作,減少了線程上下文切換的次數,從而提高了程序的整體性能。而且,原子操作的原子性保證了標志位的讀寫操作不會被其他線程打斷,避免了數據競爭和不一致的問題。在實際應用中,計數器和標志位的原子操作被廣泛應用于各種多線程場景,如資源管理、任務調度等,是多線程編程中不可或缺的工具。
3.2 條件變量和事件
在多線程編程的復雜交響樂中,條件變量和事件就像是精心編排的樂章,用于協調線程之間的執行順序,確保各個線程能夠在合適的時機協同工作。而原子操作在實現類似條件變量的功能時,發揮著關鍵的作用,為線程間的高效同步提供了有力支持。
條件變量通常用于線程間的協調,允許一個或多個線程等待某個條件的發生,當條件滿足時,相應的線程會被喚醒。在某些情況下,我們可以巧妙地使用原子操作來實現類似條件變量的功能。下面通過一個簡單的示例代碼來深入理解:
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono::system_clock>
std::atomic<bool> ready(false);
std::atomic<int> data(0);
void producer() {
// 模擬一些準備工作
std::this_thread::sleep_for(std::chrono::seconds(2));
data.store(42, std::memory_order_release);
ready.store(true, std::memory_order_release);
}
void consumer() {
while (!ready.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
std::cout << "Consumer got data: " << data.load(std::memory_order_acquire) << std::endl;
}
int main() {
std::thread producerThread(producer);
std::thread consumerThread(consumer);
producerThread.join();
consumerThread.join();
return 0;
}在這個示例中,ready是一個原子變量,充當了類似條件變量的角色,用于指示資源是否已經準備好。producer線程負責準備數據,在完成準備工作后,它將數據存儲到data變量中,并將ready設置為true,使用std::memory_order_release內存模型確保數據寫入對其他線程可見。consumer線程則在一個循環中不斷檢查ready的值,只要它為false,就調用std::this_thread::yield()讓出 CPU 時間片,避免不必要的 CPU 消耗。當ready變為true時,消費者線程就知道數據已經準備好,可以安全地讀取data變量的值了。
通過這種方式,原子操作實現了與條件變量類似的功能,即線程間的同步和等待。雖然這種方式相對簡單,但在一些特定場景下,它可以提供比傳統條件變量更高效的解決方案。因為原子操作避免了條件變量通常需要的鎖機制,減少了線程上下文切換和鎖競爭的開銷,從而提高了程序的性能。當然,原子操作實現的條件變量功能相對有限,對于復雜的條件判斷和多線程協作場景,傳統的條件變量仍然是更好的選擇。但在一些簡單的同步需求下,原子操作的簡潔性和高效性使其成為一種非常有價值的工具。
3.3 無鎖數據結構
在傳統的并發編程中,當多個線程需要訪問共享數據時,我們通常會使用鎖機制來確保數據的一致性和線程安全 。就像多個線程在爭奪一個共享的資源,鎖就像是一把鑰匙,只有拿到鑰匙的線程才能進入 “房間”(訪問共享資源),其他線程只能在外面等待。這種方式雖然簡單直接,但在高并發場景下,卻會帶來諸多問題。
無鎖數據結構則打破了這種傳統模式,它利用原子操作來實現并發控制 。原子操作就像是一個個堅固的 “堡壘”,在多線程環境下能夠保證自身的操作是不可分割的,不會被其他線程干擾。無鎖數據結構允許多個線程同時訪問和修改數據結構,而不需要像傳統鎖機制那樣進行等待。這就好比在一個沒有交通信號燈(鎖)的十字路口,車輛(線程)通過一套精心設計的規則(原子操作和算法)來協調通行,從而避免了因等待信號燈而造成的擁堵(鎖競爭)。
在高競爭場景下,無鎖數據結構的性能優勢尤為明顯 。當大量線程同時競爭同一把鎖時,傳統鎖機制會導致許多線程處于阻塞狀態,等待鎖的釋放。這不僅增加了線程上下文切換的開銷,還使得 CPU 資源大量浪費在等待上。而無鎖數據結構由于沒有鎖的限制,線程可以持續執行,不會因為等待鎖而被阻塞,從而大大提高了系統的吞吐量和響應速度。以一個多核處理器上運行的多線程服務器程序為例,使用無鎖數據結構可以充分利用多核的并行計算能力,讓每個核心都能高效地處理線程任務,而不會因為鎖競爭而出現核心閑置的情況。
(1)無鎖棧
無鎖棧是一種常見且實用的無鎖數據結構,它的實現思路巧妙地運用了原子操作 。在無鎖棧中,最關鍵的操作是入棧(push)和出棧(pop),這兩個操作都依賴于原子比較 / 交換(CAS - Compare And Swap)操作來確保線程安全。
下面是一個簡化的無鎖棧的 C++ 實現示例:
template<typename T>
class lock_free_stack {
private:
struct node {
T data;
node* next;
node(const T& data) : data(data), next(nullptr) {}
};
std::atomic<node*> head; // 使用原子指針來指向棧頂
public:
lock_free_stack() : head(nullptr) {}
void push(const T& data) {
node* new_node = new node(data);
do {
new_node->next = head.load(); // 保存當前棧頂指針
} while (!head.compare_exchange_weak(new_node->next, new_node));
// 使用CAS操作嘗試將新節點設置為棧頂,如果失敗則重試
}
std::shared_ptr<T> pop() {
node* old_head;
do {
old_head = head.load();
if (!old_head) return nullptr; // 棧為空
} while (!head.compare_exchange_weak(old_head, old_head->next));
// 使用CAS操作嘗試將棧頂指針指向下一個節點,如果失敗則重試
std::shared_ptr<T> res = std::make_shared<T>(old_head->data);
delete old_head;
return res;
}
};在上述代碼中,push操作首先創建一個新節點,然后通過compare_exchange_weak方法不斷嘗試將新節點設置為棧頂 。compare_exchange_weak方法會比較當前的head值(即預期值)是否等于new_node->next(保存的當前棧頂指針),如果相等,則將head更新為new_node(新值),表示入棧成功;如果不相等,則說明棧頂已經被其他線程修改,compare_exchange_weak會將new_node->next更新為最新的head值,并返回false,然后再次進入循環重試。
pop操作同樣使用compare_exchange_weak方法來嘗試將棧頂指針指向下一個節點 。如果成功,說明出棧操作成功,返回棧頂節點的數據;如果失敗,說明棧頂已經被其他線程修改,再次進入循環重試。
(2)無鎖隊列
無鎖隊列也是一種在多線程環境中廣泛應用的無鎖數據結構,常用于實現生產者 - 消費者模型等場景 。它的實現原理同樣基于原子操作,通過巧妙地維護隊列的頭指針和尾指針,來實現高效的并發入隊和出隊操作。
以下是一個簡單的無鎖隊列的 C++ 實現框架:
template<typename T>
class lock_free_queue {
private:
struct node {
std::shared_ptr<T> data;
std::atomic<node*> next;
node() : next(nullptr) {}
};
std::atomic<node*> head;
std::atomic<node*> tail;
public:
lock_free_queue() {
node* dummy = new node();
head.store(dummy);
tail.store(dummy);
}
void push(const T& new_value) {
std::shared_ptr<T> new_data = std::make_shared<T>(new_value);
node* new_node = new node();
new_node->data = new_data;
node* old_tail;
do {
old_tail = tail.load();
} while (!old_tail->next.compare_exchange_weak(nullptr, new_node));
// 嘗試將新節點鏈入隊列尾部
tail.compare_exchange_strong(old_tail, new_node);
// 嘗試更新tail指針
}
bool pop(T& result) {
node* old_head = head.load();
for (;;) {
node* next = old_head->next.load();
if (!next) return false; // 隊列為空
if (head.compare_exchange_weak(old_head, next)) {
// 嘗試推進head指針
result = std::move(*next->data);
delete old_head;
return true;
}
}
}
};在這個實現中,push操作首先創建一個新節點,并將新數據存入其中 。然后,通過compare_exchange_weak方法嘗試將新節點鏈入隊列的尾部(old_tail->next)。如果鏈入成功,再嘗試使用compare_exchange_strong方法更新tail指針,將其指向新節點。如果更新tail指針失敗,也不會影響隊列的正確性,因為后續的push操作會幫助推進tail指針。
pop操作則從隊列頭部開始,首先獲取當前的head指針,然后通過compare_exchange_weak方法嘗試將head指針推進到下一個節點 。如果推進成功,說明出隊操作成功,將下一個節點的數據取出并返回;如果推進失敗,說明head指針已經被其他線程修改,再次進入循環重試。
四、原子操作與非原子操作對比
為了更直觀地感受原子操作的強大之處,我們通過具體案例和匯編指令來深入分析原子變量和普通變量在多線程環境下的表現,看看原子操作是如何在保證原子性和解決內存序問題上展現出獨特優勢的。
首先,我們來看一個簡單的計數器示例,對比普通變量和原子變量在多線程環境下的行為:
#include <iostream>
#include <thread>
#include <atomic>
// 使用普通變量
int normalCounter = 0;
void normalIncrement() {
for (int i = 0; i < 10000; ++i) {
normalCounter++;
}
}
// 使用原子變量
std::atomic<int> atomicCounter(0);
void atomicIncrement() {
for (int i = 0; i < 10000; ++i) {
atomicCounter++;
}
}
int main() {
std::thread t1Normal(normalIncrement);
std::thread t2Normal(normalIncrement);
t1Normal.join();
t2Normal.join();
std::cout << "Normal counter final value: " << normalCounter << std::endl;
std::thread t1Atomic(atomicIncrement);
std::thread t2Atomic(atomicIncrement);
t1Atomic.join();
t2Atomic.join();
std::cout << "Atomic counter final value: " << atomicCounter << std::endl;
return 0;
}在這個示例中,normalIncrement函數使用普通變量normalCounter進行遞增操作,atomicIncrement函數則使用原子變量atomicCounter。當我們運行這段代碼時,會發現normalCounter的最終值往往小于 20000,這是因為normalCounter++這個操作不是原子的,它包含讀取、增加和寫入三個步驟,在多線程環境下,這些步驟可能會被其他線程打斷,從而導致數據不一致。而atomicCounter由于是原子變量,其遞增操作是原子的,不會被其他線程干擾,所以最終atomicCounter的值一定是 20000。
接下來,我們通過查看匯編指令來深入理解原子變量和普通變量的底層差異。以atomicCounter.fetch_add(1)和normalCounter++為例,在 x86 架構下,使用 GCC 編譯器編譯后,它們的匯編指令大致如下:
; atomicCounter.fetch_add(1)的匯編指令示例
mov eax, 1
lock xadd dword ptr [atomicCounter], eax
; normalCounter++的匯編指令示例
mov eax, dword ptr [normalCounter]
add eax, 1
mov dword ptr [normalCounter], eax從匯編指令可以看出,atomicCounter.fetch_add(1)使用了lock xadd指令,lock前綴會對總線加鎖,在總線加鎖期間,其它 CPU 核心無法通過總線訪問內存,直到該指令結束,從而保證了操作的原子性。而normalCounter++則是普通的讀取、加法和寫入指令,沒有任何原子性保證。如果在mov eax, dword ptr [normalCounter]和add eax, 1之間發生線程切換,另一個線程也執行相同的操作,就會導致數據競爭和不一致。
再看一個關于內存序的例子,假設我們有兩個線程,一個線程寫入數據和標志位,另一個線程讀取數據和標志位:
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> data(0);
std::atomic<bool> flag(false);
// 寫入線程
void writer() {
data.store(42, std::memory_order_relaxed);
flag.store(true, std::memory_order_relaxed);
}
// 讀取線程
void reader() {
while (!flag.load(std::memory_order_relaxed));
std::cout << "Read data: " << data.load(std::memory_order_relaxed) << std::endl;
}
int main() {
std::thread t1(writer);
std::thread t2(reader);
t1.join();
t2.join();
return 0;
}在這個例子中,由于使用了memory_order_relaxed內存模型,編譯器和處理器可以自由地對指令進行重排序。這就可能導致writer線程中data.store(42)和flag.store(true)的執行順序被重排,reader線程在讀取數據時,可能會先讀到flag為true,但此時data還未被賦值為 42,從而讀取到錯誤的數據。
如果我們將內存模型改為std::memory_order_release和std::memory_order_acquire:
// 寫入線程
void writer() {
data.store(42, std::memory_order_release);
flag.store(true, std::memory_order_release);
}
// 讀取線程
void reader() {
while (!flag.load(std::memory_order_acquire));
std::cout << "Read data: " << data.load(std::memory_order_acquire) << std::endl;
}這樣,writer線程在存儲flag為true時,會確保之前對data的寫入操作對其他線程可見;reader線程在讀取flag為true后,會確保能讀取到最新的data值,從而保證了內存序的正確性。
通過以上案例和匯編指令分析,我們可以清楚地看到,原子操作在多線程環境下能夠有效地保證原子性和解決內存序問題,避免數據競爭和不一致的發生,為多線程編程提供了更可靠的保障 。
五、實戰演練:打造無鎖數據結構
5.1 選擇合適的數據結構
在實現無鎖數據結構時,選擇合適的底層數據結構是關鍵的第一步,它就像是為高樓大廈打下堅實的地基 。以無鎖隊列的實現為例,我們常常選擇單鏈表作為底層數據結構,這背后有著諸多考量。
單鏈表具有靈活的內存分配特性,它不需要像數組那樣在初始化時就分配連續的大塊內存 。在無鎖隊列中,隨著元素的不斷入隊和出隊,內存的動態分配和釋放是頻繁發生的。單鏈表可以根據實際需求,每次在插入新元素時動態地分配一個節點的內存空間,在刪除元素時釋放對應的節點內存,這種按需分配的方式大大提高了內存的使用效率,避免了數組可能出現的內存浪費或不足的問題。
從鏈表的結構來看,單鏈表的節點通過指針依次連接,這種鏈式結構使得在進行插入和刪除操作時非常高效 。對于無鎖隊列的入隊操作,只需要在鏈表的尾部添加一個新節點;而出隊操作則只需刪除鏈表頭部的節點。這兩個操作的時間復雜度均為 O (1),相比于其他一些數據結構,如數組在進行插入和刪除操作時可能需要移動大量元素,單鏈表在性能上具有明顯的優勢。
在多線程環境下,單鏈表的結構更便于利用原子操作來實現無鎖算法 。我們可以將節點中的指針定義為原子指針,這樣在多個線程同時對鏈表進行操作時,通過原子操作可以保證指針的修改是原子性的,不會出現數據競爭和不一致的情況。例如,在入隊操作中,通過原子比較 / 交換(CAS)操作來更新尾指針,確保新節點能夠正確地插入到鏈表尾部;在出隊操作中,使用 CAS 操作更新頭指針,保證從鏈表頭部刪除節點的操作是線程安全的。
5.2 實現無鎖隊列的關鍵操作
無鎖隊列的實現中,入隊(enqueue)和出隊(dequeue)操作是其核心部分,它們的實現巧妙地運用了原子操作一些特殊的算法技巧 。
(1)入隊操作的實現步驟如下:
①準備新節點:首先創建一個新節點,用于存儲要入隊的數據,并將其next指針初始化為nullptr 。例如:
node* new_node = new node();
new_node->data = std::make_shared<T>(new_value);
new_node->next.store(nullptr);②循環 CAS 鏈接新節點:通過一個循環,不斷嘗試使用 CAS 操作將新節點鏈接到當前尾節點的next指針上 。在循環中,首先獲取當前的尾節點old_tail,然后使用compare_exchange_weak方法嘗試將old_tail->next從nullptr更新為new_node。如果 CAS 操作成功,說明新節點已成功鏈入鏈表尾部,跳出循環;如果失敗,說明old_tail->next已經被其他線程修改,此時需要更新old_tail為新的尾節點,再次嘗試。代碼如下:
node* old_tail;
do {
old_tail = tail.load();
} while (!old_tail->next.compare_exchange_weak(nullptr, new_node));③更新尾指針:在新節點成功鏈入后,嘗試使用 CAS 操作更新尾指針,將其指向新節點 。這里使用compare_exchange_strong方法,同樣是為了確保操作的原子性。即使更新尾指針失敗,也不會影響隊列的正確性,因為后續的入隊操作會幫助推進尾指針。代碼如下:
tail.compare_exchange_strong(old_tail, new_node);(2)出隊操作的實現步驟如下:
④獲取當前頭節點:首先獲取當前的頭節點old_head 。由于頭節點是一個啞元節點(dummy node),實際的數據存儲在old_head->next指向的節點中。代碼如下:
node* old_head = head.load();⑤檢查隊列是否為空:如果old_head->next為nullptr,說明隊列為空,直接返回false或nullptr 。代碼如下:
if (!old_head->next.load()) return false;⑥循環 CAS 推進頭指針:通過一個循環,不斷嘗試使用 CAS 操作將頭指針推進到下一個節點 。在循環中,首先獲取old_head->next作為old_next,然后使用compare_exchange_weak方法嘗試將head從old_head更新為old_next。如果 CAS 操作成功,說明出隊操作成功,取出old_next中的數據并返回,同時刪除old_head;如果失敗,說明head已經被其他線程修改,更新old_head為新的頭節點,再次嘗試。代碼如下:
for (;;) {
node* old_next = old_head->next.load();
if (!old_next) return false;
if (head.compare_exchange_weak(old_head, old_next)) {
result = std::move(*old_next->data);
delete old_head;
return true;
}
}在無鎖隊列的實現中,還引入了 “幫助” 機制 。當一個線程在進行入隊或出隊操作時,如果發現其他線程的操作導致數據結構處于不一致的狀態(例如,入隊時發現尾指針沒有及時更新),它會幫助其他線程完成操作,從而保證整個隊列的正常運行。這種 “幫助” 機制是無鎖數據結構能夠在多線程環境下正確工作的重要保障之一。
5.3 性能測試與分析
為了評估無鎖隊列的性能優勢,我們使用 Google Benchmark 這個強大的性能測試工具進行性能測試 。Google Benchmark 提供了簡潔易用的接口,可以方便地對各種函數和數據結構進行性能測試,并且能夠生成詳細的測試報告,幫助我們準確地了解不同實現的性能表現。
在測試中,我們將無鎖隊列與傳統的鎖保護隊列(使用std::mutex保護的std::queue)進行對比 。測試場景設置為多線程環境,分別測試在不同線程數量(如 2 個線程、4 個線程、8 個線程等)下,兩種隊列的入隊和出隊操作的性能表現。測試代碼如下:
#include <benchmark/benchmark.h>
#include "lock_free_queue.h" // 假設無鎖隊列的頭文件
#include <queue>
#include <mutex>
// 定義鎖保護隊列
class mutex_protected_queue {
private:
std::queue<int> q;
std::mutex mtx;
public:
void push(int value) {
std::lock_guard<std::mutex> guard(mtx);
q.push(value);
}
bool pop(int& result) {
std::lock_guard<std::mutex> guard(mtx);
if (q.empty()) return false;
result = q.front();
q.pop();
return true;
}
};
// 無鎖隊列的性能測試函數
static void BM_LockFreeQueue(benchmark::State& state) {
lock_free_queue<int> q;
for (auto _ : state) {
q.push(42);
int result;
q.pop(result);
}
}
BENCHMARK(BM_LockFreeQueue)->Threads(2)->Threads(4)->Threads(8);
// 鎖保護隊列的性能測試函數
static void BM_MutexProtectedQueue(benchmark::State& state) {
mutex_protected_queue q;
for (auto _ : state) {
q.push(42);
int result;
q.pop(result);
}
}
BENCHMARK(BM_MutexProtectedQueue)->Threads(2)->Threads(4)->Threads(8);
BENCHMARK_MAIN();測試結果表明,在低并發場景下,由于無鎖隊列存在一定的 “忙等” 開銷,即線程在 CAS 操作失敗時會不斷重試,而鎖保護隊列在獲取鎖成功后可以直接進行操作,所以兩者的性能差距并不明顯,有時鎖保護隊列甚至表現得更快一些 。但隨著線程數量的增加,進入高并發場景后,鎖保護隊列的性能急劇下降 。
這是因為大量線程同時競爭同一把鎖,導致許多線程處于阻塞狀態,等待鎖的釋放,從而增加了線程上下文切換的開銷,使得 CPU 資源大量浪費在等待上。而無鎖隊列由于沒有鎖的限制,線程可以持續執行,通過原子操作和 “幫助” 機制來協調對共享資源的訪問,避免了線程阻塞,其性能優勢得以充分體現,吞吐量明顯高于鎖保護隊列,能夠更高效地處理多線程并發訪問的情況。























