精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

通過這三個文件徹底搞懂Rocketmq的存儲原理

開發 前端
我們知道RocketMQ的架構是producer、NameServer、broker、Consumer,producer是生產消息的,NameServer是路由中心,負責服務的注冊發現以及路由管理這些。

[[429030]]

RocketMQ是阿里開發的一個高性能的消息隊列,支持各種消息類型,而且支持事務消息,可以說是現在的很多系統中的香餑餑了,所以呢,怎么使用大家肯定是要學習的。

我們作為一個有夢想的程序員,在學習一門技術的時候,肯定是不能光知其然,這是遠遠不夠的,我們必須要知其所以然,這樣才能在面試的時候侃侃而談,啊呸,不對,這樣我們才能在工作中遇到問題的時候,理性的去思考如何解決問題。

我們知道RocketMQ的架構是producer、NameServer、broker、Consumer,producer是生產消息的,NameServer是路由中心,負責服務的注冊發現以及路由管理這些。

Consumer是屬于消費消息的,broker則屬于真正的存儲消息,以及進行消息的持久化,也就是存儲消息的文件和索引消息的文件都在broker上。

消息隊列的主要作用是解耦異步削峰,也就意味著消息隊列中的存儲功能是必不可少的,而隨著時代的發展,業務量的增加也對消息隊列的存儲功能的強度的要求越來越高了。

也就是說你不能光性能好,你得存儲的消息也得足夠支撐我的業務量,你只能存儲100MB的消息,我這系統每分鐘的消息業務量可能500MB了,那肯定不夠使啊,那還削個啥的峰啊,峰來了你自己都頂不住。

RocketMQ憑借其強大的存儲能力和強大的消息索引能力,以及各種類型消息和消息的特性脫穎而出,于是乎,我們這些有夢想的程序員學習RocketMQ的存儲原理也變得尤為重要。

而要說起這個存儲原理,則不得不說的就是RocketMQ的消息存儲文件commitLog文件,消費方則是憑借著巧妙的設計Consumerqueue文件來進行高性能并且不混亂的消費,還有RocketMQ的強大的支持消息索引的特性,靠的就是indexfile索引文件。

我們這篇文章就從這commitLog、Consumerqueue、indexfile這三個神秘的文件說起,搞懂這三個文件,RocketMQ的核心就被你掏空了。

先上個圖,寫入commitLog文件時commitLog和Consumerqueue、indexfile文件三者的關系:

01 Commitlog文件

  • 大小和命名規則

RocketMQ中的消息存儲文件放在${ROCKET_HOME}/store 目錄下,當生產者發送消息時,broker會將消息存儲到Commit文件夾下,文件夾下面會有一個commitLog文件,但是并不是意味著這個文件叫這個,文件命名是根據消息的偏移量來決定的。

文件有自己的生成規則,每個commitLog文件的大小是1G,一般情況下第一個 CommitLog 的起始偏移量為 0,第二個 CommitLog 的起始偏移量為 1073741824 (1G = 1073741824byte)。

也正是因為該文件的文件名字規則,所以也可以更好的知道消息處于哪個文件中,假設物理偏移量是1073741830,則相對的偏移量是6(6 = 1073741830 - 1073741824),于是判斷出該消息位于第二個commitLog文件上,下面要說的Consumerqueue文件和indexfile文件都是通過偏移量來計算出消息位于哪個文件,進行更為精準的定位,減少了IO次數。

  • 文件存儲規則和特點

commitLog文件的最大的一個特點就是消息的順序寫入,隨機讀寫,關于commitLog的文件的落盤有兩種,一種是同步刷盤,一種是異步刷盤,可通過 flushDiskType 進行配置。

在寫入commitLog的時候內部會有一個mappedFile內存映射文件,消息是先寫入到這個內存映射文件中,然后根據刷盤策略寫到硬盤中,對于producer的角度來說就是,同步就是當消息真正的寫到硬盤的時候才會給producer返回成功,而異步就是當消息到達內存的時候就返回成功了,然后異步的去刷盤。

跑題了,最大的特點順序寫入,所有的topic的消息都存儲到commitLog文件中,順序寫入可以充分的利用磁盤順序減少了IO爭用數據存儲的性能,kafka也是通過硬盤順序存盤的。

大家都常說硬盤的速度比內存慢,其實這句話也是有歧義的,當硬盤順序寫入和讀取的時候,速度不比內存慢,甚至比內存速度快,這種存儲方式就好比數組,我們如果知道數組的下標,則可以直接通過下標計算出位置,找到內存地址,眾所周知,數組的讀取是很快的,但是數組的缺點在于插入數據比較慢,因為如果在中間插入數據需要將后面的數據往后移動。

而對于數組來說,如果我們只會順序的往后添加,數組的速度也是很快的,因為數組沒有后續的數據的移動,這一操作很耗時。

回到RocketMQ中的commitLog文件,也是同樣的道理,順序的寫入文件也就不需要太多的去考慮寫入的位置,直接找到文件往后放就可以了,而取數據的時候,也是和數組一樣,我們可以通過文件的大小去精準的定位到哪一個文件,然后再精準的定位到文件的位置。

當然,至于這個索引位置就是靠下面的Consumerqueue文件和indexfile文件來找到消息的位置的,也就是索引地址。

哦對了,數組的元素大小是一樣的,并不意味這commitLog文件的各個消息存儲空間一樣。

  • 簡單看下源碼

這部分源碼在DefaultMessageStore.putMessage。

  1. @Override 
  2.   public PutMessageResult putMessage(MessageExtBrokerInner msg) { 
  3.       if (this.shutdown) { 
  4.           log.warn("message store has shutdown, so putMessage is forbidden"); 
  5.           return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 
  6.       } 
  7.  
  8.       // 從節點不允許寫入 
  9.       if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { 
  10.           long value = this.printTimes.getAndIncrement(); 
  11.           if ((value % 50000) == 0) { 
  12.               log.warn("message store is slave mode, so putMessage is forbidden "); 
  13.           } 
  14.  
  15.           return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 
  16.       } 
  17.  
  18.       // store是否允許寫入 
  19.       if (!this.runningFlags.isWriteable()) { 
  20.           long value = this.printTimes.getAndIncrement(); 
  21.           if ((value % 50000) == 0) { 
  22.               log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); 
  23.           } 
  24.  
  25.           return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 
  26.       } else { 
  27.           this.printTimes.set(0); 
  28.       } 
  29.  
  30.       // topic過長 
  31.       if (msg.getTopic().length() > Byte.MAX_VALUE) { 
  32.           log.warn("putMessage message topic length too long " + msg.getTopic().length()); 
  33.           return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); 
  34.       } 
  35.  
  36.       // 消息附加屬性過長 
  37.       if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { 
  38.           log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); 
  39.           return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); 
  40.       } 
  41.  
  42.       if (this.isOSPageCacheBusy()) { 
  43.           return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 
  44.       } 
  45.  
  46.       long beginTime = this.getSystemClock().now(); 
  47.       // 添加消息到commitLog 
  48.       PutMessageResult result = this.commitLog.putMessage(msg); 
  49.  
  50.       long eclipseTime = this.getSystemClock().now() - beginTime; 
  51.       if (eclipseTime > 500) { 
  52.           log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); 
  53.       } 
  54.       this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); 
  55.  
  56.       if (null == result || !result.isOk()) { 
  57.           this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 
  58.       } 
  59.  
  60.       return result; 
  61.   } 

中間的commitLog.putMessage就是負責實現消息寫入commitLog文件,這個太長了,我就不給大家截了。

大致流程就是組裝消息,放入屬性,然后通過MappedFile對象寫入文件,緊接著根據刷盤策略刷盤,最后進行主從同步。

02 consumerQueue文件

RocketMQ是分為多個topic,消息所屬主題,屬于消息類型,每一個topic有多個queue,每個queue放著不同的消息,在同一個消費者組下的消費者,可以同時消費同一個topic下的不同queue隊列的消息。不同消費者下的消費者,可以同時消費同一個topic下的相同的隊列的消息。而同一個消費者組下的消費者,不可以同時消費不同topic下的消息。

而每個topic下的queue隊列都會對應一個Consumerqueue文件,例如Topic中有三個隊列,每個隊列中的消息索引都會有一個編號,編號從0開始,往上遞增。并由此一個位點offset的概念,有了這個概念,就可以對Consumer端的消費情況進行隊列定義。

消息消費完成后,需要將消費進度存儲起來,即前面提到的offset。廣播模式下,同消費組的消費者相互獨立,消費進度要單獨存儲;集群模式下,同一條消息只會被同一個消費組消費一次,消費進度會參與到負載均衡中,故消費進度是需要共享的。

消費進度,也就是由Broker管理每一個消費者消費Topic的進度,包含正常提交消費進度和重置消費進度,消費進度管理的目的是保證消費者在正常運行狀態、重啟、異常關閉等狀態下都能準確續接“上一次”未處理的消息。

在RocketMQ中,實現的消費語義叫“至少投遞一次”,也就是所有的消息至少有一次機會消費不用擔心會丟消息。用戶需要實現消費冪等來避免重復投遞對業務實際數據的影響。

冪等是啥應該不用我多說了吧,親愛的你們肯定知道了。

如上圖所示,消費者一般在兩種情況下“上報”消費進度,消費成功后(包含正常消費成功、重試消費成功)和重置消費進度。

而消費進度的標準就是Consumerqueue文件,這個文件中存儲的是投遞到某一個messagequeue中的位置信息。

比如我們知道消息存儲到commitLog文件中,一個消費者A對應著消費messagequeueA這個隊列,但是無法確定在commitLog文件中該隊列中的消息的位置,于是就有了ConsumerqueueA這個文件,這個文件對應一個messagequeueA,消費者A便可以通過ConsumerqueueA來確定自己的消費進度,獲取消息在commitLog文件中的具體的offset和大小。

  • 存放位置和結構

consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每個topic默認4個隊列,里面存放的consumequeue文件。

ConsumeQueue中并不需要存儲消息的內容,而存儲的是消息在CommitLog中的offset。也就是說ConsumeQueue其實是CommitLog的一個索引文件。

consumequeue是定長結構,每個記錄固定大小20個字節,單個consumequeue文件默認包含30w個條目,所以單個文件大小大概6M左右。

很顯然,Consumer消費消息的時候,要讀2次:先讀ConsumeQueue得到offset,再通過offset找到CommitLog對應的消息內容。

  • ConsumeQueue的作用

消費者通過broker保存的offset(offsetTable.offset json文件中保存的ConsumerQueue的下標)可以在ConsumeQueue中獲取消息,從而快速的定位到commitLog的消息位置,由于每個消息的大小是不一樣的,也可以通過size獲取到消息的大小,從而讀取完整的消息。

過濾tag是也是通過遍歷ConsumeQueue來實現的(先比較hash(tag)符合條件的再到具體消息比較tag)。

  • offsetTable.offset

和commitLog的offset不是一回事,這個offset是ConsumeQueue文件的(已經消費的)下標/行數,可以直接定位到ConsumeQueue并找到commitlogOffset從而找到消息體原文。這個offset是消息消費進度的核心,不同的消費模式,保存地址不同。

廣播模式:DefaultMQPushConsumer的BROADCASTING模式,各個Consumer沒有互相干擾,使用LoclaFileOffsetStore,把Offset存儲在Consumer本地。

集群模式:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存儲和控制Offset的值,使用RemoteBrokerOffsetStore。

  • 簡單看下構建過程

在Broker中,構建ComsummerQueue不是存儲完CommitLog就馬上同步構建的,而是通過一個線程任務異步的去做這個事情。在DefaultMessageStore中有一個ReputMessageService成員,它就是負責構建ComsumerQueue的任務線程。

ReputMessageService繼承自ServiceThread,表明其是一個服務線程,它的run方法很簡單,如下所示:

  1. public void run() { 
  2.             while (!this.isStopped()) { 
  3.                 try { 
  4.                     Thread.sleep(1); 
  5.                     this.doReput(); // 構建ComsumerQueue 
  6.                 } catch (Exception e) { 
  7.                     DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 
  8.                 } 
  9.             } 
  10.         } 

在run方法里,每休息1毫秒就進行一次構建ComsumerQueue的動作。因為必須先寫入CommitLog,然后才能進行ComsumerQueue的構建。那么不排除構建ComsumerQueue的速度太快了,而CommitLog還沒寫入新的消息。這時就需要sleep下,讓出cpu時間片,避免浪費CPU資源。

我們點進去這個doReput()看核心處理邏輯:

  1. private void doReput() { 
  2.             for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { 
  3.                 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新寫入CommitLog的數據 
  4.                 if (result != null) { 
  5.                     try { 
  6.                         this.reputFromOffset = result.getStartOffset(); 
  7.  
  8.                         for (int readSize = 0; readSize < result.getSize() && doNext; ) { 
  9.                             DispatchRequest dispatchRequest = 
  10.                             DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), falsefalse); // 一條一條的讀消息 
  11.                             int size = dispatchRequest.getMsgSize(); 
  12.  
  13.                             if (dispatchRequest.isSuccess()) { 
  14.                                 if (size > 0) { 
  15.                                     DefaultMessageStore.this.doDispatch(dispatchRequest); // 派發消息,進行處理,其中就包括構建ComsumerQueue 
  16.                                     this.reputFromOffset += size
  17.                                     readSize += size
  18.                                 } else if (size == 0) { //  
  19.                                     this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); 
  20.                                     readSize = result.getSize(); 
  21.                                 } 
  22.                             } else if (!dispatchRequest.isSuccess()) { // 獲取消息異常 
  23.  
  24.                                 if (size > 0) { 
  25.                                     log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); 
  26.                                     this.reputFromOffset += size
  27.                                 } else { 
  28.                                     doNext = false
  29.                                     if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { 
  30.                                         this.reputFromOffset += result.getSize() - readSize; 
  31.                                     } 
  32.                                 } 
  33.                             } 
  34.                         } 
  35.                     } finally { 
  36.                         result.release(); 
  37.                     } 
  38.                 } else { 
  39.                     doNext = false
  40.                 } 
  41.             } 
  42.         } 

我在這里省略了一些和構建ComsumerQueue不相干的代碼。

其實在doReput里面就做了三件事:

1、獲取最新寫入到CommitLog中的數據byteBuffer。

2、從byteBuffer中一條條的讀取消息,并派發出去處理。

3、更新reputFromOffset位移。

感興趣的可以打斷點走一遍。

03 indexFile文件

RocketMQ還支持通過MessageID或者MessageKey來查詢消息,使用ID查詢時,因為ID就是用broker+offset生成的(這里msgId指的是服務端的),所以很容易就找到對應的commitLog文件來讀取消息。

對于用MessageKey來查詢消息,MessageStore通過構建一個index來提高讀取速度。

  • 文件結構 

indexfile文件存儲在store目錄下的index文件里面,里面存放的是消息的hashcode和index內容,文件由一個文件頭組成:長40字節。500w個hashslot,每個4字節。2000w個index條目,每個20字節。

所以這里我們可以估算每個indexfile的大小為:40+500w4+2000w20個字節,大約400M左右。

  • 文件詳細信息

IndexHeader:索引文件頭信息由40個字節組成。

  1. //8位 該索引文件的第一個消息(Message)的存儲時間(落盤時間) 
  2. this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get()); 
  3. //8位 該索引文件的最后一個消息(Message)的存儲時間(落盤時間) 
  4. this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get()); 
  5. //8位 該索引文件第一個消息(Message)的在CommitLog(消息存儲文件)的物理位置偏移量(可以通過該物理偏移直接獲取到該消息) 
  6. this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get()); 
  7. //8位 該索引文件最后一個消息(Message)的在CommitLog(消息存儲文件)的物理位置偏移量 
  8. this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get()); 
  9. //4位 該索引文件目前的hash slot的個數 
  10. this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get()); 
  11. //4位 索引文件目前的索引個數 
  12. this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); 

Slot槽位,默認每個文件配置的slot是500萬個,每個slot是4位的整型數據,Slot每個節點保存當前已經擁有多少個index數據了。

  1. //slot的數據存放位置 40 + keyHash %(500W)* 4 
  2. int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; 
  3.  
  4. //Slot Table 
  5. //4字節 
  6. //記錄該slot當前index,如果hash沖突(即absSlotPos一致)作為下一次該slot新增的前置index 
  7. this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); 

索引消息內容,消息長度固定為20位。

  1. //Index Linked list 
  2. //topic+message key的hash值 
  3. this.mappedByteBuffer.putInt(absIndexPos, keyHash); 
  4. //消息在CommitLog的物理文件地址, 可以直接查詢到該消息(索引的核心機制) 
  5. this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); 
  6. //消息的落盤時間與header里的beginTimestamp的差值(為了節省存儲空間,如果直接存message的落盤時間就得8bytes) 
  7. this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); 
  8. //9、記錄該slot上一個index 
  9. //hash沖突處理的關鍵之處, 相同hash值上一個消息索引的index(如果當前消息索引是該hash值的第一個索引,則prevIndex=0, 也是消息索引查找時的停止條件),每個slot位置的第一個消息的prevIndex就是0的 
  10. this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); 
  • 再論結構

文件結構slot和indexLinkedList可以理解成java中的HashMap。

哎,你說HashMap我可不困了啊,你可別蒙我,這個我熟,什么負載因子、默認大小、擴容機制、紅黑樹,還有多線程下不安全這些。

乖,我知道你熟悉,你跟著我一起學習,這些當然了如指掌,只需要你了解HashMap的結構和沖突即可。

每放入一個新消息的index進來,首先會取MessageKey的HashCode,然后用Hashcode對slot的總數進行取模,決定該消息key的位置,slot的總數默認是500W個。

只要取hash就必然面臨著hash沖突的問題,indexfile也是采用鏈表結構來解決hash沖突,這一點和HashMap一樣的,不過這個不存在紅黑樹轉換這一說,個人猜測這個的沖突數量也達不到很高的級別,所以進行這方面的設計也沒啥必要,甚至變成了強行增加indexfile的文件結構難度。

還有,在indexfile中的slot中放的是最新的index的指針,因為一般查詢的時候大概率是優先查詢最近的消息。

每個slot中放的指針值是索引在indexfile中的偏移量,也就是后面index的位置,而index中存放的就是該消息在commitlog文件中的offset,每個index的大小是20字節,所以根據當前索引是這個文件中的第幾個偏移量,也就很容易定位到索引的位置,根據前面的固定大小可以很快把真實坐標算出來,以此類推,形成一個鏈表的結構。

  • 查詢流程

由于indexHeader,slot,index都是固定大小,所以:

  • 公式1:第n個slot在indexFile中的起始位置是這樣:40+(n-1)*4
  • 公式2:第s個index在indexFile中的起始位置是這樣:40+5000000*4+(s-1)*20

查詢的傳入值除了key外,還包含一個時間起始值以及截止值。

為啥還要傳時間范圍呢?

一個indexFile寫完一個會繼續寫下一個,僅僅一個key無法定位到具體的indexFile,時間范圍就為了更精確的定位到具體的indexFile,縮小查找的范圍,indexFile文件名是一個時間戳,根據這個日期就可以定位到傳入的日期范圍對應在哪個或者哪些indexFile中,是不是很棒。

好了,我們接著說查詢流程。

key-->計算hash值-->hash值對500萬取余算出對應的slot序號-->根據40+(n-1)*4(公式1)算出該slot在文件中的位置-->讀取slot值,也就是index序號-->根據40+5000000*4+(s-1)*20(公式2)算出該index在文件中的位置-->讀取該index-->將key的hash值以及傳入的時間范圍與index的keyHash值以及timeDiff值進行比對。

不滿足則根據index中的preIndexNo找到上一個index,繼續上一步;滿足則根據index中的phyOffset拿到commitLog中的消息。

為啥比對時還要帶上時間范圍呢?

只比key不行嗎?答案是不行,因為key可能會重復,producer在消息生產時可以指定消息的key,這個key顯然無法保證唯一性,那自動生成的msgId呢?也不能保證唯一,你可以去看看msgId的生成規則。

包括當前機器IP+進程號+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生產時間與broker啟動時間的差值+broker啟動后從0開始單調自增的int值,前面三項很明顯可能重復,后面兩項一個是時間差,一個是重啟歸零,也可能重復。

  • 簡單看下源碼,感興趣的下載源碼去研究。

indexfile的添加消息索引的過程

  1. public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { 
  2.         //1. 判斷該索引文件的索引數小于最大的索引數,如果>=最大索引數,IndexService就會嘗試新建一個索引文件 
  3.         if (this.indexHeader.getIndexCount() < this.indexNum) { 
  4.             //2. 計算該message key的hash值 
  5.             int keyHash = indexKeyHashMethod(key); 
  6.             //3. 根據message key的hash值散列到某個hash slot里 
  7.             int slotPos = keyHash % this.hashSlotNum; 
  8.             //4. 計算得到該hash slot的實際文件位置Position 
  9.             int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; 
  10.  
  11.             try { 
  12.                 //5. 根據該hash slot的實際文件位置absSlotPos得到slot里的值 
  13.                 //這里有兩種情況: 
  14.                 //1). slot=0, 當前message的key是該hash值第一個消息索引 
  15.                 //2). slot>0, 該key hash值上一個消息索引的位置 
  16.                 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); 
  17.  
  18.                 //6. 數據校驗及修正 
  19.                 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { 
  20.                     slotValue = invalidIndex; 
  21.                 } 
  22.  
  23.                 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); 
  24.  
  25.                 timeDiff = timeDiff / 1000; 
  26.  
  27.                 if (this.indexHeader.getBeginTimestamp() <= 0) { 
  28.                     timeDiff = 0; 
  29.                 } else if (timeDiff > Integer.MAX_VALUE) { 
  30.                     timeDiff = Integer.MAX_VALUE; 
  31.                 } else if (timeDiff < 0) { 
  32.                     timeDiff = 0; 
  33.                 } 
  34.  
  35.                 //7. 計算當前消息索引具體的存儲位置(Append模式) 
  36.                 int absIndexPos = 
  37.                     IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize 
  38.                         + this.indexHeader.getIndexCount() * indexSize; 
  39.                 //8. 存入該消息索引 
  40.                 this.mappedByteBuffer.putInt(absIndexPos, keyHash); 
  41.                 this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); 
  42.                 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); 
  43.                 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); 
  44.  
  45.                 //9. 關鍵之處:在該key hash slot處存入當前消息索引的位置,下次通過該key進行搜索時 
  46.                 //會找到該key hash slot -> slot value -> curIndex ->  
  47.                 //if(curIndex.prevIndex>0) pre index (一直循環 直至該curIndex.prevIndex==0就停止) 
  48.                 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); 
  49.  
  50.                 if (this.indexHeader.getIndexCount() <= 1) { 
  51.                     this.indexHeader.setBeginPhyOffset(phyOffset); 
  52.                     this.indexHeader.setBeginTimestamp(storeTimestamp); 
  53.                 } 
  54.  
  55.                 this.indexHeader.incHashSlotCount(); 
  56.                 this.indexHeader.incIndexCount(); 
  57.                 this.indexHeader.setEndPhyOffset(phyOffset); 
  58.                 this.indexHeader.setEndTimestamp(storeTimestamp); 
  59.  
  60.                 return true
  61.             } catch (Exception e) { 
  62.                 log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); 
  63.             }  
  64.         } else { 
  65.             log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() 
  66.                 + "; index max num = " + this.indexNum); 
  67.         } 
  68.  
  69.         return false
  70.  } 

indexfile的索引搜索源碼

  1. public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, 
  2.         final long begin, final long end, boolean lock) { 
  3.         if (this.mappedFile.hold()) { 
  4.             //1. 計算該key的hash 
  5.             int keyHash = indexKeyHashMethod(key); 
  6.             //2. 計算該hash value 對應的hash slot位置 
  7.             int slotPos = keyHash % this.hashSlotNum; 
  8.             //3. 計算該hash value 對應的hash slot物理文件位置 
  9.             int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; 
  10.  
  11.             FileLock fileLock = null
  12.             try { 
  13.                 //4. 取出該hash slot 的值 
  14.                 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); 
  15.  
  16.                 //5. 該slot value <= 0 就代表沒有該key對應的消息索引,直接結束搜索 
  17.                 //   該slot value > maxIndexCount 就代表該key對應的消息索引超過最大限制,數據有誤,直接結束搜索 
  18.                 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() 
  19.                     || this.indexHeader.getIndexCount() <= 1) { 
  20.                 } else { 
  21.                     //6. 從當前slot value 開始搜索 
  22.                     for (int nextIndexToRead = slotValue; ; ) { 
  23.                         if (phyOffsets.size() >= maxNum) { 
  24.                             break; 
  25.                         } 
  26.  
  27.                         //7. 找到當前slot value(也就是index count)物理文件位置 
  28.                         int absIndexPos = 
  29.                             IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize 
  30.                                 + nextIndexToRead * indexSize; 
  31.  
  32.                         //8. 讀取消息索引數據 
  33.                         int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); 
  34.                         long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); 
  35.  
  36.                         long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); 
  37.                         //9. 獲取該消息索引的上一個消息索引index(可以看成鏈表的prev 指向上一個鏈節點的引用) 
  38.                         int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); 
  39.                         //10. 數據校驗 
  40.                         if (timeDiff < 0) { 
  41.                             break; 
  42.                         } 
  43.  
  44.                         timeDiff *= 1000L; 
  45.  
  46.                         long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; 
  47.                         boolean timeMatched = (timeRead >= begin) && (timeRead <= end); 
  48.                         //10. 數據校驗比對 hash值和落盤時間 
  49.                         if (keyHash == keyHashRead && timeMatched) { 
  50.                             phyOffsets.add(phyOffsetRead); 
  51.                         } 
  52.  
  53.                         //當prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索 
  54.                         if (prevIndexRead <= invalidIndex 
  55.                             || prevIndexRead > this.indexHeader.getIndexCount() 
  56.                             || prevIndexRead == nextIndexToRead || timeRead < begin) { 
  57.                             break; 
  58.                         } 
  59.  
  60.                         nextIndexToRead = prevIndexRead; 
  61.                     } 
  62.                 } 
  63.             } catch (Exception e) { 
  64.                 log.error("selectPhyOffset exception ", e); 
  65.             } finally { 
  66.  
  67.                 this.mappedFile.release(); 
  68.             } 
  69.         } 
  70.     } 

本文轉載自微信公眾號「Java賊船」

責任編輯:姜華 來源: Java賊船
相關推薦

2025-06-30 07:35:00

遠程訪問NAT內網穿透

2017-01-06 10:07:39

Linuxwindowsatime

2021-03-10 09:42:30

數字人民幣數字貨幣區塊鏈

2021-03-10 14:50:44

數字人民幣加密貨幣貨幣

2023-05-29 08:12:38

2023-10-18 10:55:55

HashMap

2022-08-26 13:24:03

version源碼sources

2021-10-11 11:58:41

Channel原理recvq

2021-10-09 19:05:06

channelGo原理

2021-09-07 07:55:22

Linux CPULinux 系統

2018-04-28 11:03:58

2017-11-02 13:15:18

Linux

2021-04-16 09:17:39

機器學習人工智能AI

2021-07-16 11:35:20

Java線程池代碼

2021-11-30 14:50:29

編譯源代碼開源

2021-03-30 15:10:50

Java序列化

2021-12-23 10:05:43

機器學習人工智能黑盒模型

2022-04-24 11:06:54

SpringBootjar代碼

2021-07-08 10:08:03

DvaJS前端Dva

2023-10-26 07:29:06

mongodb十六進制ID
點贊
收藏

51CTO技術棧公眾號

99精品福利视频| 黄色漫画在线免费看| 美女网站色91| 欧美精品在线播放| 国产成人久久777777| av在线电影观看| 国产成人啪午夜精品网站男同| 国色天香2019中文字幕在线观看| 天堂久久精品忘忧草| 国产精品伊人| 亚洲国产成人va在线观看天堂| 国产有色视频色综合| 五月婷婷激情五月| 欧美激情亚洲| 亚洲欧美日韩一区二区在线| 国产欧美激情视频| 亚洲优女在线| 一区二区三区在线播| 免费一区二区三区在在线视频| 91在线公开视频| 亚洲欧美高清| 欧美伦理91i| 欧美丰满美乳xxⅹ高潮www| 欧美成人一级| 欧美视频一区二区在线观看| 日韩av高清在线看片| 男人天堂久久久| 久久久九九九九| 成人a级免费视频| 五月天婷婷导航| 在线精品亚洲| 久久成人精品视频| 日本女人性生活视频| 久久99青青| 亚洲精品av在线| wwwxxx色| 日韩精品一区二区三区免费视频| 欧美性色欧美a在线播放| 国产二级片在线观看| 中文字幕在线三区| 成人免费一区二区三区视频| 日本日本精品二区免费| 深夜福利视频网站| 国产成人精品亚洲777人妖 | 亚洲国产精品99| 一起草最新网址| 国产免费区一区二区三视频免费| 欧美日韩三级一区| 久草福利视频在线| 欧美va视频| 精品国产31久久久久久| aa视频在线播放| ririsao久久精品一区| 亚洲男人的天堂在线aⅴ视频| 亚洲欧美影院| 午夜毛片在线| 国产精品美女久久久久久久久久久| 欧美日韩最好看的视频| 激情视频在线观看免费| 99re热视频精品| 久久av一区二区三区漫画| 日韩在线观看视频一区| 91色视频在线| 欧美精品欧美精品| 国产午夜视频在线观看| 国产欧美日韩在线视频| 亚洲制服中文| www.久久ai| 亚洲福利视频一区| 91国视频在线| 电影亚洲精品噜噜在线观看| 在线观看日韩一区| 亚洲第一天堂久久| 国产精品22p| 日韩毛片中文字幕| 亚洲天堂最新地址| 欧美韩国一区| 2019亚洲日韩新视频| 亚洲 日本 欧美 中文幕| 日本视频在线一区| 亚洲精品欧美一区二区三区| 亚洲av无码乱码在线观看性色| 成人免费va视频| 欧洲精品亚洲精品| 黄在线免费观看| 午夜精品一区二区三区电影天堂 | 成人黄色大片网站| 自由日本语热亚洲人| 欧美午夜寂寞影院| 91精品国产高清91久久久久久| 噜噜噜狠狠夜夜躁精品仙踪林| 亚洲九九九在线观看| 农村老熟妇乱子伦视频| 亚洲字幕久久| 日本精品免费观看| 国产视频在线一区| 久久欧美一区二区| 成年人三级视频| 天堂√中文最新版在线| 在线电影欧美成精品| 最新在线黄色网址| 91日韩免费| 97成人精品区在线播放| 91成年人视频| 久久综合精品国产一区二区三区| 亚洲欧美国产精品桃花| 国产精品25p| 日韩一区二区不卡| 欧美精品videos| 成人免费在线视频| 日韩精品一区二区三区色偷偷| 巨大荫蒂视频欧美大片| 欧美天堂在线观看| 日本成人在线免费观看| 久久93精品国产91久久综合| 精品少妇v888av| 久久久精品视频网站| 国产69精品久久777的优势| 亚洲国产成人不卡| 小h片在线观看| 精品欧美国产一区二区三区| 三级黄色片免费观看| 欧美日韩伦理| 日本91av在线播放| 蜜桃av鲁一鲁一鲁一鲁俄罗斯的 | 亚洲欧美精品一区| 久久久无码一区二区三区| 日本欧美韩国一区三区| 国产精品一区在线观看| 国模精品一区二区| 亚洲一区二区在线观看视频| 亚洲精品手机在线观看| 综合国产视频| 久久久久久久电影一区| 中文字幕在线观看国产| 国产日韩欧美麻豆| 成人一级片网站| 日本久久成人网| 97精品久久久中文字幕免费| 亚洲国产成人在线观看| 亚洲精品欧美在线| 亚洲男人天堂2021| 91精品国产自产在线观看永久∴| 国产精品视频免费观看www| 国产区av在线| 在线视频你懂得一区| 37p粉嫩大胆色噜噜噜| 国产精品入口| 精品久久久久亚洲| 筱崎爱全乳无删减在线观看| 亚洲福利视频久久| 国产做受高潮漫动| 99re这里都是精品| aa免费在线观看| 在线日本制服中文欧美| 日本久久久a级免费| 黄视频在线播放| 在线观看亚洲专区| 久久爱一区二区| 国产一区二区导航在线播放| 国产树林野战在线播放| 一本色道69色精品综合久久| 欧美黄色片视频| 天天操天天插天天射| 欧美性猛交xxxx久久久| 一区二区三区伦理片| 男人的j进女人的j一区| 在线视频一区观看| 青草伊人久久| 97人洗澡人人免费公开视频碰碰碰| 无码精品人妻一区二区| 色偷偷成人一区二区三区91 | 在线免费看v片| 欧美91视频| 九色综合日本| jizz久久久久久| 久久久极品av| 日本黄视频在线观看| 色欧美88888久久久久久影院| x88av在线| 国产美女在线精品| 国产午夜伦鲁鲁| 国产精品羞羞答答在线观看| 日本三级久久久| 国产日产一区二区三区| 日韩精品中文字幕在线一区| 日本系列第一页| 亚洲国产精品黑人久久久| 国内精品国产三级国产aⅴ久| 久久国产精品亚洲人一区二区三区| 日韩av片永久免费网站| 国产福利免费在线观看| 日韩人体视频一二区| 加勒比婷婷色综合久久| 91啦中文在线观看| 国产视频在线视频| 亚洲午夜久久| 国产欧美精品久久久| 538在线精品| 日韩一区二区精品视频| 在线播放成人av| 亚洲一区二区综合| 日韩精品卡通动漫网站| 蓝色福利精品导航| av免费观看大全| 日韩中文字幕高清在线观看| 国产一区免费在线观看| 精品中文字幕一区二区三区四区| 久久91精品国产| 97最新国自产拍视频在线完整在线看| 精品国产亚洲在线| 中文字幕+乱码+中文字幕明步 | 国产一区二区波多野结衣| 欧美日韩另类视频| 伊人av在线播放| 精品在线你懂的| 欧美成人精品欧美一级乱| 韩国一区二区三区在线观看| 伊人久久av导航| 激情婷婷综合| 久久久久久一区| 超碰在线一区| 成人av播放| 精品一区二区三区中文字幕视频| 国产精品三级久久久久久电影| 欧美裸体视频| 欧美—级a级欧美特级ar全黄| 理论片午午伦夜理片在线播放| 一本一本久久a久久精品综合小说 一本一本久久a久久精品牛牛影视 | 欧美国产中文字幕| 日本天堂在线观看| 一区二区三区回区在观看免费视频| 国产黄色片网站| 91麻豆精品国产91久久久资源速度| 91丨九色丨海角社区| 日韩欧美精品网站| 天天综合天天干| 午夜精品在线看| 国产精品第一页在线观看| 一个色综合网站| 久久人妻无码aⅴ毛片a片app| 欧美激情自拍偷拍| 欧美另类z0zx974| 国产高清久久久| 日本黄色大片在线观看| 国产91丝袜在线播放九色| 免费欧美一级片| 国产凹凸在线观看一区二区| 初高中福利视频网站| 九九九久久久久| 国内一区二区视频| 日韩一级精品视频在线观看| 日日噜噜噜噜人人爽亚洲精品| av一二三不卡影片| 亚洲国产综合视频| 国产91在线看| 亚洲永久无码7777kkk| 久久aⅴ国产欧美74aaa| 国产福利在线免费| 奇米影视一区二区三区小说| www.夜夜爽| 久久av中文字幕片| 日本中文字幕有码| 国产麻豆一精品一av一免费 | 国模套图日韩精品一区二区| 97视频在线观看网址| a一区二区三区| 欧美在线激情网| 日本电影在线观看| 欧美理论电影在线观看| 成人性生交大片免费看午夜| 久久精品在线播放| 波多野结衣爱爱| 色综合久久综合| www.av视频| 亚洲二区视频在线| 国产女同在线观看| 亚洲人精品午夜| 人人澡人人澡人人看| 亚洲高清三级视频| 无码无套少妇毛多18pxxxx| 色94色欧美sute亚洲线路一久| 亚洲一区二区激情| 欧美无乱码久久久免费午夜一区 | 日韩黄色免费电影| 国产精品久久久久野外| 久久久久久亚洲综合影院红桃| 国产制服丝袜在线| 久久婷婷国产综合国色天香 | 亚洲激情一二三区| 中文字幕影音先锋| 亚洲色图一区二区三区| 性欧美丰满熟妇xxxx性仙踪林| 欧美国产激情二区三区| 五月天国产视频| 99免费精品视频| 在线观看你懂的视频| 国产午夜精品久久久久久免费视| 男人天堂av电影| 亚洲色图欧洲色图婷婷| 超碰超碰超碰超碰| 欧美人与禽zozo性伦| 蜜桃久久一区二区三区| 色久欧美在线视频观看| 美女精品导航| 国产精品6699| **欧美日韩在线| 538国产精品视频一区二区| 中文在线8资源库| 国产裸体写真av一区二区| 亚洲精品一级二级| av资源一区二区| 97精品中文字幕| 男人的天堂视频在线| 日韩有码一区二区三区| 伊人网在线综合| 久久久青草婷婷精品综合日韩| 一女被多男玩喷潮视频| 欧美亚洲专区| 日本泡妞xxxx免费视频软件| 久久这里都是精品| 亚洲第一成人网站| 天天做夜夜做人人爱精品 | 日韩欧美福利视频| 中文字幕日本人妻久久久免费 | 欧美图片一区二区| 伊人婷婷欧美激情| 国产精品777777| 精品国产三级a在线观看| 日批视频免费播放| 久热99视频在线观看| 日日夜夜天天综合入口| 成人激情综合网| 亚洲人成伊人成综合图片| 成人一级生活片| 国产成人综合视频| 亚洲av熟女国产一区二区性色| 亚洲成人自拍网| 亚洲欧美自拍视频| 亚洲风情亚aⅴ在线发布| av有码在线观看| 国产欧美精品日韩| 欧美激情777| 霍思燕三级露全乳照| 韩国一区二区视频| 中文字幕乱码在线| 亚洲成a人v欧美综合天堂下载| 特级西西444www高清大视频| 国产亚洲人成a一在线v站| 成人性教育av免费网址| 欧美另类变人与禽xxxxx| 中文字幕在线视频第一页| 精品av久久707| 麻豆av在线导航| 亚洲一区中文字幕在线观看| 精品免费视频| 婷婷六月天在线| 1024精品合集| 国产av一区二区三区精品| 欧美激情久久久| 青草久久视频| 国产成人无码av在线播放dvd| 国产日产欧美精品一区二区三区| 国产精品免费无遮挡无码永久视频| 亚洲精品一区二区三区不| 另类激情视频| 国产一区喷水| 日韩在线一区二区三区| 四虎永久免费地址| 日韩精品一区二区在线观看| 69av成人| 亚洲国产精品123| 国产盗摄精品一区二区三区在线| 久久久久久久久毛片| 亚洲精品乱码久久久久久金桔影视| 亚洲欧美电影| 永久久久久久| www.久久久久久久久| 四虎影院在线免费播放| 日韩中文字幕国产精品| 最新国产精品精品视频| avav在线看| 亚洲少妇30p| 亚洲av成人无码久久精品老人| 亚洲欧美久久| youjizz.com国产| 欧美伊人精品成人久久综合97 | 国产91免费看片| 欧美激情偷拍自拍| 一级黄色电影片| 91高清视频免费看| 午夜影院免费在线| 欧美亚洲另类在线一区二区三区| 精品系列免费在线观看| 国产成人啪精品午夜在线观看| 亚洲天堂开心观看| 日韩欧美高清一区二区三区| 无码aⅴ精品一区二区三区浪潮 | 人妻无码一区二区三区四区| av资源网一区|