B站這個分布式KV存儲設計,我一鍵三連了
一、背景
在B站的業務場景中,存在很多種不同模型的數據,有些數據關系比較復雜像:賬號、稿件信息。有些數據關系比較簡單,只需要簡單的kv模型即可滿足。此外,又存在某些讀寫吞吐比較高的業務場景,該場景早期的解決方案是通過MySQL來進行數據的持久化存儲,同時通過redis來提升訪問的速度與吞吐。但是這種模式帶來了兩個問題, 其一是存儲與緩存一致性的問題, 該問題在B站通過canal異步更新緩存的方式得以解決, 其二則是開發的復雜度, 對于這樣一套存儲系統,每個業務都需要額外維護一個任務腳本來消費canal數據進行緩存數據的更新。基于這種場景,業務需要的其實是一個介于Redis與MySQL之間的提供持久化高性能的kv存儲。此外對象存儲的元數據,對數據的一致性、可靠性與擴展性有著很高的要求。
基于此背景,我們對自研KV的定位從一開始就是構建一個高可靠、高可用、高性能、高拓展的系統。對于存儲系統,核心是保證數據的可靠性,當數據不可靠時提供再高的可用性也是沒用的。可靠性的一個核心因素就是數據的多副本容災,通過raft一致性協議保證多副本數據的一致性。
分布式系統,如何對數據進行分片放置,業界通常有兩種做法,一是基于hash進行分區,二是基于range進行分區,兩種方式各有優缺點。hash分區,可以有效防止熱點問題,但是由于key是hash以后放置的,無法保證key的全局有序。range分區,由于相鄰的數據都放在一起,因此可以保證數據的有序,但是同時也可能帶來寫入熱點的問題。基于B站的業務場景,我們同時支持了range分區和hash分區,業務接入的時候可以根據業務特性進行選擇。大部分場景,并不需要全局有序,所以默認推薦hash分區的接入方式,比如觀看記錄、用戶動態這些場景,只需要保證同一個用戶維度的數據有序即可,同一個用戶維度的數據可以通過hashtag的方式保證局部有序。
二、架構設計
1、總體架構

整個系統核心分為三個組件:
Metaserver用戶集群元信息的管理,包括對kv節點的健康監測、故障轉移以及負載均衡。
Node為kv數據存儲節點,用于實際存儲kv數據,每個Node上保存數據的一個副本,不同Node之間的分片副本通過raft保證數據的一致性,并選出主節點對外提供讀寫,業務也可以根據對數據一致性的需求指定是否允許讀從節點,在對數據一致性要求不高的場景時,通過設置允許讀從節點可以提高可用性以及降低長尾。
Client模塊為用戶訪問入口,對外提供了兩種接入方式,一種是通過proxy模式的方式進行接入,另一種是通過原生的SDK直接訪問,proxy本身也是封裝自c++的原生SDK。SDK從Metaserver獲取表的元數據分布信息,根據元數據信息決定將用戶請求具體發送到哪個對應的Node節點。同時為了保證高可用,SDK還實現了重試機制以及backoff請求。
2、集群拓撲

集群的拓撲結構包含了幾個概念,分別是Pool、Zone、Node、Table、Shard 與Replica。
- Pool 為資源池連通域,包含多個可用區。也可用于業務資源隔離域。
- Zone 為可用區,同一個pool內部的zone是網路聯通并且故障隔離的。通常為一個機房或者一個交換機
- Node 為實際的物理主機節點,負責具體的數據存儲邏輯與數據持久化。
- Table 對應到具體的業務表,類似MySQL里的表。
- Shard 為邏輯分片,通過將table分為多個shard將數據打散分布。
- Replica 為shard的副本,同一個shard的不同副本不能分布在同一個zone,必須保證故障隔離。每一個replica包含一個engine,engine存儲全量的業務數據。engine的實現包含rocksdb和sparrowdb。其中sparrowdb是針對大value寫放大的優化實現。
三、核心特征
1、分區分裂
基于不同的業務場景,我們同時支持了range分區和hash分區。對于range場景,隨著用戶數據的增長,需要對分區數據進行分裂遷移。對于hash分區的場景,使用上通常會根據業務的數據量做幾倍的冗余預估,然后創建合適的分片數。但是即便是幾倍的冗余預估,由于業務發展速度的不可預測,也很容易出現實際使用遠超預估的場景,從而導致單個數據分片過大。
之所以不在一開始就創建足夠的分片數有兩個原因:其一,由于每一個replica都包含一個獨立的engine,過多的分片會導致數據文件過多,同時對于批量寫入場景存在一定的寫扇出放大。其二,每一個shard都是一組raftgroup,過多的raft心跳會對服務造成額外的開銷,這一點后續我們會考慮基于節點做心跳合并優化減少集群心跳數。
為了滿足業務的需求場景,我們同時支持了range和hash兩種模式下的分裂。兩種模式分裂流程類似,下面以hash為例進行說明。

hash模式下的分裂為直接根據當前分片數進行倍增。 分裂的流程主要涉及三個模塊的交互。
1)metaserver
分裂時,metaserver會根據當前分片數計算出目標分片數,并且下發創建replica指令到對應的Node節點,同時更新shard分布信息,唯一不同的是,處于分裂中的shard狀態為splitting。該狀態用于client流量請求路由識別。當Node完成數據分裂以后上報metaserver,metaserver更新shard狀態為normal從而完成分裂。
2)Node
node收到分裂請求以后,會根據需要分裂的分片id在原地拉起創建一個新的分片。然后對舊分片的數據進行checkpoint,同時記錄舊分片checkpoint對應的logid。新分片創建完成后,會直接從舊分片的checkpoint進行open,然后在異步復制logid之后的數據保證數據的一致性。新分片加載完checkpoint后,原來的舊分片會向raftgroup提交一條分裂完成日志,該日志處理流程與普通raft日志一致。分裂完成后上報分裂狀態到metaserver,同時舊分片開始拒絕不再屬于自己分片的數據寫入,client收到分片錯誤以后會請求metaserver更新shard分布。
完成分裂以后的兩個分片擁有的兩倍冗余數據,這些數據會在engine compaction的時候根據compaction_filter過濾進行刪除。
3)Client
用戶請求時,根據hash(key) % shard_cnt 獲取目標分片。表分裂期間,該shard_cnt表示分裂完成后的最終分片數。以上圖3分片的分裂為例:
hash(key) = 4, 分裂前shard_cnt為3,因此該請求會被發送到shard1. 分裂期間,由于shard_cnt變為6,因此目標分片應該是shard4, 但是由于shard4為splitting,因此client會重新計算分片從而將請求繼續發送給shard1. 等到最終分裂完成后,shard4狀態變更為Normal,請求才會被發送到shard4.
分裂期間,如果Node返回分片信息錯誤,那么client會請求metaserver更新分片分布信息。
2、binlog支持
類似于MySQL的binlog,我們基于raftlog日志實現了kv的binlog. 業務可以根據binlog進行實時的事件流訂閱,同時為了滿足事件流回溯的需求,我們還對binlog數據進行冷備。通過將binlog冷備到對象存儲,滿足了部分場景需要回溯較長事件記錄的需求。

直接復用raftlog作為用戶行為的binlog,可以減少binlog產生的額外寫放大,唯一需要處理的是過濾raft本身的配置變更信息。 learner通過實時監聽不斷拉取分片產生的binlog到本地并解析。 根據learner配置信息決定將數據同步到對應的下游。 同時binlog數據還會被異步備份到對象存儲,當業務需要回溯較長時間的事件流的時候,可以直接指定位置從S3拉取歷史binlog進行解析。
3、多活
基于上述提到的binlog能力,我們還基于此實現了kv的多活。learner模塊會實時將用戶寫入的數據同步到跨數據中心的其他kv集群。對于跨數據中心部署的業務,業務可以選擇就近的kv集群進行讀取訪問,降低訪問延時。
kv的多活分為讀多活和寫多活。對于讀多活,機房A的寫入會被異步復制到機房B,機房B的服務可以直接讀取本機房的數據,該模式下只有機房A的kv可以寫入。對于寫多活,kv在機房A B 都能同時提供寫入并且進行雙向同步,但是為了保證數據的一致性,需要業務上做數據的單元化寫入,保證兩個機房不會同時修改同一條記錄。通過將用戶劃分單元,提供了寫多活的能力。通過對binlog數據打標,解決了雙向同步時候的數據回環問題。
4、bulk load
對于用戶畫像和特征引擎等場景,需要將離線生成的大量數據快速導入KV存儲系統提供用戶讀取訪問。傳統的寫入方式是根據生成的數據記錄一條條寫入kv存儲,這樣帶來兩個問題。其一,大批量寫入會對kv造成額外的負載與寫入帶寬放大造成浪費。其次,由于寫入量巨大,每次導入需要花費較長的時間。為了減少寫入放大以及導入提速,我們支持了bulk load的能力。離線平臺只需要根據kv的存儲格式離線生成對應的SST文件,然后上傳到對象存儲服務。kv直接從對象存儲拉取SST文件到本地,然后直接加載SST文件即可對外提供讀服務。bulk load的另外一個好處是可以直接在生成SST后離線進行compaction,將compaction的負載offload到離線的同時也降低了空間的放大。
5、kv存儲分離
由于LSM tree的寫入特性,數據需要被不斷的compaction到更底層的level。在compaction時,如果該key還有效,那么會被寫入到更底層的level里,如果該key已經被刪除,那么會判斷當前level是否是最底層的,一條被刪除的key,會被標記為刪除,直到被compaction到最底層level的時候才會被真正刪除。compaction的時候會帶來額外的寫放大,尤其當value比較大的時候,會造成巨大的帶寬浪費。為了降低寫放大,我們參考了Bitcask實現了kv分離的存儲引擎sparrowdb.
1)sparrowdb 介紹

用 戶寫入的時候,value通過append only的方式寫入data文件,然后更新索引信息,索引的value包含實際數據所在的data文件id,value大小以及position信息,同時data文件也會包含索引信息。 與原始的bitcask實現不一樣的是,我們將索引信息保存在 rocksdb。
更新寫入的時候,只需要更新對應的索引即可。compaction的時候,只需將索引寫入底層的level,而無需進行data的拷貝寫入。對于已經失效的data,通過后臺線程進行檢查,當發現data文件里的索引與rocksdb保存的索引不一致的時候,說明該data已經被刪除或更新,數據可以被回收淘汰。
使用kv存儲分離降低了寫放大的問題,但是由于kv分離存儲,會導致讀的時候多了一次io,讀請求需要先根據key讀到索引信息,再根據索引信息去對應的文件讀取data數據。為了降低讀訪問的開銷,我們針對value比較小的數據進行了inline,只有當value超過一定閾值的時候才會被分離存儲到data文件。通過inline以及kv分離獲取讀性能與寫放大之間的平衡。
6、負載均衡
在分布式系統中,負載均衡是繞不過去的問題。一個好的負載均衡策略可以防止機器資源的空閑浪費。同時通過負載均衡,可以防止流量傾斜導致部分節點負載過高從而影響請求質量。對于存儲系統,負載均衡不僅涉及到磁盤的空間,也涉及到機器的內存、cpu、磁盤io等。同時由于使用raft進行主從選主,保證主節點盡可能的打散也是均衡需要考慮的問題。
1)副本均衡
由于設計上我們會盡量保證每個副本的大小盡量相等,因此對于空間的負載其實可以等價為每塊磁盤的副本數。創建副本時,會從可用的zone中尋找包含副本數最少的節點進行創建。同時考慮到不同業務類型的副本讀寫吞吐可能不一樣導致CPU負載不一致,在挑選副本的時候會進一步檢查當前節點的負載情況,如果當前節點負載超過閾值,則跳過該節點繼續選擇其他合適的節點。目前基于最少副本數以及負載校驗基本可以做到集群內部的節點負載均衡。
當出現負載傾斜時,則從負載較高的節點選擇副本進行遷出,從集群中尋找負載最低的節點作為待遷入節點。當出現節點故障下線以及新機器資源加入的時候,也是基于均值計算待遷出以及遷入節點進行均衡。
2)主從均衡
雖然通過最少副本數策略保證了節點副本數的均衡,但是由于raft選主的性質,可能出現主節點都集中在部分少數節點的情況。由于只有主節點對外提供寫入,主節點的傾斜也會導致負載的不均衡。為了保證主節點的均衡,Node節點會定期向metaserver上報當前節點上副本的主從信息。
主從均衡基于表維度進行操作。metaserver會根據表在Node的分布信息進行副本數的計算。主副本的數量基于最樸素簡單的數學期望進行計算: 主副本期望值 = 節點副本數 / 分片副本數。下面為一個簡單的例子:
假設表a包含10個shard,每個shard 3個replica。在節點A、B、C、D的分布為 10、5、6、9. 那么A、B、C、D的主副本數期望值應該為 3、1、2、3. 如果節點數實際的主副本數少于期望值,那么被放入待遷入區,如果大于期望值,那么被放入待遷出區。同時通過添加誤差值來避免頻繁的遷入遷出。只要節點的實際主副本數處于 [x-δx,x+δx] 則表示主副本數處于穩定期間,x、δx 分別表示期望值和誤差值。
需要注意的是,當對raft進行主從切換的時候,從節點需要追上所有已提交的日志以后才能成功選為主,如果有節點落后的時候進行主從切換,那么可能導致由于追數據產生的一段時間無主的情況。因此在做主從切換的時候必須要檢查主從的日志復制狀態,當存在慢節點的時候禁止進行切換。
7、故障檢測&修復
一個小概率的事件,隨著規模的變大,也會變成大概率的事件。分布式系統下,隨著集群規模的變大,機器的故障將變得愈發頻繁。因此如何對故障進行自動檢測容災修復也是分布式系統的核心問題。故障的容災主要通過多副本raft來保證,那么如何進行故障的自動發現與修復呢。

1)健康監測
metaserver會定期向node節點發送心跳檢查node的健康狀態,如果node出現故障不可達,那么metaserver會將node標記為故障狀態并剔除,同時將node上原來的replica遷移到其他健康的節點。
為了防止部分node和metaserver之間部分網絡隔離的情況下node節點被誤剔除,我們添加了心跳轉發的功能。上圖中三個node節點對于客戶端都是正常的,但是node3由于網絡隔離與metaserver不可達了,如果metaserver此時直接剔除node3會造成節點無必要的剔除操作。通過node2轉發心跳探測node3的狀態避免了誤剔除操作。
除了對節點的狀態進行檢測外,node節點本身還會檢查磁盤信息并進行上報,當出現磁盤異常時上報異常磁盤信息并進行踢盤。磁盤的異常主要通過dmesg日志進行采集分析。
2)故障修復
當出現磁盤節點故障時,需要將原有故障設備的replica遷移到其他健康節點,metaserver根據負載均衡策略選擇合適的node并創建新replica, 新創建的replica會被加入原有shard的raft group并從leader復制快照數據,復制完快照以后成功加入raft group完成故障replica的修復。
故障的修復主要涉及快照的復制。每一個replica會定期創建快照刪除舊的raftlog,快照信息為完整的rocksdb checkpoint。通過快照進行修復時,只需要拷貝checkpoint下的所有文件即可。通過直接拷貝文件可以大幅減少快照修復的時間。需要注意的是快照拷貝也需要進行io限速,防止文件拷貝影響在線io.
四、實踐經驗
1、rocksdb
1)過期數據淘汰
在很多業務場景中,業務的數據只需要存儲一段時間,過期后數據即可以自動刪除清理,為了支持這個功能,我們通過在value上添加額外的ttl信息,并在compaction的時候通過compaction_filter進行過期數據的淘汰。level之間的容量呈指數增長,因此rocksdb越底層能容納越多的數據,隨著時間的推移,很多數據都會被移動到底層,但是由于底層的容量比較大,很難觸發compaction,這就導致很多已經過期的數據沒法被及時淘汰從而導致了空間放大。與此同時,大量的過期數據也會對scan的性能造成影響。這個問題可以通過設置periodic_compaction_seconds 來解決,通過設置周期性的compaction來觸發過期數據的回收。
2)scan慢查詢
除了上面提到的存在大批過期數據的時候可能導致的scan慢查詢,如果業務存在大批量的刪除,也可能導致scan的時候出現慢查詢。因為delete對于rocksdb本質也是一條append操作,delete寫入會被添加刪除標記,只有等到該記錄被compaction移動到最底層后該標記才會被真正刪除。帶來的一個問題是如果用戶scan的數據區間剛好存在大量的delete標記,那么iterator需要迭代過濾這些標記直到找到有效數據從而導致慢查詢。該問題可以通過添加CompactOnDeletionCollector 來解決。當memtable flush或者sst compaction的時候,collector會統計當前key被刪除的比例,通過設置合理的 deletion_trigger ,當發現被delete的key數量超過閾值的時候主動觸發compaction。
3)delay compaction
通過設置 CompactOnDeletionCollector 解決了delete導致的慢查詢問題。但是對于某些業務場景,卻會到來嚴重的寫放大。當L0被compaction到L1時候,由于閾值超過deletion_trigger ,會導致L1被添加到compaction隊列,由于業務的數據特性,L1和L2存在大量重疊的數據區間,導致每次L1的compaction會同時帶上大量的L2文件造成巨大的寫放大。為了解決這個問題,我們對這種特性的業務數據禁用了CompactOnDeletionCollector 。通過設置表級別參數來控制表級別的compaction策略。后續會考慮優化delete trigger的時機,通過只在指定層級觸發來避免大量的io放大。
4)compaction限速
由于rocksdb的compaction會造成大量的io讀寫,如果不對compaction的io進行限速,那么很可能影響到在線的寫入。但是限速具體配置多少比較合適其實很難確定,配置大了影響在線業務,配置小了又會導致低峰期帶寬浪費。基于此rocksdb 在5.9以后為 NewGenericRateLimiter 添加了 auto_tuned 參數,可以根據當前負載自適應調整限速。需要注意的是,該函數還有一個參數 RateLimiter::Mode 用來限制操作類型,默認值為 kWritesOnly,通常情況該模式不會有問題,但是如果業務存在大量被刪除的數據,只限制寫可能會導致compaction的時候造成大量的讀io。
5)關閉WAL
由于raft log本身已經可以保證數據的可靠性,因此寫入rocksdb的時候可以關閉wal減少磁盤io,節點重啟的時候根據rocksdb里保存的last_apply_id從raft log進行狀態機回放即可。
2、Raft
1)降副本容災
對于三副本的raft group,單副本故障并不會影響服務的可用性,即使是主節點故障了剩余的兩個節點也會快速選出主并對外提供讀寫服務。但是考慮到極端情況,假設同時出現兩個副本故障呢?這時只剩一個副本無法完成選主服務將完全不可用。根據墨菲定律,可能發生的一定會發生。服務的可用性一方面是穩定提供服務的能力,另一方面是故障時快速恢復的能力。那么假設出現這種故障的時候我們應該如何快速恢復服務的可用呢。
如果通過創建新的副本進行修復,新副本需要等到完成快照拷貝以后才能加入raft group進行選舉,期間服務還是不可用的。那么我們可以通過強制將分片降為單副本模式,此時剩余的單個健康副本可以獨自完成選主,后續再通過變更副本數的方式進行修復。
2)RaftLog 聚合提交
對于寫入吞吐非常高的場景,可以通過犧牲一定的延時來提升寫入吞吐,通過log聚合來減少請求放大。對于SSD盤,每一次寫入都是4k刷盤,value比較小的時候會造成磁盤帶寬的浪費。我們設置了每5ms或者每聚合4k進行批量提交。該參數可以根據業務場景進行動態配置修改。
3)異步刷盤
有些對于數據一致性要求不是非常高的場景,服務故障的時候允許部分數據丟失。對于該場景,可以關閉fsync通過操作系統進行異步刷盤。但是如果寫入吞吐非常高導致page cache的大小超過了 vm.diry_ratio ,那么即便不是fsync也會導致io等待,該場景往往會導致io抖動。為了避免內核pdflush大量刷盤造成的io抖動,我們支持對raftlog進行異步刷盤。
五、未來探討
- 透明多級存儲,和緩存結合,自動冷熱分離,通過將冷數據自動搬遷到kv降低內存使用成本。
- 新硬件場景接入,使用SPDK 進行IO提速,使用PMEM進行訪問加速。

























