Kafka中改進的二分查找算法
最近有學習些Kafak的源碼,想給大家分享下Kafak中改進的二分查找算法。二分查找,是每個程序員都應掌握的基礎算法,而Kafka是如何改進二分查找來應用于自己的場景中,這很值得我們了解學習。
由于Kafak把二分查找應用于索引查找的場景中,所以本文會先對Kafka的日志結構和索引進行簡單的介紹。在Kafak中,消息以日志的形式保存,每個日志其實就是一個文件夾,且存有多個日志段,一個日志段指的是文件名(起始偏移)相同的消息日志文件和4個索引文件,如下圖所示。
在消息日志文件中以追加的方式存儲著消息,每條消息都有著唯一的偏移量。在查找消息時,會借助索引文件進行查找。如果根據偏移量來查詢,則會借助位移索引文件來定位消息的位置。為了便于討論索引查詢,下文都將基于位移索引這一背景。位移索引的本質是一個字節數組,其中存儲著偏移量和相應的磁盤物理位置,這里偏移量和磁盤物理位置都固定用4個字節,可以看做是每8個字節一個key-value對,如下圖:
索引的結構已經清楚了,下面就能正式進入本文的主題“二分查找”。給定索引項的數組和target偏移量,可寫出如下代碼:
- private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
- // _entries表示索引項的數量
- // 1. 如果當前索引為空,直接返回(-1,-1)表示沒找到
- if (_entries == 0)
- return (-1, -1)
- // 2. 確保查找的偏移量不小于當前最小偏移量
- if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
- return (-1, 0)
- // 3. 執行二分查找算法,找出target
- var lo = 0
- var hi = _entries - 1
- while (lo < hi) {
- val mid = ceil(hi / 2.0 + lo / 2.0).toInt
- val found = parseEntry(idx, mid)
- val compareResult = compareIndexEntry(found, target, searchEntity)
- if (compareResult > 0)
- hi = mid - 1
- else if (compareResult < 0)
- lo = mid
- else
- return (mid, mid)
- }
- (lo, if (lo == _entries - 1) -1 else lo + 1)
- }
上述代碼使用了普通的二分查找,下面我們看下這樣會存在什么問題。雖然每個索引項的大小是4B,但操作系統訪問內存時的最小單元是頁,一般是4KB,即4096B,會包含了512個索引項。而找出在索引中的指定偏移量,對于操作系統訪問內存時則變成了找出指定偏移量所在的頁。假設索引的大小有13個頁,如下圖所示:
由于Kafka讀取消息,一般都是讀取最新的偏移量,所以要查詢的頁就集中在尾部,即第12號頁上。下面我們結合上述的代碼,看下查詢最新偏移量,會訪問哪些頁。根據二分查找,將依次訪問6、9、11、12號頁。
當隨著Kafka接收消息的增加,索引文件也會增加至第13號頁,這時根據二分查找,將依次訪問7、10、12、13號頁。
可以看出訪問的頁和上一次的頁完全不同。之前在只有12號頁的時候,Kafak讀取索引時會頻繁訪問6、9、11、12號頁,而由于Kafka使用了mmap來提高速度,即讀寫操作都將通過操作系統的page cache,所以6、9、11、12號頁會被緩存到page cache中,避免磁盤加載。但是當增至13號頁時,則需要訪問7、10、12、13號頁,而由于7、10號頁長時間沒有被訪問(現代操作系統都是使用LRU或其變體來管理page cache),很可能已經不在page cache中了,那么就會造成缺頁中斷(線程被阻塞等待從磁盤加載沒有被緩存到page cache的數據)。在Kafka的官方測試中,這種情況會造成幾毫秒至1秒的延遲。
鑒于以上情況,Kafka對二分查找進行了改進。既然一般讀取數據集中在索引的尾部。那么將索引中最后的8192B(8KB)劃分為“熱區”,其余部分劃分為“冷區”,分別進行二分查找。代碼實現如下:
- private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
- // 1. 如果當前索引為空,直接返回(-1,-1)表示沒找到
- if(_entries == 0)
- return (-1, -1)
- // 二分查找封裝成方法
- def binarySearch(begin: Int, end: Int) : (Int, Int) = {
- var lo = begin
- var hi = end
- while(lo < hi) {
- val mid = (lo + hi + 1) >>> 1
- val found = parseEntry(idx, mid)
- val compareResult = compareIndexEntry(found, target, searchEntity)
- if(compareResult > 0)
- hi = mid - 1
- else if(compareResult < 0)
- lo = mid
- else
- return (mid, mid)
- }
- (lo, if (lo == _entries - 1) -1 else lo + 1)
- }
- /**
- * 2. 確認熱區首個索引項位。_warmEntries就是所謂的分割線,目前固定為8192字節處
- * 對于OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024個索引項
- * 大部分查詢集中在索引項的尾部,所以把尾部的8192字節設置為熱區
- * 如果查詢target在熱區索引項范圍,直接查熱區,避免頁中斷
- */
- val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
- // 3. 判斷target偏移值在熱區還是冷區
- if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
- // 如果在熱區,搜索熱區
- return binarySearch(firstHotEntry, _entries - 1)
- }
- // 4. 確保要查找的位移值不能小于當前最小位移值
- if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
- return (-1, 0)
- // 5. 如果在冷區,搜索冷區
- binarySearch(0, firstHotEntry)
- }
這樣做的好處是,在頻繁查詢尾部的情況下,尾部的頁基本都能在page cahce中,從而避免缺頁中斷。
下面我們還是用之前的例子來看下。由于每個頁最多包含512個索引項,而最后的1024個索引項所在頁會被認為是熱區。那么當12號頁未滿時,則10、11、12會被判定是熱區;而當12號頁剛好滿了的時候,則11、12被判定為熱區;當增至13號頁且未滿時,11、12、13被判定為熱區。假設我們讀取的是最新的消息,則在熱區中進行二分查找的情況如下:
當12號頁未滿時,依次訪問11、12號頁,當12號頁滿時,訪問頁的情況相同。當13號頁出現的時候,依次訪問12、13號頁,不會出現訪問長時間未訪問的頁,則能有效避免缺頁中斷。
關于為什么設置熱區大小為8192字節,官方給出的解釋,這是一個合適的值:
足夠小,能保證熱區的頁數小于等于3,那么當二分查找時的頁面都很大可能在page cache中。也就是說如果設置的太大了,那么可能出現熱區中的頁不在page cache中的情況。
足夠大,8192個字節,對于位移索引,則為1024個索引項,可以覆蓋4MB的消息數據,足夠讓大部分在in-sync內的節點在熱區查詢。
最后一句話總結下:在Kafka索引中使用普通二分搜索會出現缺頁中斷的現象,造成延遲,且結合查詢大多集中在尾部的情況,通過將索引區域劃分為熱區和冷區,分別搜索,將盡可能保證熱區中的頁在page cache中,從而避免缺頁中斷。





































