Kafka生產環境實戰經驗深度總結,讓你少走彎路
1.背景
在實際項目中接入 Kafka 已經成為高并發系統的標配。然而,從簡單的“能跑”到“穩定高效地跑”,中間有太多坑值得記錄和總結。本文結合本人在多個生產項目中使用 Kafka 的經驗,圍繞以下幾個方面展開:消息丟失防范、重復消費控制、性能瓶頸優化、集群運維策略,以及 Topic、分區、副本機制的設計要點。先來看看kafka的基礎架構圖,有個整體認識:
圖片
接下我們我們就從生產者、服務端broker、服務端去講述下實戰經驗心得。
2.生產者如何提高吞吐量?
下面來看看生產者發送一條消息到kafka服務端的流程:
圖片
在消息發送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創建了一個雙端隊列RecordAccumulator。main 線程將消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka服務端 Broker
可以適當調整以下四個生產者的參數來提高吞吐量:
參數 | 說明 |
batch.size | 提交一批數據到緩沖區的最大值,默認 16k。適當增加該值,可以提高吞吐量,但是如果該值設置太大,會導致數據傳輸延遲增加。 |
linger.ms | 如果數據遲遲未達到 batch.size,sender 等待 linger.time之后就會發送數據。單位 ms,默認值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms 之間。 |
buffer.memory | RecordAccumulator 緩沖區總大小,默認 32m。可以適當增加該值提高緩沖區的存儲能力 |
compression.type | 生產者發送的所有數據的壓縮方式。默認是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 |
這些參數的應用思想都很好理解,就好比我們現實生活中集散中心大巴車拉人,一次拉一個,有人就走。這種方式效率低下,浪費資源。所以一般都是車到了多等一下,等人數差不多才走,這就是參數batch.size和linger.ms的提醒,buffer.memory其實也好理解,就好比車送到目的地只能容納100個人,你使勁送過去,收不下,只能目的把這個100個人安頓好,才能接著送,所以適當調大,可以增加吞吐量,至于壓縮compression.type就是讓一次可以拉更多的人,就好比讓小孩子和大人用一個座位。
3.如何保證消息不丟失?
消息丟失可能發生在生產者發送消息、broker保存消息、消費者消費消息等環節。
3.1 生產者丟失消息
生產者丟失消息是比較常見的場景,生產者發送消息到kafka,因為網絡抖動最后發現kakfa沒保存,這鍋該誰背?答案是生產者,因為 Kafka Producer 是異步發送消息的,也就是說如果你調用的是 producer.send(msg) 這個 API,那么它通常會立即返回,但此時你不能認為消息發送已成功完成。解決辦法也很簡單:**Producer 永遠要使用帶有回調通知的發送 API,也就是說不要使用 producer.send(msg),而要使用 producer.send(msg, callback)**,通過回調callback才能真正知道消息是否成功發送
設置重試 retries 。這里的 retries 同樣是 Producer 的參數,對應前面提到的 Producer 自動重試。當出現網絡的瞬時抖動時,消息發送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發送,避免消息丟失。
3.2 broker丟失消息
設置 acks = all。acks 是 Producer 的一個參數,代表了你對“已提交”消息的定義。如果設置成 all,表示生產者發送過來的數據,Leader和ISR隊列里面的所有節點收到數據后才應答。
參數 | 說明 |
acks | 0:生產者發送過來的數據,不需要等數據落盤應答。 |
設置 unclean.leader.election.enable = false。這是 Broker 端的參數,它控制的是哪些 Broker 有資格競選分區的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數設置成 false,即不允許這種情況的發生。
設置 replication.factor >= 3。這也是 Broker 端的參數。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
設置 min.insync.replicas > 1。這依然是 Broker 端參數,控制的是消息至少要被寫入到多少個副本才算是“已提交”。設置成大于 1 可以提升消息持久性。在實際環境中千萬不要使用默認值 1。
確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區就無法正常工作了。我們不僅要改善消息的持久性,防止數據丟失,還要在不降低可用性的基礎上完成。推薦設置成 replication.factor = min.insync.replicas + 1
3.3 消費者丟失消息
Consumer 程序有個“位移”的概念,表示的是這個 Consumer 當前消費到的 Topic 分區的位置。如果我們一次消費offset為0-9的10條消息,拉取到消息之后就自動提交了位移,但是消費到位移5的時候報錯了,那么位移5-9的消息就被丟失了。
解決辦法也很簡單就是確保消息消費完成再提交。Consumer 端有個參數 enable.auto.commit,最好把它設置成 false關閉自動提交,并采用手動提交位移的方式。如果啟用了自動提交,Consumer 端還有個參數就派上用場了:auto.commit.interval.ms。它的默認值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。
手動提交位移是保證消費者消息消息過程中不丟失消息的核心所在,手動提交分為同步和異步,同步提交會使消費者處于阻塞狀態,直到遠端的 Broker 返回提交結果。而異步提交它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。但是異步條件有一個缺點就是發生了異常我們無法立刻感知到并相應邏輯處理,所以代碼里面的提交位移邏輯一般是:同步+異步
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
kafkaConsumer.commitAsync(); // 使用異步提交規避阻塞
}
} catch (Exception e) {
handle(e); // 處理異常
} finally {
try {
kafkaConsumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
kafkaConsumer.close();
}
}這段代碼同時使用了 commitSync() 和 commitAsync()。對于常規性、階段性的手動提交,我們調用 commitAsync() 避免程序阻塞,而在 Consumer 要關閉前,我們調用 commitSync() 方法執行同步阻塞式的位移提交,以確保 Consumer 關閉前能夠保存正確的位移數據。將兩者結合后,我們既實現了異步無阻塞式的位移管理,也確保了 Consumer 位移的正確性,所以,如果你需要自行編寫代碼開發一套 Kafka Consumer 應用,那么我推薦你使用上面的代碼范例來實現手動的位移提交。
關于提交位移有一個可能發生的異常:CommitFailedException,顧名思義就是 Consumer 客戶端在提交位移時出現了錯誤或異常,而且還是那種不可恢復的嚴重異常。這是因為在拉取到消息消費完之后提交位移這期間,消費者組發生了重平衡。關于什么是重平衡可以看后續總結講述。
4.如何保證消息不會重復消費?
在生產者端可能由于開啟了重試機制導致同一條消息被發送了兩次,這時候可以讓生產者開啟冪等性配置參數:enable.idempotence 默認為true, 即開啟的。
消費者端就是要保證實際消費消息的位移和提交的位移一致,使用手工同步位移。當然我們也可以在消費消息的代碼邏輯保證消費的冪等性:使用唯一索引或者分布式鎖都行
5.如何解決消息積壓問題
消息積壓會導致很多問題,?如磁盤被打滿、?產端發消息導致kafka性能過慢,最后導致出現服務雪崩不可用,解決方案如下:
- 如果是Kafka消費能力不足,則可以考慮增加Topic的分區數,并且同時提升消費組的消費者數量,消費者數 = 分區數。因為主題的一個分區只能被消費者組中一個消費者消費,假如我們消費者組里有3個消費者,但是主題就一個分區,這就白白空著兩個消費者無所事事。如果已經是多個消費者對應多個分區了,還是消費比較慢,就說明是消息消息的代碼邏輯過重處理過慢,可以引入多線程異步操作,但這時候需要自己控制代碼邏輯來保證消費的順序性,因為一個分區內的消息是有序的,被一個消費者順序消費,但是當消費者開啟多線程處理之后就不能保證順序消費了。
- 如果是下游的數據處理不及時:提高每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間 < 生產速度),使處理的數據小于生產的數據,也會造成數據積壓。比如說可以從一次最多拉取500條,調整為一次最多拉取1000條。簡單來說就是在消費能力跟得上的同時,盡量保證消費速度>生產速度,這樣就不會堆積了。
6.如何保證消息的有序性。
生產者:在發送時將ack不能設置0,關閉重試,使?同步發送,等到發送成功再發送下?條。確保消息是順序發送的。
消費者:消息是發送到?個分區中,只能有?個消費組的消費者來接收消息。
因此,kafka的順序消費會犧牲掉性能。
7.什么是重平衡?
Rebalance 就是讓一個 Consumer Group 下所有的 Consumer 實例就如何消費訂閱主題的所有分區達成共識的過程。在 Rebalance 過程中,所有 Consumer 實例共同參與,在協調者組件的幫助下,完成訂閱主題分區的分配。但是,在整個過程中,所有實例都不能消費任何消息,因此它對 Consumer 的 TPS 影響很大。重平衡觸發的場景:
- 消費者組訂閱的主題的分區數增加了,注意主題分區數只能增加,不能減少
- 消費者組訂閱的主題數有變化,可能變多了也可能變少了。
- 消費者組成員有變化,可能變多了也可能變少了。
前兩個訂閱的分區數增加還是主題數變化,都是一個主動發起Rebalance,我們是能提前感知到的。Consumer 實例增加的情況很好理解,當我們啟動一個配置有相同 group.id 值的 Consumer 程序時,實際上就向這個 Group 添加了一個新的 Consumer 實例。此時,Coordinator 會接納這個新實例,將其加入到組中,并重新分配分區。通常來說,增加 Consumer 實例的操作都是計劃內的,可能是出于增加 TPS 或提高伸縮性的需要。但是對于Consumer 實例減少,大部分不是人為操作下線的,更多情況是Consumer 實例會被 Coordinator 錯誤地認為“已停止”從而被“踢出”Group。如果是這個原因導致的 Rebalance,這種情況就得引起我們重視了。
Coordinator 會在什么情況下認為某個 Consumer 實例已掛從而要退組呢?
當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數,叫 session.timeout.ms,就是被用來表征此事的。該參數的默認值是 145秒,即如果 Coordinator 在 45 秒之內沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經掛了。可以這么說,session.timout.ms 決定了 Consumer 存活性的時間間隔。
Consumer 端還有一個參數,用于控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
8.kafka作為消息隊列為什么發送和消費消息這么快?
- 消息分區:不受單臺服務器的限制,可以不受限的處理更多的數據
- 順序讀寫:磁盤順序讀寫,提升讀寫效率
- 頁緩存:把磁盤中的數據緩存到內存中,把對磁盤的訪問變為對內存的訪問
- 零拷貝:減少上下文切換及數據拷貝
- 消息壓縮:減少磁盤IO和網絡IO
- 分批發送:將消息打包批量發送,減少網絡開銷

























