精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

公眾號矩陣
移動端

真香,聊聊 RocketMQ 5.0 的 POP 消費模式!

開發(fā) 前端
可能還存在限制 Reef 實現(xiàn)更高性能的因素,我們后續(xù)將研究 Reef 凍結(jié)期間的潛在回歸,并繼續(xù)努力使 Reef 成為迄今為止最好的 Ceph 版本!

大家好,我是君哥。

大家都知道,RocketMQ 消費模式有 PULL 模式和 PUSH 模式,不過本質(zhì)上都是 PULL 模式,而在實際使用時,一般使用 PUSH 模式。

不過,RocketMQ 的 PUSH 模式有明顯的不足,主要體現(xiàn)在以下幾個方面:

  1. 消息積壓了,增加消費者不一定能解決。PUSH 模式如下圖:

圖片

上面的圖中,消費組中的消費者每個消費者消費兩個 MessageQueue,這種情況下,增加消費者是可以提高消費能力的。

但是下面這張圖,每個消費者消費一個 MessageQueue,因為同一個 MessageQueue 只能被同一個消費組中的一個消費者消費,所以增加消費者并不能提高消費能力。

圖片

  1. 客戶端的處理邏輯比較多,比如負載均衡、offset 管理、消費失敗后的處理(比如失敗消息發(fā)送回 Broker),這些邏輯都在客戶端。
  2. 如果再支持其他語言,客戶端會變得越來越重。
  3. 消費者機器 hang 住,可能會導(dǎo)致消息積壓,如下圖:

圖片

通過客戶端負責均衡,MessageQueue0 這個隊列分配給了 Consumer0 進行獨占消費,如果 Consumer0 這個消費者 hang 住了,但是服務(wù)沒有掛,不能從 Name Server 中下線,因為 Consumer0 拉取到的消息不能消費,也就不能給 Broker 發(fā)送更新 Offset 的請求,最終導(dǎo)致消息積壓。這種情況只能手動讓 Consumer0 下線或者讓 Consumer0 重啟。

RocketMQ 5.0 為了解決 PUSH Consumer 上面的問題,引入了 POP Consumer。

1 POP 客戶端

POP 模式的客戶端引入的背景是 RocketMQ 5.0 為了更好地擁抱云原生,客戶端要改造成無狀態(tài)的輕量級客戶端,RocketMQ 4.x 中客戶端具有的負載均衡、權(quán)限管理、消費管理等功能都從客戶端移動到了 Proxy。

POP 消費模式如下圖:

圖片

四個消費者都可以消費 Broker1 和 Broker2 上面的所有隊列,這樣即使某一個消費者 hang 住了,其他消費者也可以消費,并不會造成消息積壓。

同時,從上圖中可以看到,POP 客戶端還有一個優(yōu)勢,增加消費者數(shù)量是可以提高消費能力的,不受 MessageQueue 數(shù)量和消費者數(shù)量的限制。

跟 PUSH 模式相比,POP 模式拉取到消息后,會設(shè)置一個 POP_CK 屬性,代碼如下:

//MQClientAPIImpl.java
if (requestHeader instanceof PopMessageRequestHeader) {
 if (startOffsetInfo == null) {
  // we should set the check point info to extraInfo field , if the command is popMsg
  // find pop ck offset
  String key = messageExt.getTopic() + messageExt.getQueueId();
  if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
   map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
    messageExt.getTopic(), brokerName, messageExt.getQueueId()));

  }
  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
 } else {
  String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
  String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
  int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
  Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);

  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
   ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
    responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
  );
  //...
 }
}

可以看到,POP_CK 屬性包含了 brokerName、Topic、QueueId、offset 等參數(shù),通過這個屬性可以唯一標識一條消息了。

從上面的代碼還可以看到,responseHeader 中有一個 invisibleTime 屬性,這個屬性的作用是消費者通過 POP 模式拉取到一條消息后,這段時間(invisibleTime)內(nèi)這條消息在 Broker 端是不可見的,消費者再次拉取就不會重復(fù)拉取到。但是如果過了這段時間,消費者還沒有給 Broker 返回 ACK,這條消息會變?yōu)榭梢姡俅伪幌M者拉取到。

消費完成后,向 Broker 發(fā)送 ACK 消息,見下面代碼:

public void ackMessageAsync(
 final String addr,
 final long timeOut,
 final AckCallback ackCallback,
 final AckMessageRequestHeader requestHeader //
) throws RemotingException, MQBrokerException, InterruptedException {
 final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
 this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {

  @Override
  public void onComplete(ResponseFuture responseFuture) {
   RemotingCommand response = responseFuture.getResponseCommand();
   if (response != null) {
    try {
     AckResult ackResult = new AckResult();
     if (ResponseCode.SUCCESS == response.getCode()) {
      ackResult.setStatus(AckStatus.OK);
     } //...
     assert ackResult != null;
     ackCallback.onSuccess(ackResult);
    } //...
   } else {
    //...
   }

  }
 });
}

2. Broker

從上面的介紹可以看到,每個消費者都可以從 Broker 的所有 MessageQueue 上拉取消息,那如果多個消費者都從一個 MessageQueue 上面拉取,有沒有可能會重復(fù)消費呢?

Broker 收到消息拉取請求,從 MessageStore 拉取消息時,首先會給 MessageQueue 進行加鎖,加鎖成功后,才會拉取消息,這是其他客戶端來拉取時就會加鎖失敗。

//PopMessageProcessor.java
String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
if (!queueLockManager.tryLock(lockKey)) {
 restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
 return restNum;
}

Broker 從 MessageStore 拉取到消息后,會定義一個 CheckPoint 放入緩存,代碼如下:

//PopMessageProcessor.java
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
 PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
 Channel channel, long popTime,
 ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
 StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
 String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
  requestHeader.getConsumerGroup()) : requestHeader.getTopic();
 String lockKey =
  topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
 //...
 offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
 GetMessageResult getMessageTmpResult = null;
 try {
  //...

  restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
  if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {

   if (isOrder) {
    //...
   } else {
    appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
   }
  } //...
 } //...
 return restNum;
}

Broker 收到消費者發(fā)來的 ACK 后,會把 CheckPoint 從緩存中移除。

如果 Broker 一直沒有收到 ACK,則會把 CheckPoint 從緩存中移除,同時把 CheckPoint 發(fā)送給 MessageStore,由 MessageStore 發(fā)送到重試隊列。代碼如下:

boolean removeCk = !this.serving;
 // ck will be timeout
 if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
  removeCk = true;
 }

 // the time stayed is too long
 if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
  removeCk = true;
 }

 // double check
 if (removeCk) {
  // put buffer ak to store
  if (pointWrapper.getReviveQueueOffset() < 0) {
   putCkToStore(pointWrapper, false);
  }
 }
}

3 總結(jié)

POP 客戶端有很多的優(yōu)勢,總結(jié)如下:

  1. 無狀態(tài),更好地擁抱云原生;
  2. 計算相關(guān)的功能下移到 Proxy,更加輕量級;
  3. 消費能力擴展不受 MessageQueue 數(shù)量的限制;
  4. 消費者 hang 住,并不會導(dǎo)致消息積壓。
責任編輯:武曉燕 來源: 君哥聊技術(shù)
相關(guān)推薦

2023-08-07 08:32:05

RocketMQ名字服務(wù)

2023-07-03 08:57:45

Master服務(wù)TCP

2022-05-23 09:18:55

RocketMQ存儲中間件

2022-07-07 09:00:49

RocketMQ消費者消息消費

2023-12-25 19:28:59

RocketMQ大數(shù)據(jù)

2021-12-27 08:22:18

Kafka消費模型

2024-08-19 04:00:00

2023-09-26 08:01:46

消費者TopicRocketMQ

2025-07-08 08:51:45

2022-08-09 08:18:19

RocketMQpush消費

2023-04-11 08:35:22

RocketMQ云原生

2025-05-09 09:05:00

Spring框架設(shè)計模式

2024-10-06 12:56:36

Golang策略設(shè)計模式

2024-01-24 09:00:31

SSD訂閱關(guān)系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點

2021-05-17 14:57:23

策略模式代碼

2021-08-09 10:31:33

自定義授權(quán)響應(yīng)

2024-12-13 08:28:45

設(shè)計模式依賴

2023-06-12 08:49:12

RocketMQ消費邏輯

2022-11-08 07:36:17

RocketMQ消費者消息堆積
點贊
收藏

51CTO技術(shù)棧公眾號

天天综合天天综合| 久草资源在线视频| 成人亚洲精品| 亚洲高清免费视频| 欧洲亚洲一区二区| 一区二区精品视频在线观看| 欧美午夜a级限制福利片| 亚洲国产成人在线播放| caopor在线视频| 午夜成年人在线免费视频| 久久美女高清视频 | 美女91在线看| 中文字幕国产一区二区| 成人三级在线| 亚洲高清在线看| 欧美日韩国产欧| 亚洲最新av在线| 95视频在线观看| 宅男噜噜噜66国产精品免费| 欧美色视频日本高清在线观看| 在线视频一区观看| 青青久在线视频| 国产成人aaa| 国产色视频一区| 特级西西444www大精品视频免费看| 国产精品久久久久蜜臀| 亚洲欧洲国产伦综合| 无码人妻丰满熟妇啪啪网站| 欧美videos粗暴| 欧美日韩亚洲国产一区| 日本免费a视频| 成人午夜在线影视| 国产精品你懂的在线欣赏| 精品乱子伦一区二区三区| 精品国产999久久久免费| 蜜桃一区二区三区四区| 国产成人中文字幕| 国语对白永久免费| 精品二区视频| 久久久久中文字幕| 青娱乐在线视频免费观看| 欧美gay男男猛男无套| 国产一区二区三区视频| 日本xxxxxxxxx18| 亚欧洲精品视频在线观看| 亚洲国产成人久久综合一区| 久草免费资源站| 亚洲第一二区| 欧美一区二区三区精品| 中文字幕一区二区在线观看视频| 欧美日韩破处视频| 欧美日韩在线电影| 色天使在线观看| 在线成人免费| 91精品国产一区二区| 超碰中文字幕在线观看| 亚洲一区二区小说| 欧美一级高清大全免费观看| 欧美老女人bb| 精品福利一区| 日韩av网址在线| 亚洲av无码一区二区二三区| 亚洲第一二三区| 亚洲欧美福利视频| 国产123在线| 五月综合激情| 九九精品在线视频| 国产污片在线观看| 亚洲精品社区| 国产精品扒开腿做爽爽爽视频| 日本熟妇一区二区三区| 久久精品72免费观看| 91亚洲精品久久久| 手机看片1024日韩| 久久丝袜美腿综合| 性欧美精品一区二区三区在线播放| av影片在线看| 一区二区三区在线免费| 丁香六月激情网| 欧美一级大黄| 欧美日韩国产一级片| 永久av免费在线观看| 精品三级在线观看视频| 亚洲色图第三页| 中日韩一级黄色片| 亚洲人成久久| 国产精品久久久久久久久影视| 一本到在线视频| 国产91在线观看| 日本不卡一区二区三区在线观看| 色影视在线观看| 亚洲国产美国国产综合一区二区| 日韩精品一区二区三区久久| 九九热这里有精品| 欧美成人一区二区三区片免费| 国产精品久久久免费观看| 欧美中文字幕一区二区| 欧美激情免费视频| japanese国产在线观看| 丁香啪啪综合成人亚洲小说| 欧美三级网色| 不卡av免费观看| 欧美日韩一卡二卡三卡 | 精品少妇人妻av免费久久洗澡| 欧美一级大黄| 亚洲精品一线二线三线| 麻豆视频免费在线播放| 在线欧美视频| 成人免费网站在线观看| 精品亚洲综合| 亚洲国产综合视频在线观看| 污视频免费在线观看网站| theporn国产在线精品| 一色桃子一区二区| 免费日韩一级片| 国产成人午夜精品5599 | 国内精品久久久久久久影视蜜臀| 国产精品99久久久久久人| 亚洲国产精品视频在线| 国产精品女同互慰在线看| 国产精品第12页| a看欧美黄色女同性恋| 日韩视频在线免费| 黄色在线免费观看| 成人一级视频在线观看| 中国一级黄色录像| 国产亚洲人成a在线v网站| 亚洲老头老太hd| www..com国产| 成人动漫一区二区在线| 中文字幕在线中文| 国产精品久久久久久av公交车| 中文字幕亚洲精品| 神马久久久久久久| 久久久久一区二区三区四区| 日韩欧美一区二| 女同久久另类99精品国产| 欧美激情在线观看视频| aaa国产视频| 亚洲日本中文字幕区| 中文字幕第38页| 欧美日韩一二三四| 国产精品18久久久久久首页狼| 四虎影视精品成人| 黑人巨大精品欧美一区免费视频 | 欧美日韩成人精品| 精品国产av鲁一鲁一区 | 成人黄色片免费| 精品国产一区二区三区2021| 久久久精品视频在线观看| 亚洲资源在线播放| 中文字幕一区二区日韩精品绯色| www.亚洲高清| 91麻豆国产自产在线观看亚洲| 国产日韩av在线播放| 欧美videos极品另类| 制服.丝袜.亚洲.中文.综合| 午夜三级在线观看| 国产剧情av麻豆香蕉精品| 肉大捧一出免费观看网站在线播放 | 免费在线高清av| 在线观看www91| 精品国产国产综合精品| 国产原创一区二区| 国产精品69久久久| 亚洲va久久久噜噜噜久久| 日本成熟性欧美| lutube成人福利在线观看| 欧美日韩免费观看一区三区| 97成人资源站| 不卡在线视频中文字幕| 亚洲色成人一区二区三区小说| 国产精品亚洲人成在99www| 国产日韩中文字幕| 好久没做在线观看| 亚洲人成欧美中文字幕| 亚洲专区在线播放| 亚洲成人av电影| www.av天天| 国产一区二区三区在线观看免费| 国产一级黄色录像片| 国产毛片精品| 国产精品91久久| 在线你懂的视频| 日韩精品欧美国产精品忘忧草| 做爰无遮挡三级| 亚洲一区二区三区四区不卡 | 国产精品国产馆在线真实露脸| 欧美性受xxxx黒人xyx性爽| 亚洲先锋成人| 欧美一区二区三区在线播放| 欧美日韩黄色| 国产精品高清免费在线观看| 色www永久免费视频首页在线| 亚洲欧美一区二区三区在线| 国产一区二区三区三州| 精品国产福利视频| 国产wwwwxxxx| 久久综合成人精品亚洲另类欧美 | a在线视频观看| 91亚洲人成网污www| 久久久久欧美| 国产精品美女久久久久人| 欧美亚洲在线播放| 国产探花视频在线观看| 中文字幕av一区二区| 日韩中文字幕免费观看| 欧美精品免费视频| 无码人妻av一区二区三区波多野| 亚洲精品欧美综合四区| 最新中文字幕av| 99在线精品免费| 波多野结衣中文字幕在线播放| 久久婷婷激情| 成人性免费视频| 欧美91大片| 亚洲欧洲另类精品久久综合| 思热99re视热频这里只精品| 99在线高清视频在线播放| 羞羞视频在线观看一区二区| 热久久这里只有| www成人免费观看| 欧美黑人狂野猛交老妇| 久久亚洲天堂| 丝袜一区二区三区| 国产在线资源| 亚洲人永久免费| 天堂а√在线8种子蜜桃视频| 日韩欧美综合一区| av男人天堂av| 在线综合视频播放| 亚洲中文字幕一区二区| 欧美视频三区在线播放| 亚洲欧美一二三区| 欧美性猛交xxxx| youjizz在线视频| 好吊成人免视频| 天堂中文在线网| 粉嫩av一区二区三区免费野| 日韩女同强女同hd| 亚洲成av人片在线观看无码| 国语对白一区二区| 亚洲国产综合色| 国产无套粉嫩白浆内谢| 亚洲大片在线观看| 日本少妇毛茸茸高潮| 午夜精品视频在线观看| 精品无码久久久久久久| 亚洲一区二区视频在线观看| 久久精品视频6| 亚洲国产精品尤物yw在线观看| 国产在线视频你懂的| 亚洲一区在线视频| 国产精品6666| 日韩欧美极品在线观看| 伊人中文字幕在线观看| 欧洲激情一区二区| 一区不卡在线观看| 日韩欧美在线不卡| 亚洲黄色a级片| 亚洲国产日韩精品在线| 免费观看成年在线视频网站| 在线观看视频99| 国内精品久久久久国产| 另类色图亚洲色图| 波多野结衣在线观看| 欧美亚洲免费电影| 国产成人精选| av噜噜色噜噜久久| 自拍偷拍精品| 伊人情人网综合| 国产一区二区中文| 国产91在线视频观看| 蜜臀久久99精品久久久久宅男| 最新国产黄色网址| 波多野洁衣一区| 亚洲一区视频在线播放| 亚洲婷婷综合久久一本伊一区 | 一本色道久久综合亚洲aⅴ蜜桃| 高潮毛片又色又爽免费| 欧美疯狂做受xxxx富婆| 黄色小视频免费在线观看| 日韩精品一二三四区| av大片在线看| 欧美国产日韩xxxxx| 亚洲一二三四| 91精品在线看| 亚洲成人一品| 亚洲色图都市激情| 午夜在线精品| 亚洲综合123| 久久久不卡网国产精品一区| 欧美卡一卡二卡三| 色偷偷久久人人79超碰人人澡| 亚洲一线在线观看| 亚洲国产精品va在线观看黑人| 成人网视频在线观看| 欧美精品video| 老司机精品视频网| 久久精品国产综合精品| 亚洲精品a级片| 麻豆av免费在线| 粉嫩一区二区三区性色av| 337人体粉嫩噜噜噜| 香蕉加勒比综合久久| 一区二区日韩视频| 日韩电影大片中文字幕| 91高清在线观看视频| 国产精品一区二区性色av| 欧美日韩直播| 青青青在线观看视频| 久久国产欧美日韩精品| 自拍偷拍中文字幕| 亚洲国产另类av| 99久久久久成人国产免费| 国产亚洲综合久久| 澳门成人av网| 国产美女精品在线观看| 欧美日韩1区| 亚洲一区二区中文字幕在线观看| 欧美韩国日本一区| youjizz在线视频| 亚洲国产日韩精品在线| 2024最新电影免费在线观看| 国产精品亚洲片夜色在线| 神马电影久久| 各处沟厕大尺度偷拍女厕嘘嘘| 国产91精品一区二区麻豆亚洲| 杨钰莹一级淫片aaaaaa播放| 欧美日高清视频| 视频三区在线| 国产久一一精品| 日韩精品免费一区二区三区| 午夜免费高清视频| 国产日韩欧美精品在线| 免费看污视频的网站| 亚洲精品之草原avav久久| 亚洲人成在线网站| 久久国产精品-国产精品| 亚洲视频精品| 亚洲精品无码一区二区| 亚洲国产美女搞黄色| 少妇人妻偷人精品一区二区| 久久久久亚洲精品国产| 国产成人一二片| 日本xxxxxxxxxx75| 99久久精品99国产精品| 日韩精品一区二区不卡| 亚洲国产精品大全| 一级毛片久久久| 欧美日韩一区二区三区在线观看免| 国产精品毛片在线| 国产精品毛片一区二区| 色噜噜狠狠成人中文综合| 久蕉在线视频| 国产精品免费在线免费| 999久久久国产精品| 国产性生活一级片| 亚洲一卡二卡三卡四卡| 色中色在线视频| 国产精品久久激情| 999视频精品| 国偷自产av一区二区三区麻豆| 亚洲v日本v欧美v久久精品| 亚洲日本国产精品| 国产精品激情av电影在线观看 | 欧美猛交免费看| 国内自拍欧美| 亚洲成熟丰满熟妇高潮xxxxx| 国产欧美日韩另类一区| 国产视频aaa| 欧美在线激情网| 日韩美女一区二区三区在线观看| 亚欧精品在线视频| 精品久久久久久电影| 97视频在线观看网站| 亚洲一区二区中文字幕| aⅴ色国产欧美| 91成人精品一区二区| 日韩精品一区二区三区在线| xx欧美xxx| 男女激烈动态图| 久久亚洲二区三区| 国产又大又粗又硬| 91av网站在线播放| 久久高清免费| 男女性杂交内射妇女bbwxz| 日本久久电影网| 宅男网站在线免费观看| 欧美欧美一区二区| 国产精品一区在线观看你懂的| 国产精品一区二区6| 俺去了亚洲欧美日韩| 久久精品国产亚洲5555| 色啦啦av综合| 色综合久久88色综合天天免费| 成a人片在线观看| 欧美亚洲精品日韩| 成人免费视频视频在线观看免费| 日韩乱码一区二区三区|