構(gòu)建下一代萬(wàn)億級(jí)云原生消息架構(gòu):Apache Pulsar 在 vivo 的探索與實(shí)踐
作者 | vivo互聯(lián)網(wǎng)大數(shù)據(jù)團(tuán)隊(duì) - Chen Jianbo、Quan Limin
本文整理自 vivo 互聯(lián)網(wǎng)大數(shù)據(jù)團(tuán)隊(duì)在 Apache Pulsar Meetup 上的演講《Apache Pulsar 在 vivo 的探索與實(shí)踐》,介紹 vivo 在集群管理與監(jiān)控上應(yīng)用 Pulsar 的實(shí)踐。
vivo 移動(dòng)互聯(lián)網(wǎng)為全球 4 億 + 智能手機(jī)用戶提供互聯(lián)網(wǎng)產(chǎn)品與服務(wù)。其中,vivo 分布式消息中間件團(tuán)隊(duì)主要為 vivo 所有內(nèi)外銷實(shí)時(shí)計(jì)算業(yè)務(wù)提供高吞吐、低延時(shí)的數(shù)據(jù)接入、消息隊(duì)列等服務(wù),覆蓋應(yīng)用商店、短視頻、廣告等業(yè)務(wù)。業(yè)務(wù)集群已達(dá)每天十萬(wàn)億級(jí)的數(shù)據(jù)規(guī)模。

圖 1. vivo 分布式消息中間件系統(tǒng)架構(gòu)
上圖為系統(tǒng)的整體架構(gòu),其中數(shù)據(jù)接入層包括數(shù)據(jù)接入、數(shù)據(jù)采集服務(wù),支持 SDK 直連;消息中間件由 Kafka 和 Pulsar 共同承擔(dān),其中 Pulsar 的承載量達(dá)到千億級(jí)別;數(shù)據(jù)處理部分使用 Flink、Spark 等組件。
目前,Kafka 采用多集群方式,根據(jù)不同的業(yè)務(wù)量級(jí)、重要性分別使用不同的集群提供服務(wù),比如計(jì)費(fèi)集群、搜索集群、日志集群。在 Kafka 集群的內(nèi)部,則采用物理隔離的方式,根據(jù)不同業(yè)務(wù)的重要性,將不同業(yè)務(wù)的 Topic 控制在不同的資源組內(nèi),避免業(yè)務(wù)之間相互影響。

圖 2. Kafka 集群資源隔離

圖 3. Kafka 集群流量均衡
資源組內(nèi)部則會(huì)針對(duì) Topic 流量、分區(qū)分布、磁盤(pán)容量、機(jī)器機(jī)架等指標(biāo)生成遷移計(jì)劃進(jìn)行流量均衡,以此增強(qiáng) Kafka 可靠性。目前 Kafka 已在多集群部署、資源隔離、流量均衡三個(gè)方面保障了基本的穩(wěn)定性和資源利用率,但是在此之外,系統(tǒng)仍存在一些問(wèn)題。
一、應(yīng)對(duì)業(yè)務(wù)流量數(shù)十倍增長(zhǎng),引入 Apache Pulsar
過(guò)去幾年來(lái),Kafka 集群承載的業(yè)務(wù)量迅速增長(zhǎng),流量上漲數(shù)十倍,帶來(lái)諸多問(wèn)題:
- Topic 及 Topic 分區(qū)總量不斷增加,集群性能受到影響:Kafka 高性能依賴于磁盤(pán)的順序讀寫(xiě),磁盤(pán)上大量分區(qū)導(dǎo)致隨機(jī)讀寫(xiě)加重;
- 業(yè)務(wù)流量增加迅速,存量集群變大,需要將老的業(yè)務(wù)進(jìn)行資源組隔離遷移或者集群拆分。無(wú)論是資源組隔離還是集群的隔離的方式,由于集群不可以進(jìn)行動(dòng)態(tài)擴(kuò)縮容,機(jī)器不能夠進(jìn)行靈活調(diào)配,都存在利用率不高、運(yùn)維成本增加的問(wèn)題;
- 機(jī)器擴(kuò)容慢,需要做長(zhǎng)時(shí)間流量均衡,難以應(yīng)對(duì)突發(fā)流量。集群規(guī)模越大,問(wèn)題越突出;
- 消費(fèi)端性能擴(kuò)展太依賴分區(qū)擴(kuò)容,導(dǎo)致集群元數(shù)據(jù)瘋狂增長(zhǎng);
- 集群數(shù)量對(duì)應(yīng)的機(jī)器基數(shù)大,硬件故障概率高,出現(xiàn)硬件故障時(shí)影響會(huì)直接傳導(dǎo)到客戶端,缺少中間層容錯(cuò)。
面對(duì)龐大的集群、流量和多樣化的業(yè)務(wù)場(chǎng)景,綜合考慮集群的穩(wěn)定性和維護(hù)成本等因素,vivo 需要一個(gè)功能更豐富、適用更多場(chǎng)景、擴(kuò)展能力更強(qiáng)的消息組件。
Pulsar 如何解決 vivo 存在的問(wèn)題,可以首先看一下 Pulsar 的架構(gòu)設(shè)計(jì)。Pulsar 采用計(jì)算存儲(chǔ)層分離架構(gòu)。計(jì)算層的 Broker 節(jié)點(diǎn)是對(duì)等且無(wú)狀態(tài)的,可以快速擴(kuò)展;存儲(chǔ)層使用 BookKeeper 作為節(jié)點(diǎn),同樣節(jié)點(diǎn)對(duì)等。這種分離架構(gòu)支持計(jì)算和存儲(chǔ)層各自獨(dú)立擴(kuò)展。

圖 4. Pulsar 存儲(chǔ)計(jì)算分離
其次,Pulsar 的各個(gè)節(jié)點(diǎn)都是輕量化的,在出現(xiàn)故障和宕機(jī)時(shí)可以快速恢復(fù)。一般情況下可以通過(guò)快速上下線來(lái)解決某個(gè)節(jié)點(diǎn)機(jī)器的問(wèn)題。同時(shí) Broker 層可以作為 BookKeeper 層的容錯(cuò)層,可以防止故障直接傳導(dǎo)至用戶端。
Pulsar 擴(kuò)容時(shí)無(wú)需長(zhǎng)時(shí)間的數(shù)據(jù)遷移,且支持實(shí)時(shí)均衡。Broker 層抽象了 Bundle 概念,可以用有限的 Bundle 映射海量 Topic,Topic 可以隨著 Bundle 遷移,通過(guò)動(dòng)態(tài)遷移 Bundle 可以更好地應(yīng)對(duì)流量突發(fā)場(chǎng)景。BookKeeper 分層分片的架構(gòu)讓數(shù)據(jù)分布均勻,在 Broker 層有一個(gè)選擇機(jī)制,在擴(kuò)容時(shí)可以將數(shù)據(jù)寫(xiě)入存儲(chǔ)量小的節(jié)點(diǎn),擴(kuò)容時(shí)無(wú)需數(shù)據(jù)遷移,提供更好的流量高峰應(yīng)對(duì)能力。Bookie 進(jìn)行數(shù)據(jù)刷盤(pán)時(shí)會(huì)對(duì)批量數(shù)據(jù)自動(dòng)進(jìn)行數(shù)據(jù)排序,可以避免 Kafka 中的隨機(jī)讀寫(xiě)。
Pulsar 提供了四種消息模型:Exclusive、Failover、Shared 和 Key_Shared,其中 Shared 模型允許一個(gè)分區(qū)同時(shí)被多個(gè)消費(fèi)實(shí)例訂閱消費(fèi),并采用 Round Robin(輪詢)方式將數(shù)據(jù)推送到各個(gè)消費(fèi)實(shí)例。因此消費(fèi)能力的擴(kuò)展不會(huì)過(guò)于依賴分區(qū)擴(kuò)容,慢消費(fèi)的消費(fèi)實(shí)例也可以在 Shared 模型中得到解決。Key_Shared 模型則是在 Shared 的基礎(chǔ)上對(duì)應(yīng)對(duì)順序性有要求的場(chǎng)景,可以按照 Key 來(lái)消費(fèi)。

圖 5. Pulsar 訂閱模型
Pulsar 的設(shè)計(jì)架構(gòu)帶來(lái)了海量分區(qū)支撐、消費(fèi)擴(kuò)展、精準(zhǔn)限流、流量均衡、快速擴(kuò)縮容、故障恢復(fù)、分層存儲(chǔ)、云原生容器部署、異地多活等特性和優(yōu)勢(shì),可以幫助集群更好地實(shí)現(xiàn)高可用、高擴(kuò)展,提高了更高的彈性。
二、Apache Pulsar 集群管理實(shí)踐
下面我們從流量控制和數(shù)據(jù)管理方面,分享 vivo 在使用 Pulsar 過(guò)程中的集群管理經(jīng)驗(yàn)。
2.1 Bundle 的管理
在集群流量控制層面,比較關(guān)鍵的一點(diǎn)就是 Bundle 的管理。Bundle 負(fù)責(zé)控制用戶流量到 Broker 的具體分布。Broker 與 Topic 之間沒(méi)有直接聯(lián)系,而是在 Broker 之上抽象出 Bundle 概念,通過(guò) Bundle 與 Topic 建立關(guān)系;Topic 通過(guò)名稱計(jì)算哈希值,并散列分布到一致性哈希環(huán)中,而哈希環(huán)的每一段都是一個(gè) Bundle。另外 Load Manager 根據(jù) Bundle 的負(fù)載情況將后者分配到對(duì)應(yīng)的 Broker 上,將 Bundle 數(shù)據(jù)存儲(chǔ)在 ZooKeeper 中。由此以來(lái)就間接實(shí)現(xiàn)了 Topic 與 Broker 之間的聯(lián)系(可參考近期 StreamNative 發(fā)布的 Broker 負(fù)載均衡技術(shù)文章)。?


圖 6. Bundle 與 Topic 建立關(guān)系
這里需要注意:
- Bundle 的個(gè)數(shù)影響均衡效果,因?yàn)橥ㄟ^(guò)一致性哈希來(lái)確認(rèn) Topic 應(yīng)該落在哪個(gè) Bundle 上, Topic 與 Bundle 會(huì)存在不均衡分配,某些 Bundle 分配的 Topic 可能較多或較少。Bundle 越多,每個(gè) Bundle 承載的 Topic 越少,粒度越細(xì)。依賴于 Pulsar 的負(fù)載均衡算法,均衡效果更好;否則若 Bundle 太大,無(wú)論如何卸載都很難平衡負(fù)載;
- Bundle 數(shù)據(jù)和 Broker 映射元數(shù)據(jù)都維護(hù)在 ZooKeeper 中,需要做好 Bundle 數(shù)量的規(guī)劃。
針對(duì)以上兩點(diǎn),我們根據(jù) Broker 來(lái)設(shè)置 Bundle 數(shù)量設(shè)置最小最大值來(lái)控制,還可以對(duì)流量較大的 Topic 針對(duì)性擴(kuò)大分區(qū),讓分區(qū)均勻分配到 Broker Bundle 上。
Pulsar 雖然提供了海量分區(qū)能力,但是過(guò)多的 Topic 或者分區(qū)產(chǎn)生的 lookup 也會(huì)對(duì)集群產(chǎn)生較大的壓力。集群管理者需要提前規(guī)劃 Bundle 和分區(qū)設(shè)置,杜絕濫用。
另外對(duì) Bundle 的操作需要注意:
- Pulsar 本身提供了卸載操作,可以解除 Bundle 和 Broker 的關(guān)聯(lián)關(guān)系,將 Bundle 重新分配。線上流量較大時(shí)應(yīng)卸載 Bundle 而不是整個(gè)命名空間,因?yàn)樾遁d后者會(huì)導(dǎo)致其上的全部 Bundle 與對(duì)應(yīng)的生產(chǎn)者、消費(fèi)者斷開(kāi),重新進(jìn)行 lookup。
- 利用 Bundle split 對(duì)流量較大的 Bundle 進(jìn)行拆分,增加命名空間的 Bundle 數(shù)量,降低影響。
總體而言,用戶需要注意流量的均衡與集群的穩(wěn)定性,在集群管理之初就做好 Bundle 的數(shù)量管理和相關(guān)測(cè)試,謹(jǐn)慎對(duì)待大批量 Bundle 卸載等運(yùn)維操作。
2.2 數(shù)據(jù)的管理
接下來(lái)我們從數(shù)據(jù)的存儲(chǔ)、過(guò)期、刪除三個(gè)方面來(lái)分析。
(1) Ledger 翻轉(zhuǎn)
首先介紹數(shù)據(jù)寫(xiě)入 ledger 的過(guò)程。每一個(gè) Topic 分區(qū)在一段時(shí)間內(nèi)只創(chuàng)建一個(gè) Ledger 維護(hù)分區(qū)寫(xiě)入的 Entry 的數(shù)據(jù)歸屬。Topic 分區(qū)寫(xiě)入的數(shù)據(jù)以 Entry 的形式,經(jīng)過(guò) Broker 寫(xiě)入 Netty 線程處理隊(duì)列,線程依次根據(jù) Entry 的 Ledger Id,對(duì) Ledger 目錄數(shù)取模,寫(xiě)入到目標(biāo)磁盤(pán) Ledger 目錄,最終以 Entry Log 和 RocksDB 的索引方式存儲(chǔ)。需要注意,Ledger 是一個(gè)分區(qū)在一段時(shí)間內(nèi)寫(xiě)入數(shù)據(jù)的邏輯管理單位,維護(hù)了這段數(shù)據(jù)存儲(chǔ)的 Bookie 位置。一個(gè) Topic 分區(qū)在一段時(shí)間內(nèi)寫(xiě)入的數(shù)據(jù)只被一個(gè)活躍 Ledger 管理,待該 Ledger 達(dá)到翻轉(zhuǎn)條件后才會(huì)關(guān)閉 Ledger 并重新計(jì)算,創(chuàng)建新 Ledger 繼續(xù)寫(xiě)入。?

圖 7. Ledger 翻轉(zhuǎn)示意
Ledger 翻轉(zhuǎn)后,數(shù)據(jù)才會(huì)寫(xiě)入新的數(shù)據(jù)目錄。在 Pulsar 中,在滿足 Ledger 最小翻轉(zhuǎn)時(shí)間以及以下條件之一后觸發(fā) Ledger 翻轉(zhuǎn):
- 已達(dá)到 Ledger 最大翻轉(zhuǎn)時(shí)間;
- 已達(dá)到 Ledger 的最大 Entry 數(shù)量;
- 已達(dá)到 Ledger 的最大大小。
默認(rèn)值:
觸發(fā)ledger翻轉(zhuǎn)的最小時(shí)間:
managedLedgerMinLedgerRolloverTimeMinutes=10
觸發(fā)ledger翻轉(zhuǎn)的最長(zhǎng)時(shí)間:
managedLedgerMaxLedgerRolloverTimeMinutes=240
觸發(fā)ledger翻轉(zhuǎn)的最大entry數(shù):
managedLedgerMaxEntriesPerLedger=50000
觸發(fā)ledger翻轉(zhuǎn)的最大大小:
managedLedgerMaxSizePerLedgerMbytes=2048
注意兩個(gè)問(wèn)題:
- Ledger 過(guò)大:最小翻轉(zhuǎn)時(shí)間是防止 Ledger 元數(shù)據(jù)過(guò)快增長(zhǎng)的手段,但實(shí)踐發(fā)現(xiàn)如果 Topic 分區(qū)流量較大,Ledger 的實(shí)際值可能遠(yuǎn)超上述設(shè)置的上限閾值。Ledger 只有在翻轉(zhuǎn)后才會(huì)創(chuàng)建新的 Ledger,Ledger 過(guò)大會(huì)導(dǎo)致某段時(shí)間內(nèi)寫(xiě)入某個(gè)磁盤(pán)的數(shù)據(jù)過(guò)多,產(chǎn)生磁盤(pán)存儲(chǔ)不均衡的問(wèn)題;針對(duì) Ledger 為對(duì)象的一些操作也會(huì)受到影響,產(chǎn)生無(wú)法及時(shí)卸載數(shù)據(jù)到二級(jí)存儲(chǔ)、數(shù)據(jù)卸載時(shí)間較長(zhǎng)、還未卸載成功但 Ledger 已經(jīng)過(guò)期等問(wèn)題。
- Ledger 間不均衡:Ledger ID 以集群維度進(jìn)行遞增。在分區(qū)的維度,按照 Ledger ID 對(duì) Ledger 存儲(chǔ)目錄數(shù)進(jìn)行取模的方式無(wú)法對(duì)多磁盤(pán)進(jìn)行均衡寫(xiě)入。但保持 Ledger 間的大小一致,在一定程度上會(huì)對(duì)多磁盤(pán)目錄的寫(xiě)入均衡有比較大的改善。
總而言之,建議根據(jù)業(yè)務(wù)消息情況適當(dāng)調(diào)整 Ledger 翻轉(zhuǎn)參數(shù)和有針對(duì)性地增加大流量 Topic 分區(qū)數(shù)量,可以防止 Ledger 過(guò)大、大小不均衡的問(wèn)題。
(2)數(shù)據(jù)過(guò)期
數(shù)據(jù)過(guò)期主要分為四個(gè)階段:
第一階段:未被 Ack 的消息
Backlog 消息:該段數(shù)據(jù)不會(huì)被刪除
第二階段:已經(jīng) Ack 的消息
- 訂閱主動(dòng) Ack 后,標(biāo)記為非 backlog 消息,有多個(gè)訂閱時(shí)以最慢的為準(zhǔn)
- TTL:若某 Topic 沒(méi)有活躍訂閱,超過(guò) TTL 存活時(shí)間的消息會(huì)被主動(dòng) Ack ,本質(zhì)上是移動(dòng) cursor
第三階段:消息保留時(shí)間檢查
Retention:對(duì)已經(jīng) Ack 的消息的保留策略,按保留周期和保留大小設(shè)置來(lái)保留消息。
第四階段:消息刪除
Deleted:超過(guò) Retenion 范圍的消息則被刪除。超過(guò) rentention 保留周期和保留大小的消息,系統(tǒng)會(huì)從當(dāng)前已經(jīng) ack 消息的最新位置往前檢查并獲取已經(jīng)過(guò)期的 ledger,將其標(biāo)記刪除。

圖 8. 消息保留時(shí)間檢查與消息刪除
從上述的消息階段演化來(lái)看,Pulsar 提供了較大的消息管理空間,但也略顯復(fù)雜。建議集群維護(hù)者建立簡(jiǎn)單統(tǒng)一的規(guī)則處理數(shù)據(jù)保留策略,如可以設(shè)置 TTL = Retention 保留周期值。
(3) 數(shù)據(jù)刪除
此處介紹數(shù)據(jù)的物理刪除。Bookie 在處理數(shù)據(jù)寫(xiě)入過(guò)程時(shí),會(huì)將同一段時(shí)間內(nèi)的數(shù)據(jù)經(jīng)過(guò)排序 flush 到同一個(gè) Entry Log 文件中,將索引存放在 RocksDB 中。由于多個(gè) Ledger 的數(shù)據(jù)可能會(huì)同時(shí)寫(xiě)入同一個(gè) Entry Log,因此 Entry Log 便不能被簡(jiǎn)單直接的刪除。對(duì)此 BookKeeper 會(huì)啟動(dòng)一個(gè) GC(GarbageCollector) 線程進(jìn)行檢查和物理刪除操作。?

圖 9. 數(shù)據(jù)物理刪除流程
Entry Log 維護(hù)元數(shù)據(jù)信息( EntryLogMetadata),該元數(shù)據(jù)記錄了 Ledger 列表、大小與剩余有效數(shù)據(jù)比例。
GC 清理線程在每個(gè) gcWaitTime 時(shí)間間隔:
- 掃描 Entry Log 的元數(shù)據(jù)信息,對(duì)于已經(jīng)沒(méi)有有效數(shù)據(jù)的 entry log 直接進(jìn)行刪除。
- 判斷是否滿足 compaction 條件,滿足 compaction 條件后 GC 線程會(huì)讀取每一個(gè) Entry 判斷其是否過(guò)期,一旦過(guò)期就會(huì)丟棄,否則會(huì)將數(shù)據(jù)寫(xiě)入新的 Entry Log。
Compaction 分為 minorCompaction 和 majorCompaction,二者區(qū)別在于閾值。默認(rèn)情況下,minorCompaction 清理間隔 1 小時(shí),閾值 0.2;majorCompaction 清理間隔 24 小時(shí),閾值 0.8。閾值是 Entry Log File 中的剩余有效數(shù)據(jù)占比。?
minorCompactionInterval=3600
minorCompactionThreshold=0.2
majorCompactionThreshold=0.8
majorCompactionInterval=86400
在實(shí)際使用中,如果機(jī)器節(jié)點(diǎn)的磁盤(pán)較小且數(shù)據(jù)遲遲得不到刪除,為了及時(shí)清除數(shù)據(jù),應(yīng)該按照業(yè)務(wù)流量和磁盤(pán)空間適當(dāng)調(diào)整數(shù)據(jù)清理間隔時(shí)間、有效數(shù)據(jù)閾值,并配合 compaction 限速策略減小對(duì)集群的影響。
三、Pulsar 監(jiān)控實(shí)踐
vivo 的 Pulsar 指標(biāo)監(jiān)控鏈路架構(gòu)如下:?

圖 10. vivo 針對(duì) Pulsar 監(jiān)控指標(biāo)搭建的監(jiān)控架構(gòu)
該架構(gòu)中:
采用 Prometheus 采集 Pulsar 指標(biāo);
- 應(yīng)用 Prometheus 遠(yuǎn)程存儲(chǔ)特性將格式化后的指標(biāo)發(fā)送到 Kafka;
- Druid 消費(fèi) Kafka 數(shù)據(jù)后可以作為 Grafana 的數(shù)據(jù)源,配置 Grafana 面板查詢指標(biāo)。
為什么不使用 Prometheus 存儲(chǔ)數(shù)據(jù)?因?yàn)橛行?shù)據(jù)較久遠(yuǎn),一旦集群規(guī)模增加,監(jiān)控指標(biāo)數(shù)量級(jí)會(huì)很大。Prometheus 對(duì)資源依賴重,我們只采用了它的采集能力。
下圖是常用的關(guān)鍵指標(biāo):

圖 11. 關(guān)鍵監(jiān)控指標(biāo)
指標(biāo)類型分為:
- 【客戶端指標(biāo)】:用來(lái)排查客戶端出現(xiàn)的異常
- 【Broker 端指標(biāo)】:監(jiān)控 topic 流量、調(diào)整 broker 間流量差距
- 【Bookie 端指標(biāo)】:排查讀寫(xiě)延遲等問(wèn)題
除了官方指標(biāo)外,團(tuán)隊(duì)還開(kāi)發(fā)了 Bundle 相關(guān)的一些指標(biāo):
- 分區(qū)數(shù)、流量等在 Bundle 的分布
- Broker 端記錄讀寫(xiě)延遲的 P95/P99 值
- 基于請(qǐng)求對(duì)列實(shí)現(xiàn) Broker 端網(wǎng)絡(luò)負(fù)載指標(biāo)等。
四、問(wèn)題優(yōu)化與最佳實(shí)踐
4.1 負(fù)載均衡參數(shù)
負(fù)載均衡的目的是對(duì)資源平均分配,差異大會(huì)影響穩(wěn)定性。對(duì)負(fù)載均衡設(shè)置的目標(biāo)是節(jié)點(diǎn)流量偏差 20% 以內(nèi),每天均衡頻次在 10 次以內(nèi),否則客戶端會(huì)頻繁斷連、重連。優(yōu)化后的參數(shù)如下:?
# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.Broker.loadbalance.impl.ThresholdShedder
# enable/disable namespace Bundle auto split
loadBalancerAutoBundleSplitEnabled=false
# enable/disable automatic unloading of split Bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false
#計(jì)算新資源使用量時(shí)的CPU使用權(quán)重(默認(rèn)1.0)
loadBalancerCPUResourceWeight=0.0
#計(jì)算新的資源使用量時(shí)的堆內(nèi)存使用權(quán)重(默認(rèn)1.0)
loadBalancerMemoryResourceWeight=0.0
#計(jì)算新資源使用量時(shí)的直接內(nèi)存使用權(quán)重(默認(rèn)1.0)
loadBalancerDirectMemoryResourceWeight=0.0
下面三個(gè)參數(shù)改為零,是因?yàn)榧菏褂昧讼嗤臋C(jī)型,團(tuán)隊(duì)更關(guān)注流量均衡,對(duì)內(nèi)存和 CPU 不是特別關(guān)注。
以一個(gè)具體產(chǎn)品案例來(lái)看,其中有 1 個(gè) Topic、30 個(gè)分區(qū)、180 個(gè) Bundle:

圖 12. 1 個(gè) Topic、30 個(gè)分區(qū)、180 個(gè) Bundle 的每秒入流量
上圖節(jié)點(diǎn)間流量差異較大,由 Bundle unload 導(dǎo)致。

圖 13. 1 個(gè) Topic、30 個(gè)分區(qū)、180 個(gè) Bundle 下,Bundle 上 Topic 分區(qū)情況
上圖可看出,有兩個(gè) Bundle 分配了四個(gè)分區(qū),遠(yuǎn)超其他 Bundle。實(shí)踐中出現(xiàn)以下問(wèn)題:
- 均衡頻次高,一天大概有 200 多次
- 客戶端連接頻繁切換,流量波動(dòng)大
- 每個(gè) Bundle 的分區(qū)數(shù)量分布差異大?

圖 14. 1 個(gè) Topic、30 個(gè)分區(qū)、180 個(gè) Bundle 的入流量分布
優(yōu)化過(guò)程中,關(guān)鍵在于將分區(qū)打散到不同 Bundle 上,但分區(qū)數(shù)量太少很難做到。Topic 通過(guò)哈希算法分配到 Bundle 上在前文已經(jīng)介紹。此案例中,問(wèn)題在于分區(qū)數(shù)量少。
于是團(tuán)隊(duì)將分區(qū)增加到 120 個(gè),效果如下:
- 節(jié)點(diǎn)間流量差異小
- 均衡頻次降低,一天大概有 10 次左右
- 客戶端連接切換減少,流量波動(dòng)較小
- 每個(gè) bundle 的分區(qū)數(shù)量分布差異降低?

圖 15. 1 個(gè) Topic、120 個(gè)分區(qū)、180 個(gè) Bundle 的每秒入流量

圖 16. 1 個(gè) Topic、120 個(gè)分區(qū)、180 個(gè) Bundle 下,Bundle 上 Topic 分區(qū)情況

圖 17. 1 個(gè) Topic、120 個(gè)分區(qū)、180 個(gè) Bundle 的入流量分布
4.2 客戶端發(fā)送性能
在和上述業(yè)務(wù)相同的場(chǎng)景中,分區(qū)數(shù)量增加后,系統(tǒng)滾動(dòng)重啟后出現(xiàn)了流量下降情況:

圖 18. 單個(gè) Topic,30 個(gè)分區(qū)增加到 120 個(gè),系統(tǒng)滾動(dòng)重啟后流量下降
客戶端配置參數(shù):
- memoryLimitBytes=209715200 (默認(rèn)為 0)
- maxPendingMessages=2000 (默認(rèn) 1000)
- maxPendingMessagesAcrossPartitions=40000 (默認(rèn) 50000)
- batchingMaxPublishDelayMicros=50 (默認(rèn) 1 毫秒)
- batchingMaxMessages=2000 (默認(rèn) 1000)
- batchingMaxBytes=5242880 (默認(rèn) 128KB)
滿足三個(gè) batch 數(shù)據(jù)中的任何一個(gè)的情況下就會(huì)觸發(fā)打包、發(fā)送。

圖 19. 重啟后 maxPendingMessages(隊(duì)列長(zhǎng)度)出現(xiàn)下降
這里 maxPendingMessages(隊(duì)列長(zhǎng)度)
=min(maxPendingMessages,maxPendingMessagesAcrossPartitions/partitionNum) 。
而分區(qū)數(shù)添加(30 -> 120)后,需要重啟客戶端才對(duì)隊(duì)列長(zhǎng)度生效。重啟后 maxPendingMessages 隊(duì)列長(zhǎng)度 從 40000/30 = 1333 變?yōu)?40000/120 = 333,出現(xiàn)了明顯下降。
另外,測(cè)試發(fā)現(xiàn) batchingMaxMessages 調(diào)小后性能提升 10 倍之多:

圖 20. 單個(gè) Topic,30 個(gè)分區(qū)增加到 120 個(gè),調(diào)整后性能提升
建議 batchingMaxPublishDelayMicros 不要過(guò)大,確保 batchingMaxMessages 比 maxPendingMessages 要大,否則等待 batchingMaxPublishDelayMicros 才會(huì)發(fā)送。
4.3 宕機(jī)導(dǎo)致集群流量驟降
某個(gè)分區(qū)隊(duì)列滿后會(huì)導(dǎo)致發(fā)送線程阻塞,影響所有分區(qū)的整體發(fā)送和集群穩(wěn)定性:?

圖 21. 執(zhí)行 Kill-9 一臺(tái) Broker 后,其他 Broker 流量下降

圖 22. 第四個(gè)分區(qū)已滿,發(fā)送線程阻塞在 canEnqueRequest 上,等待時(shí)間長(zhǎng),其他未滿分區(qū)的發(fā)送也被影響。

圖 23. 極端情況下,第四個(gè)分區(qū)已滿,其他分區(qū)等待中。發(fā)送線程會(huì)在第四個(gè)分區(qū)阻塞等待,其他線程無(wú)法發(fā)送。
針對(duì)這一問(wèn)題的優(yōu)化思路,首先是能者多勞,讓發(fā)送快的分區(qū)盡可能多發(fā)送;然后是將阻塞點(diǎn)從 ProducerImpl 移到 PartitionedProducerImpl;如果分區(qū) ProducerImpl 出現(xiàn)隊(duì)列已滿阻塞較長(zhǎng)時(shí)間,就將該分區(qū)排除。

圖 24. 宕機(jī)導(dǎo)致集群流量驟降優(yōu)化思路
實(shí)踐中可分為可用 Producer 和不可用 Producer 兩個(gè)列表。在 ① 中,兩個(gè)列表都處于初始化狀態(tài)并可用;在 ② 中,某個(gè)可用分區(qū)阻塞一段時(shí)間后可以等待一段時(shí)間;若不可用就移動(dòng)到不可用列表中,如 ③ 所示;當(dāng)分區(qū)可用比例達(dá)到閾值再挪回可用列表,如 ④ 所示。
經(jīng)過(guò)優(yōu)化后,宕機(jī) Broker 流量可以快速轉(zhuǎn)移到其他 Broker:

圖 25. 優(yōu)化后 Broker 流量分流并上漲
注:優(yōu)化只支持 RoundRobinPartitionMessageRouterImpl 路由策略。
在單個(gè) ProducerImpl 對(duì)應(yīng)的 Broker 出現(xiàn)處理慢、網(wǎng)絡(luò)慢等導(dǎo)致發(fā)送響應(yīng)慢的情況,都可能會(huì)導(dǎo)致發(fā)送線程阻塞,業(yè)務(wù)發(fā)送消息的速度受限于最慢的 ProducerImpl 的速度。
五、未來(lái)展望
本文分享了 vivo 在 Pulsar 集群管理與監(jiān)控的經(jīng)驗(yàn),并介紹 vivo 在負(fù)載均衡等方面的最佳實(shí)踐。
由于服務(wù)端的問(wèn)題很難通過(guò)監(jiān)控指標(biāo)進(jìn)行分析,vivo 在未來(lái)計(jì)劃實(shí)現(xiàn)生產(chǎn)端到消費(fèi)端的全鏈路監(jiān)控能力。大數(shù)據(jù)團(tuán)隊(duì)希望整合大數(shù)據(jù)組件,支撐 Flink、Spark、Druid 等核心下游組件打通落地。
同時(shí),vivo 內(nèi)部目前 Pulsar 與 Kafka 同時(shí)運(yùn)行,團(tuán)隊(duì)將嘗試基于 KoP 對(duì)存量 Kafka 萬(wàn)億流量嘗試遷移,降低 Kafka 遷移成本;并探索容器化落地,充分發(fā)揮 Pulsar 云原生優(yōu)勢(shì)。

























