Kafka設(shè)計原理以及在達觀產(chǎn)品中的應(yīng)用
作者:蹇智華 達觀數(shù)據(jù)
前言
達觀數(shù)據(jù)作為一家提供大數(shù)據(jù)服務(wù)的公司,經(jīng)常會遇到客戶上報數(shù)據(jù)的需求。這樣的請求不需要馬上返回處理結(jié)果, 而是需要后臺將一系列的上報數(shù)據(jù)進行統(tǒng)一歸檔整理挖掘, 然后將結(jié)果數(shù)據(jù)呈現(xiàn)給客戶。這樣的業(yè)務(wù)需求需要達觀提供數(shù)據(jù)暫存服務(wù),也就是說我們需要一個系統(tǒng)在生產(chǎn)者(客戶上報數(shù)據(jù))和消費者(后臺數(shù)據(jù)處理)之間進行溝通,簡而言之叫系統(tǒng)間通信消息系統(tǒng),這種模型就是經(jīng)典的生產(chǎn)者(producer)、消費者(consumer)模型。
然而有一個消息系統(tǒng)正好是為了應(yīng)對這種業(yè)務(wù)場景而生,它就是kafka。那么kafka到底是一個什么樣的系統(tǒng)?有什么特點?實際吞吐表現(xiàn)又如何?帶著這些問題,我們一起來了解一下。
一, Kafka簡介
首先根據(jù)官網(wǎng)介紹,知道kafka是一個分布式流處理平臺,一個可處理企業(yè)級發(fā)布/訂閱的消息系統(tǒng),并且具有高容錯性和消費及時性等特點,那么它是怎么做到這一點的呢?接著往下看。
1,主題和日志:
主題(topic)和日志(log)設(shè)置是kafka一大特色,一個kafka集群可以創(chuàng)建多個topic, 每個topic都相當于一個消息隊列,這就意味著可以將不同格式的數(shù)據(jù)發(fā)布到不同的topic中,減小消費這些數(shù)據(jù)時的邏輯難度。那么每個topic中處理的數(shù)據(jù)結(jié)構(gòu)是怎樣呢?我們先來看一張topic的解剖圖:

圖1:topic原理解析圖
從圖1中可以看到, 消息傳送過來時kafka會通過負載均衡將消息最終寫入到磁盤上一個特定分區(qū)(partition)。由于在同一個partition上這些消息都是順序存儲的, 所以對一個特定分區(qū)每條消息都會有一個基于起始位置的偏移量(offset), 因此我們在后續(xù)消費時只需要指明從哪個partition中哪個offset開始消費,就能達到重復(fù)消費目的。
1)雖然kafka可以通過增加partition方式來增加負載,但是它的數(shù)據(jù)最終是被寫入到磁盤中。比如機械磁盤寫入效率是很低的, 難道我們需要增大一個topic的負載給它設(shè)置更多的partition嗎?
機械磁盤驅(qū)動器吞吐量跟尋道延時是強關(guān)聯(lián),也就是說,線性讀寫速度遠大于隨機讀寫。例如,在67200rpm SATA RAID-5磁盤陣列中, 隨機寫速度大約是100k/s, 然而線性寫速度可以達到600M/s,后者大約是前者的6000倍。通過圖1可知, kafka采用的即是后者, 利用操作系統(tǒng)read-ahead和write-behind技術(shù),極大提升磁盤訪問性能;設(shè)置partition數(shù)量固然可以從磁盤讀寫角度增大topic負載,但是partition數(shù)量過多會導(dǎo)致cpu計算量增大,所以***辦法是根據(jù)不同配置的機器, 不同的業(yè)務(wù)場景設(shè)置不同的partition數(shù)量。
2)偏移量offset存儲類型是什么, 如果消息足夠大,offset的值是否會重新置0, 如果置0,后續(xù)消費是否會紊亂?
kafka offset 是一個日志序列號( log sequence number),不必擔心offset 長度問題。那么這個日志序列號到底有多大,舉個例子:如果一個partition一天接收1T日志, 這個offset至少可以使用1百萬年。由于offset足夠用,而且不會被置0,所以從這個角度講消費紊亂情況是不會出現(xiàn)的。
3)寫入磁盤的日志會被***保留嗎?如果想刪除過期消息, 需要怎么操作?
可以通過配置文件中l(wèi)og.retention參數(shù)設(shè)置消息過期時間,超過過期時間的消息會被系統(tǒng)刪除,刪除的消息不可再被重新消費。
2,分布式集群
通過前文介紹我們已經(jīng)了解到kafka通過partition和順序讀寫磁盤的方式達到很高吞吐量,可是單臺機器吞吐量再高一旦該機發(fā)生故障宕掉就會對業(yè)務(wù)產(chǎn)生災(zāi)難性影響,怎么處理這個問題呢?想必你已經(jīng)知道了,那就是采用集群的方式,一旦一臺機器發(fā)生故障客戶端可以選擇鏈接其它機器, 保證業(yè)務(wù)穩(wěn)定性。每一個partition 都會有一個服務(wù)器來作為***(leader), 另外一個或者多個服務(wù)器(server)來作為跟隨者(follower),leader會處理所有的讀寫請求,而follower則會從leader那里備份數(shù)據(jù), 如果一個leader失敗了, 其它的follower會自動選舉一個成為一個新的leader, 所以對于一個server來說,他可能是某些partition下的leader, 而對于另外一些partition來說則是follower,這樣設(shè)計可以將負載更好均衡。
1)搭建kafka集群時有沒有什么小細節(jié)需要值得注意的?
kafka官網(wǎng)已經(jīng)有詳細的搭建過程,在此不贅述。建議正式項目中不要采用偽集群(多個broker運行在同一臺物理機上)的搭建方式,而且zookeeper集群和kafka集群***不要出現(xiàn)在同一臺實體機上,這樣會影響kafka順序讀寫效率。
2)在kafka集群中如果一個server失敗, 怎樣保證數(shù)據(jù)完整性?
在kafka配置文件中有一個復(fù)制因子控制參數(shù),如果將該參數(shù)設(shè)置為N,則表示一份數(shù)據(jù)會被保存N次,而這些數(shù)據(jù)被備份到不同server中,所以當設(shè)置復(fù)制因子為N時即使有N-1臺server失敗,也會保證數(shù)據(jù)完整性。
3,生產(chǎn)者消費者和消息的順序性:
上面講了那么多,無非是要實現(xiàn)一個隊列的數(shù)據(jù)結(jié)構(gòu)。對于隊列這種數(shù)據(jù)結(jié)構(gòu)我們一點也不陌生,由此可以想到對于kafka的一個topic 隊列來說,生產(chǎn)消費邏輯應(yīng)該是這樣:有很多生產(chǎn)者向topic中寫入數(shù)據(jù),另外一端則有許多消費者消費數(shù)據(jù)。(見圖2)

圖2:生產(chǎn)者消費者原理解析圖
然而實際上kafka生產(chǎn)者消費者模式有它的特殊性,那么kafka這個隊列是怎樣實現(xiàn)入隊和出隊的?接下來我們一起來看看kafka生產(chǎn)者消費者模式。
生產(chǎn)者:生產(chǎn)者(producer)顧名思義,就是向kafka隊列中發(fā)布消息的,即入隊操作者。生產(chǎn)者功能是在topic中選擇一個partion 然后向這個partition中發(fā)送數(shù)據(jù)。選擇partition的過程就是一個負載均衡的方式, 比如可以采用輪詢或者自己設(shè)定partition選擇函數(shù)來實現(xiàn)負載均衡。當然如果使用封裝的api比如(https://github.com/dpkp/kafka-python)就大可不必關(guān)心負載均衡問題。會有默認的負載均衡函數(shù)來實現(xiàn)這一功能。
消費者: 消費者(consumer)功能是從隊列中讀取數(shù)據(jù)并進行相應(yīng)邏輯處理,但是kafka消費者有特殊之處。kafka增加了一個組(group)的概念,一個topic可以有多個group, 當多個consumer從屬于一個組時,一條消息將被發(fā)往所有組,但是在組內(nèi),這條消息只會被一個consumer消費。由此說來一個group才是一個真正“邏輯消費者(logic consumer)”。相關(guān)邏輯如圖3所示。
消息順序性:通過圖3我們知道消息的消費情況,那么一個消息流消費情況會是怎樣的?其實在高等級api中由于指定了負載均衡規(guī)則,同一個生產(chǎn)者發(fā)布兩條不同消息數(shù)據(jù)時會根據(jù)相應(yīng)規(guī)則發(fā)送到一個特定partition中,在消費時會按照同樣規(guī)則從partition中取出數(shù)據(jù),這樣就能保證兩條數(shù)據(jù)消費的先后順序,從而保證了消息順序性。
1)對于一個具有多個consumer的topic,我要實現(xiàn)一條消息被多個consumer消費和一條消息只被一個consumer消費,那我需要怎么設(shè)置group?
將多個consumer設(shè)置為同一個組可以實現(xiàn)一條消息只被多個consumer消費, 將所有的consumer都設(shè)置為不同組,一條消息將會被所有consumer消費。
2)如果有一批數(shù)據(jù)消費時必須嚴格按照入隊先后順序來消費,需要怎樣設(shè)置生產(chǎn)者和消費者。
如果數(shù)據(jù)量小,可以將topic設(shè)置為一個partition;如果數(shù)據(jù)量較大,可以將一個生產(chǎn)者寫死負載均衡函數(shù),將數(shù)據(jù)發(fā)送到一個特定partition上,消費數(shù)據(jù)時指定消費者消費的partition,和offset來順序消費數(shù)據(jù)。

圖3:多個消費者組時消息流向原理圖
二, Kafka性能測試:
kafka是跨語言消息隊列系統(tǒng),github上提供了Java, Python等多種語言客戶端,為了簡單起見,我們這里采用kafka-python(https://github.com/dpkp/kafka-python)作為客戶端來鏈接kafka集群做測試。
測試環(huán)境:
1, broker 數(shù)量:3
2, 備份因子數(shù):2
3, 磁盤信息:200G普通機械硬盤
4, cpu參數(shù):8核8線程
5, 語言: Python2.7
6, 客戶端: kafka-python
7, partition 數(shù)量: 5
單進程producer 發(fā)送10條消息測試(如圖4):

圖4:一個生產(chǎn)者發(fā)送消息延時結(jié)果圖
統(tǒng)計上圖數(shù)據(jù)可知平均延時:0.004488707,也就是說qps可以達到2000,這樣的成績無疑是驚人的。那么在多進程情況下kafka表現(xiàn)還會好嗎?我們設(shè)置10個進程,看看kafka在10個進程下的延時會有較大的變化嗎?如圖5(打印消息過多,截取部分結(jié)果圖):

圖5:多個生產(chǎn)者發(fā)送消息延時結(jié)果圖(部分)
由圖5可知10 個進程每個進程發(fā)送10條消息,平均延時為0.00050380466秒, qps接近200000,由于kafka支持數(shù)千個客戶端同時讀寫,所以kafka吞吐能力是驚人的,更多測試歡迎大家去完成。
三,kafka在達觀數(shù)據(jù)的應(yīng)用介紹
1,在垂直搜索中的應(yīng)用:
我們知道搜索引擎需要定時對文檔進行更新, 如果我們把需要更新內(nèi)容暫存到 kafka,這樣索引更新時,只需要從對應(yīng) partition 中從上一次取過的 offset 處繼續(xù)取數(shù)據(jù),就能達到增量更新目的,而過期數(shù)據(jù)會被自動清理, 減少了操作冗余性和復(fù)雜性。
2,在用戶畫像以及相關(guān)推薦中的應(yīng)用:
和用戶畫像之前上報的用戶點擊行為數(shù)據(jù)不同,相關(guān)推薦之前的海量 item 數(shù)據(jù)上報對數(shù)據(jù)準確性要求更高,試想如果一條 item 數(shù)據(jù)因為處理失敗而沒有正確入庫,那么相關(guān)推薦時就永遠不會出現(xiàn)這條 item, 所以這就對“可回滾”提出了更加嚴格要求。然而在 kafka 中,也只需要將消費的 offset 重新置為消費失敗時的 offset,修復(fù)入庫問題重新消費即可。
當然 kafka 還有更加廣泛的應(yīng)用,這里就不一一討論,根據(jù)官網(wǎng)的介紹,kafka 在網(wǎng)站行為追蹤(Website Activity Tracking)、數(shù)據(jù)監(jiān)控, 流處理等眾多方面有特長,如果你對 kafka 原理有研究或者有實際應(yīng)用方面有心得,歡迎來討論,謝謝!
關(guān)于達觀數(shù)據(jù)
達觀數(shù)據(jù)專注于企業(yè)大數(shù)據(jù)技術(shù)服務(wù),以***的多層智能挖掘算法,實現(xiàn)對海量用戶行為和文本數(shù)據(jù)的深入分析和挖掘,為企業(yè)提供智能文本分析、精準用戶行為建模、個性化推薦、智能搜索等***數(shù)據(jù)挖掘功能。




























