精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

深度解析:基于 RocketMQ 實現(xiàn)分布式事務(wù)的技術(shù)實踐與原理探究

開發(fā)
本文我們嘗試基于RocketMQ實現(xiàn)下單的分布式的事務(wù)。可能會有讀者會有疑問,之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?

在上一篇文章Spring Boot自動裝配原理以及實踐我們完成了服務(wù)通用日志監(jiān)控組件的開發(fā),確保每個服務(wù)都可以基于一個注解實現(xiàn)業(yè)務(wù)功能的監(jiān)控。 而本文我們嘗試基于RocketMQ實現(xiàn)下單的分布式的事務(wù)。可能會有讀者會有疑問,之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?

我們的再來回顧一下我們下單功能大抵是做以下三件事情:

  • 創(chuàng)建訂單,將訂單記錄存到數(shù)據(jù)庫中。
  • 扣款,記錄用戶扣款后錢包所剩下的額度。
  • 扣除商品庫存,并發(fā)放商品。

我們將該場景放到高并發(fā)場景下,這個功能勢必要考慮性能和可靠性問題,所以我們在業(yè)務(wù)需求清楚明了的情況下,就希望能有一種方式確保下單功能在高并發(fā)場景保證性能、可靠性。 而Seata的AT模式確實可以保證最終一致性,但是seata的AT模式本質(zhì)上是依賴于global_table、branch_table等數(shù)據(jù)表維護應(yīng)用層分布式事務(wù),在操作期間會涉及大量的更新和刪除操作,隨著時間的推移還是會出現(xiàn)大量的索引碎片,導(dǎo)致索引性能下降。

所以我們就考慮采用RocketMQ實現(xiàn)分布式事務(wù),盡管RocketMQ對于分布式事務(wù)的實現(xiàn)業(yè)務(wù)侵入性相對強一些,但它可以保證業(yè)務(wù)層面的功能解耦從而提升并發(fā)性能,且RocketMQ還對消息消費可靠性做了許多不錯的優(yōu)化,例如:失敗重試、死信隊列等,所以我們還是嘗試使用RocketMQ來改良我們的下單分布式事務(wù)問題。

一、詳解RocketMQ落地分布式事務(wù)案例

1. 需求說明

用戶下單大抵需要在三個服務(wù)中完成:

  • 訂單服務(wù)完成訂單創(chuàng)建,基于用戶傳入的產(chǎn)品編碼、用戶編碼、產(chǎn)品購買數(shù)生成訂單信息,對應(yīng)的調(diào)用參數(shù)如下:
{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}
  • 基于入?yún)⒌挠脩舸a定位到用戶錢包金額,完成賬戶扣款。
  • 基于產(chǎn)品和購買數(shù)完成庫存扣減。

這其中會跨域三個服務(wù),分別是訂單服務(wù)創(chuàng)建訂單、賬戶服務(wù)扣款、商品服務(wù)扣減庫存。

2. 落地思路

以我們業(yè)務(wù)為最終目標,RocketMQ實現(xiàn)分布式事務(wù)的原理是基于2PC的,流程大抵如下:

  • 訂單服務(wù)發(fā)送一個事務(wù)消息到消息隊列,消息內(nèi)容就是我們的訂單信息,這里面包含用戶賬號、購買的產(chǎn)品代碼、購買產(chǎn)品數(shù)量等數(shù)據(jù)。
  • MQ收到half消息,并回復(fù)ack確認。
  • 生產(chǎn)者(訂單服務(wù)order-service)得知我們發(fā)送的消息已被收到,訂單服務(wù)則執(zhí)行本地事務(wù)并提交事務(wù),即將訂單信息寫入數(shù)據(jù)庫中,同時在該事務(wù)內(nèi)將訂單插入結(jié)果寫入transaction_log表中。
  • 生產(chǎn)者(訂單服務(wù)order-service)完成本地事務(wù)的提交,告知MQ將事務(wù)消息commit,此時消費者就可以消費這條消息了,注意若生產(chǎn)者消費失敗,則將消息rollback,一切就當沒有發(fā)生過。
  • 如果上述的消息是commit則將消息持久化到commitLog中,以便后續(xù)MQ宕機或者服務(wù)宕機后依然可以繼續(xù)消費這條沒有被消費的消息。
  • (非必要步驟)若MQ長時間沒有收到生產(chǎn)者的commit或者rollback的信號,則攜帶事務(wù)id找生產(chǎn)者查詢transaction_log索要當前消息狀態(tài),如果看到對應(yīng)的消息則判定生產(chǎn)者事務(wù)成功將消息commit給消費者消費,若沒看到則說明生產(chǎn)者本地事務(wù)執(zhí)行失敗,回滾該消息。
  • 消費者即我們的用戶服務(wù)或者庫存服務(wù)收到消息則執(zhí)行本地事務(wù)并提交,若失敗則會不斷重試,直到達到上限則將消息存到死信隊列并告警。
  • 人工介入查看死信隊列查看失敗消息手工補償數(shù)據(jù)。

二、實踐-基于RocketMQ實現(xiàn)分布式事務(wù)

1. 部署RocketMQ(Linux環(huán)境)

在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個版本:https://rocketmq.apache.org/download/。

完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時啟動時腳本會拋出runserver.sh: 70: [[: Exec format error錯誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報錯找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:

以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}

完成后鍵入./mqnamesrv &將MQ啟動,如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動broker,需要注意的是默認情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

2. 服務(wù)引入MQ完成下單功能開發(fā)

(1) 服務(wù)引入RocketMQ依賴

完成RocketMQ部署之后,我們就可以著手編碼工作了,首先我們要在在三個服務(wù)中引入RocketMQ的依賴,由于筆者的spring-boot版本比較老,所以這里筆者為了統(tǒng)一管理在父pom中指定了mq較新的版本號:

<!--rocketmq-->
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

然后我們分別對order、account、product三個服務(wù)中引入依賴:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
           
        </dependency>

(2) 注冊中心配置RocketMQ信息

由于我們的分布式事務(wù)涉及3個服務(wù),而且mq的消費模式采用的是發(fā)布訂閱模式,所以我們的生產(chǎn)者(order-service)和消費者(account-serivce)都配置為cloud-group

rocketmq.name-server=172.29.193.12:9876
# 指定消費者組
rocketmq.producer.group=cloud-group

之所以沒有沒將消費者2(product-service)也配置到cloud-group中的原因也很簡單,同一個消息只能被同一個消費者組中的一個成員消費,假如我們的將product-service配置到同一個消費者組中就會出現(xiàn)因一條消息只能被一個服務(wù)消費而導(dǎo)致product-service收不到消息。

對此我們實現(xiàn)思路有兩種:

  • 將服務(wù)都放到同一個消費者組,消費模式改為廣播模式。
  • 將product-service設(shè)置到別的消費者組中。

考慮后續(xù)擴展筆者選擇方案2,將產(chǎn)品服務(wù)的訂閱者放到消費者組2中:

rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2

(3) 創(chuàng)建消息日志表

我們在上文進行需求梳理時有提到一個MQServer沒收到生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài)進行回查的操作,所以我們在生產(chǎn)者在執(zhí)行本地事務(wù)時,需要創(chuàng)建一張表記錄生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài),建表SQL如下:

DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `transaction_id` varchar(50) DEFAULT NULL,
  `log` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(4) 完成order服務(wù)half消息發(fā)送、監(jiān)聽、回查回調(diào)邏輯

我們的訂單服務(wù)需要做以下三件事:

  • 發(fā)送half消息給MQ。
  • half消息發(fā)送成功執(zhí)行本地事務(wù)并記錄日志。
  • 告知MQ可以提交事務(wù)消息。

所以我們需要定義一下消息格式,對象類中必須包含訂單號、產(chǎn)品編碼、用戶編碼、購買產(chǎn)品數(shù)量等信息。

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {

    private static final long serialVersionUID = 1L;

 //設(shè)置主鍵自增,避免插入時沒必要的報錯
    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    /**
     * 訂單號
     */
    private String orderNo;

    /**
     * 用戶編碼
     */
    private String accountCode;

    /**
     * 產(chǎn)品編碼
     */
    private String productCode;

    /**
     * 產(chǎn)品扣減數(shù)量
     */
    private Integer count;

    /**
     * 余額
     */
    private BigDecimal amount;

    /**
     * 本次扣減金額
     */
    private BigDecimal price;
}

然后我們就可以編寫控制層的代碼了,通過獲取前端傳輸?shù)膮?shù)調(diào)用orderService完成half消息發(fā)送。

@PostMapping("/order/createOrderByMQ")
    public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
        log.info("基于mq完成用戶下單流程,請求參數(shù): " + JSON.toJSONString(orderDTO));
        orderService.createOrderByMQ(orderDTO);
        return ResultData.success("基于mq完成用戶下單完成");

    }

orderService的實現(xiàn)邏輯很簡單,定義好消息設(shè)置消息頭內(nèi)容和消息載體的對象,通過sendMessageInTransaction方法完成半消息發(fā)送,需要了解一下消息的主題(topic)為ORDER_MSG_TOPIC,只有訂閱這個主題的消費者才能消費這條消息:

@Autowired
    private RocketMQTemplate rocketMQTemplate;

@Override
    public void createOrderByMQ(OrderDto orderDto) {


        //創(chuàng)建half消息對應(yīng)的事務(wù)日志的id
        String transactionId = UUID.randomUUID().toString();

        //調(diào)用產(chǎn)品服務(wù)獲取商品詳情
        ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
        //計算總售價
        BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
        orderDto.setAmount(amount);

        //將訂單信息作為載體
        Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                //下單用戶編碼
                .setHeader("accountCode", orderDto.getAccountCode())
                //產(chǎn)品編碼
                .setHeader("productCode", orderDto.getProductCode())
                //產(chǎn)品購買數(shù)
                .setHeader("count", orderDto.getCount())
                //下單金額
                .setHeader("amount", amount)
                .build();

        //發(fā)送half消息
        rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);


    }

完成half消息發(fā)送之后,我們就必須知曉消息發(fā)送結(jié)果才能確定是否執(zhí)行本地事務(wù)并提交,所以我們的訂單服務(wù)必須創(chuàng)建一個監(jiān)聽器了解half消息的發(fā)送情況,executeLocalTransaction方法就是mq成功收到半消息后的回調(diào)函數(shù),一旦我們得知消息成功發(fā)送之后,MQ就會執(zhí)行這個方法,筆者通過這個方法獲取消息頭的參數(shù)創(chuàng)建訂單對象,調(diào)用createOrderWithRocketMqLog完成訂單的創(chuàng)建的本地事務(wù)成功的日志記錄。

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
    private final IOrderService orderService;
    private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;

    /**
     * 監(jiān)聽到發(fā)送half消息,執(zhí)行本地事務(wù)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("order執(zhí)行本地事務(wù)");


        try {
            //解析消息頭
            MessageHeaders headers = message.getHeaders();
            //獲取購買金額
            BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
            //獲取訂單信息
            Order order = Order.builder()
                    .accountCode((String) headers.get("accountCode"))
                    .amount(amount)
                    .productCode((String) headers.get("productCode"))
                    .count(Integer.valueOf(String.valueOf(headers.get("count"))))
                    .build();
            //獲取事務(wù)id
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            //執(zhí)行本地事務(wù)和記錄事務(wù)日志
            orderService.createOrderWithRocketMqLog(order, transactionId);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("創(chuàng)建訂單失敗,失敗原因: {}", e.getMessage(), e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        
    }

    /**
     * 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //獲取事務(wù)ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("檢查本地事務(wù),事務(wù)ID:{}", transactionId);
        //根據(jù)事務(wù)id從日志表檢索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id", transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        //如果消息表存在,則說明生產(chǎn)者事務(wù)執(zhí)行完成,回復(fù)commit
        if (null != rocketmqTransactionLog) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        //回復(fù)rollback
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

createOrderWithRocketMqLog做了兩件事,分別是插入訂單信息和創(chuàng)建消息日志,這里筆者用到了事務(wù)注解確保了兩個操作的原子性。 這樣一來,MQserver后續(xù)的回查邏輯完全可以基于RocketmqTransactionLog 進行判斷,如果消息的事務(wù)id在表中存在,則說明生產(chǎn)者本地事務(wù)成功,反之就是失敗。

@Transactional(rollbackFor = Exception.class)
    @Override
    public void createOrderWithRocketMqLog(Order order, String transactionId) {
        //創(chuàng)建訂單編號
        order.setOrderNo(UUID.randomUUID().toString());
        //插入訂單信息
        orderMapper.insert(order);
        //事務(wù)日志
        RocketmqTransactionLog log = RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("執(zhí)行創(chuàng)建訂單操作")
                .build();
        rocketmqTransactionLogMapper.insert(log);
    }

補充一下基于MP生成的RocketmqTransactionLog 類代碼:

@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog對象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "ID", type = IdType.AUTO)
    private Integer id;

    private String transactionId;

    private String log;


}

(5) 完成account、product監(jiān)聽事件

然后我們就可以實現(xiàn)用戶服務(wù)和商品服務(wù)的監(jiān)聽事件了,一旦生產(chǎn)者提交事務(wù)消息之后,這幾個消費者都會收到這個topic(主題)的消息,進而完成當前服務(wù)的業(yè)務(wù)邏輯。

先來看看實現(xiàn)扣款的用戶服務(wù),我們的監(jiān)聽器繼承了RocketMQListener,基于@RocketMQMessageListener注解設(shè)置它訂閱的主題為createByRocketMQ,一旦收到這個主題的消息時這個監(jiān)聽器就會執(zhí)行onMessage方法,我們的邏輯很簡單,就是獲取消息的內(nèi)容完成扣款,唯一需要注意的就是線程安全問題。我們的壓測的情況下,單用戶可能會頻繁創(chuàng)建訂單,在并發(fā)期間同一個用戶的扣款消息可能同時到達扣款服務(wù)中,這就導(dǎo)致單位時間內(nèi)扣款服務(wù)從數(shù)據(jù)庫中查詢到相同的余額,執(zhí)行相同的扣款邏輯,導(dǎo)致金額少扣了。

所以我們必須保證扣款操作互斥和原子化,考慮到筆者當前項目環(huán)境是單體,所以就用簡單的synchronized 關(guān)鍵字解決問題。

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {

    @Resource
    private AccountMapper accountMapper;

    //強制轉(zhuǎn)為runTimeException
    @SneakyThrows
    @Override
    public void onMessage(OrderDto orderDto) {
        log.info("賬戶服務(wù)收到消息,開始消費");
        QueryWrapper<Account> query = new QueryWrapper<>();
        query.eq("account_code", orderDto.getAccountCode());
        //解決單體服務(wù)下線程安全問題
        synchronized (this){
            Account account = accountMapper.selectOne(query);
            BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
            if (subtract.compareTo(BigDecimal.ZERO)<0){
                throw new Exception("用戶余額不足");
            }
            account.setAmount(subtract);
            log.info("更新賬戶服務(wù),請求參數(shù):{}", JSON.toJSONString(account));
            accountMapper.updateById(account);
        }


    }
}

然后就說商品服務(wù),邏輯也很簡單,也同樣要注意一下線程安全問題:

@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
    @Resource
    private ProductMapper productMapper;

    @Override
    public void onMessage(OrderDto orderDto) {
        log.info(" 產(chǎn)品服務(wù)收到消息,開始消費");
        QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
        queryWrapper.eq("product_code",orderDto.getProductCode());
        synchronized (this){
            Product product = productMapper.selectOne(queryWrapper);
            if (product.getCount()<orderDto.getCount()){
                throw new RuntimeException("庫存不足");
            }

            product.setCount(product.getCount()-orderDto.getCount());
            log.info("更新產(chǎn)品庫存信息,請求參數(shù):{}", JSON.toJSONString(product));
            productMapper.updateById(product);
        }



    }
}

三、基于幾個測試用例驗證MQ半消息事務(wù)

1. 前置準備與說明

完整編碼工作后,自測是非常有必要的,我們?nèi)粘M瓿砷_發(fā)任務(wù)后,都會結(jié)合需求場景以及功能編排一些自測用例查看最終結(jié)果是否與預(yù)期一致。 需要注意的是由于訂單業(yè)務(wù)邏輯較為復(fù)雜,很多業(yè)務(wù)場景一篇博客是不可能全部覆蓋,所以這里我們就測試一下基于RocketMQ實現(xiàn)分布式事務(wù)常見的幾個問題場景是否和預(yù)期一致。

在測試前我們必須做好前置準備工作,準備功能測試時涉及到的SQL語句,以本次用戶購買產(chǎn)品的業(yè)務(wù)為例,涉及到訂單表、用戶賬戶信息表、產(chǎn)品表、以及生產(chǎn)者本地事務(wù)日志表。

SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;

在每次測試完成之后,我們希望數(shù)據(jù)能夠還原,所以這里也需要準備一下每次測試結(jié)束后的更新語句,由于訂單表和消息日志表都是主鍵自增,考慮到這兩張表只涉及插入,所以筆者為了重置主鍵的值采取的是truncate語句。

truncate  table  t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;

2. 測試正常消費

第一個用例是查看所有服務(wù)都正常的情況下,訂單表是否有數(shù)據(jù),用戶表的用戶是否會正常扣款,以及商品表庫存是否會扣減。

測試前,我們先查看訂單表,確認沒有數(shù)據(jù)

查看我們的測試用戶,錢包額度為10000

再查看庫存表,可以看到數(shù)量為1000

確認完數(shù)據(jù)之后,我們就可以測試服務(wù)是否按照預(yù)期的方式執(zhí)行,將所有服務(wù)啟動。

我們通過網(wǎng)關(guān)發(fā)起調(diào)用,請求地址如下:

http://localhost:8090/order/order/createOrderByMQ

請求參數(shù)如下,從參數(shù)可以看出這個請求意為用戶代碼(accountCode)為demoData這個用戶希望購買1個(count)產(chǎn)品代碼(productCode)為P001的產(chǎn)品,該產(chǎn)品當前售價(price)為1元。

{
    "accountCode": "0932897",
    "productCode": "P003",
    "count": 1
}

調(diào)用完成后,查看訂單表,訂單數(shù)據(jù)生成無誤:

圖片圖片

查看用戶服務(wù)是否完成用戶扣款,扣款無誤:

查看產(chǎn)品表,可以看到產(chǎn)品數(shù)量也準確扣減:

3. 測試生產(chǎn)者commit提交失敗

我們希望測試一下發(fā)送完half消息之后,執(zhí)行本地事務(wù)完成,但是未提交commit請求時,MQServer是否會調(diào)用回查邏輯。

為了完成這一點我們必須按照以下兩個步驟執(zhí)行:

  • 在訂單服務(wù)提交事務(wù)消息處打個斷點。

  • 發(fā)起請求,當代碼執(zhí)行到這里的時候通過jps定位到進程號,將其強制殺死。如下所示,我們的代碼執(zhí)行到了提交事務(wù)消息這一步:

我們通過jps定位并將其殺死::

完成這些步驟后,我們再次將服務(wù)啟動,等待片刻之后可以發(fā)現(xiàn),MQServer會調(diào)用checkLocalTransaction回查生產(chǎn)者本地事務(wù)的情況。我們放行這塊代碼讓程序執(zhí)行下去,最后再查看數(shù)據(jù)庫中的數(shù)據(jù)結(jié)果是否符合預(yù)期。

4. 測試消費者消費失敗

測試消費者執(zhí)行報錯后是否會進行重試,這一點就比較好測試了,我們在消費者監(jiān)聽器中插入隨便插入一個報錯查看其是否會不斷重試。這里筆者就不多做演示,實驗結(jié)果是會進行不斷重試,當重試次數(shù)達到閾值時會將結(jié)果存到死信隊列中。

四、壓測MQ和Seata的性能

由于MQ是采用異步消費的形式解耦了服務(wù)間的業(yè)務(wù),而我們的Seata采用默認的AT模式每次執(zhí)行分布式事務(wù)時都會需要借助undo-log、全局鎖等的方式保證最終一致性。所以理論上RocketMQ的性能肯定是高于Seata的,對此我們不妨使用Jmeter進行壓測來驗證一下。

本次壓測只用了1000個并發(fā),MQ和seata的壓測結(jié)果如下,可以看到MQ無論從執(zhí)行時間還是成功率都遠遠優(yōu)秀于Seata的。

MQ的壓測結(jié)果:

Seata的壓測結(jié)果,可以看到大量的數(shù)據(jù)因為lock_table鎖超時而導(dǎo)致失敗,所以整體性能表現(xiàn)非常差勁:

五、詳解RocketMQ落地分布式事務(wù)常見問題

1. RocketMQ 如何保證事務(wù)的最終一致性

最終一致性是一種允許軟狀態(tài)存在的分布式事務(wù)解決方案,RocketMQ 保證事務(wù)最終一致性的方式主要是依賴生產(chǎn)者本地事務(wù)和消息可靠發(fā)送的原子性來最大努力保證最終一致性,注意這里筆者所強調(diào)的盡最大努力交付。

之所以說是最大努力交付是說RocketMQ是通過保證生產(chǎn)者事務(wù)和消息發(fā)送可靠性的原子性和一致性,由此保證消費者一定能夠消費到消息,理想情況下,只要消費者能夠正確消費消息,事務(wù)結(jié)果最終是可以保證一致性的,但是復(fù)雜的系統(tǒng)因素消費者可能會存在消費失敗的情況,此時事務(wù)最終一致性就無法保證,業(yè)界的做法是通過手動操作或者腳本等方式完成數(shù)據(jù)補償。

2. 什么是half消息

half消息即半消息,和普通消息的區(qū)別是該消息不會立馬被消費者消費,原因是half消息的存在是為了保證生產(chǎn)者本地事務(wù)和消費者的原子性和一致性,其過程如上文所介紹,初始發(fā)送的half消息是存儲在MQ一個內(nèi)存隊列中(并未投遞到topic中),只有生產(chǎn)者本地事務(wù)成功并發(fā)送commit通知后,這個消息才會被持久化到commitLog同時提交到topic隊列中,此時消費者才能夠消費該消息并執(zhí)行本地事務(wù)。

3. 為什么要先發(fā)送half消息再執(zhí)行本地事務(wù)?先執(zhí)行本地事務(wù),成功后在發(fā)送不行嗎?

先發(fā)送half消息的原因是為了盡可能確保生產(chǎn)者和消息隊列通信正常,只有通信正常了才能確保生產(chǎn)者本地事務(wù)和消息發(fā)送的原子性和一致性,由此保證分布式事務(wù)的可靠性。

先執(zhí)行本地事務(wù),執(zhí)行成功后再發(fā)送存在一個問題,試想一下,假設(shè)我們本地事務(wù)執(zhí)行成功,但是發(fā)送的消息因為網(wǎng)絡(luò)波動等諸多原因?qū)е翸Q沒有收到消息,此時生產(chǎn)者和消費者的分布式事務(wù)就會出現(xiàn)數(shù)據(jù)不一致問題。

而half消息則不同,它會優(yōu)先發(fā)送一個消費者感知不到的half消息確認通信可達,然后執(zhí)行本地事務(wù)后降消息設(shè)置未commit讓消費者消費,即使說commit消息未收到,因為half消息的存在,MQ在指定超時先限制后也可以通過回查的方式到生產(chǎn)者事務(wù)表查詢執(zhí)行情況。

4. 如果mq收到half消息,準備發(fā)送success信號的消息給生產(chǎn)者,但因為網(wǎng)絡(luò)波動導(dǎo)致生產(chǎn)者沒有收到這個消息要怎么辦?

此時生產(chǎn)者就會認為half消息發(fā)送失敗,本地事務(wù)不執(zhí)行,隨著時間推移MQ長時間沒收到commit或者rollback消息就會回查生產(chǎn)者消息日志表,明確沒看到數(shù)據(jù)則知曉生產(chǎn)者本地事務(wù)執(zhí)行失敗,直接rollback掉half消息,而消費者全程無感知,業(yè)務(wù)上的一致性也是可以保證。

5. MQ沒有收到生產(chǎn)者(訂單服務(wù))的commit或者rollback信號怎么保證事務(wù)最終一致性?

常規(guī)的做法就是建立一張表記錄消息狀態(tài),只要我們訂單信息插入成功就需要日志一下這條數(shù)據(jù),所以我們必須保證訂單數(shù)據(jù)插入和日志插入表中的原子性,確保生產(chǎn)者的事務(wù)和消息日志的ACID:

6. 如果生產(chǎn)者執(zhí)行本地事務(wù)失敗了怎么辦?

這一點前面的部分也已經(jīng)說明,首先將本地會事務(wù)回滾,并向消息隊列提交一個rollback的請求不提交half消息,消息就不會被消費者消費,保證最終一致性。

7. 前面說的都是事務(wù)流程?這和事務(wù)消息如何保證數(shù)據(jù)最終一致性有什么關(guān)系?

生產(chǎn)者和消息隊列事務(wù)流程可以確保生產(chǎn)者和消息隊列發(fā)送的一致性,確保寫操作都是同時成功或者失敗。只有保證兩者正常通信,才能確保消費者可以消費MQ中的消息從而完成數(shù)據(jù)最終一致性。

8. 消費者提交本地事務(wù)失敗了怎么辦?

我們都知道消息隊列只能保證消息可靠性,而無法保證分布式事務(wù)的強一致性,出現(xiàn)這種情況,消費者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會在一定時間后,繼續(xù)將這條消息推送給消費者,消費者就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功消息隊列會進行N次重試,如果還是失敗,則可以到死信隊列中查看失敗消息,然后通過補償機制實現(xiàn)分布式事務(wù)最終一致性。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2024-01-26 13:17:00

rollbackMQ訂單系統(tǒng)

2024-07-08 07:30:47

2022-06-21 08:27:22

Seata分布式事務(wù)

2025-03-25 10:29:52

2023-05-12 08:02:43

分布式事務(wù)應(yīng)用

2022-08-26 00:02:03

RocketMQ單體架構(gòu)MQ

2024-01-05 07:28:50

分布式事務(wù)框架

2024-01-08 08:05:08

分開部署數(shù)據(jù)體系系統(tǒng)拆分

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2024-01-09 08:00:58

2023-01-06 09:19:12

Seata分布式事務(wù)

2021-04-23 08:15:51

Seata XA AT

2024-08-15 08:03:52

2019-08-19 10:24:33

分布式事務(wù)數(shù)據(jù)庫

2024-06-13 09:25:14

2022-03-01 16:26:09

鏈路監(jiān)控日志監(jiān)控分布式系統(tǒng)

2024-11-28 15:11:28

2022-07-10 20:24:48

Seata分布式事務(wù)

2019-11-19 08:32:26

數(shù)據(jù)庫HLC事務(wù)
點贊
收藏

51CTO技術(shù)棧公眾號

6080日韩午夜伦伦午夜伦| 国产一区二区三区观看| 日韩av在线导航| 国产极品尤物在线| 青青免费在线视频| 久久婷婷激情| 国产一区二区三区在线播放免费观看| 欧美国产激情视频| 国产小视频在线观看| 日韩av中文字幕一区二区三区| 国产一区二区三区欧美| 天堂av8在线| av免费在线免费| 成人午夜电影小说| 清纯唯美亚洲激情| 欧美福利在线视频| 秋霞一区二区| 精品久久香蕉国产线看观看gif| 精品在线视频一区二区| 国产免费a视频| 狠狠综合久久av一区二区蜜桃| 欧美性色19p| 亚洲欧美国产精品桃花| a在线观看免费| 亚洲男人影院| 中文字幕亚洲欧美| 国产精品日日摸夜夜爽| 国产丝袜在线观看视频| 久久久久久久久久看片| 91精品视频免费观看| 国产中文字幕免费| 精品日韩欧美一区| 亚洲аv电影天堂网| www日韩视频| av中文字幕电影在线看| 91色在线porny| 国产在线视频一区| 国产精品美女毛片真酒店| 精品一区免费| 精品欧美乱码久久久久久| 国产黄色特级片| 欧美另类tv| 99久久婷婷国产综合精品电影| 国产精品久久久久久av下载红粉| 免费在线看黄网址| 日韩中字在线| 亚洲欧美999| 中文字幕制服丝袜| 亚洲综合伊人| 精品国产31久久久久久| 五月天丁香综合久久国产| 国产成人自拍一区| 国产制服丝袜一区| 国产精品久久久久久久av大片| 亚洲国产精一区二区三区性色| 久久国产成人精品| 亚洲欧美日韩中文在线制服| 国产欧美视频一区| 年轻的保姆91精品| 欧美日韩色综合| 最近免费中文字幕中文高清百度| 波多一区二区| 亚洲女同女同女同女同女同69| 日韩在线第一区| 欧美日韩激情视频一区二区三区| 成人亚洲精品久久久久软件| 91中文在线观看| 中文字幕视频一区二区| 老司机久久99久久精品播放免费| 国外成人在线播放| 久久精品一区二区三| 欧美国产激情| 色综合久久天天综线观看| 很污很黄的网站| 欧美独立站高清久久| 亚洲视频在线观看网站| 日本少妇色视频| 群体交乱之放荡娇妻一区二区 | 日本一区二区在线视频观看| 日本黄视频在线观看| 成人午夜视频在线| 久久精品国产精品国产精品污| 秋霞视频一区二区| 不卡的电视剧免费网站有什么| 成人av男人的天堂| 99热这里只有精| 国产一区二区在线视频| 91|九色|视频| 国产自产一区二区| 91小视频在线| 快播亚洲色图| 国产三级在线免费观看| 国产亚洲人成网站| 亚洲国产欧美日韩| 成人欧美在线| 一区二区免费在线播放| 日韩网站在线免费观看| 一个人www视频在线免费观看| 欧美日韩国产专区| 久久精品影视大全| 精品国产一级| 亚洲福利视频二区| 国产三级av在线播放| 欧美韩日一区| 欧美激情视频给我| 欧美h在线观看| 新67194成人永久网站| 国产精品一区电影| 99热精品在线播放| 91老师片黄在线观看| 一区高清视频| 超碰在线资源| 欧美日韩精品是欧美日韩精品| 超碰在线资源站| 久久中文资源| 中文字幕亚洲无线码a| a级黄色片免费看| 亚洲一区二区网站| 成人精品网站在线观看| 无码精品人妻一区二区| 久久久久久97三级| 国产91视频一区| 一区二区三区四区日本视频| 欧美日韩国产综合一区二区| 国产精品熟妇一区二区三区四区| 九九综合九九| 欧美国产日韩一区二区在线观看| 日韩av女优在线观看| 美女一区二区三区在线观看| 丁香五月网久久综合| 每日更新av在线播放| 亚洲一区二区三区三| 亚洲一级免费在线观看| 久久视频在线观看| 中文字幕一区电影| 日韩人妻无码一区二区三区99| 美女性感视频久久| 欧美美乳视频网站在线观看| 成码无人av片在线观看网站| 在线观看国产精品网站| 丝袜熟女一区二区三区| 久久久久久久久久久久久久| 日韩av免费在线| www视频在线| 日本一区二区久久| heyzo国产| 91精品国产一区二区在线观看| 欧美精品一区二区久久久| 自拍偷拍第9页| 老司机精品久久| 久久亚洲一区二区| 国产盗摄一区二区| 欧美日韩在线播| 熟女高潮一区二区三区| 国产欧美不卡| 国产精品久久久久久久免费大片 | 亚洲电影免费观看高清完整版在线| 手机在线中文字幕| 日韩二区三区四区| 久中文字幕一区| 黄色软件视频在线观看| 精品av综合导航| 久久亚洲精品大全| 国产成人久久精品77777最新版本| 日本午夜精品一区二区| 欧美freesex| 日韩大片免费观看视频播放| 国产一级一片免费播放放a| 国产成人在线视频网站| 7777在线视频| 亚洲一二三区视频| 久久久久久成人| 日韩一级片免费看| 亚洲一二三四在线| 国产极品一区二区| 99精品热6080yy久久| 国内一区在线| 日韩伦理福利| 亚洲国产视频一区二区三区| 亚洲国产综合色| 亚洲一区二区三区四区五区六区| 99成人免费视频| 麻豆av福利av久久av| 久久sese| 亚洲色图50p| 成人黄色片在线观看| 国产精品三级电影| 日本亚洲一区二区三区| 国内揄拍国内精品久久| 国产精品三区www17con| 中文字幕在线看片| 这里只有精品丝袜| 国产毛片毛片毛片毛片| 亚洲无线码一区二区三区| 久久人妻一区二区| 日韩国产精品久久久久久亚洲| 亚洲欧洲精品一区二区三区波多野1战4| 巨大黑人极品videos精品| 欧美成人精品影院| 日韩有码电影| 欧美精品 国产精品| 日韩欧美亚洲国产| 91热门视频在线观看| 日本黄大片一区二区三区| 你懂的一区二区| 久久久精品国产一区二区三区| 欧美日韩亚洲国产| 欧美人与性动交| 欧美日韩国产综合视频 | 国产欧美日韩综合精品一区二区三区| 一区二区成人在线视频 | 久久久久久日产精品| 午夜av中文字幕| 亚洲日本激情| 亚洲一区三区电影在线观看| 天堂av一区| 国产a级全部精品| a毛片在线观看| 亚洲毛茸茸少妇高潮呻吟| 国产又粗又猛又爽又黄视频| 亚洲国产成人av网| 国产白丝一区二区三区| av网站免费线看精品| 天天干天天综合| 亚洲一级黄色| 先锋影音亚洲资源| 国产精品久久久久av蜜臀| 国产精品久久久久久搜索| www在线看| 日韩中文在线不卡| 日本韩国精品一区二区| 日韩欧美中文字幕一区| 亚洲av无码乱码国产精品fc2| 亚洲自拍另类综合| 韩国一级黄色录像| 久久婷婷综合激情| 一边摸一边做爽的视频17国产 | 亚洲第一区在线观看| 一区二区视频网站| 色综合久久88色综合天天免费| 国产高潮流白浆| 国产日韩欧美综合一区| 中文视频在线观看| 国产成人免费在线| www.成人黄色| 美女久久久精品| 日韩黄色片视频| 亚洲福利专区| 亚洲欧美综合一区| 成人在线亚洲| 日本中文不卡| 精品深夜福利视频| www.久久草| 免费一级欧美在线观看视频| 国产不卡在线观看| 亚洲一区站长工具| 69精品小视频| 精品极品在线| 久久久爽爽爽美女图片| 日本性爱视频在线观看| 另类天堂视频在线观看| 97最新国自产拍视频在线完整在线看| 精品一区二区亚洲| 五月激情婷婷网| 欧美精品一区视频| 日本免费网站在线观看| 亚洲第一精品电影| 色呦呦视频在线| 日韩黄色在线免费观看| 婷婷在线观看视频| 亚洲国产精品中文| 无码精品一区二区三区在线 | 免费国产成人av| 欧美一级一区| 日本精品一区在线观看| 亚洲综合欧美| 992kp快乐看片永久免费网址| 日韩专区欧美专区| 一本色道久久88亚洲精品综合 | av一区二区久久| 亚洲国产av一区| 中文字幕一区二区三区av| 青青草手机在线视频| 婷婷综合五月天| 在线播放国产一区| 精品国产sm最大网站免费看| 美国成人毛片| 久久国产精品久久久| 色戒汤唯在线| 国产日韩在线一区| 欧美巨大xxxx| 中国成人在线视频| 国产日韩欧美三级| 中文字幕 欧美日韩| 成人h动漫精品| 美国精品一区二区| 亚洲成人免费在线观看| 中文在线观看av| 亚洲成人在线网| 四虎久久免费| 91国内精品久久| 成人免费观看49www在线观看| 久久精品二区| 中文字幕一区二区三区欧美日韩 | 99久久婷婷国产综合| 午夜久久久久久电影| 亚洲精品无码久久久久| 亚洲精品一区二区三区福利 | 99热这里只有精| 一区二区三区视频在线| 男人添女人下部高潮视频在线观看| 日本久久亚洲电影| 一区二区三区四区视频免费观看| 日本一区视频在线观看| 激情欧美日韩| 手机在线免费毛片| 亚洲国产精品av| 国产精品美女久久久久av爽| 精品久久久久久久久久久下田| aaa国产精品视频| 国产91在线播放精品91| 网站一区二区| 亚洲一区二区在线看| 免费在线成人| 中文字幕乱视频| 亚洲欧美一区二区三区极速播放 | 国产高清视频在线观看| 91免费版在线看| 青青草免费av| 欧美高清精品3d| 岛国视频免费在线观看| 2018日韩中文字幕| 久久久久久久久久久久久久久久久久久久| 综合国产精品久久久| 日韩电影在线观看一区| 国产偷人妻精品一区| 精品久久久久久久久久ntr影视| www.成人在线观看| 久久视频中文字幕| 日韩黄色三级| 亚洲精品在线观看免费| 秋霞国产午夜精品免费视频| 中文字幕在线看高清电影| 丁香五六月婷婷久久激情| 无码精品人妻一区二区| 欧美亚洲第一区| 欧美**字幕| 免费看a级黄色片| 久久精品亚洲麻豆av一区二区 | 国产 日韩 欧美 精品| 欧美激情精品久久久久久变态 | 亚洲综合日韩| 欧美精品黑人猛交高潮| 午夜在线成人av| 五月婷婷六月激情| 青青久久av北条麻妃黑人| 狠狠做深爱婷婷综合一区| 欧美婷婷精品激情| 亚洲视频在线一区二区| a级片在线播放| 97精品伊人久久久大香线蕉 | 日韩精品一区二区三区四区| 免费毛片在线看片免费丝瓜视频 | 久久久久在线观看| 老牛精品亚洲成av人片| 干日本少妇首页| 欧美国产视频在线| 国产农村老头老太视频| 久久久久久999| 亚洲免费观看高清完整版在线观| 国产在线观看福利| 中文字幕+乱码+中文字幕一区| 国产又粗又猛又黄| 欧美巨猛xxxx猛交黑人97人| 国产精品调教视频| 久久精品网站视频| 亚洲欧美欧美一区二区三区| 人妻无码中文字幕免费视频蜜桃| 热久久这里只有| 99九九热只有国产精品| 成人啪啪18免费游戏链接| 欧美性色视频在线| 国产剧情在线| 国语精品中文字幕| 全国精品久久少妇| 国产精彩视频在线| 夜夜嗨av色一区二区不卡| 国产精品igao视频网网址不卡日韩| 欧美久久在线观看| 欧美激情一区二区三区全黄| 亚洲AV无码乱码国产精品牛牛| 欧美亚洲视频一区二区| 亚洲深深色噜噜狠狠爱网站| 中文字幕乱码在线| 欧美日韩精品三区| 日韩伦理福利| 女女百合国产免费网站| 久久免费精品国产久精品久久久久 | 日韩a一级欧美一级| 欧美性高潮床叫视频|