精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot分布式事務之可靠消息最終一致性

開發 前端
如果broker未收到消息(如果執行本地事務突然宕機了,相當執行本地事務(executeLocalTransaction)執行結果返回unknow,則和broker未收到確認消息的情況一樣處理。

環境:springboot2.3.9 + RocketMQ4.8.0

可靠消息最終一致性原理

圖片

  • 執行流程
  1. Producer發送Prepare message到broker。
  2. Prepare Message發送成功后開始執行本地事務。
  3. 如果本地事務執行成功的話則返回commit,如果執行失敗則返回rollback。(這個是在事務消息的回調方法里由開發者自己決定commit or rollback)
  4. Producer發送上一步的commit還是rollback到broker,這里有以下兩種情況:

1、如果broker收到了commit/rollback消息 :

如果收到了commit,則broker認為整個事務是沒問題的,執行成功的。那么會下發消息給Consumer端消費。

如果收到了rollback,則broker認為本地事務執行失敗了,broker將會刪除Half Message,不下發給Consumer端。

2、如果broker未收到消息(如果執行本地事務突然宕機了,相當執行本地事務(executeLocalTransaction)執行結果返回unknow,則和broker未收到確認消息的情況一樣處理。):

broker會定時回查本地事務的執行結果:如果回查結果是本地事務已經執行則返回commit,若未執行,則返回unknow。

Producer端回查的結果發送給Broker。Broker接收到的如果是commit,則broker視為整個事務執行成功,如果是rollback,則broker視為本地事務執行失敗,broker刪除Half Message,不下發給consumer。如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重復回查,以確保查到最終的事務結果。重復回查的時間間隔和次數都可配。

工程結構

圖片圖片

建立父子工程,兩個子項目account-manager,integral-manager。

依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Account子模塊

  • 配置文件
server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
  • 業務實體類
// 用戶表
@Entity
@Table(name = "t_account")
public class Account {
  @Id
  private Long id;
  private String name ;
}
// 業務記錄表(用來查詢去重)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
  @Id
  private Long txid;
  private Date createTime ;
}
  • DAO相關類
public interface AccountRepository extends JpaRepository<Account, Long> {
}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {
}
  • Service相關類
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
  
// 該方法保存業務數據,同時保存操作記錄;操作記錄用來回查。
@Transactional
public boolean register(Account account) {
  accountRepository.save(account) ;
  AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
  accountLogRepository.save(accountLog) ;
  return true ;
}
  
public AccountLog existsTxId(Long txid) {
  return accountLogRepository.findById(txid).orElse(null) ;
}
  • 發送消息方法
@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public String sendTx(String topic, String tags, Account account) {
  String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
  TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
      setHeader("tx_id", uuid).build(), uuid) ;
  return result.getSendStatus().name() ;
}
  • 消息監聽(生產者監聽)
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
  
  @Resource
  private AccountService accountService ;


  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      accountService.register(account) ;
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  // 這里檢查本地事務是否執行成功
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      System.out.println("執行查詢ID為:" + account.getId() + " 的數據是否存在") ;
      AccountLog accountLog = accountService.existsTxId(account.getId()) ;
      if (accountLog == null) {
        return RocketMQLocalTransactionState.UNKNOWN ;
      }
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.UNKNOWN ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


}
  • Controller接口
@RestController
@RequestMapping("/accounts")
public class AccountController {
  @Resource
  private ProducerMessageService messageService ;
  @PostMapping("/send")
  public Object sendMessage(@RequestBody Account account) {
    return messageService.sendTx("tx-topic", "mks", account) ;
  }
}

Integral子模塊

  • 業務實體類
@Entity
@Table(name = "t_integral")
public class Integral {
  @Id
  private Long id;
  private Integer score ;
  private Long acccountId ;
}
  • DAO相關類
public interface IntegralRepository extends JpaRepository<Integral, Long> {
}
  • Service相關類
@Resource
private IntegralRepository integralRepository ;
  
@Transactional
public Integral saveIntegral(Integral integral) {
  return integralRepository.save(integral) ;
}
  • 消息監聽
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {


  @Resource
  private IntegralService integralService ;
  
  @SuppressWarnings("unchecked")
  @Override
  public void onMessage(String message) {
    System.out.println("Integral接收到消息:" + message) ;
    try {
      Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class) ;
      Integer id = (Integer) jsonMap.get("id") ;
      integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
    } catch (Exception e) {
      throw new RuntimeException(e) ;
    }
  }


}

測試

分別啟動兩個子模塊

  • 初始數據表

圖片圖片


  • Postman測試

圖片圖片

Account模塊

圖片圖片

Integral模塊

圖片圖片


當子模塊Account執行本地事務發生錯誤時,事務會回滾并且刪除消息。子模塊Integral并不會收到消息。

責任編輯:武曉燕 來源: 實戰案例錦集
相關推薦

2021-06-16 08:33:02

分布式事務ACID

2022-07-21 06:54:28

微服務系統RocketMQ

2025-08-05 04:22:00

2022-12-19 19:12:17

分布式事務

2019-10-11 23:27:19

分布式一致性算法開發

2019-09-05 08:43:34

微服務分布式一致性數據共享

2021-11-22 16:30:30

分布式一致性分布式系統

2015-10-19 10:42:37

分布式一致性應用系統

2024-01-31 09:54:51

Redis分布式

2024-06-04 10:58:30

2020-05-11 10:30:57

2PC分布式協議

2017-09-21 10:59:36

分布式系統線性一致性測試

2024-11-28 10:56:55

2021-07-28 08:39:25

分布式架構系統

2021-06-03 15:27:31

RaftSOFAJRaft

2022-06-07 12:08:10

Paxos算法

2025-06-09 08:00:37

分布式文件系統

2021-06-06 12:45:41

分布式CAPBASE

2017-09-22 12:08:01

數據庫分布式系統互聯網

2020-10-28 11:15:24

EPaxos分布式性算法
點贊
收藏

51CTO技術棧公眾號

日日碰狠狠丁香久燥| 国产精品国产三级欧美二区| 色www亚洲国产阿娇yao| 亚洲男人在线| 亚洲国产精品视频| 欧美日产一区二区三区在线观看| 欧美性受xxx黑人xyx性爽| 国产精品99在线观看| 亚洲第一精品福利| 亚洲77777| heyzo在线播放| 欧美韩国一区二区| 翡翠波斯猫1977年美国| 最新在线中文字幕| 尤物在线精品| xxx成人少妇69| 国产中文字幕一区二区| 国内精品视频| 在线看一区二区| av网站大全免费| 日本电影全部在线观看网站视频| eeuss国产一区二区三区| 国产精品视频一区二区三区四| 久久久一区二区三区四区| 久久精品国产www456c0m| 亚洲国产高清自拍| 国产精品999.| 欧美爱爱视频| 色婷婷av久久久久久久| 欧美成人高潮一二区在线看| 岛国成人毛片| 中文字幕在线一区二区三区| 欧美一区二视频在线免费观看| 亚洲免费成人在线| 国产精品一区二区果冻传媒| 国产精品久久久精品| 国内自拍视频在线播放| 亚洲激情黄色| 欧美激情一区二区三区久久久 | 狠狠久久伊人| 日韩一区二区在线观看| 999在线观看| 欧美日韩五区| 欧美色精品在线视频| 99福利在线观看| 自拍一区在线观看| 五月婷婷色综合| 成人性生活视频免费看| 俺来俺也去www色在线观看| 亚洲美女屁股眼交3| 日本老太婆做爰视频| 97caopor国产在线视频| 亚洲三级小视频| 黄色a级在线观看| 日韩大片在线永久免费观看网站| 国产精品色噜噜| 一区二区三区四区视频在线| 午夜免费播放观看在线视频| 国产精品久久午夜夜伦鲁鲁| 亚洲一区二区三区免费观看| 最近高清中文在线字幕在线观看| 中文文精品字幕一区二区| 五月婷婷一区| 蜜桃视频网站在线观看| 亚洲精品一二三| 日韩成人手机在线| 漫画在线观看av| 日本道色综合久久| www.xxx亚洲| 国产亚洲欧美日韩精品一区二区三区 | 日韩国产精品视频| 性高潮久久久久久久| 免费成人高清在线视频theav| 亚洲人成欧美中文字幕| 免费一级特黄3大片视频| 羞羞答答成人影院www| 欧美肥婆姓交大片| 亚洲 欧美 日韩 综合| 老牛嫩草一区二区三区日本| 成人免费福利在线| 亚洲AV无码乱码国产精品牛牛 | 午夜亚洲一区| 国产精品久久久久久久久粉嫩av| 国产精选久久久| 成人免费福利片| 欧洲国产精品| 永久免费网站在线| 欧美色视频日本版| 奇米视频7777| 欧美激情极品| 色多多国产成人永久免费网站 | 亚洲黄色三级| 国产精品男人的天堂| 国产丰满美女做爰| 国产亚洲欧美色| 少妇高潮大叫好爽喷水| 在线毛片观看| 日韩欧美中文字幕精品| 巨胸大乳www视频免费观看| 91青青国产在线观看精品| 久久久久久午夜| 中文字幕永久在线观看| av福利精品导航| 成人短视频在线看| 亚洲女同av| 日韩午夜激情视频| 妖精视频在线观看免费| 亚洲国产高清一区二区三区| 国产一区视频在线播放| 欧洲综合视频| 亚洲五码中文字幕| 亚洲人视频在线| 亚洲综合小说图片| 久久久久亚洲精品成人网小说| 中文字幕1区2区3区| av电影在线观看完整版一区二区| 亚洲国产精品久久久久婷婷老年| 国产又色又爽又黄刺激在线视频| 欧美日韩精品一区二区| 波多野结衣办公室33分钟| 欧美日韩蜜桃| 国产在线一区二区三区| 久久久久久久影视| 精品久久香蕉国产线看观看亚洲| 日本中文字幕在线不卡| 日本一区二区在线看| 91精品国产91久久久久久最新| 国产三区在线播放| 中文字幕一区二区三区在线不卡| 日本精品久久久久中文字幕| 好吊妞国产欧美日韩免费观看网站 | 欧美日韩色婷婷| 蜜臀av粉嫩av懂色av| 国产精品v日韩精品v欧美精品网站 | 国产韩日精品| 亚洲视频一区二区| 免费在线不卡视频| caoporn国产一区二区| 日韩精品一区二区免费| 日韩视频在线直播| 欧美黑人一区二区三区| www.色视频| 一区二区三区高清在线| 奇米777在线| 综合久久婷婷| 91免费看蜜桃| 欧美xxxx免费虐| 精品成人一区二区| 91精品国产乱码在线观看| 不卡的av中国片| 北条麻妃在线视频观看| 欧美亚洲国产日韩| 日韩av免费网站| 国产天堂在线| 欧美片在线播放| 在线观看美女av| 国产伦理精品不卡| 欧美一级爱爱视频| 国产精品毛片av| 69视频在线播放| 你懂的视频在线免费| 色综合久久综合网欧美综合网| 魔女鞋交玉足榨精调教| 免费观看在线综合| www.午夜色| 精品中文字幕一区二区三区| 久久久久久国产精品| 午夜视频福利在线观看| 91黄色免费看| 精品在线观看一区| 国产成人综合自拍| 免费国产a级片| 欧美精品一区二区三区中文字幕| 国产精品91在线| 黄色网页在线播放| 精品国产伦一区二区三区免费| 青青操免费在线视频| 国产农村妇女毛片精品久久麻豆| 国产福利精品一区二区三区| 欧美激情精品久久久六区热门| 国外成人在线视频网站| 88xx成人永久免费观看| 久久人人爽亚洲精品天堂| 人妻91麻豆一区二区三区| 日韩欧美中文第一页| 日韩一卡二卡在线观看| av毛片久久久久**hd| xx欧美撒尿嘘撒尿xx| 狠久久av成人天堂| 日本精品一区二区三区高清 久久| a一区二区三区亚洲| 午夜精品一区二区三区av| 成人精品一区二区三区免费| 精品国内二区三区| 怡春院在线视频| 亚洲va中文字幕| 亚欧精品视频一区二区三区| 成人免费高清在线| 污污网站在线观看视频| 亚洲一级在线| 日本天堂免费a| 成人a'v在线播放| 国产高清不卡av| 九七电影院97理论片久久tvb| 国内揄拍国内精品| 日本电影在线观看网站| 日韩成人网免费视频| 国产免费一区二区三区最新不卡| 欧美日韩国产一区二区三区| 任我爽在线视频| 国产亚洲一区二区三区四区| 免费观看污网站| 精品一区二区三区香蕉蜜桃| 欧在线一二三四区| a级毛片免费观看在线| 亚洲欧美高清| 六月婷婷久久| 亚洲一区二区三区四区电影| 国产精品久久久久99| 欧美a级在线观看| 欧美日本亚洲视频| 午夜伦理在线| 亚洲最新中文字幕| 无码国精品一区二区免费蜜桃| 欧美一级国产精品| 亚洲影视一区二区| 欧美亚洲动漫另类| 国内自拍视频在线播放| 亚洲国产一区二区a毛片| 97在线观看免费高| 亚洲私人黄色宅男| 午夜激情福利电影| 国产精品每日更新在线播放网址| 亚洲av无码国产精品麻豆天美| 91最新地址在线播放| 欧美日韩人妻精品一区在线| 成人性视频免费网站| xxxx国产视频| 国产suv精品一区二区6| 深夜做爰性大片蜜桃| 国产乱国产乱300精品| 特级黄色片视频| 国产乱色国产精品免费视频| 青娱乐国产精品视频| 国产一区不卡视频| 韩国一区二区三区四区| 国产成人自拍在线| 日本wwwwwww| 成人黄色av电影| 波多野结衣有码| 99久久国产综合精品麻豆 | 成人午夜激情在线| 日本一区二区免费视频| av一区二区久久| 37p粉嫩大胆色噜噜噜| 久久久久高清精品| 成人小视频免费看| 综合久久久久久| 麻豆国产尤物av尤物在线观看| 亚洲在线视频网站| 中文字幕第15页| 欧美午夜电影在线播放| 91九色蝌蚪91por成人| 欧美一区二区三区视频| 亚洲高清精品视频| 亚洲精品久久久久中文字幕二区 | 国产黄网在线观看| 欧美日韩在线播放三区| a级片免费观看| 日韩成人在线视频观看| 国产精品一区二区婷婷| 色偷偷av一区二区三区| a毛片在线播放| 97在线视频免费看| 日韩和的一区二在线| 91精品视频观看| 另类ts人妖一区二区三区| 欧美日韩中文国产一区发布| 天天超碰亚洲| 妞干网在线观看视频| 免费精品视频在线| 欧美成人精品一区二区综合免费| 久久综合久久久久88| 日韩精品久久久久久久的张开腿让| 一区二区三区在线观看视频| 在线能看的av| 欧美日韩dvd在线观看| 亚洲欧美激情在线观看| 亚洲人成网在线播放| www久久日com| 国产精彩精品视频| 秋霞影院一区| 亚洲国产午夜伦理片大全在线观看网站 | 久久久夜色精品| 在线观看中文字幕不卡| 精品国产伦一区二区三| 国产亚洲一区二区精品| 狂野欧美性猛交xxxxx视频| 秋霞午夜一区二区| 天堂va在线高清一区| 欧美日韩亚洲在线| 欧美日韩国产探花| 亚洲精品www.| 久久亚洲免费视频| 手机在线免费看片| 欧美视频在线观看免费网址| 最近中文字幕免费观看| 亚洲精品短视频| 天堂8中文在线| 国产日韩欧美日韩| 中文字幕亚洲影视| 久久这里只有精品23| 国产真实乱子伦精品视频| 国产人妻大战黑人20p| 亚洲成精国产精品女| 国产免费叼嘿网站免费| 在线不卡国产精品| 在线日韩影院| 好看的日韩精品| 欧美网站在线| 欧美体内she精高潮| 国产精品视频九色porn| 亚洲欧美另类在线视频| 日韩成人在线视频网站| 9999在线视频| 成人毛片网站| 欧美另类女人| 五月天六月丁香| 亚洲男人的天堂av| 在线观看免费黄色小视频| 国产亚洲欧美一区| 日韩在线短视频| 欧洲一区二区在线观看| 久久精品123| 中文字字幕码一二三区| 欧美视频专区一二在线观看| 日本成人动漫在线观看| 午夜精品久久久久久久久久久久久| 伊人久久影院| 日韩精品一区在线视频| 成人h精品动漫一区二区三区| 国产亚洲小视频| 精品美女被调教视频大全网站| 图片区小说区亚洲| 国产精品三区在线| 欧美日本三区| 中文字幕第3页| 午夜欧美大尺度福利影院在线看| 亚洲第一色网站| 久久久久久网址| 日本欧美韩国国产| 国产免费毛卡片| 久久精品视频网| 中文字幕理论片| 日韩在线视频免费观看| 91精品国产色综合久久不卡粉嫩| 法国空姐在线观看免费| 成人污污视频在线观看| 91在线看视频| 中文精品99久久国产香蕉| 日韩在线激情| 中文字幕欧美日韩一区二区三区| 狠狠色狠狠色综合系列| 免费无遮挡无码永久在线观看视频 | 欧美人与牛zoz0性行为| 午夜免费精品视频| 亚洲欧美激情视频在线观看一区二区三区| 精品黑人一区二区三区国语馆| 久久久久久97| 精品视频黄色| 曰本三级日本三级日本三级| 天天综合天天综合色| 国产九九在线| www.成人av| 久久xxxx精品视频| 搜索黄色一级片| 亚洲国产欧美一区二区丝袜黑人| 日韩精品影片| www.亚洲成人网| 国产色一区二区| 精品国自产在线观看| 全亚洲最色的网站在线观看| 欧美大片aaaa| 在线免费观看a级片| 欧美日韩aaaaa| 多野结衣av一区| 亚洲最新免费视频| 99精品国产99久久久久久白柏| www.亚洲激情| 欧美极品少妇xxxxⅹ裸体艺术| 九九热线有精品视频99| 夜夜爽久久精品91| 色婷婷激情综合| 欧美草逼视频| 深田咏美在线x99av| 国产白丝网站精品污在线入口| 日韩综合在线观看| 欧美激情亚洲一区| 第一社区sis001原创亚洲| 7788色淫网站小说|