尷尬,在Kafka生產實踐中又出問題了
1、背景
最近在折騰Kafka日志集群,由于公司部署的應用不斷增加,日志采集程序將采集到的日志發送到Kafka集群時出現了較大延遲,總的TPS始終上不去,為了不影響業務團隊通過日志排查問題,采取了先解決問題,再排查的做法,對Kafka集群進行擴容,但擴容后尷尬的是新增加的5臺機器中,有兩臺機器的消費發送響應時間比其他機器明顯高出不少,為了確保消息服務的穩定性,又臨時對集群進行縮容,將這臺機器從集群中剔除,具體的操作就是簡單粗暴的使用 kill pid命令,但意外發生了。
發現Java客戶端報如下錯誤:

而Go客戶端報的錯誤如下所示:

基本可以認為是部分分區沒有在線Leader,無法成功發送消息。
2、問題分析
那為什么會出現這個問題嗎?Kafka一個節點下線,不是會自動觸發故障轉移,分區leader不是會被重新選舉嗎?請帶著這個疑問,開始我們今天的探究之旅。
首先我們可以先看看當前存在問題的分區的路由信息,從第一張圖中看出主題dw_test_kafka_0816000的101分區消息發送失敗,我們在Zookeeper中看一下其狀態,具體命令如下:
./zkCli.sh -server 127.0.0.1:2181
get -s /kafka_cluster_01/brokers/topics/dw_test_kafka_0816000/partitions/101/state
該命令可以看到對應分區的相信信息,如下圖所示:

這里顯示出leader的狀態為-1,而isr列表中只有一副本,在broker-1上,但此時broker id為1的機器已經下線了,那為什么不會觸發分區Leader重新選舉呢?
其實看到這里,我相信你只要稍微細想一下,就能發現端倪,isr字段的值為1,說明該分區的副本數為1,說明該分區只在一個Broker上存儲數據,一旦Broker下線,由于集群內其他Broker上并沒有該分區的數據,此時是無法進行故障轉移的,因為一旦要進行故障轉移,分區的數據就會丟失,這樣帶來的影響將是非常嚴重的。
那為什么該主題的副本數會設置為1呢?那是因為當時集群的壓力太大,節點之間復制數據量巨大,網卡基本滿負荷在運轉,而又是日志集群,對數據的丟失的接受程度較大,故當時為了避免數據在集群之間的大量復制,將該主題的副本數設置為了1。
但集群節點的停機維護是少不了的,總不能每一次停機維護,都會出現一段時間數據寫入失敗吧。要解決這個問題,我們在停機之前,需要先對主題進行分區移動,將該主題的分區從需要停機的集群中移除。
主題分區移動的具體做法,請參考我之前的一篇文章Kafka主題遷移實踐 的第三部分。
3、Kafka節點下線分區的故障轉移機制
Kafka單副本的主題在集群內一臺節點下線后,將無法完成分區的故障轉移機制,為了深入掌握底層的一些實現細節,我想再深入探究一下kafka節點下線的一些故障轉移機制。
溫馨提示:接下來主要是從源碼角度深入探究實現原理,加深對這個過程的理解,如果大家不感興趣,可以直接進入到本文的第4個部分:總結。
在Kafka中依賴的Zookeeper服務器上存儲了當前集群內存活的broker信息,具體的路徑為/{namespace}/brokers/brokers/ids,具體圖示如下:

并且ids下的每一個節點記錄了Broker的一些信息,例如對外提供服務的協議、端口等,值得注意的是這些節點為臨時節點,如下圖所示:

這樣一旦對應的Broker宕機下線,對應的節點會刪除,Kafka集群內的Controller角色在啟動時會監聽該節點下節點的變化,并作出響應,最終將會調用KafkaController的onBrokerFailure方法,具體代碼如下所示:

這個方法實現比較復雜,我們在這里不做過多分散,重點查找分區的故障轉移機制,也就是接下來我們將具體分析KafkaController的onReplicasBecomeOffline方法,主要探究分區的故障轉移機制。
3.1 onReplicasBecomeOffline故障轉移
由于該方法實現復雜,接下來將分布對其進行詳解。
Step1:從需要設置為下線狀態分區進行分組,分組依據為是否需要刪除,沒有觸發刪除的集合用newofflineReplicasNotForDeletion表示,需要被刪除的集合用newofflineReplicasForDeletion表示。

Step2:挑選沒有Leader的分區,用partitionsWithoutLeader,代碼如下圖所示:

分區沒有Leader的標準是:分區的Leader副本所在的Broker沒有下線,并且沒有被刪除。
Step3:將沒有Leader的分區狀態變更為OfflinePartition(離線狀態),這里的狀態更新是放在kafka Controller中的內存中,具體的內存結構:Map[TopicPartition, PartitionState]。
Step4:Kafka分區狀態機驅動(觸發)分區狀態為OfflinePartition、NewPartition向OnlinePartition轉化,狀態的轉化主要包括兩個重要的步驟:
調用PartitionStateMachine的doHandleStateChanges的方法,驅動分區狀態機的轉換。
然后調用ControllerBrokerRequestBatch的sendRequestsToBrokers方法,實現元信息在其他Broker上的同步。
由于篇幅的問題,我們這篇文章不會體系化的介紹Kafka分區狀態機的實現細節,先重點關注OfflinePartition離線狀態向OnlinePartition轉化過程。

我們首先說明一下OfflinePartition離線狀態向OnlinePartition轉化過程時各個參數的含義:
Seq[TopicPartition] partitions 當前處于OfflinePartition、NewPartition狀態、并且沒有刪除的分區。
PartitionState targetState 狀態驅動的目標狀態:OnlinePartition。
PartitionLeaderElectionStrategy 分區Leader選舉策略,這里傳入的是OfflinePartitionLeaderElectionStrategy,分區離線狀態的Leader選舉策略
這里判斷一下分區是否有效的依據主要是要根據狀態機設置的驅動條件,例如只有分區狀態為OnlinePartition、NewPartition、OfflinePartition三個狀態才能轉換為OnlinePartition。
接下來重點看變更為OnlinePartition的具體實現邏輯,具體代碼如下所示:

具體實現分為3個步驟:
首先先分別帥選出當前狀態為NewPartition的集合與(OfflinePartition或者OnlinePartition)分區。
狀態為NewPartition的分區,執行分區的初始化,通常為分區擴容或主題新創建
狀態為OfflinePartition或者OnlinePartition的執行分區重新選舉,因為這些集合中的分區是當前沒有Leader的分區,這些分區暫時無法接受讀寫請求。
接下來我們重點看一下離線狀態變更為OnlinePartition的分區leader選舉實現,具體方法為:PartitionStateMachine的electLeaderForPartitions方法,其代碼如下所示:

這個方法的實現結構比較簡單,返回值為兩個集合,一個選舉成功的集合,一個選舉失敗的集合,同時選舉過程中如果出現可恢復異常,則會進行重試。
具體的重試邏輯由doElectLeaderForPartitions方法實現,該方法非常復雜。
3.2 分區選舉機制
分區選舉由PartitionStateMachine的doElectLeaderForPartitions方法實現,接下來分步進行講解。
Step1:首先從Zookeeper中獲取需要選舉分區的元信息,代碼如下所示:

Kafka中主題的路由信息存儲在Zookeeper中,具體路徑為:/{namespace}/brokers/topics/{topicName}}/partitions/{partition}/state,具體存儲的內容如下所示:

Step2:將查詢出來的主題分區元信息,組裝成Map< TopicPartition, LeaderIsrAndControllerEpoch>的Map結構,代碼如下所示:

Step3:將分區中的controllerEpoch與當前Kafka Controller的epoch對比,刷選出無效與有效集合,具體代碼如下所示:

如果當前控制器的controllerEpoch小于分區狀態中的controllerEpoch,說明已有新的Broker已取代當前Controller成為集群新的Controller,本次無法進行Leader選取,并且打印日志。
Step4:根據Leader選舉策略進行Leader選舉,代碼如下所示:

由于我們這次是由OfflinePartition狀態向OnlinePartition狀態轉換,進入的分支為leaderForOffline,稍后我們再詳細介紹該方法,經過選舉后的返回值為兩個集合,其中partitionsWithoutLeaders表示未成功選舉出Leader的分區,而partitionsWithLeaders表示成功選舉出Leader的分區。
Step5:沒有成功選舉出Leader的分區打印對應日志,并加入到失敗隊列集合中,如下圖所示:

Step5:將選舉結果更新到zookeeper中,如下圖所示:

Step6:將最新的分區選舉結果同步到其他Broker節點上。

更新分區狀態的請求LEADER_AND_ISR被其他Broker接受后,會根據分區的leader與副本信息,成為該分區的Leader節點或從節點,關于這塊的實現細節在專欄的后續文章中會專門提及。
那OfflinePartitionLeaderElectionStrategy選舉策略具體是如何進行選舉的呢?接下來我們探究其實現細節。
3.3 OfflinePartitionLeaderElectionStrategy選舉策略
OfflinePartitionLeaderElectionStrategy的選舉策略實現代碼見PartitionStateMachine的leaderForOffline,我們還是采取分步探討的方式。
Step1:主要初始化幾個集合,代碼如下

對上面的變量做一個簡單介紹:
partitionsWithNoLiveInSyncReplicas 分區的副本所在的Broker全部不存活
partitionsWithLiveInSyncReplicas 分區副本集合所在的broker部分或全部存活
partitionsWithUncleanLeaderElectionState 主題是否開啟了副本不在isr集合中也可以參與Leader競選,可在主題級別設置unclean.leader.election.enable,默認為false。
Step2:執行分區Leader選舉,具體實現代碼如下所示:

首先解釋如下幾個變量的含義:
assignment 分區設置的副本集(所在brokerId)。
liveReplicas 當前在線的副本集。
具體的選舉算法如下所示:

離線轉在線的選舉算法比較簡單:如果unclean.leader.election.enable=false,則從存活的ISR集合中選擇第一個成為分區的Leader,如果沒有存活的ISR副本,并且unclean.leader.election.enable=true,則選擇一個在線的副本,否則返回NONE,表示沒有成功選擇一個合適的Leader。
然后返回本次選舉的結果,完成本次選舉。
4、總結
本文從一個生產實際故障開始進行分析,經過分析得出單副本主題在集群中單臺節點下線會引起部分隊列無法寫入,解決辦法是要先執行主題分區移動,也就是將需要停止的broker上所在的分區移動到其他broker上,這個過程并不會對消息發送,消息消費造成影響。





























