Flink 的 RocksDB 狀態(tài)后端在 vivo 的實(shí)踐

本文簡要介紹了特征拼接在實(shí)時(shí)推薦中的重要作用,并講述了vivo實(shí)時(shí)推薦系統(tǒng)中特征拼接模塊的架構(gòu)演進(jìn)過程以及采用現(xiàn)有的“基于RocksDB的大狀態(tài)解決方案”的原因,重點(diǎn)敘述了該方案所遇到的一系列問題,包括TM Lost、RocksDB性能調(diào)優(yōu)門檻高、TM初始化慢、狀態(tài)遠(yuǎn)程存儲(chǔ)HDFS RPC飆高等,并給出了這些問題的現(xiàn)象以及解決方案。
1分鐘看圖掌握核心觀點(diǎn)??

01、背景
在推薦系統(tǒng)中,樣本拼接是銜接在線服務(wù)與算法模型的重要一個(gè)環(huán)節(jié),主要職責(zé)是樣本拼接和業(yè)務(wù)相關(guān)的ETL處理等,模塊位置如下圖紅框所示。

推薦系統(tǒng)通過學(xué)習(xí)埋點(diǎn)數(shù)據(jù)來達(dá)到個(gè)性化精準(zhǔn)推薦的目的,因此需要知道服務(wù)端推薦下發(fā)的內(nèi)容,是否有一系列的行為(曝光,點(diǎn)擊,播放,點(diǎn)贊,收藏,加購等等),把被推薦內(nèi)容的埋點(diǎn)數(shù)據(jù)與當(dāng)下的特征拼接起來的過程,一般稱為樣本拼接,一個(gè)簡化的流程如下:

推薦的過程可以檢驗(yàn)概括為以下幾點(diǎn):
- 后臺(tái)服務(wù)rank 推薦內(nèi)容給app客戶端,同時(shí)把內(nèi)容對(duì)應(yīng)的特征快照保存起來;
- app接收到內(nèi)容后,埋點(diǎn)日志被上報(bào)到消息中間件;
- 樣本拼接負(fù)責(zé)將特征與埋點(diǎn)日志拼接起來,定義正負(fù)樣本,格式轉(zhuǎn)換;
- 模型接收樣本訓(xùn)練,將使用最新的模型做推薦。
為了保證較高的拼接率和穩(wěn)定性,我們的拼接架構(gòu)也經(jīng)過了長時(shí)間的迭代,這篇文章我將給大家介紹vivo特征拼接架構(gòu)的發(fā)展歷程、當(dāng)前方案、當(dāng)前方案遇到的問題和解決方案,以及未來的規(guī)劃和展望,希望能幫助到業(yè)內(nèi)的同學(xué)。
02、拼接方案選型
2.1小時(shí)粒度拼接
小時(shí)拼接是將埋點(diǎn)日志和特征快照都保存到Hive并以小時(shí)分區(qū),每小時(shí)調(diào)度一個(gè)Spark任務(wù)來處理兩個(gè)表相應(yīng)分區(qū)的數(shù)據(jù)做拼接,由于是小時(shí)拼接,實(shí)時(shí)性較低,Spark作業(yè)本身也依賴于上游Hive表小時(shí)分區(qū)生成,每個(gè)小時(shí)末尾的請(qǐng)求埋點(diǎn)有可能是落在當(dāng)前小時(shí),也有可能落在下個(gè)小時(shí)。舉個(gè)例子:19點(diǎn)50分下發(fā)了一個(gè)視頻,客戶端在19:59分點(diǎn)擊了,但是視頻播放卻是在20點(diǎn)03分完成的,這個(gè)時(shí)候就會(huì)存在拼接不上的問題。

2.2基于 Redis 的流式拼接
為了提升拼接率,且達(dá)到實(shí)時(shí)拼接,節(jié)點(diǎn)故障容災(zāi),完備監(jiān)控等特性,F(xiàn)link是一個(gè)很好的替代方案,也是最近幾年比較主流的實(shí)現(xiàn)。最初在實(shí)時(shí)推薦場景中,Kafka中的特征快照通過Flink任務(wù)寫入到Redis,另一個(gè)Flink任務(wù)消費(fèi)曝光埋點(diǎn)數(shù)據(jù)和點(diǎn)擊埋點(diǎn)數(shù)據(jù)并讀取存在Redis中的特征快照數(shù)據(jù)做拼接,拼接后的數(shù)據(jù)作為拼接特征被寫入到下游的Kafka中,提供給后續(xù)的算法做模型的訓(xùn)練,架構(gòu)圖如下:

經(jīng)過一段時(shí)間實(shí)踐,以上的方案出現(xiàn)了兩個(gè)痛點(diǎn):
- Redis中存儲(chǔ)了幾十T的數(shù)據(jù),Redis的成本高;
- 業(yè)務(wù)數(shù)據(jù)流量會(huì)波動(dòng),經(jīng)常需要DBA對(duì)Redis集群進(jìn)行擴(kuò)容,涉及大量數(shù)據(jù)的遷移,運(yùn)維成本高。
2.3基于 RocksDB 大狀態(tài)流式拼接
為了解決基于Redis的作為中間數(shù)據(jù)的存儲(chǔ)存在的問題,我們采用Flink狀態(tài)來存儲(chǔ)特征快照,整個(gè)架構(gòu)中不再需要外部的Redis,由于我們需要存儲(chǔ)的數(shù)據(jù)量達(dá)幾十T,這里我們選用適合大數(shù)據(jù)量存儲(chǔ)的RocksDB類型的狀態(tài)后端,調(diào)整后架構(gòu)更加簡潔,如下圖所示:

流程如下:
- 首先將曝光流點(diǎn)點(diǎn)擊流以及特征在Flink 任務(wù)中做union并做keyby;
- 在processElement方法中如果接收到曝光流就將數(shù)據(jù)保存到state中,如果接收到曝光流就將數(shù)據(jù)保存到state中,如果接收到特征就去state中查詢相應(yīng)的曝光和點(diǎn)擊數(shù)據(jù);
- 如果能找到就發(fā)送到下游并將狀態(tài)數(shù)據(jù)清理掉,沒找到就將特征保存到state中,并注冊(cè)一個(gè)定時(shí)器;
- 定時(shí)器觸發(fā)時(shí)去state中查詢相應(yīng)的曝光和點(diǎn)擊數(shù)據(jù),如果找到就發(fā)到下游,并將狀態(tài)數(shù)據(jù)清理掉。
由于RocksDB可以同時(shí)利用內(nèi)存和磁盤來存儲(chǔ)數(shù)據(jù),所以對(duì)于內(nèi)存的使用量大幅下降,由于RocksDB是嵌入式的數(shù)據(jù)庫,每個(gè)TM上的RocksDB數(shù)據(jù)庫只存儲(chǔ)shuffe到該TM上的數(shù)據(jù),無需再關(guān)注擴(kuò)縮容的問題。當(dāng)然隨著數(shù)據(jù)上漲,F(xiàn)link流式拼接在實(shí)際的生產(chǎn)過程中也遇到了一系列的問題,為了保證業(yè)務(wù)的可用性,我們花了較長的時(shí)間對(duì)這些問題進(jìn)行攻克,目前任務(wù)穩(wěn)定性達(dá)到99.99% ,拼接率長期穩(wěn)定在99%以上,對(duì)拼接效果提升較大。下面我將列舉我們遇到的問題和解決方案,希望能夠幫助到業(yè)內(nèi)的其他團(tuán)隊(duì)。
03、問題及解決方案
3.1TM Lost問題
3.1.1 現(xiàn)象
在方案實(shí)施之初,我們發(fā)現(xiàn)這些特征拼接的任務(wù)頻繁出現(xiàn)TM was Lost異常導(dǎo)致任務(wù)重啟,我們看了日志,發(fā)現(xiàn)都是TM內(nèi)存超出了YARN的內(nèi)存限制被kill。
3.1.2 問題分析
那么我們的疑問就來了,為啥這部分任務(wù)的內(nèi)存很容易超出,超出的那部分內(nèi)存又是誰在用呢?下面這張圖是來自Flink的官網(wǎng),因?yàn)槲覀冊(cè)谄脚_(tái)使用Flink的時(shí),我們只設(shè)置了總的內(nèi)存,并沒有關(guān)注其他各個(gè)局部的內(nèi)存,那么這些部位的內(nèi)存是如何分配的?為了搞清楚這個(gè)問題,有必要梳理一下每個(gè)模塊內(nèi)存計(jì)算的邏輯。

Flink內(nèi)存分配邏輯
一般在YARN上提交的任務(wù)是含有taskmanager.memory.process.size 參數(shù)的配置的,所以Flink在分配內(nèi)存時(shí),會(huì)以調(diào)用deriveProcessSpecWithTotalProcessMemory 方法分配。
通過配置參數(shù)獲得meatspace 的大小,通過jobmanager.memory.jvm-overhead.fraction 的比例計(jì)算overhead的內(nèi)存,totalFlinkMemory通過總的進(jìn)程的內(nèi)存減去meatspace + overhead的內(nèi)存得到。
通過配置中的參數(shù)獲取 frameworkHeapMemory-Size、frameworkOffHeapMemorySize 、task-OffHeapMemorySize 的大小。
通過managedmemory的配置獲取托管內(nèi)存的值, 通過networkbuffer的配置獲取networkbuffer的值 。totalFlinkMemory 減去所有需要排除的內(nèi)存,剩下的內(nèi)存分配給堆。內(nèi)存分配邏輯,以及每塊內(nèi)存的設(shè)置方法如下圖:

到此TM的各個(gè)內(nèi)存模塊的內(nèi)存已經(jīng)劃分完成。有上面的分析我們可以得出以下的結(jié)論:
totalProcessMemorySize = totalFlinkMemorySize + JvmMetaspaceSize + JvmOverheadSize
totalFlinkMemorySize = frameworkOffHeapMemorySize + taskOffHeapMemorySize + managedMemorySize + networkMemorySize + frameworkHeapMemorySize + taskHeapMemorySize這里重點(diǎn)將一下JVMOverhead,JVMOverhead并沒有具體的作用,是一個(gè)預(yù)留值,它是一個(gè)緩沖區(qū),可以避免在Flink運(yùn)行在容器中是因?yàn)槎虝r(shí)時(shí)間的內(nèi)存超出了容器的限制而被kill。
frameworkOffHeapMemorySize和taskOff-HeapMemorySize 也是預(yù)留值,offheap在概念上的主要是指native內(nèi)存。frameworkHeap-MemorySize 也是預(yù)留值。由此可以看出雖然Flink官方將TM的內(nèi)存劃分的較細(xì)致,但是像JvmOverheadSize frameworkOffHeap-MemorySize,taskOffHeapMemorySize,frameworkHeapMemorySize 都只是邏輯上的預(yù)留,并沒有從操作系統(tǒng)層面實(shí)現(xiàn)隔離。
RocksDB內(nèi)存分配邏輯
因?yàn)槎褍?nèi)存不足時(shí)一般會(huì)報(bào)out of memory的異常,所以到這一步我們推測(cè)應(yīng)該是堆外內(nèi)存溢出了,而堆外內(nèi)存最大的一塊就是RocksDB使用的,而從Flink的官網(wǎng)的介紹可以知道托管內(nèi)存就是給RocksDB使用的,下面我們?cè)倏匆幌峦泄軆?nèi)存是如何分配給RocksDB的。
cacheMemory = (1-(1/3)*(writeBufferRatio))* managedMemory
bufferMemory = (2/3)*(writeBufferRatio)* managedMemory
讀寫緩存總內(nèi)存 = bufferMemory + cacheMemory = (1 +(1/3)*(writeBufferRatio))* managedMemory由上面的代碼可以看出,managed memory 是通過一定的比例給RocksDB的各個(gè)部分來分配內(nèi)存的,writeBufferRatio會(huì)影響讀緩存和寫緩存的大小,理論上讀寫緩存總內(nèi)存有可能會(huì)超過managedMemory的大小。通過上面的公式可以看出讀寫緩存總內(nèi)存最多超出managedMemory的1/3,這里很容易想到,那么我們?cè)谂挪閛verhead的時(shí)候配置大于managedMemory的1/3不就能你面內(nèi)存溢出了,但是在實(shí)踐中,我們這樣配置并并沒有完全的解決物理內(nèi)存溢出的問題,下面關(guān)于RocksDB內(nèi)存的資料,終于找到了是還有哪部分內(nèi)存容易溢出了,是因?yàn)椴糠謪^(qū)域的內(nèi)存難以限制導(dǎo)致的。
RocksDB 的內(nèi)存占用有 4 個(gè)部分:
- Block Cache: OS PageCache 之上的一層緩存,緩存未壓縮的數(shù)據(jù) Block;
- Indexes and filter blocks: 索引及布隆過濾器,用于優(yōu)化讀性能;
- MemTable: 類似寫緩存;
- Blocks pinned by Iterator: 觸發(fā) RocksDB 遍歷操作(比如遍歷 RocksDBMapState 的所有 key)時(shí),Iterator 在其生命周期內(nèi)會(huì)阻止其引用到的 Block 和 MemTable 被釋放,導(dǎo)致額外的內(nèi)存占用。
前三個(gè)區(qū)域的內(nèi)存都是可配置的,但 Iterator 鎖定的資源則要取決于應(yīng)用業(yè)務(wù)使用模式,且沒有提供一個(gè)硬限制,因此 Flink 在計(jì)算 RocksDB StateBackend 內(nèi)存時(shí)沒有將這部分納入考慮,其次是 RocksDB Block Cache 的一個(gè) bug,它會(huì)導(dǎo)致 Cache 大小無法嚴(yán)格控制,有可能短時(shí)間內(nèi)超出設(shè)置的內(nèi)存容量,相當(dāng)于軟限制,原來是迭代器的內(nèi)存限制的不好,導(dǎo)致的內(nèi)存溢出。
3.1.3 解決方案
我們?cè)谑褂肍link 的RocksDB狀態(tài)后端時(shí),是通過managed memory來控制RocksDB各個(gè)部分的內(nèi)存的,所以managed memory內(nèi)存越小分配給各個(gè)部分的內(nèi)存也就越小,迭代器內(nèi)存越不容易溢出。到此我們對(duì)Flink的RocksDB狀態(tài)后端的內(nèi)存有了一定的認(rèn)知:當(dāng)性能可以滿足的情況下,F(xiàn)link的Manaed memory應(yīng)該越小越好。但是上滿形成的經(jīng)驗(yàn)很難高效的在業(yè)務(wù)上落地,原因是“Flink的Manaed memory應(yīng)該越小越好”很難去確定。
于是我們聯(lián)想到了之前的JVMoverhead,在我們的實(shí)際實(shí)踐中過程中,我們是通過調(diào)大JVMoverhead,和jemalloc內(nèi)存分配器來解決內(nèi)存溢出問題的。在Flink1.12之后Flink on k8s的內(nèi)存分配器已經(jīng)默認(rèn)改成了jemalloc,可以避免內(nèi)存的分配過程中出現(xiàn)64M問題。
但是要注意:由于我們的Java版本是JAVA8小版本是192,在最新版本的jemalloc5.3上出現(xiàn)了死鎖的問題,后來我們采用jemalloc4.5 就沒有問題了。據(jù)了解業(yè)界有些公司使用的JAVA8小版本是256采用jemalloc5.3沒有遇到死鎖問題。
3.2RocksDB 的性能監(jiān)控問題
3.2.1 現(xiàn)象
Flink RocksDB大狀態(tài)的任務(wù)經(jīng)常出現(xiàn)延遲,但是我們很難知道性能的瓶頸在哪塊,從而優(yōu)化響應(yīng)的環(huán)節(jié)。
3.2.2 解決方案
其實(shí)Flink提供了一系列對(duì)于RocksDB的性能的監(jiān)控指標(biāo),我們只需要加上參數(shù)開啟即可,這里我只結(jié)局我覺得最有參考意義的指標(biāo)開啟的參數(shù):

下面是相關(guān)指標(biāo)的監(jiān)控頁面:


3.3任務(wù)出現(xiàn)延遲
3.3.1 現(xiàn)象
Flink RocksDB大狀態(tài)的任務(wù)經(jīng)常出現(xiàn)延遲,調(diào)優(yōu)參數(shù)高達(dá)近百個(gè),如何系統(tǒng)性的調(diào)優(yōu),難度較大。
3.3.2 解決方案
要想對(duì)RocksDB的性能做優(yōu)化,我們有必要先了解一下RocksDB的讀寫流程。
RocksDB的讀流程

- 獲取當(dāng)前時(shí)刻的SuperVersion,SuperVersion是RocksDB內(nèi)針對(duì)于所有SST文件列表以及內(nèi)存中的MemTable和Immutable MemTable的一個(gè)版本;
- 獲取當(dāng)前的序號(hào)來決定當(dāng)前讀操作依賴的數(shù)據(jù)快照;
- 嘗試從第一步SuperVersion中引用的MemTable以及Immutable MemTable中獲取對(duì)應(yīng)的值。首先會(huì)經(jīng)過布隆過濾器,假如不存在則一定不存在,反之假如返回存在則不一定存在;
- 嘗試從Block Cache中讀??;
- 嘗試從SST文件中獲取。
RocksDB的寫流程

- 將寫入操作順序?qū)懭隬AL日志中,接下來把數(shù)據(jù)寫到 MemTable中(采用SkipList結(jié)構(gòu)實(shí)現(xiàn))
MemTable達(dá)到一定大小后,將這個(gè) MemTable 切換為不可更改的 immutable MemTable,并新開一個(gè) MemTable 接收新的寫入請(qǐng)求; - 這個(gè) immutable MemTable進(jìn)行持久化到磁盤,成為L0 層的 SSTable 文件;
- 每一層的所有文件總大小是有限制的,每下一層大十倍。一旦某一層的總大小超過閾值了,就選擇一個(gè)文件和下一層的文件合并。
注意: 所有下一層被影響到的文件都會(huì)參與 Compaction。合并之后,保證 L1 到 L6 層的每一層的數(shù)據(jù)都是在 key 上全局有序的,而 L0 層是可以有重疊的,寫流程的約束; - 日志文件用于崩潰恢復(fù);
- 每個(gè)MemTable及SST文件中的Key都是有序的(字符順序的升序);
- 日志文件中的Key是無序的;
- 刪除操作是標(biāo)記刪除,是插入操作的一種,真正的刪除要在Compaction的時(shí)候?qū)崿F(xiàn);
- 無更新實(shí)現(xiàn),記錄更新通過插入一條新記錄實(shí)現(xiàn);
當(dāng)任務(wù)出現(xiàn)延遲時(shí),由于我們已經(jīng)有了RocksDB性能指標(biāo)的監(jiān)控也了解RocksDB的原理,我們?cè)谧鲂阅軆?yōu)化時(shí)就可以對(duì)癥下藥了。
讀性能優(yōu)化
當(dāng)任務(wù)出現(xiàn)延遲且塊緩存命中率下降時(shí),說明是讀的性能下降導(dǎo)致延遲,我們可以通過提升緩存命中率的方式來提升讀性能,RocksDB任務(wù)緩存命中率的優(yōu)化思路如下:
- 托管內(nèi)存小于TM內(nèi)存20%,可以調(diào)大托管內(nèi)存:state.backend.rocksdb.memory.managed 到 20%;
- Flink內(nèi)部對(duì)RocksDB的優(yōu)化已經(jīng)沉淀了多組參數(shù),建議使用配置:
state.backend.rocksdb.predefined-options = SPINNING_DISK_OPTIMIZED_HIGH_MEM; - Flink中使用state.backend.rocksdb.memory.write-buffer-ratio參數(shù)來管理寫緩存,調(diào)小該參數(shù),能夠提升讀緩存,該參數(shù)默認(rèn)0.5;
- RocksDB 會(huì)有一寫索引和過濾器放在內(nèi)存中,用這個(gè)參數(shù)開啟:state.backend.rocksdb.memory.partitioned-index-filters 默認(rèn) false,并且可以調(diào)節(jié)索引和過濾器占用的內(nèi)存比例,參數(shù)是:state.backend.rocksdb.memory.high-prio-pool-ratio默認(rèn)為0.1。
寫性能優(yōu)化
當(dāng)任務(wù)延遲,如果出現(xiàn)等待flush的內(nèi)存表的大小增加,或者等待合并的個(gè)數(shù)增加,因?yàn)榈葞lush個(gè)數(shù)達(dá)到一定的個(gè)數(shù)時(shí)寫將會(huì)被阻塞,可以先關(guān)注一下磁盤io是否打滿,如果已經(jīng)處于高位,建議提升任務(wù)的并發(fā)。如果此磁盤io處于低位,我們可以調(diào)整flush和compation的線程數(shù)來使寫的數(shù)據(jù)不再積壓。提升寫寫性能。Flink會(huì)將flush和compation的線程數(shù)通過一個(gè)參數(shù)統(tǒng)一管理,參數(shù)是:state.backend.rocksdb.thread.num,默認(rèn)值是1。
3.4任務(wù)啟動(dòng)慢的問題
3.4.1 現(xiàn)象
由于Flink任務(wù)在從狀態(tài)啟動(dòng)時(shí)需要將存儲(chǔ)在遠(yuǎn)程HDFS的狀態(tài)文件讀到本地,當(dāng)TM較集中時(shí)單臺(tái)機(jī)器的磁盤io很容易被打滿,導(dǎo)致某些sub task 長時(shí)間處于INITIALIZING的狀態(tài)。
3.4.2 解決方案
YARN參數(shù)的優(yōu)化
YARN默認(rèn)的yarn.scheduler.fair.assignmultiple參數(shù)為flase,即一次只分配一個(gè)container,但是CDH將這個(gè)參數(shù)設(shè)置成了true,yarn.scheduler.fair.max.assigr默認(rèn)為-1,表示不限制,所以導(dǎo)致一次調(diào)度到單個(gè)節(jié)點(diǎn)上的container較多。我們的解決方案是將YARN配置中的yarn.scheduler.fair.assignmultiple參數(shù)設(shè)為false,一次只調(diào)度一個(gè)container,解決了TM分配較集中的問題。
Flink調(diào)度策略的優(yōu)化
由于只是限制了每次分配TM的個(gè)數(shù),還不能完全避免分配集中的問題,于是我們對(duì)Flink引擎內(nèi)部做了優(yōu)化,可以硬限制在某臺(tái)機(jī)器上調(diào)度TM的個(gè)數(shù),具體做法是,是當(dāng)YARN返回給Flink ResourceManager container信息時(shí),判斷container是否符合要求,如果不符合可以部分拒收,再次申請(qǐng)資源,該功能由參數(shù)開啟。

3.5磁盤打滿的問題
3.5.1 現(xiàn)象
由于我們實(shí)時(shí)集群的磁盤較小,大狀態(tài)任務(wù)的狀態(tài)達(dá)幾十上百T,頻繁出現(xiàn)磁盤使用率達(dá)到90%的告警。
3.5.2 解決方案
我們將大狀態(tài)的任務(wù)的Checkpoint數(shù)據(jù)存儲(chǔ)到磁盤資源較寬裕的離線的集群,非大狀態(tài)的任務(wù)的Checkpoint數(shù)據(jù)存儲(chǔ)在實(shí)時(shí)集群。
3.6HDFS RPC 飆高問題
3.6.1 現(xiàn)象
在業(yè)務(wù)新上一批任務(wù)后,我們發(fā)現(xiàn)離線集群HDFS的RPC有明顯的增加。

3.6.2 解決方案
由于我們默認(rèn)只會(huì)保存最近的3個(gè)Checkpoint,所以對(duì)于增量Checkpoint而言,肯定會(huì)有文件的修改和刪除,據(jù)了解修改和刪除是對(duì)HDFS性能影響較大的操作。我們對(duì)比這一批任務(wù)任務(wù)在HDFS上的Checkpoint文件和之前的任務(wù)對(duì)比發(fā)現(xiàn),文件數(shù)量大很多,但是每個(gè)文件小很多,于是我們調(diào)整了參數(shù):state.backend.rocksdb.compaction.level.target-file-size-base參數(shù)為256MB,這個(gè)參數(shù)默認(rèn)是64MB,參數(shù)的作用控制壓縮后的文件的大小。配置改參數(shù)后RPC回歸正常。
效果如圖:

04、總結(jié)
4.1遺留問題
4.1.1.RocksDB的調(diào)優(yōu)的門檻較高
雖然我們?cè)谌蝿?wù)上使用了積累通用經(jīng)驗(yàn)進(jìn)行優(yōu)化,但是有些數(shù)據(jù)量較大的任務(wù)在流量高峰期依然容易出現(xiàn)延遲,RocksDB的參數(shù)有幾十個(gè),要想把性能調(diào)優(yōu)做到比較極致需要深入了解其原理,還有對(duì)業(yè)務(wù)特點(diǎn)有深入的了解,對(duì)于應(yīng)用開發(fā)而言,門檻較高。
4.1.2.任務(wù)恢復(fù)慢
由于有些任務(wù)的狀態(tài)高達(dá)幾十T,在重啟任務(wù)或者異常重啟時(shí)要從Checkpoint恢復(fù),需要從遠(yuǎn)程的HDFS下載狀態(tài)到本地磁盤,單機(jī)的io很容易被打滿,雖然我們做了TM打散,但是有些單個(gè)TM恢復(fù)狀態(tài)就需要幾十分鐘,這對(duì)于特征拼接任務(wù)來講是不可接受的。
4.1.3.SSD壽命消耗加速
我們的實(shí)時(shí)集群磁盤使用的是單塊的SSD,SSD壽命是有限的,然而RcoksDB的寫放大的特點(diǎn)加速了SSD的壽命的消耗。
4.2規(guī)劃
經(jīng)過較長時(shí)間的實(shí)踐我們理解了樣本拼接的本質(zhì)是將不同來源、不同更新頻率、不同規(guī)模的特征(如基礎(chǔ)特征、實(shí)時(shí)埋點(diǎn)特征、歷史特征)組合成完整樣本,而單一組件往往在 “延遲、存儲(chǔ)規(guī)模、更新頻率” 等維度存在短板,必須通過混合架構(gòu)實(shí)現(xiàn) “優(yōu)勢(shì)互補(bǔ)”。
業(yè)界混合架構(gòu)的案例
組件分工

拼接流程
① 實(shí)時(shí)日志采集:用戶點(diǎn)擊商品的日志通過Kafka接入Flink實(shí)時(shí)作業(yè);
② 實(shí)時(shí)數(shù)據(jù)存儲(chǔ):將曝光流的數(shù)據(jù)存到RocksDB和HBase中,RocksDB的TTL設(shè)置成1小時(shí);
③ 算子內(nèi)實(shí)時(shí)拼接:Flink算子從RocksDB讀取用戶最近1小時(shí)埋點(diǎn)特征,從HBase讀取基礎(chǔ)特征,初步拼接成“實(shí)時(shí)+基礎(chǔ)”特征;
④ 歷史特征融合:Flink作業(yè)將初步拼接結(jié)果寫入Paimon,與Paimon中存儲(chǔ)的“7天歷史特征”融合,生成完整樣本;
⑤ 樣本分發(fā):
- 實(shí)時(shí)推薦:完整樣本通過Flink寫入到HDFS提供給在線訓(xùn)練服務(wù)使用;
- 離線訓(xùn)練:Spark作業(yè)從Paimon讀取全量完整樣本,用于推薦模型的離線迭代。
下面是一個(gè)調(diào)用時(shí)序圖:

核心價(jià)值
- 低延遲:RocksDB 支撐算子內(nèi)毫秒級(jí)拼接,滿足實(shí)時(shí)推薦的 “秒級(jí)響應(yīng)” 需求;
- 大規(guī)模:HBase+Paimon 可支撐億級(jí)用戶的PB級(jí)特征存儲(chǔ);
- 流批協(xié)同:同一套樣本既供實(shí)時(shí)推薦,又供離線訓(xùn)練,實(shí)現(xiàn)流批架構(gòu)統(tǒng)一;
- 易于擴(kuò)展:Paimon動(dòng)態(tài)列支持特征迭代。
4.3展望
近幾年大數(shù)據(jù)架構(gòu)已經(jīng)從計(jì)算-存儲(chǔ)緊密耦合的Map-Reduce時(shí)代,進(jìn)入到了以Kubernetes容器化部署為標(biāo)準(zhǔn)的云原生世界。未來Flink將引入基于遠(yuǎn)程存儲(chǔ)的存算分離狀態(tài)管理架構(gòu),新架構(gòu)主要為了解決以下問題:
- 容器化環(huán)境下計(jì)算節(jié)點(diǎn)受本地磁盤大小限制的問題;
- 由于RocksDB中LSM結(jié)構(gòu)的周期性 Compaction 導(dǎo)致計(jì)算資源尖峰的問題;
- 大規(guī)模狀態(tài)快速擴(kuò)縮容的挑戰(zhàn)。
我們也將持續(xù)關(guān)注Flink社區(qū)的發(fā)展,嘗試采用遠(yuǎn)程存儲(chǔ)狀態(tài)后端來做為特征拼接的解決方案。



































