精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

完美解決,RocketMQ如何支持多事務消息?

開發 前端
本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener? 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。

今天我們將解決使用RocketMQ事務消息時可能遇到的一個常見問題:如何讓其支持多事務消息?

1. 問題背景

在實際開發中,我們常常會面臨多事務消息的場景,例如在DailyMart的訂單模塊中,用戶支付后需要調用庫存服務進行庫存扣減,而在訂單確認收貨后需要調用用戶服務實現積分贈送。這兩個業務邏輯都需要通過事務消息來保證分布式事務。

為了處理這種情況,我們可能會考慮在訂單模塊中創建兩個事務消息監聽器,分別用于處理庫存扣減和積分贈送的事務處理和事務回查。

@Component
@Slf4j
//處理訂單支付的事務監聽器
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //處理訂單支付邏輯
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //檢查訂單處理邏輯
   }
}

@Component
@Slf4j
//處理訂單收貨的事務監聽器
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,當我們信心滿滿地完成業務邏輯編寫并啟動服務時,可能會遇到如下錯誤:rocketMQTemplate already exists RocketMQLocalTransactionListener

圖片圖片

在rocketmq-spring-boot-starter版本低于2.1.0的項目中,可以使用多個 @RocketMQTransactionListener 監聽不同的 txProducerGroup 來發送不同類型的事務消息到topic。然而,從 RocketMQ-Spring 2.1.0 版本開始,注解 @RocketMQTransactionListener 不能設置 txProducerGroup、ak、sk,這些值均需與對應的 RocketMQTemplate 保持一致。通過閱讀源碼 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已經存在了 RocketMQTransactionListener 則會出現上述錯誤。

圖片圖片

2. 如何解決

為了在保證系統只有一個 RocketMQTransactionListener 的前提下實現多事務消息,我們可以將 RocketMQLocalTransactionListener 不處理具體業務邏輯,而是將其作為一個分發器使用。

在生產者發送事務消息時指定對應的事務處理器 ,并將事務處理器放置在消息頭上發送出去,在 RocketMQTransactionListener 中根據消息頭選擇具體的事務處理器來實現業務邏輯。

具體實現如下:

2.1 定義事務消息處理接口

首先,定義公共的事務消息處理接口,所有事務消息都實現此接口而非 RocketMQ 默認的 RocketMQLocalTransactionListener。

public interface TransactionMessageHandler {
    
    /**
    * 執行本地事務
    * @param payload 消息體
    * @param arg 參數
    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);
    
    /**
     * 檢查本地執行狀態
     * @param payload 消息體
     * @return 執行結果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload);
    
}

2.2 修改事務消息發送工具類,指定消息處理器

public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message, Class<? extends TransactionMessageHandler> transactionMessageListener) {  
  if(transactionMessageListener == null){
    throw new IllegalArgumentException("transactionMessageListener must not null");
  }
  
  String destination = buildDestination(topic, tag);

  Message<T> sendMessage = MessageBuilder.withPayload(message)
    .setHeader(RocketMQHeaders.KEYS, message.getKey())
    .setHeader(SOURCE_HEADER, message.getSource())
    .setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())
    .build();
  TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);

  log.info("[{}]事務消息[{}]發送結果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));

  return sendResult;
}

2.3 修改RocketMQ事務消息監聽器

@Slf4j
@RocketMQTransactionListener
public class DefaultRocketMQTransactionListener implements RocketMQLocalTransactionListener {
    
    private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;
    
    public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {
        this.transactionMessageHandlerMap = transactionMessageHandlerMap;
    }
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("消費者收到事務消息[{}]", JSONObject.toJSON(message));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        
        RocketMQLocalTransactionState state;
        Object payload = message.getPayload();
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.executeLocalTransaction(payload, arg);
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消費者收到事務回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));
        String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
        if (null == listenerName) {
            throw new RuntimeException("not params transactionMessageListener");
        }
        RocketMQLocalTransactionState state;
        try {
            TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
            if (null == messageHandler) {
                throw new RuntimeException("not match condition TransactionMessageHandler");
            }
            state = messageHandler.checkLocalTransaction(message.getPayload());
        } catch (Exception e) {
            log.error("rocket transaction message executeLocal error:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        return state;
    }
    
}

在上述代碼中,根據消息頭中的TRANSACTION_MESSAGE_HEADER參數選擇對應的事務處理器來處理事務消息。

在 DailyMart 中有一個公共組件 dailymart-rocketmq-spring-boot-starter 專門用于 RocketMQ 消息發送監聽的封裝,因此我們也將事務消息的處理邏輯封裝到了此組件中。

圖片圖片

2.4 修改事務消息處理邏輯

所有的事務消息處理邏輯都實現 TransactionMessageHandler 接口,以訂單支付的處理邏輯為例:

@Component
@Slf4j
public class OrderPaidTransactionConsumer implements TransactionMessageHandler {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
        ...
    }
    
}

2.5 修改事務消息發送邏輯,指定事務處理器

TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);

小結

本文解決了在 RocketMQ 2.1.0 版本以后,無法簡單使用多個 @RocketMQTransactionListener 的問題。通過引入事務消息處理接口 TransactionMessageHandler,我們將原有的事務處理器改造成了一個分發器,使得在 DailyMart 項目中可以輕松處理多事務消息的場景。

責任編輯:武曉燕 來源: JAVA日知錄
相關推薦

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-10-03 21:41:13

RocketMQKafkaPulsar

2021-04-15 09:17:01

SpringBootRocketMQ

2022-03-31 08:26:44

RocketMQ消息排查

2023-12-21 08:01:41

RocketMQ消息堆積

2021-03-04 06:49:53

RocketMQ事務

2023-09-04 08:00:53

提交事務消息

2025-04-29 04:00:00

分布式事務事務消息

2022-07-04 11:06:02

RocketMQ事務消息實現

2024-06-13 09:25:14

2024-08-06 09:55:25

2014-03-25 10:57:42

Android消息推送方案

2024-10-22 08:01:15

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-12-04 15:38:43

2023-12-15 13:08:00

RocketMQ中間件消費順序

2021-02-02 11:01:31

RocketMQ消息分布式

2022-12-22 10:03:18

消息集成

2023-07-18 09:03:01

RocketMQ場景消息
點贊
收藏

51CTO技術棧公眾號

91禁在线观看| 国产精品九九九九九| 欧美寡妇性猛交xxx免费| av动漫一区二区| 国产精品成人一区| 一级性生活免费视频| 成人18夜夜网深夜福利网| 色菇凉天天综合网| 红桃一区二区三区| 欧美男男同志| 国产98色在线|日韩| 日本一区二区在线播放| 日韩在线观看视频一区二区| 鲁大师精品99久久久| 欧美日韩一区国产| 精品这里只有精品| 黄色片网站在线| 26uuu久久综合| 97夜夜澡人人双人人人喊| 亚洲 欧美 中文字幕| 欧美福利视频| 亚洲天堂网站在线观看视频| 绯色av蜜臀vs少妇| 日本免费在线一区| 日本韩国视频一区二区| 亚洲不卡1区| 日韩中文字幕国产| 国产激情片在线观看| 天堂а√在线8种子蜜桃视频| 蜜桃av噜噜一区二区三区小说| 久久久久久久久久国产精品| 亚洲AV成人无码精电影在线| 日韩精品免费一区二区三区竹菊| 538在线一区二区精品国产| 成年人免费在线播放| 搞黄网站在线看| 亚洲精品免费视频| 亚洲免费久久| 福利片在线看| 国产亚洲综合av| 精品免费视频123区| www.污视频| 国产精品一区一区三区| 成人看片人aa| 91亚洲视频在线观看| 秋霞电影一区二区| 国产精品高清在线| 国产精品久久久久久久久夜色| 亚洲每日更新| 国内精品400部情侣激情| 精国产品一区二区三区a片| 亚洲mv大片欧洲mv大片| 色多多国产成人永久免费网站| 偷拍夫妻性生活| 伊人久久大香线蕉| 国产一区二区三区三区在线观看| 国产伦精品一区二区三区妓女 | 欧美 日韩 国产 成人 在线 91| 激情综合网天天干| 92国产精品视频| 99久久久国产精品无码免费| 国产成人在线视频免费播放| 亚洲中国色老太| 国产黄色片网站| 岛国精品在线播放| 精品一区二区三区日本| 亚洲日本在线播放| 久久久久久久久免费| 美女一区视频| 国产天堂在线| 国产精品对白交换视频| 一本一道久久a久久综合精品| 调教视频免费在线观看| 亚洲免费观看高清完整版在线观看熊| 2025韩国大尺度电影| 久久五月精品中文字幕| 精品久久香蕉国产线看观看gif| 欧美aⅴ在线观看| 91成人抖音| 欧美一区二区三区视频免费| 欧美69精品久久久久久不卡| 日韩极品少妇| 日韩中文综合网| 久久久久久蜜桃| 免费在线日韩av| 成人黄色大片在线免费观看| 国产美女三级无套内谢| 不卡视频免费播放| 欧洲国产精品| 18视频在线观看网站| 精品久久久精品| 日韩一级理论片| 日韩视频在线直播| 亚洲人成在线免费观看| 爱爱视频免费在线观看| 99热在线精品观看| 国产日韩精品在线观看| 欧美自拍偷拍第一页| 国产精品嫩草99a| 无码人妻少妇伦在线电影| 中韩乱幕日产无线码一区| 日韩欧美国产麻豆| 蜜桃传媒一区二区亚洲| 一区免费在线| 国产日韩欧美在线看| 五月天婷婷社区| 亚洲天天做日日做天天谢日日欢 | 精品一区二区在线视频| 久久久久一区二区三区| a级网站在线播放| 色综合天天综合给合国产| 色婷婷狠狠18禁久久| 加勒比久久综合| 欧美激情欧美狂野欧美精品| 综合久久中文字幕| 99国产精品久久久久久久久久久 | 免费在线观看黄色| 色综合中文综合网| 好吊操视频这里只有精品| 日韩片欧美片| 青青草国产精品一区二区| 成 人 免费 黄 色| 国产精品久久久久久妇女6080| 欧美乱大交xxxxx潮喷l头像| 亚洲一区二区三区久久久| 亚洲裸体xxxx| 国产成人一区二区三区影院在线| 国产剧情一区在线| 一区二区成人国产精品| 日韩欧美精品电影| 亚洲精品日韩欧美| 国产精品theporn动漫| 国产精品一区二区果冻传媒| 一区二区视频在线免费| 亚洲电影有码| 国产一区二区三区网站| 好吊色在线视频| 91美女在线视频| 少妇人妻无码专区视频| 国产精品17p| 欧美黑人国产人伦爽爽爽| 一本色道久久综合精品婷婷| 国产欧美日韩精品在线| 免费在线观看毛片网站| 日韩aaa久久蜜桃av| 午夜精品一区二区三区在线播放| 丰满少妇在线观看bd| 亚洲图片欧美综合| 美女搡bbb又爽又猛又黄www| 欧美日韩18| 国产精品视频免费一区二区三区| 少女频道在线观看高清| 精品蜜桃在线看| 国产一级一片免费播放放a| 国产不卡视频在线播放| 免费在线黄网站| 蜜桃久久久久| 91黄色8090| 国产免费av高清在线| 在线欧美一区二区| 开心激情五月网| 国产成人在线视频播放| 成人在线观看你懂的| 久久黄色影视| 国产精品va在线播放我和闺蜜| jizz日韩| 日韩三级高清在线| 日韩精品成人一区| 国产午夜精品一区二区| 成人不卡免费视频| 欧美黄污视频| 久久另类ts人妖一区二区| 欧美成人h版| 精品国产欧美一区二区五十路 | 91成人在线| 欧美剧在线观看| 日本黄色一区二区三区| 91久久国产综合久久| 日本黄色录像视频| 成人免费视频网站在线观看| 国产午夜福利视频在线观看| 不卡一区2区| 91福利视频导航| 中文字幕在线直播| 久久好看免费视频| 无码国产精品一区二区免费16| 欧美特级限制片免费在线观看| 久草视频手机在线| 99久久免费精品高清特色大片| 欧美黄色性生活| 精品动漫av| 亚洲福利av在线| 国产精品中文字幕制服诱惑| 国产精品第10页| 1234区中文字幕在线观看| 国产一区二区三区在线观看网站| 精品国产无码AV| 在线免费观看日本欧美| 久久精品国产亚洲av香蕉| 国产精品亲子乱子伦xxxx裸| 黑丝av在线播放| 精品在线观看视频| 国产一级片黄色| 黄色精品一区| 亚洲日本无吗高清不卡| 秋霞影院一区二区三区| 91老司机在线| 日韩av超清在线观看| 97精品国产97久久久久久春色| 色开心亚洲综合| 亚洲天堂av图片| 日韩在线观看视频一区| 91精品国产91综合久久蜜臀| 夜夜爽妓女8888视频免费观看| 亚洲一区电影777| 疯狂试爱三2浴室激情视频| 久久久久国产成人精品亚洲午夜| 高清中文字幕mv的电影| 激情综合网激情| 日本久久精品一区二区| 一区二区三区精品视频在线观看| 男插女免费视频| 日韩av免费大片| 日本视频精品一区| 精品影片在线观看的网站| 国产日产精品一区二区三区四区| 久久视频免费| 国产在线视频一区| 久久久久久久性潮| 国产精品欧美风情| 巨胸喷奶水www久久久免费动漫| 91国产在线精品| 超碰资源在线| 久久免费福利视频| 操喷在线视频| 97视频国产在线| 麻豆免费在线| 久久久之久亚州精品露出| 日本伦理一区二区| 欧美日韩国产123| 日韩三级免费| 欧美激情视频给我| 国产偷倩在线播放| 久久久久久久久爱| 国产粉嫩在线观看| 欧美性受xxxx黑人猛交| 天堂中文在线播放| 欧美主播福利视频| 欧美亚洲韩国| 国产精品草莓在线免费观看 | 91精品国产一区二区三区动漫| 欧美男女视频| 91久久综合亚洲鲁鲁五月天| 日本高清精品| 国产亚洲欧美另类一区二区三区| 米奇精品关键词| 日本一区不卡| 色喇叭免费久久综合网| 精品少妇人妻av一区二区| 一区二区免费不卡在线| 欧美性猛交内射兽交老熟妇| 国产一区二区三区自拍| 18禁男女爽爽爽午夜网站免费| 天使萌一区二区三区免费观看| 日本熟妇人妻中出| 久久99精品久久久久久国产越南| 日本中文字幕在线不卡| 成人免费毛片片v| 中文字幕一区二区三区人妻| 中文av一区二区| 爱爱视频免费在线观看| 亚洲成av人片观看| 不卡av电影在线| 日韩丝袜美女视频| 亚洲av成人无码网天堂| 国产一区二区三区直播精品电影| 免费高清在线观看| 性日韩欧美在线视频| 成人在线网站| 不卡一区二区三区视频| 综合综合综合综合综合网| 中文字幕一区二区三区5566| 亚洲视屏一区| 嫩草av久久伊人妇女超级a| 国产老肥熟一区二区三区| 亚洲综合自拍网| 亚洲三级久久久| 免费在线不卡视频| 欧美日韩免费视频| 少妇精品视频一区二区| 中文字幕日韩电影| h片精品在线观看| 国产日韩专区在线| 亚洲ab电影| 妞干网这里只有精品| 乱人伦精品视频在线观看| 亚洲成人激情小说| 国产日韩欧美精品电影三级在线| 玖玖爱免费视频| 欧美日韩亚洲不卡| 亚洲人成色777777老人头| y97精品国产97久久久久久| 成人小电影网站| 高清视频一区二区三区| 全球成人免费直播| 欧洲黄色一级视频| 丁香啪啪综合成人亚洲小说 | 日韩欧美亚洲成人| 性生活免费网站| 色婷婷综合久久久久| 在线观看爽视频| 国产精品xxxx| 亚洲一区二区日韩| 熟女少妇精品一区二区| www.色精品| 久久国产精品波多野结衣| 欧美日韩午夜影院| 成人资源www网在线最新版| 97免费中文视频在线观看| 日韩中文字幕| 欧美性视频在线播放| 秋霞午夜av一区二区三区 | 亚洲图片激情小说| 国产精品无码一区| 亚洲日韩中文字幕在线播放| 国产精品vvv| 国产精品久久久久免费| 欧美777四色影| 一级日本黄色片| 国产精品乱码一区二三区小蝌蚪| 中文字幕免费观看| 亚洲人成电影网站色| 松下纱荣子在线观看| 国产乱码精品一区二区三区卡| 欧美理论在线| 亚洲精品乱码久久久久久9色| 亚洲欧洲三级电影| 136福利视频导航| 中文字幕在线日韩 | 精品久久久久av影院| 欧美日韩经典丝袜| 999在线免费观看视频| 欧美一区二区三区免费看| 手机在线视频一区| 亚洲免费电影在线| 国产ts人妖调教重口男| 久久91精品国产91久久跳| 国产精品xxx| 欧美视频第二页| 亚洲老妇色熟女老太| 久久久免费精品| 黄色成人美女网站| 黄色一级视频片| 久久久精品国产免费观看同学| 中文字幕在线播| 色妞一区二区三区| 精品一区91| 久艹在线免费观看| 久久综合色一综合色88| 性色av免费观看| www.欧美精品一二三区| 国产精品亚洲欧美一级在线| 免费的av在线| 91亚洲午夜精品久久久久久| 老熟妇仑乱一区二区av| 中文国产成人精品| 日韩精品视频一区二区三区| www.成年人视频| 久久青草国产手机看片福利盒子| 在线观看国产成人| 九九热这里只有精品6| 国产精品网站在线看| 北条麻妃av高潮尖叫在线观看| 中文字幕一区二区三区四区 | 国产成人综合亚洲网站| 日韩精品久久久久久久| 曰本色欧美视频在线| 国产精品日韩精品在线播放 | 精品少妇一二三区| 日韩理论片久久| 日韩午夜电影免费看| 精品丰满人妻无套内射| 国产欧美一区二区精品性| 国产黄色小视频在线观看| 日本老师69xxx| 午夜日韩电影| 亚洲人成人无码网www国产| 69堂成人精品免费视频| 色偷偷偷在线视频播放| 日本丰满少妇黄大片在线观看| 91美女蜜桃在线| 国产色视频在线| 国产精品久久久久国产a级| 亚洲视频一区| 中日韩一级黄色片| 亚洲欧美中文在线视频| av综合网址| 五月激情五月婷婷| 欧美小视频在线| 天使と恶魔の榨精在线播放|