精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

4 張圖,9 個維度告訴你怎么做能確保 RocketMQ 不丟失消息

開發 架構
引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

大家好,我是君哥。

引入消息隊列可以方便地實現系統解耦、削峰填谷等作用。但是消息隊列使用不當,可能會引起消息丟失,在一些消息敏感的業務場景下,這是不允許的。今天我們來聊一聊 RocketMQ 怎么做能確保消息不丟失。

1 RocketMQ 簡介

RocketMQ 是阿里巴巴開源的分布式消息中間件,整體架構如下圖:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同時 Name Server 進行集群注冊管理和保存元數據。

2 消息不丟失

要想保證消息不丟失,需要從以下幾個方面考慮:

  • Producer 發送消息
  • Broker 保存消息
  • Consumer 消費消息
  • Broker 主從切換

維度 1:同步發送,代碼如下:

public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

同步發送會返回 4 個狀態碼:

  • SEND_OK:消息發送成功。需要注意的是,消息發送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態才能真正表示發送成功。
  • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時。
  • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時。
  • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用。

根據返回的狀態碼,可以做消息重試,這里設置的重試次數是 3。

消息重試時,消費端一定要做好冪等處理。

維度 2:異步發送,代碼如下:

public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {

}

@Override
public void onException(Throwable e) {
// TODO 可以在這里加入重試邏輯
}
});
}

異步發送,可以重寫回調函數,回調函數捕獲到 Exception 時表示發送失敗,這時可以進行重試,這里設置的重試次數是 3。

維度 3:刷盤策略

  • 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。
  • 同步刷盤:消息寫入內存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH

維度 4:Broker 多副本和高可用

Broker 為了保證高可用,采用一主多從的方式部署。如下圖:

消息發送到 master 節點后,slave 節點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節點復制消息就直接給 Producer 返回成功。

這樣會有一個問題,如果 slave 節點還沒有完成消息復制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節點同步復制消息,即等 slave 節點復制消息成功后再給 Producer 返回發送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改為同步復制后,消息復制流程如下:

  • slave 初始化后,跟 master 建立連接并向 master 發送自己的 offset;
  • master 收到 slave 發送的 offset 后,將 offset 后面的消息批量發送給 slave;
  • slave 把收到的消息寫入 commitLog 文件,并給 master 發送新的 offset;
  • master 收到新的 offset 后,如果 offset >= producer 發送消息后的 offset,給 Producer 返回 SEND_OK。

維度 5:消息確認

Consumer 消費消息的代碼如下:

public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}

如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。

維度 6:Consumer 重試

Consumer 消費失敗,這里有 3 種情況:

  • 返回 RECONSUME_LATER
  • 返回 null
  • 拋出異常

Broker 收到這個響應后,會把這條消息放入重試隊列,重新發送給 Consumer。

注意:

  • Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。
  • 重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。
  • Consumer 端一定要做好冪等處理。

其實重試 3 次都失敗就可以說明代碼有問題,這時 Consumer 可以把消息存入本地,給 Broker 返回CONSUME_SUCCESS 來結束重試。代碼如下:

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息寫入本地存儲
System.out.println("重試次數超過3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

維度7:事務消息

RocketMQ支持事務消息,整體流程如下圖:

  • Producer 發送 half 消息;
  • Broker 先把消息寫入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的隊列,之后給 Producer 返回成功;
  • Producer 執行本地事務,成功后給 Broker 發送 commit 命令(本地事務執行失敗則發送 rollback);
  • Broker 收到 commit 請求后把消息狀態更改為成功并把消息推到真正的 topic;
  • Consumer 拉取消息進行消費。

代碼如下:

public class ProducerTransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
/**
* 這里執行本地事務,執行成功返回LocalTransactionState.COMMIT_MESSAGE,執行失敗返回
* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
* Broker會回來查詢,所以需要記錄事務執行狀態
*/
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
/**
* 這里查詢事務執行狀態,根據事務狀態返回LocalTransactionState.COMMIT_MESSAGE或
* LocalTransactionState.ROLLBACK_MESSAGE,如果沒有查詢到返回LocalTransactionState.UNKNOW,
* Broker會再次查詢,可以記錄查詢次數,超過次數后返回ROLLBACK_MESSAGE
*/
return LocalTransactionState.UNKNOW;
}
}

維度 8:消息索引

我們知道,RocketMQ 核心的數據文件有 3 個:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一個索引文件,結構如下圖:

查找消息時,首先根據消息 key 的 hashcode 計算出 Hash 槽的位置,然后讀取 Hash 槽的值計算 Index 條目的位置,從Index 條目位置讀取到消息在 CommitLog 文件中的 offset,從而查找到消息。

在 Producer 發送消息時,可以指定一個 key,代碼如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

這樣可以通過 RocketMQ 提供的命令或者管理控制臺來查詢消息是否發送成功。

維度 9:極端情況

如果對消息丟失零容忍,我們必須要考慮極端情況,比如整個 RocketMQ 集群掛了,這時 Producer 端發送消息一定會失敗,可以考慮在 Producer 端做降級,把要發送的消息保存到本地數據庫或磁盤,等 RocketMQ 恢復以后再把本地消息推送出去。

3 總結

在一些特殊的業務場景,比如支付、銀行核算等,需要確保消息不丟失,但是同時也要看到,消息不丟失的方案會大大降低 RocketMQ 的吞吐量,需要綜合考慮。


責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2022-09-26 10:43:13

RocketMQ保存消息

2022-08-01 10:43:11

RocketMQZookeeper注冊中心

2022-08-15 10:45:34

RocketMQ消息隊列

2024-08-06 09:55:25

2020-10-09 06:55:23

監控告警日志

2022-09-26 11:32:14

用戶分層服務業務

2021-03-18 12:16:44

用戶分層業務

2022-03-31 08:26:44

RocketMQ消息排查

2022-07-11 11:06:11

RocketMQ函數.消費端

2022-06-13 11:05:35

RocketMQ消費者線程

2022-12-16 17:15:33

MQRabbitMQ

2022-12-19 17:44:25

MQ技術RabbitMQ

2022-04-25 15:01:07

系統程序員調度

2022-07-04 11:06:02

RocketMQ事務消息實現

2022-06-27 11:04:24

RocketMQ順序消息

2022-09-16 15:42:00

數據Kafka

2020-04-06 14:53:05

MySQL數據庫字符串

2021-04-13 15:51:46

服務治理流量

2021-04-13 18:16:07

多線程安全代碼

2012-07-20 17:24:51

HTML5
點贊
收藏

51CTO技術棧公眾號

日本亚洲欧洲色| 99亚洲视频| 黑人与娇小精品av专区| 国产精品大全| 欧美亚洲另类小说| 精品视频久久| 日韩精品一区二区三区四区 | 这里只有精品66| www.av网站| 一本一本久久| 久久视频在线直播| 深爱五月激情网| 成人午夜888| 欧美日韩亚洲视频一区| 日本亚洲欧洲精品| 亚洲专区第一页| 99在线观看免费视频精品观看| 日韩在线欧美在线| 人妻av一区二区三区| 国模一区二区| 亚洲高清免费观看| 日本欧美色综合网站免费| 在线观看毛片av| 国产欧美亚洲一区| 中文字幕一区电影| 岛国av免费观看| 成人影院网站| 亚洲国产成人av网| 亚洲天堂av免费在线观看| 免费人成在线观看网站| 日本视频免费一区| 欧美一级大片在线免费观看| 99久久精品久久亚洲精品| 免费一级欧美片在线观看网站| 91福利资源站| 动漫av网站免费观看| 午夜成年人在线免费视频| 国产欧美综合在线| 久久久久国产精品视频| 国产wwwxxx| 韩国v欧美v亚洲v日本v| 国产精品欧美日韩久久| 国产午夜无码视频在线观看| 牛牛影视一区二区三区免费看| 欧美一级一区二区| 欧美精品 - 色网| 日韩黄色在线| 欧美男人的天堂一二区| 高清av免费看| 综合在线影院| 91国在线观看| 国产av人人夜夜澡人人爽| 黄毛片在线观看| 亚洲妇女屁股眼交7| 特级黄色录像片| 黄色网址视频在线观看| 久久久久国产成人精品亚洲午夜| 亚洲精品女av网站| 一级黄色a视频| 狠狠色狠狠色综合系列| 国产精品日韩在线| 中文字幕欧美色图| 免费高清不卡av| 国产一区红桃视频| 一级片在线免费观看视频| 狠狠色狠狠色综合| av资源站久久亚洲| 国产国语亲子伦亲子| 国产老肥熟一区二区三区| 99国产视频| 亚洲av成人无码网天堂| 99久久99久久精品国产片果冻 | 精品综合久久久久久97| 少妇久久久久久被弄高潮| 午夜精品影院| 91a在线视频| 日本黄色中文字幕| 精品一区二区三区欧美| 秋霞av国产精品一区| 中文字幕+乱码+中文| 欧美a级一区二区| 国产精品69av| 在线观看视频二区| 国产jizzjizz一区二区| 91久久精品国产91久久性色tv| 亚洲精品一区二区三区不卡| 久久免费视频色| 日韩区国产区| 最新黄网在线观看| 粉嫩老牛aⅴ一区二区三区| 国产中文字幕视频在线观看| 日韩成人亚洲| 精品久久一区二区三区| 给我看免费高清在线观看| 欧美理伦片在线播放| 一区二区三区国产视频| 182在线观看视频| 欧美日韩视频| 国产精品夫妻激情| 成人黄色免费视频| 国产视频一区二区在线观看| 国产日韩第一页| 黄色网页在线播放| 狠狠躁夜夜躁人人爽超碰91| 午夜啪啪小视频| 中国av一区| 久久99久久99精品免观看粉嫩 | 亚洲最大的黄色网址| 国产一区二区精品| 91精品美女在线| 欧美日本网站| 亚洲激情一二三区| 国产aaaaa毛片| 亚洲狼人在线| 日韩经典中文字幕| 欧美精品入口蜜桃| 久久国产精品99久久久久久老狼| 九九99久久| 日本色护士高潮视频在线观看| 色婷婷香蕉在线一区二区| 极品人妻一区二区| 91视频久久| 欧美最猛性xxxx| 亚洲精品.www| 一区二区三区资源| 国模私拍视频在线观看| 精品国产精品| 日本久久久久久久久| 亚洲欧美国产高清va在线播放| 自拍偷拍亚洲综合| 羞羞的视频在线| 理论片一区二区在线| 欧美激情免费观看| 久久久久久少妇| 国产精品亚洲综合一区在线观看| 一区二区三区四区视频在线观看 | 97精品在线视频| 亚洲第一精品网站| 一区二区三区精品在线| 日韩高清在线一区二区| 天天做天天爱天天综合网| 欧美一级片在线播放| www.黄色片| 国产精品久久久久久久午夜片| 久久久999视频| 牛牛影视久久网| 欧美一级高清免费| 黄色在线视频观看网站| 欧美日韩在线影院| 国产性生活毛片| 日韩欧美午夜| 国产精品91久久久| 91视频在线观看| 精品视频一区二区不卡| www.4hu95.com四虎| 日韩电影免费在线| 一区在线电影| 久久av影院| 亚洲色图国产精品| 乱子伦一区二区三区| 93久久精品日日躁夜夜躁欧美| 日本黄xxxxxxxxx100| 9l视频自拍蝌蚪9l视频成人| 久久久久久国产精品三级玉女聊斋| 超碰免费在线97| 亚洲国产精品自拍| 91中文字幕永久在线| 日韩综合小视频| 亚洲天堂av免费在线观看| 视频一区视频二区欧美| 911国产网站尤物在线观看| 久色视频在线| 欧美乱妇23p| 久草视频在线资源站| 91色乱码一区二区三区| 三年中国国语在线播放免费| 999久久久精品国产| 亚洲影院污污.| 免费网站在线观看人| 欧美日韩精品欧美日韩精品| 国产ts在线播放| 国产原创一区二区| www.爱色av.com| 国产精品成人a在线观看| 99re视频在线| 婷婷综合六月| 欧美黑人巨大精品一区二区| 你懂的在线视频| 91精品国产一区二区三区香蕉| 久久9999久久免费精品国产| 99麻豆久久久国产精品免费优播| 男人天堂成人在线| 国产精品啊啊啊| 日本在线免费观看一区| 一区二区三区视频播放| 5566日本婷婷色中文字幕97| wwwxxx在线观看| 制服丝袜成人动漫| 久操视频免费在线观看| 欧美国产乱子伦| 中国免费黄色片| 麻豆视频观看网址久久| 国产一区二区三区乱码| 久操成人av| 国产成人成网站在线播放青青| 欧美18—19sex性hd| 欧美激情一区二区三区久久久| 天天干天天爽天天操| 欧美日韩一区二区在线观看 | 亚洲欧美色视频| 色悠久久久久综合欧美99| 久久久久久久中文字幕| 国产精品三级av在线播放| 欧美精品欧美极品欧美激情| 视频在线在亚洲| 久久久久久www| 日韩中文首页| 日韩精品久久久| 亚洲理论电影| 韩国一区二区三区美女美女秀| 四虎地址8848精品| 琪琪第一精品导航| caoporn视频在线观看| 欧美高清电影在线看| 老司机免费在线视频| 国产小视频91| 黄色av一区二区三区| 欧美一区二区三区日韩视频| 糖心vlog精品一区二区| 色综合一个色综合亚洲| 日本视频免费在线| 香蕉久久一区二区不卡无毒影院 | 日韩美女网站| 中文字幕一区电影| av播放在线| 中文字幕精品国产| av在线首页| 最近2019好看的中文字幕免费 | 伊人久久大香伊蕉在人线观看热v| 国产91网红主播在线观看| 筱崎爱全乳无删减在线观看| 欧美成aaa人片免费看| 九九热视频在线观看| 亚洲电影在线看| 少妇高潮久久久| 欧美视频在线观看一区| 天干夜夜爽爽日日日日| 精品久久久中文| 韩国av免费观看| 亚洲福利视频一区二区| 成人在线观看小视频| 亚洲色图.com| 黄色一级片中国| 一区二区激情小说| 国产亚洲欧美久久久久| 中文字幕欧美国产| 欧美性生给视频| 久久人人爽人人爽| 一二三四国产精品| 国产精品看片你懂得 | 欧美日韩国产中文字幕 | 欧美人与性动交α欧美精品图片| 久久亚洲国产精品| 国产精品69xx| 欧美中文字幕视频| 成人国产在线| 97人人香蕉| 亚洲制服一区| 亚洲一卡二卡三卡| 欧美日韩91| 老子影院午夜伦不卡大全| 一区视频在线| 日本精品www| 免费人成精品欧美精品| www.超碰com| 国产成人自拍在线| 一区二区三区少妇| 欧美激情在线免费观看| av网页在线观看| 国产精品久久久久毛片软件| 精品无码久久久久久久| 欧美视频精品一区| 中文字幕一区2区3区| 91麻豆精品国产91久久久久| 性欧美18一19性猛交| 亚洲欧美日韩久久久久久 | 久久婷婷国产麻豆91天堂| 国产后进白嫩翘臀在线观看视频| 日本sm极度另类视频| 国产精品视频一区二区三区综合| 国内一区二区三区在线视频| 精品视频久久| 无码专区aaaaaa免费视频| 免费观看在线综合色| xfplay5566色资源网站| 亚洲国产高清在线| 欧美人与禽zozzo禽性配| 亚洲高清在线精品| 国产又粗又猛又黄又爽无遮挡| 亚洲精品白浆高清久久久久久| 免费在线国产| 午夜精品视频网站| 精精国产xxxx视频在线野外| 成人午夜一级二级三级| 亚洲成在人线免费观看| 狠狠干视频网站| 美女性感视频久久| 国产十八熟妇av成人一区| 欧美国产视频在线| 国产午夜性春猛交ⅹxxx| 欧美日韩精品欧美日韩精品一| 天天操天天操天天操| 欧美精品在线视频观看| 欧美伊人亚洲伊人色综合动图| 成人欧美一区二区三区视频| 婷婷精品进入| 最近中文字幕一区二区| 国产一区日韩二区欧美三区| 制服丝袜av在线| 欧美性www| 国内精品久久久久影院色| 午夜啪啪小视频| 亚洲国产精品成人久久综合一区| 日韩av电影网| 欧美成人vr18sexvr| 欧美一级做性受免费大片免费| 亚洲精品一区二区三区不| 懂色av一区| 2022国产精品| 91成人精品| 91精品999| 欧美极品xxx| 青青操视频在线播放| 宅男噜噜噜66一区二区66| 国产在线91| 欧美中文字幕第一页| 亚洲深夜福利在线观看| 日韩极品视频在线观看 | www.99av.com| 久久精品男人天堂av| 日韩欧美亚洲国产| 精品成人一区二区三区四区| 黄色网页在线免费观看| 亚洲xxxxx电影| 亚洲激情五月| 韩国一区二区在线播放| 亚洲另类春色国产| 国内精品久久久久久久久久久 | 欧美激情黑人| 国产精品丝袜久久久久久高清| 精品香蕉视频| 91淫黄看大片| 国产精品嫩草影院av蜜臀| 91精品国产综合久| 日韩中文有码在线视频| 日韩第二十一页| 日本xxx免费| 国产精品夜夜爽| 日本少妇性生活| 亚洲欧美www| 99欧美精品| 2021国产视频| 高清shemale亚洲人妖| 日本一区二区欧美| 欧美成人国产一区二区| 欧美aaaaa性bbbbb小妇| 欧洲视频一区二区三区| 麻豆91小视频| 91视频免费在线看| 亚洲国模精品一区| 在线天堂资源| 欧美资源一区| 美女国产一区二区三区| 国产精品suv一区二区88| 日韩一二三四区| 99riav视频在线观看| 日本一区二区在线| 黄网站免费久久| 制服.丝袜.亚洲.中文.综合懂色| 国产午夜精品视频免费不卡69堂| 四虎在线精品| 97碰在线视频| 久久久另类综合| www五月天com| 欧美成人一二三| 久久91精品| 亚洲一区二区福利视频| 亚洲sss视频在线视频| 第九色区av在线| 国产精品流白浆视频| 欧美精品福利| 少妇精品一区二区| 91久久久免费一区二区| 怡红院av在线| 日本高清不卡一区二区三| 国产精品激情| 中文字幕被公侵犯的漂亮人妻| 欧美一区二区二区| 色偷偷色偷偷色偷偷在线视频| dy888午夜|