聊聊 Kafka 那點(diǎn)破事!
本文轉(zhuǎn)載自微信公眾號(hào)「微觀技術(shù)」,作者微觀技術(shù)。轉(zhuǎn)載本文請(qǐng)聯(lián)系微觀技術(shù)公眾號(hào)。
大家好,我是Tom哥~
Kafka作為一款開(kāi)源的消息引擎,很多人并不陌生,但深入其源碼的同學(xué)估計(jì)不多,除非你是中間件團(tuán)隊(duì)消息系統(tǒng)維護(hù)者。但術(shù)業(yè)有專攻,市面上那么多開(kāi)源框架且每個(gè)框架又經(jīng)常迭代升級(jí),花精力深入了解每一個(gè)框架源碼不太現(xiàn)實(shí),本文會(huì)以業(yè)務(wù)視角羅列工作中大家需要熟知的一些知識(shí)
本篇文章的目錄:
首先,為什么使用kafka?
- 削峰填谷。緩沖上下游瞬時(shí)突發(fā)流量,保護(hù)“脆弱”的下游系統(tǒng)不被壓垮,避免引發(fā)全鏈路服務(wù)“雪崩”。
- 系統(tǒng)解耦。發(fā)送方和接收方的松耦合,一定程度簡(jiǎn)化了開(kāi)發(fā)成本,減少了系統(tǒng)間不必要的直接依賴。
Kafka 名詞術(shù)語(yǔ),一網(wǎng)打盡
- Broker:接收客戶端發(fā)送過(guò)來(lái)的消息,對(duì)消息進(jìn)行持久化
- 主題:Topic。主題是承載消息的邏輯容器,在實(shí)際使用中多用來(lái)區(qū)分具體的業(yè)務(wù)。
- 分區(qū):Partition。一個(gè)有序不變的消息序列。每個(gè)主題下可以有多個(gè)分區(qū)。
- 消息:這里的消息就是指 Kafka 處理的主要對(duì)象。
- 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個(gè)單調(diào)遞增且不變的值。
- 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個(gè)地方以提供數(shù)據(jù)冗余,這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。每個(gè)分區(qū)可配置多個(gè)副本實(shí)現(xiàn)高可用。一個(gè)分區(qū)的N個(gè)副本一定在N個(gè)不同的Broker上。
- 生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。
- 消費(fèi)者:Consumer。從主題訂閱新消息的應(yīng)用程序。
- 消費(fèi)者位移:Consumer Offset。表示消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。offset保存在broker端的內(nèi)部topic中,不是在clients中保存
- 消費(fèi)者組:Consumer Group。多個(gè)消費(fèi)者實(shí)例共同組成的一個(gè)組,同時(shí)消費(fèi)多個(gè)分區(qū)以實(shí)現(xiàn)高吞吐。
- 重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)
ZooKeeper 在里面的職責(zé)是什么?
它是一個(gè)分布式協(xié)調(diào)框架,負(fù)責(zé)協(xié)調(diào)管理并保存 Kafka 集群的所有元數(shù)據(jù)信息,比如集群都有哪些 Broker 在運(yùn)行、創(chuàng)建了哪些 Topic,每個(gè) Topic 都有多少分區(qū)以及這些分區(qū)的 Leader 副本都在哪些機(jī)器上等信息。
消息傳輸?shù)母袷?/h3>
純二進(jìn)制的字節(jié)序列。當(dāng)然消息還是結(jié)構(gòu)化的,只是在使用之前都要將其轉(zhuǎn)換成二進(jìn)制的字節(jié)序列。
消息傳輸協(xié)議
- 點(diǎn)對(duì)點(diǎn)模型。系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息
- 發(fā)布/訂閱模型。該模型也有發(fā)送方和接收方,只不過(guò)提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點(diǎn)對(duì)點(diǎn)模型不同的是,這個(gè)模型可能存在多個(gè)發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個(gè),它們都能接收到相同主題的消息。
消息壓縮
生產(chǎn)者程序中配置compression.type 參數(shù)即表示啟用指定類型的壓縮算法。
props.put(“compression.type”, “gzip”),它表明該 Producer 的壓縮算法使用的是GZIP。這樣 Producer 啟動(dòng)后生產(chǎn)的每個(gè)消息集合都是經(jīng) GZIP 壓縮過(guò)的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及 Kafka Broker 端的磁盤占用。
但如果Broker又指定了不同的壓縮算法,如:Snappy,會(huì)將生產(chǎn)端的消息解壓然后按自己的算法重新壓縮。
各壓縮算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
kafka默認(rèn)不指定壓縮算法。
消息解壓縮
當(dāng) Consumer pull消息時(shí),Broker 會(huì)原樣發(fā)送出去,當(dāng)消息到達(dá) Consumer 端后,由 Consumer 自行解壓縮還原成之前的消息。
分區(qū)策略
編寫(xiě)一個(gè)類實(shí)現(xiàn)org.apache.kafka.clients.Partitioner接口。實(shí)現(xiàn)內(nèi)部?jī)蓚€(gè)方法:partition()和close()。然后顯式地配置生產(chǎn)者端的參數(shù)partitioner.class
常見(jiàn)的策略:
- 輪詢策略(默認(rèn))。保證消息最大限度地被平均分配到所有分區(qū)上。
- 隨機(jī)策略。隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
- 按key分區(qū)策略。key可能是uid或者訂單id,將同一標(biāo)志位的所有消息都發(fā)送到同一分區(qū),這樣可以保證一個(gè)分區(qū)內(nèi)的消息有序
- 其他分區(qū)策略。如:基于地理位置的分區(qū)策略
生產(chǎn)者管理TCP連接
在new KafkaProducer 實(shí)例時(shí),生產(chǎn)者應(yīng)用會(huì)在后臺(tái)創(chuàng)建并啟動(dòng)一個(gè)名為 Sender 的線程,該 Sender 線程開(kāi)始運(yùn)行時(shí)首先會(huì)創(chuàng)建與 Broker 的連接。此時(shí)還不知道給哪個(gè)topic發(fā)消息,所以Producer 啟動(dòng)時(shí)會(huì)發(fā)起與所有的 Broker 的連接。
Producer 通過(guò)metadata.max.age.ms 參數(shù)定期地去更新元數(shù)據(jù)信息,默認(rèn)值是 300000,即 5 分鐘,不管集群那邊是否有變化,Producer 每 5 分鐘都會(huì)強(qiáng)制刷新一次元數(shù)據(jù)以保證它是最新的數(shù)據(jù)。
Producer 發(fā)送消息:
Producer 使用帶回調(diào)通知的發(fā)送 API, producer.send(msg, callback)。
設(shè)置 acks = all。Producer 的一個(gè)參數(shù),表示所有副本都成功接收到消息,該消息才算是“已提交”,最高等級(jí),acks的其它值說(shuō)明。min.insync.replicas > 1,表示消息至少要被寫(xiě)入到多少個(gè)副本才算是“已提交”
retries 是 Producer 的參數(shù)。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。
冪等性 Producer
設(shè)置參數(shù)props.put(“enable.idempotence”, ture),Producer 自動(dòng)升級(jí)成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動(dòng)幫你做消息的重復(fù)去重。
原理很簡(jiǎn)單,就是經(jīng)典的空間換時(shí)間,即在 Broker 端多保存一些字段。當(dāng) Producer 發(fā)送了具有相同字段值的消息后,Broker 能夠自動(dòng)知曉這些消息已經(jīng)重復(fù)了,可以在后臺(tái)默默地把它們“丟棄”掉。
只能保證單分區(qū)、單會(huì)話上的消息冪等性。一個(gè)冪等性 Producer 能夠保證某個(gè)topic的一個(gè)分區(qū)上不出現(xiàn)重復(fù)消息,但無(wú)法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性。比如采用輪詢,下一次提交換了一個(gè)分區(qū)就無(wú)法解決
事務(wù)型 Producer
能夠保證將消息原子性地寫(xiě)入到多個(gè)分區(qū)中。這批消息要么全部寫(xiě)入成功,要么全部失敗。能夠保證跨分區(qū)、跨會(huì)話間的冪等性。
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- //提交事務(wù)
- producer.commitTransaction();
- } catch (KafkaException e) {
- //事務(wù)終止
- producer.abortTransaction();
- }
實(shí)際上即使寫(xiě)入失敗,Kafka 也會(huì)把它們寫(xiě)入到底層的日志中,也就是說(shuō) Consumer 還是會(huì)看到這些消息。要不要處理在 Consumer 端設(shè)置 isolation.level ,這個(gè)參數(shù)有兩個(gè)值:
- read_uncommitted:這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫(xiě)入的任何消息
- read_committed:表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫(xiě)入的消息
Kafka Broker 是如何存儲(chǔ)數(shù)據(jù)?
Kafka 使用消息日志(Log)來(lái)保存數(shù)據(jù),一個(gè)日志就是磁盤上一個(gè)只能追加寫(xiě)(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉?xiě)入,故避免了緩慢的隨機(jī) I/O 操作,改為性能較好的順序 I/O 寫(xiě)操作,這也是實(shí)現(xiàn) Kafka 高吞吐量特性的一個(gè)重要手段。
不過(guò)如果你不停地向一個(gè)日志寫(xiě)入消息,最終也會(huì)耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?
簡(jiǎn)單來(lái)說(shuō)就是通過(guò)日志段(Log Segment)機(jī)制。在 Kafka 底層,一個(gè)日志又近一步細(xì)分成多個(gè)日志段,消息被追加寫(xiě)到當(dāng)前最新的日志段中,當(dāng)寫(xiě)滿了一個(gè)日志段后,Kafka 會(huì)自動(dòng)切分出一個(gè)新的日志段,并將老的日志段封存起來(lái)。Kafka 在后臺(tái)還有定時(shí)任務(wù)會(huì)定期地檢查老的日志段是否能夠被刪除,從而實(shí)現(xiàn)回收磁盤空間的目的。
Kafka 的備份機(jī)制
相同的數(shù)據(jù)拷貝到多臺(tái)機(jī)器上。副本的數(shù)量是可以配置的。Kafka 中follow副本不會(huì)對(duì)外提供服務(wù)。
副本的工作機(jī)制也很簡(jiǎn)單:生產(chǎn)者總是向leader副本寫(xiě)消息;而消費(fèi)者總是從leader副本讀消息。至于follow副本,它只做一件事:向leader副本以異步方式發(fā)送pull請(qǐng)求,請(qǐng)求leader把最新的消息同步給它,必然有一個(gè)時(shí)間窗口導(dǎo)致它和leader中的數(shù)據(jù)是不一致的,或者說(shuō)它是落后于leader。
為什么要引入消費(fèi)者組?
主要是為了提升消費(fèi)者端的吞吐量。多個(gè)消費(fèi)者實(shí)例同時(shí)消費(fèi),加速整個(gè)消費(fèi)端的吞吐量(TPS)。
在一個(gè)消費(fèi)者組下,一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi),但一個(gè)消費(fèi)者可能被分配多個(gè)分區(qū),因而在提交位移時(shí)也就能提交多個(gè)分區(qū)的位移。如果1個(gè)topic有2個(gè)分區(qū),消費(fèi)者組有3個(gè)消費(fèi)者,有一個(gè)消費(fèi)者將無(wú)法分配到任何分區(qū),處于idle狀態(tài)。
理想情況下,Consumer 實(shí)例的數(shù)量應(yīng)該等于該 Group 訂閱topic(可能多個(gè))的分區(qū)總數(shù)。
消費(fèi)端拉取(批量)、ACK
消費(fèi)端先拉取并消費(fèi)消息,然后再ack更新offset。
1)消費(fèi)者程序啟動(dòng)多個(gè)線程,每個(gè)線程維護(hù)專屬的 KafkaConsumer 實(shí)例,負(fù)責(zé)完整的消息拉取、消息處理流程。一個(gè)KafkaConsumer負(fù)責(zé)一個(gè)分區(qū),能保證分區(qū)內(nèi)的消息消費(fèi)順序。
缺點(diǎn):線程數(shù)受限于 Consumer 訂閱topic的總分區(qū)數(shù)。
2)任務(wù)切分成了消息獲取和消息處理兩個(gè)部分。消費(fèi)者程序使用單或多線程拉取消息,同時(shí)創(chuàng)建專門線程池執(zhí)行業(yè)務(wù)邏輯。優(yōu)點(diǎn):可以靈活調(diào)節(jié)消息獲取的線程數(shù),以及消息處理的線程數(shù)。
缺點(diǎn):無(wú)法保證分區(qū)內(nèi)的消息消費(fèi)順序。另外引入了多組線程,使得整個(gè)消息消費(fèi)鏈路被拉長(zhǎng),最終導(dǎo)致正確位移提交會(huì)變得異常困難,可能會(huì)出現(xiàn)消息的重復(fù)消費(fèi)或丟失。
消費(fèi)端offset管理
1)老版本的 Consumer組把位移保存在 ZooKeeper 中,但很快發(fā)現(xiàn)zk并不適合頻繁的寫(xiě)更新。
2)在新版本的 Consumer Group 中,Kafka 社區(qū)重新設(shè)計(jì)了 Consumer組的位移管理方式,采用了將位移保存在 Broker端的內(nèi)部topic中,也稱為“位移主題”,由kafka自己來(lái)管理。原理很簡(jiǎn)單, Consumer的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到__consumer_offsets 中。它的消息格式由 Kafka 自己定義,用戶不能修改。位移主題的 Key 主要包括 3 部分內(nèi)容:
Kafka Consumer 提交位移的方式有兩種:自動(dòng)提交位移和手動(dòng)提交位移。
Kafka 使用Compact策略來(lái)刪除位移主題中的過(guò)期消息,避免該topic無(wú)限期膨脹。提供了專門的后臺(tái)線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)。
Rebalance 觸發(fā)條件
1)組成員數(shù)發(fā)生變更。比如有新的 Consumer 實(shí)例加入組或者離開(kāi)組,又或是有 Consumer 實(shí)例崩潰被“踢出”組。(99%原因是由它導(dǎo)致)
2) 訂閱topic數(shù)發(fā)生變更。Consumer Group 可以使用正則表達(dá)式的方式訂閱topic,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開(kāi)頭、字母 c 結(jié)尾的topic。在 Consumer Group 的運(yùn)行過(guò)程中,你新創(chuàng)建了一個(gè)滿足這樣條件的topic,那么該 Group 就會(huì)發(fā)生 Rebalance。
3) 訂閱topic的分區(qū)數(shù)發(fā)生變化。Kafka 目前只允許增加topic的分區(qū)數(shù)。當(dāng)分區(qū)數(shù)增加時(shí),也會(huì)觸發(fā)訂閱該topic的所有 Group 開(kāi)啟 Rebalance。
消息的順序性
Kafka的設(shè)計(jì)中多個(gè)分區(qū)的話無(wú)法保證全局的消息順序。如果一定要實(shí)現(xiàn)全局的消息順序,只能單分區(qū)。
方法二:通過(guò)有key分組,同一個(gè)key的消息放入同一個(gè)分區(qū),保證局部有序
歷史數(shù)據(jù)清理策略
基于保存時(shí)間,log.retention.hours
基于日志大小的清理策略。通過(guò)log.retention.bytes控制
組合方式
























