精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot整合RocketMQ事務/廣播/順序消息

開發 前端
本篇帶給大家SpringBoot整合RocketMQ事務/廣播/順序消息相關知識,希望能夠幫助到你!

[[393265]]

環境:springboot2.3.9RELEASE + RocketMQ4.8.0

依賴

  1. <dependency> 
  2.   <groupId>org.springframework.boot</groupId> 
  3.     <artifactId>spring-boot-starter-web</artifactId> 
  4. </dependency> 
  5. <dependency> 
  6.     <groupId>org.apache.rocketmq</groupId> 
  7.     <artifactId>rocketmq-spring-boot-starter</artifactId> 
  8.     <version>2.2.0</version> 
  9. </dependency> 

配置文件

  1. server: 
  2.   port: 8080 
  3. --- 
  4. rocketmq: 
  5.   nameServer: localhost:9876 
  6.   producer: 
  7.     group: demo-mq 

普通消息

發送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String message) { 
  5.   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); 

接受

  1. @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2"
  2. @Component 
  3. public class ConsumerListener implements RocketMQListener<String> { 
  4.  
  5.     @Override 
  6.     public void onMessage(String message) { 
  7.         System.out.println("接收到消息:" + message) ; 
  8.     } 
  9.  

順序消息

發送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.  
  4. public void sendOrder(String topic, String message, String tags, int id) { 
  5.     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),  
  6.             "order-" + id, new SendCallback() { 
  7.                 @Override 
  8.                 public void onSuccess(SendResult sendResult) { 
  9.                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; 
  10.                 } 
  11.                 @Override 
  12.                 public void onException(Throwable e) { 
  13.                     e.printStackTrace() ; 
  14.                 } 
  15.             }); 

這里是根據hashkey將消息發送到不同的隊列中

  1. @RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",  
  2.     selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) 
  3. @Component 
  4. public class ConsumerOrderListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; 
  9.     } 
  10.  

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。

結果

 

當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:

集群/廣播消息模式

發送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String topic, String message, String tags) { 
  5.     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; 

集群消息模式

消費端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.CLUSTERING

測試

啟動兩個服務分別端口是8080,8081

8080服務

8081服務

集群消息模式下,每個服務分別接收一部分消息,實現了負載均衡

廣播消息模式

消費端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener<String> { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.BROADCASTING

測試

啟動兩個服務分別端口是8080,8081

8080服務

8081服務

集群消息模式下,每個服務分別都接受了同樣的消息。

事務消息

RocketMQ事務的3個狀態

TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息

TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。

TransactionStatus.Unknown :中間狀態,它代表需要檢查消息隊列來確定狀態。

RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程 整體流程為:

正常事務發送與提交階段

1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)

2、服務端響應消息寫入結果,半消息發送成功

3、開始執行本地事務

4、根據本地事務的執行狀態執行Commit或者Rollback操作

事務信息的補償流程

1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求

2、生產者收到確認回查請求后,檢查本地事務的執行狀態

3、根據檢查后的結果執行Commit或者Rollback操作

補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。

發送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void sendTx(String topic, Long id, String tags) { 
  5.     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( 
  6.             new Users(id, UUID.randomUUID().toString().replaceAll("-"""))). 
  7.             setHeader("BID", UUID.randomUUID().toString().replaceAll("-""")).build(),  
  8.             UUID.randomUUID().toString().replaceAll("-""")) ; 

生產者對應的監聽器

  1. @RocketMQTransactionListener 
  2. public class ProducerTxListener implements RocketMQLocalTransactionListener { 
  3.      
  4.     @Resource 
  5.     private BusinessService bs ; 
  6.  
  7.     @Override 
  8.     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  9.         // 這里執行本地的事務操作,比如保存數據。 
  10.         try { 
  11.             // 創建一個日志記錄表,將這唯一的ID存入數據庫中,在下面的check方法中可以根據這個id查詢是否有數據 
  12.             String id = (String) msg.getHeaders().get("BID") ; 
  13.             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; 
  14.             System.out.println("消息內容:" + users + "\t參與數據:" + arg + "\t本次事務的唯一編號:" + id) ; 
  15.             bs.save(users, new UsersLog(users.getId(), id)) ; 
  16.         } catch (Exception e) { 
  17.             e.printStackTrace() ; 
  18.             return RocketMQLocalTransactionState.ROLLBACK ; 
  19.         } 
  20.         return RocketMQLocalTransactionState.COMMIT ; 
  21.     } 
  22.  
  23.     @Override 
  24.     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { 
  25.         // 這里檢查本地事務是否執行成功 
  26.         String id = (String) msg.getHeaders().get("BID") ; 
  27.         System.out.println("執行查詢ID為:" + id + " 的數據是否存在") ; 
  28.         UsersLog usersLog = bs.queryUsersLog(id) ; 
  29.         if (usersLog == null) { 
  30.             return RocketMQLocalTransactionState.ROLLBACK ; 
  31.         } 
  32.         return RocketMQLocalTransactionState.COMMIT ; 
  33.     } 
  34.  

消費端

  1. @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10"
  2. @Component 
  3. public class ConsumerTxListener implements RocketMQListener<Users> { 
  4.  
  5.     @Override 
  6.     public void onMessage(Users users) { 
  7.         System.out.println("TX接收到消息:" + users) ; 
  8.     } 
  9.  

Service

  1. @Transactional 
  2. public boolean save(Users users, UsersLog usersLog) { 
  3.     usersRepository.save(users) ; 
  4.     usersLogRepository.save(usersLog) ; 
  5.     if (users.getId() == 1) { 
  6.         throw new RuntimeException("數據錯誤") ; 
  7.     } 
  8.     return true ; 
  9.      
  10. public UsersLog queryUsersLog(String bid) { 
  11.     return usersLogRepository.findByBid(bid) ; 

Controller

  1. @GetMapping("/tx/{id}"
  2. public Object sendTx(@PathVariable("id")Long id) { 
  3.     ps.sendTx("tx-topic", id, "tag10") ; 
  4.     return "send transaction success" ; 

測試

調用接口后,控制臺輸出:

從打印日志看出來都保存完畢了后 消費端才接受到消息。

刪除數據,再測試ID為1會報錯的。

數據庫中沒有數據。。。

是不是也不是很復雜,2個階段來處理。

完畢!!!

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2023-09-04 08:00:53

提交事務消息

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-04-07 08:43:09

SpringBootRocketMQ開發技術

2021-10-03 21:41:13

RocketMQKafkaPulsar

2023-12-15 13:08:00

RocketMQ中間件消費順序

2024-02-04 09:02:29

RocketMQ項目處理器

2024-04-25 14:27:32

順序消息事務消息

2021-07-13 11:52:47

順序消息RocketMQkafka

2024-11-11 00:00:10

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-04 11:06:02

RocketMQ事務消息實現

2024-06-13 09:25:14

2023-04-12 08:56:37

RocketMQSpring核心業務

2022-01-10 11:58:51

SpringBootPulsar分布式

2025-06-18 07:09:05

2024-10-22 08:01:15

2023-09-26 08:01:46

消費者TopicRocketMQ
點贊
收藏

51CTO技術棧公眾號

久久免费视频1| 欧美成人一二三| 手机看片福利日韩| 男人的天堂在线视频免费观看| 久久精品国产一区二区| 久久中文字幕国产| 亚洲色图14p| 91国产一区| 日韩欧美国产视频| 国产成年人在线观看| 四虎免费在线观看| 老司机精品视频一区二区三区| 欧美激情xxxx| 黄色三级生活片| 超碰在线亚洲| 欧美精品乱码久久久久久| 国产精品专区在线| 美女免费久久| 99精品视频在线观看| 国产日韩精品视频| 中文字字幕在线中文| 一区二区三区网站| 在线观看日韩av| 在线免费观看a级片| 国产激情精品一区二区三区| 色综合天天做天天爱| 亚洲色婷婷久久精品av蜜桃| 国产原创av在线| jiyouzz国产精品久久| 成人做爰www免费看视频网站| 亚洲精品男人的天堂| 91精品国产91久久综合| 中国日韩欧美久久久久久久久| 中国男女全黄大片| 国产一区一区| 欧美日韩的一区二区| 成人综合视频在线| 精品日韩av| 中文字幕一区二区三区不卡在线| 蜜桃av噜噜一区二区三区| 亚洲高清视频网站| 国产一区二区在线观看视频| 国产精品黄页免费高清在线观看| 日韩少妇高潮抽搐| 欧美天天在线| 久久久久久亚洲| 日本天堂中文字幕| 66视频精品| www.久久撸.com| 亚洲天堂av中文字幕| 要久久电视剧全集免费| 日韩精品视频在线| 完美搭档在线观看| 哺乳挤奶一区二区三区免费看| 欧美精品久久一区| 韩国一区二区在线播放| 国产一区二区视频在线看| 欧美丰满少妇xxxbbb| 日本 片 成人 在线| av成人在线播放| 欧美日韩综合色| 超碰超碰在线观看| 久久av影院| 欧美日韩另类一区| 亚洲免费999| 欧美成人一二区| 欧美精品久久一区二区三区| 中文字幕第一页在线视频| 亚州精品国产| 91精品国产91久久综合桃花| 国产在线观看中文字幕| 图片一区二区| 日韩精品一区二区三区在线播放 | xxxx视频在线| 偷窥少妇高潮呻吟av久久免费| 成人免费在线网| 色在线视频观看| 欧美亚洲丝袜传媒另类| 性生生活大片免费看视频| 成人污版视频| 欧美videos大乳护士334| 久久国产免费视频| 欧美美女在线观看| 中文字幕亚洲欧美日韩2019| 日韩成人短视频| 一区久久精品| 国产不卡一区二区在线播放| 中文字幕av免费观看| 精品一区二区三区在线观看 | 久久久久久久久久久久久久久久久久久久| 亚洲国产福利在线| 日本高清www| 久久综合电影| 久久99视频免费| 精品人妻一区二区三区免费看| 蜜臀va亚洲va欧美va天堂| 亚洲一区二区免费| 欧美成人免费| 亚洲欧美日韩人成在线播放| 国产一区二区四区| 97成人超碰| 精品久久久久一区二区国产| 新91视频在线观看| 国产精品videosex极品| 日本精品久久久久久久| 国产精品久久免费| 99久久国产综合精品女不卡| 亚洲国产欧美一区二区三区不卡| 欧美寡妇性猛交xxx免费| 日韩欧美高清视频| 国内精品国产三级国产aⅴ久| 日韩电影不卡一区| 日韩视频中文字幕| 欧美日韩乱国产| 精品一区二区三区影院在线午夜 | 男生操女生视频在线观看| 久久成人福利| 久久视频在线视频| 羞羞色院91蜜桃| 99re成人精品视频| 亚洲精品天堂成人片av在线播放 | 亚洲激情网址| 成人一区二区电影| 狠狠狠综合7777久夜色撩人| 一级精品视频在线观看宜春院| 亚洲不卡视频在线| 天堂网av成人| 午夜免费久久久久| 99精品国产99久久久久久97| 国产欧美va欧美不卡在线| 777777av| 粉嫩一区二区三区四区公司1| 日韩视频亚洲视频| 中文字幕av久久爽| 欧美国产成人精品| 国产性生交xxxxx免费| 天海翼亚洲一区二区三区| 久久久亚洲影院你懂的| 99精品视频免费看| 国产精品高潮久久久久无| 丁香婷婷激情网| 国产一区二区三区四区二区| 538国产精品视频一区二区| 亚洲欧美激情国产综合久久久| 最新国产精品久久精品| 性刺激的欧美三级视频| 国产va免费精品观看精品视频 | 亚洲欧美精品中文字幕在线| 日本一区二区三区免费视频| 懂色av中文字幕一区二区三区| 手机看片日韩国产| 免费观看性欧美大片无片| 久久天天躁日日躁| a级片免费观看| 樱花影视一区二区| 蜜桃色一区二区三区| 亚洲经典视频在线观看| 精品国产区在线| 竹内纱里奈兽皇系列在线观看 | 香蕉视频在线看| 欧美区一区二区三区| 亚洲AV成人无码精电影在线| 久久99在线观看| 国产树林野战在线播放| 国产免费av国片精品草莓男男| 久久亚洲精品中文字幕冲田杏梨 | 日韩av一级| 中文字幕一精品亚洲无线一区| 中文字幕免费播放| 中文字幕亚洲一区二区va在线| 五月激情婷婷在线| 欧美日本一区二区视频在线观看 | 日韩美女中文字幕| 9色在线观看| 欧美精品丝袜中出| 国产极品国产极品| av电影一区二区| 日本一区二区黄色| 日韩成人免费| 不卡的av一区| 涩涩视频在线播放| 最近中文字幕日韩精品 | 华人av在线| 国产亚洲一区二区在线| 国产露脸国语对白在线| 亚洲二区在线观看| www.av天天| 国产精一品亚洲二区在线视频| 菠萝蜜视频在线观看入口| 女同另类激情重口| 国产精品老牛影院在线观看| av网站网址在线观看| 亚洲国产日韩欧美综合久久| 中文天堂在线资源| 亚洲一区二区三区美女| 国产jk精品白丝av在线观看| 狠狠色狠狠色合久久伊人| 久久99久久久久久| 成人无号精品一区二区三区| 97人人澡人人爽| free欧美| 久久噜噜噜精品国产亚洲综合| 国产三级在线免费观看| 日韩免费观看高清完整版在线观看| 黄色在线免费观看| 一区二区三区四区在线免费观看| 美女脱光内衣内裤| 国产成a人亚洲精品| 黄色三级视频片| 99国产精品私拍| 欧美三级午夜理伦三级老人| 要久久电视剧全集免费 | 西野翔中文久久精品国产| 国产日韩在线免费| 视频在线日韩| 69久久夜色精品国产69| 国产欧美黑人| 在线播放日韩专区| 亚洲欧美丝袜中文综合| 日韩欧美国产小视频| 国产精品自拍第一页| 香蕉成人伊视频在线观看| 国产性生活大片| 国产亚洲一区二区三区在线观看| 国产原创剧情av| 国产不卡高清在线观看视频| 久热在线视频观看| 日本亚洲三级在线| 无码精品国产一区二区三区免费| 国产精品jizz在线观看美国| 一区二区免费在线视频| 精品国产一区二区三区小蝌蚪| 国新精品乱码一区二区三区18| 白嫩亚洲一区二区三区| 国产精品视频一区二区三区四| 久草在线资源福利站| 久久久人成影片一区二区三区| a视频在线观看免费| 日韩性生活视频| 99re在线视频| 最近的2019中文字幕免费一页| 国产免费av高清在线| 亚洲欧美日韩天堂| 可以在线观看的av| 亚洲香蕉伊综合在人在线视看| 深夜福利在线看| 精品视频久久久久久| 同心难改在线观看| 日韩电影在线观看永久视频免费网站| 天堂在线中文网| 日韩av最新在线观看| 手机亚洲第一页| 亚洲欧洲成视频免费观看| 欧美男男同志| 正在播放欧美一区| 免费在线看黄色| 欧美美女操人视频| 后进极品白嫩翘臀在线播放| 久久久久久国产精品久久| 欧洲成人综合网| 91国产视频在线| 成人影院av| 国产精品中文字幕在线| 成人噜噜噜噜| 国产亚洲情侣一区二区无| 欧美在线关看| 无遮挡亚洲一区| 亚洲欧美日韩高清在线| 国产精品无码免费专区午夜| 激情久久五月| 欧美 激情 在线| 麻豆免费看一区二区三区| 91精品视频国产| 成人av午夜电影| 国产高潮呻吟久久| 成人免费一区二区三区视频| avove在线播放| 欧美日韩亚洲高清| 一区二区www| 精品久久久久久久久久久院品网| 熟妇人妻av无码一区二区三区| 亚洲免费视频在线观看| 日本在线人成| 久久免费国产视频| 国产精品亚洲d| 99re在线观看| 国模精品一区| 粉嫩av一区二区三区天美传媒 | 欧美一区二区免费在线观看| 91麻豆成人久久精品二区三区| 国精产品视频一二二区| 亚洲在线中文字幕| 国产乱码77777777| 日韩欧美久久久| 国产二区在线播放| 欧美日韩高清区| 97精品国产99久久久久久免费| 91av一区二区三区| 最新亚洲精品| 精品人妻大屁股白浆无码| 视频精品一区二区| 不许穿内裤随时挨c调教h苏绵| 欧美国产精品中文字幕| 久久免费视频精品| 欧美日韩激情一区| 深夜福利视频在线观看| 久久资源免费视频| 国模视频一区| 精品免费视频123区| 亚洲经典一区| 欧美日韩在线免费播放| 成人性生交大合| 亚洲二区在线播放| 欧美在线看片a免费观看| 蜜桃av噜噜一区二区三区麻豆| 在线观看日韩www视频免费| 日韩精品av| 国产精品一区二区在线观看 | 久久精品色欧美aⅴ一区二区| 筱崎爱全乳无删减在线观看| 999日本视频| 99久久99视频只有精品| 91av俱乐部| 91丨九色porny丨蝌蚪| 69av视频在线| 欧美一区二区在线免费播放| 国产三级视频在线播放线观看| 久久人人爽人人爽人人片av高请| 国产激情精品一区二区三区| 日韩资源av在线| 欧美亚洲自偷自偷| 免费黄色三级网站| 亚洲香肠在线观看| 丰满少妇高潮在线观看| 久久精品国产综合| 日韩精品一页| 色大师av一区二区三区| 快she精品国产999| 在线免费看黄视频| 欧美性xxxx| 天堂中文资源在线| 91av视频在线播放| 日韩mv欧美mv国产网站| 欧美二区在线视频| 99久久精品久久久久久清纯| 国产亚洲精品久久久久久无几年桃 | 免费av观看网址| 99re6这里只有精品视频在线观看| 国产五月天婷婷| 日韩av网址在线观看| www在线观看黄色| 精品欧美国产| 日韩国产欧美三级| 成人信息集中地| 在线播放91灌醉迷j高跟美女| 欧美日韩xx| 91久久伊人青青碰碰婷婷| 午夜视频一区| 国产精品成人免费一区久久羞羞| 亚洲国产你懂的| 无套内谢的新婚少妇国语播放| 91精品国产高清| 国产一区不卡| 色免费在线视频| 亚洲精品高清在线| 午夜性色福利视频| 日本在线观看天堂男亚洲| 精品国产一区探花在线观看| 99热这里只有精品在线播放| 国产精品不卡视频| 国产高清免费av| 91av在线播放视频| 欧美日韩高清| 国产xxx在线观看| 色综合久久综合网欧美综合网| 91社区在线观看| 成人高清在线观看| 老色鬼久久亚洲一区二区| 色偷偷男人天堂| 精品少妇一区二区三区视频免付费 | 免费观看成人网| 亚洲欧洲国产日韩| 日韩在线观看视频网站| 青青青国产精品一区二区| 久久一区二区三区电影| 熟女人妻一区二区三区免费看| 欧美日韩国产一区二区三区| 成人h小游戏| 国产精品免费一区二区三区| 国产精品外国| 国语对白在线播放| 国产丝袜视频一区| 国产日韩一区二区三免费高清| 亚洲熟妇av日韩熟妇在线| 国产精品青草久久| 五月天久久久久久| 91精品中文在线| 丝袜亚洲精品中文字幕一区| 特级片在线观看| 一本色道久久综合狠狠躁篇怎么玩|