精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

萬字長文講透 RocketMQ 的消費邏輯

開發 項目管理
名字服務是是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態注冊與發現。

RocketMQ 是筆者非常喜歡的消息隊列,4.9.X 版本是目前使用最廣泛的版本,但它的消費邏輯相對較重,很多同學學習起來沒有頭緒。

這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啟發。

圖片

一、架構概覽

在展開集群消費邏輯細節前,我們先對 RocketMQ 4.9.X 架構做一個概覽。

圖片

整體架構中包含四種角色 :

1、NameServer

名字服務是是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態注冊與發現。

2、BrokerServer

Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證 。

3、Producer

消息發布的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。

4、Consumer

消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。

RocketMQ 集群工作流程:

1、啟動 NameServer,NameServer 起來后監聽端口,等待 Broker、Producer 、Consumer 連上來,相當于一個路由控制中心。

2、Broker 啟動,跟所有的 NameServer 保持長連接,定時發送心跳包。心跳包中包含當前 Broker信息( IP+端口等 )以及存儲所有 Topic 信息。注冊成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關系。

3、收發消息前,先創建 Topic,創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。

4、Producer 發送消息,啟動時先跟 NameServer 集群中的其中一臺建立長連接,并從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的 Broker 建立長連接從而向 Broker 發消息。

5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費消息。

二、發布訂閱

RocketMQ 的傳輸模型是:發布訂閱模型 。

發布訂閱模型具有如下特點:

  • 消費獨立相比隊列模型的匿名消費方式,發布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間相互獨立不會相互影響。
  • 一對多通信基于獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發布訂閱模型可以實現一對多通信。

RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broadcasting )。

集群消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。

圖片

廣播消費:當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。

圖片

為了實現這種發布訂閱模型 , RocketMQ 精心設計了它的存儲模型。先進入 Broker 的文件存儲目錄。

圖片

RocketMQ 采用的是混合型的存儲結構。

1、Broker 單個實例下所有的隊列共用一個數據文件(commitlog)來存儲

生產者發送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產者發送的消息就不會丟失。

單個文件大小默認 1G , 文件名長度為 20 位,左邊補零,剩余為起始偏移量,比如 00000000000000000000 代表了第一個文件,起始偏移量為 0 ,文件大小為1 G = 1073741824 。

圖片

commitlog 目錄

這種設計有兩個優點:

  • 充分利用順序寫,大大提升寫入數據的吞吐量;
  • 快讀定位消息。因為消息是一條一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據消息的物理偏移量通過二分查找,定位消息位于那個文件中,并獲取到消息實體數據。

2、Broker 端的后臺服務線程會不停地分發請求并異步構建 consumequeue(消費文件)和 indexfile(索引文件)

進入索引文件存儲目錄 :

圖片

1、消費文件按照主題存儲,每個主題下有不同的隊列,圖中主題 my-mac-topic 有 16 個隊列 (0 到 15) ;

2、每個隊列目錄下 ,存儲 consumequeue 文件,每個 consumequeue 文件也是順序寫入,數據格式見下圖。

圖片

每個 consumequeue 文件包含 30 萬個條目,每個條目大小是 20 個字節,每個文件的大小是 30 萬 * 20 = 60萬字節,每個文件大小約 5.72M 。

和 commitlog 文件類似,consumequeue 文件的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位于哪一個文件里。

消費文件按照主題-隊列來保存 ,這種方式特別適配發布訂閱模型。

消費者從 Broker 獲取訂閱消息數據時,不用遍歷整個 commitlog 文件,只需要根據邏輯偏移量從 consumequeue 文件查詢消息偏移量 ,  最后通過定位到 commitlog 文件, 獲取真正的消息數據。

要實現發布訂閱模型,還需要一個重要文件:消費進度文件。原因有兩點:

  • 不同消費組之間相互獨立,不會相互影響 ;
  • 消費者下次拉取數據時,需要知道從哪個進度開始拉取 ,就像我們小時候玩單機游戲存盤一樣。

因此消費進度文件需要保存消費組所訂閱主題的消費進度。

我們瀏覽下集群消費場景下的 Broker 端的消費進度文件 consumerOffset.json 。

圖片

圖片

在進度文件 consumerOffset.json 里,數據以 key-value 的結構存儲,key 表示:主題@消費者組 , value 是 consumequeue 中每個隊列對應的邏輯偏移量 。

寫到這里,我們粗糙模擬下 RocketMQ 存儲模型如何滿足發布訂閱模型(集群模式) 。

圖片

1、發送消息:生產者發送消息到 Broker ;

2、保存消息:Broker 將消息存儲到 commitlog 文件 ,異步線程會構建消費文件 consumequeue ;

3、消費流程:消費者啟動后,會通過負載均衡分配對應的隊列,然后向 Broker 發送拉取消息請求。Broker 收到消費者拉取請求之后,根據訂閱組,消費者編號,主題,隊列名,邏輯偏移量等參數 ,從該主題下的 consumequeue 文件查詢消息消費條目,然后從 commitlog 文件中獲取消息實體。消費者在收到消息數據之后,執行消費監聽器,消費完消息;

4、保存進度:消費者將消費進度提交到 Broker ,Broker 會將該消費組的消費進度存儲在進度文件里。

三、消費流程

我們重點講解下集群消費的消費流程 ,因為集群消費是使用最普遍的消費模式,理解了集群消費,廣播消費也就能順理成章的掌握了。

圖片

集群消費示例代碼里,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監聽器,最后調用 start 方法啟動。

消費者啟動后,我們可以將整個流程簡化成:

圖片

四、負載均衡

消費端的負載均衡是指將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者,負載均衡是客戶端開始消費的起點。

RocketMQ 負載均衡的核心設計理念是

  • 消費隊列在同一時間只允許被同一消費組內的一個消費者消費
  • 一個消費者能同時消費多個消息隊列

負載均衡是每個客戶端獨立進行計算,那么何時觸發呢 ?

  • 消費端啟動時,立即進行負載均衡;
  • 消費端定時任務每隔 20 秒觸發負載均衡;
  • 消費者上下線,Broker 端通知消費者觸發負載均衡。

負載均衡流程如下:

1、發送心跳

消費者啟動后,它就會通過定時任務不斷地向 RocketMQ 集群中的所有 Broker 實例發送心跳包(消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端實例編號等信息)。

Broker 端在收到消費者的心跳消息后,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量 channelInfoTable 中,為之后做消費端的負載均衡提供可以依據的元數據信息。

2、啟動負載均衡服務

負載均衡服務會根據消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:

(1) 獲取該主題下的消息消費隊列集合;

(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;

(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列;

平均分配算法

這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應該分配到的記錄。

(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作。

消費者實例內 ,processQueueTable 對象存儲著當前負載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。

  1. 標紅的 Entry 部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。
  2. 綠色的 Entry 部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。
  3. 黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,為每個分配的新隊列創建一個消息拉取請求  pullRequest  ,  在消息拉取請求中保存一個處理隊列 processQueue (隊列消費快照),內部是紅黑樹(TreeMap),用來保存拉取到的消息。

最后創建拉取消息請求列表,并將請求分發到消息拉取服務,進入拉取消息環節。

五、長輪詢

在負載均衡這一小節,我們已經知道負載均衡觸發了拉取消息的流程。

消費者啟動的時候,會創建一個拉取消息服務 PullMessageService ,它是一個單線程的服務。

核心流程如下:

1、負載均衡服務將消息拉取請求放入到拉取請求隊列 pullRequestQueue , 拉取消息服務從隊列中獲取拉取消息請求 ;

2、拉取消息服務向 Brorker 服務發送拉取請求 ,拉取請求的通訊模式是異步回調模式 ;

消費者的拉取消息服務本身就是一個單線程,使用異步回調模式,發送拉取消息請求到 Broker 后,拉取消息線程并不會阻塞 ,可以繼續處理隊列 pullRequestQueue 中的其他拉取任務。

3、Broker 收到消費者拉取消息請求后,從存儲中查詢出消息數據,然后返回給消費者;

4、消費者的網絡通訊層會執行拉取回調函數相關邏輯,首先會將消息數據存儲在隊列消費快照 processQueue 里;

消費快照使用紅黑樹 msgTreeMap 存儲拉取服務拉取到的消息 。

5、回調函數將消費請求提交到消息消費服務 ,而消息消費服務會異步的消費這些消息;

6、回調函數會將處理中隊列的拉取請放入到定時任務中;

7、定時任務再次將消息拉取請求放入到隊列 pullRequestQueue 中,形成了閉環:負載均衡后的隊列總會有任務執行拉取消息請求,不會中斷。

細心的同學肯定有疑問:既然消費端是拉取消息,為什么是長輪詢呢 ?

雖然拉模式的主動權在消費者這一側,但是缺點很明顯。

因為消費者并不知曉 Broker 端什么時候有新的消息 ,所以會不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導致消息延遲。

所以要想消費消息的延遲低,服務端的推送必不可少。

下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。

核心流程如下:

1、Broker 端接收到消費者的拉取消息請求后,拉取消息處理器開始處理請求,根據拉取請求查詢消息存儲 ;

2、從消息存儲中獲取消息數據 ,若存在新消息 ,則將消息數據通過網絡返回給消費者。若無新消息,則將拉取請求放入到拉取請求表 pullRequestTable 。

3、長輪詢請求管理服務 pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取消息請求的隊列是否有新的消息。

判定標準是:拉取消息請求的偏移量是否小于當前消費隊列最大偏移量,如果條件成立則說明有新消息了。

若存在新的消息 ,  長輪詢請求管理服務會觸發拉取消息處理器重新處理該拉取消息請求。

4、當 commitlog 中新增了新的消息,消息分發服務會構建消費文件和索引文件,并且會通知長輪詢請求管理服務,觸發拉取消息處理器重新處理該拉取消息請求。

六、消費消息

在拉取消息的流程里, Broker 端返回消息數據,消費者的通訊框架層會執行回調函數。

回調線程會將數據存儲在隊列消費快照 processQueue(內部使用紅黑樹 msgTreeMap)里,然后將消息提交到消費消息服務,消費消息服務會異步消費這些消息。

消息消費服務有兩種類型:并發消費服務和順序消費服務 。

6.1 并發消費

并發消費是指消費者將并發消費消息,消費的時候可能是無序的。

消費消息并發服務啟動后,會初始化三個組件:消費線程池、清理過期消息定時任務、處理失敗消息定時任務。

核心流程如下:

0、通訊框架回調線程會將數據存儲在消費快照里,然后將消息列表 msgList 提交到消費消息服務

1、 消息列表 msgList 組裝成消費對象

2、將消費對象提交到消費線程池

我們看到10 條消息被組裝成三個消費請求對象,不同的消費線程會執行不同的消費請求對象。

3、消費線程執行消息監聽器

執行完消費監聽器,會返回消費結果。

4、處理異常消息

當消費異常時,異常消息將重新發回 Broker 端的重試隊列( RocketMQ 會為每個 topic 創建一個重試隊列,以 %RETRY% 開頭),達到重試時間后將消息投遞到重試隊列中進行消費重試。

我們將在重試機制這一節重點講解 RocketMQ 如何實現延遲消費功能 。

假如異常的消息發送到 Broker 端失敗,則重新將這些失敗消息通過處理失敗消息定時任務重新提交到消息消費服務。

5、更新本地消費進度

消費者消費一批消息完成之后,需要保存消費進度到進度管理器的本地內存。

首先我們會從隊列消費快照 processQueue 中移除消息,返回消費快照 msgTreeMap 第一個偏移量 ,然后調用消費消息進度管理器 offsetStore 更新消費進度。

待更新的偏移量是如何計算的呢?

  • 場景1:快照中1001(消息1)到1010(消息10)消費了,快照中沒有了消息,返回已消費的消息最大偏移量 + 1 也就是1011。
  • 場景2:快照中1001(消息1)到1008(消息8)消費了,快照中只剩下兩條消息了,返回最小的偏移量 1009。
  • 場景3:1001(消息1)在消費對象中因為某種原因一直沒有被消費,即使后面的消息1005-1010都消費完成了,返回的最小偏移量是1001。

在場景3,RocketMQ 為了保證消息肯定被消費成功,消費進度只能維持在1001(消息1),直到1001也被消費完,本地的消費進度才會一下子更新到1011。

假設1001(消息1)還沒有消費完成,消費者實例突然退出(機器斷電,或者被 kill ),就存在重復消費的風險。

因為隊列的消費進度還是維持在1001,當隊列重新被分配給新的消費者實例的時候,新的實例從 Broker 上拿到的消費進度還是維持在1001,這時候就會又從1001開始消費,1001-1010這批消息實際上已經被消費過還是會投遞一次。

所以業務必須要保證消息消費的冪等性。

寫到這里,我們會有一個疑問:假設1001(消息1)因為加鎖或者消費監聽器邏輯非常耗時,導致極長時間沒有消費完成,那么消費進度就會一直卡住 ,怎么解決呢 ?

RocketMQ 提供兩種方式一起配合解決:

  • 拉取服務根據并發消費間隔配置限流拉取消息服務在拉取消息時候,會判斷當前隊列的 processQueue 消費快照里消息的最大偏移量 - 消息的最小偏移量大于消費并發間隔(2000)的時候 , 就會觸發流控 ,  這樣就可以避免消費者無限循環的拉取新的消息。
  • 清理過期消息消費消息并發服務啟動后,會定期掃描所有消費的消息,若當前時間減去開始消費的時間大于消費超時時間,首先會將過期消息發送 sendMessageBack 命令發送到 Broker ,然后從快照中刪除該消息。

6.2 順序消費

順序消息是指對于一個指定的 Topic ,消息嚴格按照先進先出(FIFO)的原則進行消息發布和消費,即先發布的消息先消費,后發布的消息后消費。

順序消息分為分區順序消息和全局順序消息。

1、分區順序消息

對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區塊分區,同一個分區內的消息按照嚴格的先進先出(FIFO)原則進行發布和消費。同一分區內的消息保證順序,不同分區之間的消息順序不做要求。

  • 適用場景:適用于性能要求高,以 Sharding Key 作為分區字段,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行消息發布和消費的場景。
  • 示例:電商的訂單創建,以訂單 ID 作為 Sharding Key ,那么同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發布的先后順序來消費。

2、全局順序消息

對于指定的一個 Topic ,所有消息按照嚴格的先入先出(FIFO)的順序來發布和消費。

  • 適用場景:適用于性能要求不高,所有的消息嚴格按照 FIFO 原則來發布和消費的場景。
  • 示例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式發布和消費全局順序消息。

全局順序消息實際上是一種特殊的分區順序消息,即 Topic 中只有一個分區,因此全局順序和分區順序的實現原理相同。

因為分區順序消息有多個分區,所以分區順序消息比全局順序消息的并發度和性能更高。

消息的順序需要由兩個階段保證:

  • 消息發送如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產生的順序,業務上要求同一訂單的消息保持順序,例如訂單 A 的消息發送和消費都按照 A1、A2、A3 的順序。如果是普通消息,訂單A 的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。下圖是生產者發送順序消息的封裝,原理是發送消息時,實現 MessageQueueSelector 接口, 根據 Sharding Key 使用 Hash 取模法來選擇待發送的隊列。生產者順序發送消息封裝
  • 消息消費消費者消費消息時,需要保證單線程消費每個隊列的消息數據,從而實現消費順序和發布順序的一致。

順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,并發消費和順序消費并沒有什么大的差別。

最大的差別在于:順序消費會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔20秒會重新嘗試。

順序消費核心流程如下:

1、 組裝成消費對象

2、 將請求對象提交到消費線程池

和并發消費不同的是,這里的消費請求包含消費快照 processQueue ,消息隊列 messageQueue 兩個對象,并不對消息列表做任何處理。

3、 消費線程內,對消費隊列加鎖

順序消費也是通過線程池消費的,synchronized 鎖用來保證同一時刻對于同一個隊列只有一個線程去消費它

4、 從消費快照中取得待消費的消息列表

消費快照 processQueue 對象里,創建了一個紅黑樹對象 consumingMsgOrderlyTreeMap 用于臨時存儲的待消費的消息。

5、 執行消息監聽器

消費快照的消費鎖 consumeLock 的作用是:防止負載均衡線程把當前消費的 MessageQueue 對象移除掉。

6、 處理消費結果

消費成功時,首先計算需要提交的偏移量,然后更新本地消費進度。

消費失敗時,分兩種場景:

  • 假如已消費次數小于最大重試次數,則將對象 consumingMsgOrderlyTreeMap 中臨時存儲待消費的消息,重新加入到消費快照紅黑樹 msgTreeMap 中,然后使用定時任務嘗試重新消費。
  • 假如已消費次數大于等于最大重試次數,則將失敗消息發送到 Broker ,Broker 接收到消息后,會加入到死信隊列里 , 最后計算需要提交的偏移量,然后更新本地消費進度。

我們做一個關于順序消費的總結 :

  1. 順序消費需要由兩個階段消息發送和消息消費協同配合,底層支撐依靠的是 RocketMQ 的存儲模型;
  2. 順序消費服務啟動后,隊列的數據都會被消費者實例單線程的執行消費;
  3. 假如消費者擴容,消費者重啟,或者 Broker 宕機 ,順序消費也會有一定幾率較短時間內亂序,所以消費者的業務邏輯還是要保障冪等。

七、保存進度

RocketMQ 消費者消費完一批數據后, 會將隊列的進度保存在本地內存,但還需要將隊列的消費進度持久化。

1、 集群模式

圖片

集群模式下,分兩種場景:

  • 拉取消息服務會在拉取消息時,攜帶該隊列的消費進度,提交給 Broker 的拉取消息處理器。
  • 消費者定時任務,每隔5秒將本地緩存中的消費進度提交到 Broker 的消費者管理處理器。

Broker 的這兩個處理器都調用消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務異步將消費進度持久化到消費進度文件 consumerOffset.json 中。

圖片

2、 廣播模式

廣播模式消費進度存儲在消費者本地,定時任務每隔 5 秒通過 LocalFileOffsetStore 持久化到本地文件offsets.json ,數據格式為 MessageQueue:Offset 。

圖片

廣播模式下,消費進度和消費組沒有關系,本地文件 offsets.json 存儲在配置的目錄,文件中包含訂閱主題中所有的隊列以及隊列的消費進度。

八、重試機制

集群消費下,重試機制的本質是 RocketMQ 的延遲消息功能。

消費消息失敗后,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。

Broker 端會為每個 topic 創建一個重試隊列 ,隊列名稱是:%RETRY% + 消費者組名 ,達到重試時間后將消息投遞到重試隊列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數后消息還沒有成功被消費,則消息將被投遞至死信隊列。

第幾次重試

與上次重試的間隔時間

第幾次重試

與上次重試的間隔時間

1

10 秒

9

7 分鐘

2

30 秒

10

8 分鐘

3

1 分鐘

11

9 分鐘

4

2 分鐘

12

10 分鐘

5

3 分鐘

13

20 分鐘

6

4 分鐘

14

30 分鐘

7

5 分鐘

15

1 小時

8

6 分鐘

16

2 小時

圖片

開源 RocketMQ 4.X 支持延遲消息,默認支持18 個 level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項確定的,如下:

圖片

Broker 在啟動時,內部會創建一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲 level 的個數,創建對應數量的隊列,也就是說18個 level 對應了18個隊列。

我們先梳理下延遲消息的實現機制。

1、生產者發送延遲消息

Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

2、Broker端存儲延遲消息

延遲消息在 RocketMQ Broker 端的流轉如下圖所示:

圖片

第一步:修改消息 Topic 名稱和隊列信息

Broker 端接收到生產者的寫入消息請求后,首先都會將消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會根據消息中的 Topic 信息和隊列信息,將其轉發到目標 Topic 的指定隊列 consumequeue 中。

但由于消息一旦存儲到 consumequeue 中,消費者就能消費到,而延遲消息不能被立即消費,所以 RocketMQ 將 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,并根據延遲級別確定要投遞到哪個隊列下。

同時,還會將消息原來要發送到的目標 Topic 和隊列信息存儲到消息的屬性中。

圖片

第二步:構建 consumequeue 文件時,計算并存儲投遞時間

圖片

圖片

上圖是 consumequeue 文件一條消息的格式,最后 8 個字節存儲 Tag 的哈希值,此時存儲消息的投遞時間。

第三步:定時調度服務啟動

ScheduleMessageService 類是一個定時調度服務,讀取 SCHEDULE_TOPIC_XXXX 隊列的消息,并將消息投遞到目標 Topic 中。

定時調度服務啟動時,創建一個定時調度線程池 ,并根據延遲級別的個數,啟動對應數量的 HandlePutResultTask ,每個 HandlePutResultTask 負責一個延遲級別的消費與投遞。

圖片

第四步:投遞時間到了,將消息數據重新寫入到 commitlog

消息到期后,需要投遞到目標 Topic 。第一步已經記錄了原來的 Topic 和隊列信息,這里需要重新設置,再存儲到 commitlog 中。

第五步:將消息投遞到目標 Topic 中

Broker 端的后臺服務線程會不停地分發請求并異步構建 consumequeue(消費文件)和 indexfile(索引文件)。因此消息會直接投遞到目標 Topic 的 consumequeue 中,之后消費者就可以消費到這條消息。

回顧了延遲消息的機制,消費消息失敗后,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。

Broker 端 SendMessageProcessor 處理器會調用 asyncConsumerSendMsgBack 方法。

圖片

首先判斷消息的當前重試次數是否大于等于最大重試次數,如果達到最大重試次數,或者配置的重試級別小于0,則重新創建 Topic ,規則是 %DLQ% + consumerGroup,后續處理消息發送到死信隊列。

正常的消息會進入 else 分支,對于首次重試的消息,默認的 delayLevel 是 0 ,RocketMQ 會將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時級別,消息消費重試首次,是延遲了第三個級別發起的重試,也就是距離首次發送 10s 后重試,其主題的默認規則是 %RETRY% + consumerGroup。

當延時級別設置完成,刷新消息的重試次數為當前次數加 1 ,Broker 端將該消息刷盤,邏輯如下:

圖片

延遲消息寫入到 commitlog 里 ,這里其實和延遲消息機制的第一步類似,后面按照延遲消息機制的流程執行即可(第二步到第六步)。

九、總結

下圖展示了集群模式下消費者并發消費流程 :

圖片

核心流程如下:

  1. 消費者啟動后,觸發負載均衡服務 ,負載均衡服務為消費者實例分配對應的隊列 ;
  2. 分配完隊列后,負載均衡服務會為每個分配的新隊列創建一個消息拉取請求  pullRequest  ,  拉取請求保存一個處理隊列 processQueue,內部是紅黑樹(TreeMap),用來保存拉取到的消息 ;
  3. 拉取消息服務單線程從拉取請求隊列  pullRequestQueue 中彈出拉取消息,執行拉取任務 ,拉取請求是異步回調模式,將拉取到的消息放入到處理隊列;
  4. 拉取請求在一次拉取消息完成之后會復用,重新被放入拉取請求隊列 pullRequestQueue 中 ;
  5. 拉取完成后,調用消費消息服務  consumeMessageService 的  submitConsumeRequest 方法 ,消費消息服務內部有一個消費線程池;
  6. 消費線程池的消費線程從消費任務隊列中獲取消費請求,執行消費監聽器  listener.consumeMessage ;
  7. 消費完成后,若消費成功,則更新偏移量 updateOffset,先更新到內存 offsetTable,定時上報到 Broker ;若消費失敗,則將失敗消費發送到 Broker 。
  8. Broker 端接收到請求后, 調用消費進度管理器的 commitOffset 方法修改內存的消費進度,定時刷盤到  consumerOffset.json。

RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:

  1. 客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實現完整的負載均衡邏輯,此外還需要實現拉消息、位點管理、消費失敗后將消息發回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
  2. 保證冪等非常重要。當客戶端升級或者下線時,或者 Broker 宕機,都要進行負載均衡操作,可能造成消息堆積,同時有一定幾率造成重復消費。

參考資料:

1、RocketMQ 4.9.4 Github 文檔

https://github.com/apache/rocketmq/tree/rocketmq-all-4.9.4/docs

2、RocketMQ 技術內幕

3、消息隊列核心知識點

https://mp.weixin.qq.com/s/v7_ih9X5mG3X4E4ecfgYXA

4、消息ACK機制及消費進度管理

https://zhuanlan.zhihu.com/p/25265380

責任編輯:武曉燕 來源: 勇哥java實戰分享
相關推薦

2021-08-26 05:02:50

分布式設計

2021-10-18 11:58:56

負載均衡虛擬機

2022-09-06 08:02:40

死鎖順序鎖輪詢鎖

2021-01-19 05:49:44

DNS協議

2022-09-14 09:01:55

shell可視化

2024-03-07 18:11:39

Golang采集鏈接

2020-07-15 08:57:40

HTTPSTCP協議

2020-11-16 10:47:14

FreeRTOS應用嵌入式

2022-07-19 16:03:14

KubernetesLinux

2020-07-09 07:54:35

ThreadPoolE線程池

2022-10-10 08:35:17

kafka工作機制消息發送

2024-05-10 12:59:58

PyTorch人工智能

2024-01-11 09:53:31

面試C++

2022-09-08 10:14:29

人臉識別算法

2024-01-05 08:30:26

自動駕駛算法

2022-07-15 16:31:49

Postman測試

2021-06-04 07:27:24

sourcemap前端技術

2022-04-25 10:56:33

前端優化性能

2022-02-15 18:45:35

Linux進程調度器

2024-09-09 05:00:00

RedisString數據庫
點贊
收藏

51CTO技術棧公眾號

欧美日韩免费观看一区=区三区| xxxxx性欧美特大| 国产精品一卡二卡在线观看| 久久久久久久97| 国产精品久久久免费观看| 日韩在线影院| 一区二区三区中文在线观看| 色哟哟亚洲精品| 亚洲精品高潮| 国产又粗又猛又爽又黄的视频一| 中文字幕一区二区中文字幕| 亚洲一区999| 免费观看国产视频在线| 成人小说亚洲一区二区三区| 久久婷婷影院| 亚洲国产日韩av| 欧美激情精品久久久久久变态| 免费中文字幕av| 99久久久成人国产精品| 精品成人av一区| 综合久久国产| 国产乱视频在线观看| 国产精品白丝jk白祙喷水网站| 欧日韩在线观看| 青娱乐91视频| 日韩欧美中文| 亚洲欧美日韩中文在线制服| 国产51自产区| 2019中文亚洲字幕| 欧美在线视频不卡| 欧美老熟妇喷水| 青春草在线视频| 国产精品国产三级国产普通话蜜臀 | 欧美体内谢she精2性欧美| 国产 国语对白 露脸 | 国产偷v国产偷v亚洲高清| 成人欧美一区二区| 夜夜狠狠擅视频| 日韩av网站免费在线| 欧亚精品中文字幕| 成人毛片18女人毛片| 精品96久久久久久中文字幕无| 精品国产欧美一区二区三区成人| av电影网站在线观看| 日韩有码中文字幕在线| 精品精品国产高清一毛片一天堂| 日韩a一级欧美一级| 久久精品嫩草影院| 午夜电影网一区| 99久久国产综合精品五月天喷水| 中文字幕在线三区| 亚洲欧美国产高清| 中文字幕99| 亚洲s色大片| 国产精品污www在线观看| 视频一区不卡| 青青草视频在线观看| 97se亚洲国产综合自在线观| 国产日韩精品推荐| 好吊色在线观看| 96av麻豆蜜桃一区二区| 乱一区二区三区在线播放| 亚洲欧美日韩综合在线| 久久一日本道色综合| 欧美午夜免费| h视频在线免费| 国产精品久久久久影视| 致1999电视剧免费观看策驰影院| av网址在线| 亚洲图片欧美视频| 日韩中文字幕在线视频观看 | 国产精自产拍久久久久久蜜| 国产精品久久久久久久久毛片| 国产一区二区三区久久久| 成人av电影免费| 性xxxx视频播放免费| 91美女视频网站| 亚洲毛片aa| 日韩激情美女| 欧美天堂在线观看| 无尽裸体动漫2d在线观看| 清纯唯美激情亚洲| 亚洲国产日韩欧美在线99| av网站免费在线看| 小处雏高清一区二区三区| 欧美精品精品精品精品免费| 久久久久在线视频| 韩国视频一区二区| 狠狠色综合色区| av在线电影院| 亚洲国产人成综合网站| 黑森林福利视频导航| 涩涩涩久久久成人精品| 亚洲国产精彩中文乱码av在线播放| 亚洲国产欧美视频| 亚洲激情中文在线| 欧美影院久久久| 国产精品高潮呻吟AV无码| 99久久99久久精品国产片果冻 | 色综合久久综合中文综合网| 国产又黄又猛的视频| 卡通动漫精品一区二区三区| 在线看国产精品| 黄色小视频在线免费看| 日韩不卡免费视频| 成人欧美一区二区三区黑人免费| 高清中文字幕一区二区三区| 夜夜揉揉日日人人青青一国产精品 | 999视频在线免费观看| 免费观看成年在线视频网站| 亚洲黄色小视频| 91香蕉视频导航| 国产精品网址| 久久夜色精品国产亚洲aⅴ| 亚洲视频 欧美视频| 国产成人免费视频精品含羞草妖精 | 18一19gay欧美视频网站| 国产精品欧美激情在线| 久久久国产午夜精品| 蜜臀精品一区二区| 91精品国产色综合久久不卡粉嫩| 亚洲欧洲国产伦综合| 国产精品18p| 国产成人综合亚洲网站| 一本一道久久a久久综合精品| 在线高清av| 欧美刺激午夜性久久久久久久| 成人做爰69片免网站| 久久久久网站| 欧美18视频| 超碰在线97国产| 日韩视频免费观看高清完整版在线观看 | 黄页免费在线观看视频| 欧美片网站免费| 久久久精品久久久久| 中文字幕免费观看视频| 国产欧美一区二区精品性色| 大肉大捧一进一出好爽视频| 91精品入口| 久久久久成人网| 亚洲精品无码久久久| 亚洲免费观看视频| 国产在线视频三区| 亚洲无中文字幕| 91夜夜揉人人捏人人添红杏| 免费网站免费进入在线| 欧美精品三级在线观看| 国精产品一区一区二区三区mba| 蜜乳av一区二区三区| 亚洲一区二区三区在线观看视频| 欧美日韩在线精品一区二区三区激情综合 | 91精品国产91久久久久| 性欧美8khd高清极品| 亚洲人成精品久久久久久| 久久精品久久99| 91精品国产91久久久久久密臀 | 亚洲精品视频导航| 欧美精品一区二区三区精品| 国产精品欧美风情| 麻豆av在线免费看| 欧美一区二区三区精品| 欧美成人aaa片一区国产精品| 国产福利91精品一区二区三区| 一本色道久久88亚洲精品综合| 波多野结衣一区二区三区免费视频| 久久久久久久国产| 婷婷综合激情网| 一本大道久久a久久精二百| 亚洲а∨天堂久久精品2021| 久久精品国产精品亚洲精品| gogogo免费高清日本写真| 亚洲成人偷拍| 777国产偷窥盗摄精品视频| 久青草国产在线| 欧美日韩精品一区二区| 私库av在线播放| 99精品久久99久久久久| 日日噜噜夜夜狠狠| 在线精品小视频| 久久久www免费人成黑人精品| 快播电影网址老女人久久| 久久久国产精品一区| 亚洲第一页综合| 欧美天堂在线观看| 成人欧美一区二区三区黑人一 | 最新中文字幕视频| 蜜臀久久99精品久久久久久9| 福利网在线观看| 高清日韩中文字幕| 国产精品美女在线观看| 羞羞网站在线看| 日韩av综合网站| 中文字幕av久久爽| 亚洲愉拍自拍另类高清精品| 成人性生交大免费看| 久久99久久久久久久久久久| 国产主播自拍av| 日韩视频在线观看| 国产自产精品| 精品乱码一区二区三区四区| 国a精品视频大全| 18视频免费网址在线观看| 精品福利av导航| 一级aaaa毛片| 欧美性色19p| 亚洲av鲁丝一区二区三区| 国产三级精品三级在线专区| 欧美图片自拍偷拍| 老司机精品视频导航| 国产精品宾馆在线精品酒店| 亚洲视频在线免费| 亚洲国产一区二区在线| 精品嫩草影院| 亚洲综合中文字幕68页| 嫩草伊人久久精品少妇av杨幂| 欧美国产亚洲精品久久久8v| 91大神xh98hx在线播放| 亚洲另类欧美自拍| 三级网站免费观看| 91精品国产综合久久国产大片 | 天天综合av| 欧美尺度大的性做爰视频| av每日在线更新| 伊人春色精品| 欧美激情免费观看| 黄网站在线免费看| 伊人精品在线观看| 免费福利在线观看| 日韩电影中文字幕一区| 亚洲国产精品一| 欧美一级理论片| 国产又粗又黄又爽视频| 欧美日精品一区视频| 国产一区二区视频免费| 欧美视频一区二区三区…| 中文在线观看免费网站| 亚洲视频香蕉人妖| 美国黄色片视频| 国产精品久久久久国产精品日日 | 中文字幕五月欧美| 人妻熟人中文字幕一区二区| 国产欧美中文在线| av网站免费在线看| 中文字幕欧美三区| 懂色av蜜桃av| 国产精品私人影院| 最新日韩免费视频| 国产精品女主播av| 亚洲色图100p| 中文字幕一区二区三区精华液| 极品久久久久久久| 成人欧美一区二区三区白人| 91久久久久久久久久久久久久| 国产精品剧情在线亚洲| 99热这里只有精品4| 1024亚洲合集| 欧美激情国产精品免费| 亚洲综合无码一区二区| 国产乡下妇女做爰| 欧美日韩激情小视频| 在线观看日本网站| 欧美少妇性性性| 国产又粗又黄又爽| 日韩精品在线一区| 婷婷五月综合激情| 亚洲人成在线观看| 999在线视频| 欧美成人午夜剧场免费观看| 黄色在线看片| 国产福利视频一区| 日韩免费大片| 国产91社区| 免费观看久久av| 在线看无码的免费网站| 欧美激情在线| 国产午夜伦鲁鲁| 免费人成精品欧美精品| 无码人妻少妇色欲av一区二区| 粉嫩一区二区三区在线看| 国产三级国产精品| 国产精品国产三级国产普通话蜜臀 | 国内精品一区视频| 久久久99久久精品女同性| www.综合网.com| 国产精品专区一| www.成人网| 午夜精品电影在线观看| 亚洲欧美在线专区| 黄色免费观看视频网站| 精久久久久久久久久久| 天天插天天射天天干| 国产精品毛片a∨一区二区三区| 欧美高清视频一区二区三区| 日韩欧美中文在线| 国产jzjzjz丝袜老师水多| 亚洲精品久久久久久下一站 | 最新热久久免费视频| 日韩精品国产一区二区| 欧美日韩在线播放一区| 五月婷婷在线播放| 久久综合五月天| 3d欧美精品动漫xxxx无尽| av在线亚洲男人的天堂| 成人在线免费观看网站| 精品久久一二三| 国产一区二区h| 六月婷婷七月丁香| 亚洲夂夂婷婷色拍ww47| 在线亚洲欧美日韩| 国产视频自拍一区| 制服丝袜中文字幕在线| 国产精品视频网| 日日天天久久| 欧美狂野激情性xxxx在线观| 蜜臀av性久久久久av蜜臀妖精| 亚洲熟女一区二区| 亚洲蜜臀av乱码久久精品| 亚洲精品国产欧美在线观看| 亚洲高清久久网| 日皮视频在线观看| 91久热免费在线视频| 俺要去色综合狠狠| 动漫av网站免费观看| 成人久久久精品乱码一区二区三区| 亚洲色图27p| 欧美影院午夜播放| 可以在线观看的黄色| 琪琪第一精品导航| 色狠狠久久av综合| 国产伦精品一区二区三区四区视频_| 国产酒店精品激情| 日韩一区二区不卡视频| 欧美日韩亚洲综合在线| 高清美女视频一区| 国产成人精品一区二区三区| 午夜欧洲一区| 啊啊啊一区二区| 99综合电影在线视频| 精品无码人妻一区二区三区品| 欧美一区二区三区在线电影| 浪潮av一区| 91在线免费视频| 欧美1区视频| 视频区 图片区 小说区| 亚洲免费在线看| 超碰在线人人干| 欧美激情视频在线| 理论片一区二区在线| 99在线精品免费视频| 成人涩涩免费视频| 天天操天天射天天爽| 日韩激情片免费| 厕沟全景美女厕沟精品| 欧美日韩一区二区三区在线观看免 | 国产精品极品| 3d动漫一区二区三区| 91在线精品一区二区| 国产成人精品777777| 一区二区亚洲欧洲国产日韩| 精品123区| 制服丝袜综合日韩欧美| 国产精品1024久久| 久久久综合久久久| 亚洲精品大尺度| 成人性生活视频| 一区二区三区国产福利| 国产伦精一区二区三区| 久久久久久久中文字幕| 亚洲精品成人网| 岛国一区二区| 欧美日韩一级在线| 成人动漫精品一区二区| 国产91精品一区| 中文字幕欧美精品日韩中文字幕| 日韩综合av| 成人毛片一区二区| 国产亚洲精品aa| 99久久精品日本一区二区免费| 欧美大片在线影院| 少妇久久久久| 亚洲18在线看污www麻豆| 亚洲大尺度视频在线观看| 欧洲视频在线免费观看| 国产日韩欧美在线| 欧美日韩影院| www.黄色在线| 日韩视频一区二区三区在线播放 | 欧美日韩综合| 亚洲av无码一区二区二三区| 欧美日韩成人激情| av电影在线地址| 亚洲欧美日韩不卡一区二区三区| 夫妻av一区二区| 中文精品久久久久人妻不卡| 色综合视频网站| 欧美一区二区性| 91精品啪在线观看国产| 欧美日韩你懂的| 原纱央莉成人av片| av动漫在线免费观看|