精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

分布式場景下的事務(wù)機(jī)制

開發(fā) 前端
首先客戶端Producer通過sendMessageInTransaction方法發(fā)送事務(wù)消息,Broker判斷是事務(wù)消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續(xù)執(zhí)行邏輯。

事務(wù)消息是RocketMQ的一個(gè)非常特色的高級(jí)特性,它的基礎(chǔ)訴求是通過RocketMQ的事務(wù)機(jī)制,來保證上下游的數(shù)據(jù)?致性。

我們在單機(jī)版本下面只需要在業(yè)務(wù)方法上加上對(duì)應(yīng)的事務(wù)就可以達(dá)到效果,但是分布式的場景下,多個(gè)系統(tǒng)之間的協(xié)調(diào)配合,你無法知道到底是那個(gè)先執(zhí)行那個(gè)后執(zhí)行,當(dāng)然在微服務(wù)里面存在Seate框架來保證事務(wù),但是這事務(wù)的保證始終是心頭大患,只能用一句話形容魚和熊掌不可兼得。

而RocketMq的事務(wù)消息能夠在提升性能的情況下滿足要求,其主要實(shí)現(xiàn)是支持分布式情況下保障消息生產(chǎn)和本地事務(wù)的最終一致性,消息生產(chǎn)我們可以使用順序消息去執(zhí)行,這樣我們只需要滿足這兩個(gè)的事務(wù)即可。

 實(shí)現(xiàn)過程

圖片圖片

準(zhǔn)備階段:生產(chǎn)者將消息發(fā)送到Broker,Broker向生產(chǎn)者發(fā)送ack表示消息發(fā)送成功,但是此時(shí)的消息為一個(gè)等待狀態(tài),不會(huì)被消費(fèi)者去消費(fèi)。(生產(chǎn)者繼續(xù)執(zhí)行接下來的代碼)

確認(rèn)階段:當(dāng)我們執(zhí)行完所有的代碼后,本地事務(wù)要么回滾要么提交,此時(shí)當(dāng)我們了解本地事務(wù)的狀態(tài)后,將結(jié)果推送給Broker做二次確認(rèn)結(jié)果,如果為Commit則將修改激活準(zhǔn)備推送給消費(fèi)者,如果為Rollback則將消息進(jìn)行回滾。

補(bǔ)償機(jī)制:當(dāng)出現(xiàn)異常情況沒有發(fā)生二次確認(rèn),此時(shí)我們在固定時(shí)間后將會(huì)進(jìn)行回查,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài),重寫Commit或者Rollback。

 涉及狀態(tài)以及注意點(diǎn)

事務(wù)消息存在三種狀態(tài):

CommitTransaction:提交事務(wù)狀態(tài),此狀態(tài)下允許消費(fèi)者消費(fèi)。

RollbackTransaction:回滾事務(wù)狀態(tài),此狀態(tài)下消息會(huì)被刪除。

Unknown:中間狀態(tài),此狀態(tài)下會(huì)等待本地事務(wù)處理結(jié)果進(jìn)行對(duì)應(yīng)操作。

注意點(diǎn):

本消息狀態(tài)是一種對(duì)消費(fèi)者不可見的狀態(tài),將消息的內(nèi)容放到系統(tǒng)Topic的RMQ_SYS_TRANS_HALF_TOPIC隊(duì)列里面去。

事務(wù)消息中的相關(guān)參數(shù)可以進(jìn)行設(shè)置,比如:本地事務(wù)回查次數(shù)transactionCheckMax默認(rèn)15次,本地事務(wù)回查的間隙transactionCheckInterval默認(rèn)60s,超出后會(huì)直接將消息丟棄。

RocketMQ的事務(wù)消息是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗,通過RocketMQ的事務(wù)信息可以實(shí)現(xiàn)可靠消息的最終一致性方案。

 源碼解析

Producer端通過構(gòu)建TransactionMQProducer對(duì)象綁定事務(wù)監(jiān)聽。

TransactionListener transactionListener = new TransactionListener() {    @Override    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        return LocalTransactionState.COMMIT_MESSAGE;    }
    @Override    public LocalTransactionState checkLocalTransaction(MessageExt msg) {        return LocalTransactionState.COMMIT_MESSAGE;    }};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);

執(zhí)行sendMessageInTransaction方法來發(fā)送消息。

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
  // 檢查TransactionListener是否存在,如果不存在就直接拋異常
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // 事務(wù)消息不支持延遲等特性
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 設(shè)置half屬性,表明是事務(wù)屬性
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // 設(shè)置所屬生成者組
    // broker向生產(chǎn)者發(fā)送回查事務(wù)請(qǐng)求根據(jù)這個(gè)producergroup找到指定的channel
    // 生產(chǎn)者能找到所有在同一個(gè)組的機(jī)器實(shí)例從而檢查事務(wù)狀態(tài)
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 同步發(fā)送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    // 消息返回信息
    switch (sendResult.getSendStatus()) {
            // 第一階段消息發(fā)送成功
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    // 設(shè)置事務(wù)ID屬性
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    // 執(zhí)行本地事務(wù)
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    // 發(fā)送消息成功后,執(zhí)行本地操作
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 本地事務(wù)執(zhí)行完畢向broker提交事務(wù)或回滾事務(wù)
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

首先發(fā)送第一階段信息直接返回半提交狀態(tài),然后執(zhí)行本地事務(wù)返回事務(wù)的三種狀態(tài),未知,回滾,提交,最后執(zhí)行endTransaction方法,把事務(wù)執(zhí)行的狀態(tài)告訴broker。

endTransaction方法

根據(jù)本地事務(wù)執(zhí)行狀態(tài)構(gòu)建requestHeader對(duì)象執(zhí)行二階段提交。

public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 獲取消息中的MessageId
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // 找到broker地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    // 構(gòu)建EndTransactionRequestHeader對(duì)象
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    // offset是prepare消息中offsetMsgId中獲取的
    requestHeader.setCommitLogOffset(id.getOffset());
    requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
    // 社會(huì)提交/回滾狀態(tài)
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            // 提交
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            // 回滾
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            // 未知
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 發(fā)送給broker端
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

將本地方法執(zhí)行事務(wù)的結(jié)果發(fā)送給Broker,通過endTransactionOneway方法創(chuàng)建Code為END_TRANSACTION的消息,然后在Broker就會(huì)找出對(duì)應(yīng)的Processor來處理。

    Broker端處理     

Broker總共存在兩個(gè)處理,首先針對(duì)第一個(gè)階段發(fā)送的Half消息,broker要進(jìn)行相關(guān)的操作,后面endTransaction提交進(jìn)來的事務(wù)狀態(tài),針對(duì)三種狀態(tài)進(jìn)行相關(guān)操作。

接收第一階段發(fā)送的Half消息

SendMessageProcessor的sendMessage方法中去執(zhí)行處理事務(wù)消息。

// 發(fā)送Half消息時(shí),在屬性中設(shè)置了PROPERTY_TRANSACTION_PREPARED為true,這里根據(jù)這個(gè)屬性判斷是否是事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    // 事務(wù)消息進(jìn)入這里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盤的方式存入store
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}

如果消息攜帶事務(wù)標(biāo)記就去執(zhí)行TransactionMessageService類的prepareMessage方法進(jìn)行相關(guān)的處理。

// 解析Half消息
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
      // 把真實(shí)的topic和真實(shí)的queueId放在消息的屬性中
     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
         String.valueOf(msgInner.getQueueId()));
     // 設(shè)置默認(rèn)的事務(wù)狀態(tài)為TRANSACTION_NOT_TYPE=>unknow
     msgInner.setSysFlag(
         MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
     // 將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC,這個(gè)是對(duì)消費(fèi)者不可見的
     msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
     // 設(shè)置queueId=0
     msgInner.setQueueId(0);
     msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
     return msgInner;
}

進(jìn)行topic的切換,將原來的topic存入到消息的屬性里面,將消息的topic設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC。

處理endTransaction方法

在endTransaction方法中將消息同步給Broker處理的Code對(duì)應(yīng)為END_TRANSACTION,Broker就會(huì)找出對(duì)應(yīng)的Processor來處理該類即調(diào)用EndTransactionProcessor類的processRequest方法處理。

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 根據(jù)commitLogOffset獲取文件中的message,獲取到了返回success
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 檢查消息是否一致
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 生成要保存的消息
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 把真實(shí)的topic消息存儲(chǔ)到CommitLog中
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 移除prepare消息,存入opQueueMap中
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
    // 回滾
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 查詢到half消息則返回成功
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 檢查消息是否一致
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 移除prepare消息,存入opQueueMap中
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}

僅僅展示相關(guān)核心代碼,其主要邏輯:首先去判斷請(qǐng)求的方式是commit還是rollback,如果是commit查詢到消息還原消息原來的topic,然后刪除half topic上的消息轉(zhuǎn)存到opQueueMap中,如果是rollback直接進(jìn)行刪除half topic上的消息并轉(zhuǎn)存到opQueueMap中去。

注意:opQueueMap的引入為了解決有可能出現(xiàn)網(wǎng)絡(luò)、進(jìn)程、線程等各種因素導(dǎo)致消費(fèi)端未能成功處理消息的情況,該機(jī)制的作用是在消費(fèi)者端將未成功處理的消息重新發(fā)送到服務(wù)端進(jìn)行重試,直到確認(rèn)消息已經(jīng)被成功處理或者達(dá)到最大重試次數(shù)后進(jìn)行回滾操作。而 Op 消息本身則是通過修改消息狀態(tài)來實(shí)現(xiàn)的。

消息回查

當(dāng)網(wǎng)絡(luò)中斷或者響應(yīng)超時(shí)等各種異常信息導(dǎo)致消息并沒有傳送到broker端去,為了解決這一問題在Broker就開啟一個(gè)回查線程每隔一分鐘執(zhí)行一次處理超過6s未回查的消息,當(dāng)超過15次回查后直接將消息丟棄。

在啟動(dòng)BrokerController類時(shí),會(huì)去調(diào)用startProcessorByHa方法如果是Master節(jié)點(diǎn)就會(huì)去啟動(dòng)一個(gè)線程每隔6s處理未回查的消息,檢查最大次數(shù)為15次。

public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 檢查回查消息 timeout = 6s checkMax=15
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

在check方法里面去調(diào)用listener.resolveHalfMsg(msgExt)方法去處理事務(wù)消息。

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

執(zhí)行sendCheckMessage方法發(fā)送一個(gè)檢查事務(wù)狀態(tài)的Code為CHECK_TRANSACTION_STATE的消息,在客戶端MQClientAPIImpl初始化的時(shí)候就會(huì)去注冊一個(gè)Code對(duì)應(yīng)的Processor,最終就會(huì)去執(zhí)行checkTransactionState方法,判斷本地事務(wù)的狀態(tài),然后再去執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION處理。

public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final String brokerAddr = addr;
        private final MessageExt message = msg;
        private final CheckTransactionStateRequestHeader checkRequestHeader = header;
        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();


        // 執(zhí)行線程方法
        @Override
        public void run() {
            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            TransactionListener transactionListener = getCheckListener();
            if (transactionCheckListener != null || transactionListener != null) {
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable exception = null;
                try {
                    if (transactionCheckListener != null) {
                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                    } else if (transactionListener != null) {
                        log.debug("Used new check API in transaction message");
                        // 檢查本地事務(wù)
                        localTransactionState = transactionListener.checkLocalTransaction(message);
                    } else {
                        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                    }
                } catch (Throwable e) {
                    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                    exception = e;
                }
                // 處理事務(wù)狀態(tài)
                this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);
            } else {
                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
            }
        }
      // 
        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
            thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
            thisHeader.setProducerGroup(producerGroup);
            thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
            thisHeader.setFromTransactionCheck(true);
            thisHeader.setBname(checkRequestHeader.getBname());


            String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (uniqueKey == null) {
                uniqueKey = message.getMsgId();
            }
            thisHeader.setMsgId(uniqueKey);
            thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
            switch (localTransactionState) {
                // 提交狀態(tài)
                case COMMIT_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                // 回滾狀態(tài)
                case ROLLBACK_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                    break;
                // 未知狀態(tài)
                case UNKNOW:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                    break;
                default:
                    break;
            }


            String remark = null;
            if (exception != null) {
                remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
            }
            doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);


            try {
                // 再次執(zhí)行endTransactionOneway發(fā)起END_TRANSACTION
                DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
            } catch (Exception e) {
                log.error("endTransactionOneway exception", e);
            }
        }
    };


    this.checkExecutor.submit(request);
}

總結(jié)

首先客戶端Producer通過sendMessageInTransaction方法發(fā)送事務(wù)消息,Broker判斷是事務(wù)消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續(xù)執(zhí)行邏輯。

然后調(diào)用endTransaction方法去提交本地事務(wù)通過endTransactionOneway將消息提交給Broker端,Broker端通過Code為END_TRANSACTION的處理器去處理消息調(diào)用processRequest方法來處理對(duì)應(yīng)的消息,

如果由于各種原因?qū)е孪⒌氖鬏?,為了防止這些現(xiàn)象的出現(xiàn)所以在BrokerController啟動(dòng)時(shí)就啟動(dòng)一個(gè)線程每隔6s處理未回查的消息(檢查最大次數(shù)為15次)的任務(wù)來進(jìn)行消息的回查,簡單來說就是通過sendCheckMessage方法去注冊一個(gè)Code為CHECK_TRANSACTION_STATE的消息將內(nèi)容發(fā)送給客戶端,然后客戶端在啟動(dòng)時(shí)也注冊對(duì)應(yīng)Code的處理邏輯,通過processTransactionState方法去處理事務(wù)的狀態(tài),如果正常最后還是會(huì)去執(zhí)行endTransactionOneway方法,完成事務(wù)消息。

責(zé)任編輯:武曉燕 來源: java從零到壹
相關(guān)推薦

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2024-06-13 08:04:23

2009-06-19 15:28:31

JDBC分布式事務(wù)

2009-09-18 15:10:13

分布式事務(wù)LINQ TO SQL

2021-09-29 09:07:37

分布式架構(gòu)系統(tǒng)

2022-12-08 08:13:11

分布式數(shù)據(jù)庫CAP

2023-09-11 15:40:43

鍵值存儲(chǔ)云服務(wù)

2021-09-28 09:43:11

微服務(wù)架構(gòu)技術(shù)

2021-02-01 09:35:53

關(guān)系型數(shù)據(jù)庫模型

2021-12-01 10:13:48

場景分布式并發(fā)

2019-06-26 09:41:44

分布式事務(wù)微服務(wù)

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2022-03-24 07:51:27

seata分布式事務(wù)Java

2025-05-15 08:05:00

2020-02-25 15:00:42

數(shù)據(jù)分布式架構(gòu)

2014-01-22 13:37:53

2023-02-21 16:41:41

分布式相機(jī)鴻蒙
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

欧美精品一区在线| 久久91超碰青草是什么| 999精品视频在线| 成人高清免费观看mv| 人人爽香蕉精品| 中文字幕无线精品亚洲乱码一区 | 久久久久久自在自线| 在线成人中文字幕| 中文字幕avav| 欧美黑人粗大| 亚洲日本电影在线| 国产一区二区在线观看免费播放| 神马久久久久久久| 亚洲一区欧美| 亚洲欧美综合精品久久成人| 久久久久久久高清| 涩涩涩视频在线观看| 国产精品九色蝌蚪自拍| 国产伦精品一区二区三区视频免费| 国产在线观看黄色| 天天综合精品| 亚洲日本中文字幕| 国产国语老龄妇女a片| 羞羞影院欧美| 亚洲国产成人av网| 亚洲一区二区三区精品在线观看 | 日韩高清av一区二区三区| 蜜臀av免费观看| 1区2区3区在线| 国产精品伦一区| 久久精品国产精品国产精品污 | 日韩中文一区| 黄色av网址在线| 久久99九九99精品| 日本高清久久天堂| 精品一区二区三区人妻| 欧美国产小视频| 亚洲欧美三级伦理| 在线免费看黄色片| 国产一区二区在线观| 欧美羞羞免费网站| 日本三级免费观看| 黑人玩欧美人三根一起进| 国产精品久久久久久久久动漫| 久久久久久a亚洲欧洲aⅴ| 性色av蜜臀av| 国产一区欧美二区| 国产美女久久精品| 艳妇乳肉豪妇荡乳av无码福利| 国产亚洲一区在线| 午夜精品福利在线观看| 久久免费黄色网址| 欧美日韩国产一区精品一区| 欧美精品免费播放| 国产高潮国产高潮久久久91| 亚洲一区二区三区| 久久99久久99精品中文字幕 | 亚洲网色网站| 久久精品视频在线观看| 国产精品久久久免费看| 999久久久91| 久久深夜福利免费观看| √天堂中文官网8在线| 一区二区三区中文| 久久99精品久久久久久青青91| 欧美成人一区二区三区高清| 黄色日韩在线| 91精品国产一区| 欧美日韩一二三四区| 日韩中文欧美在线| 国产欧美日韩丝袜精品一区| 一级片在线观看视频| 韩国欧美一区二区| 51午夜精品| 日韩一卡二卡在线| 极品少妇xxxx精品少妇| 国产欧美一区二区| 亚洲成人77777| 国产精品一区二区在线播放 | 豆国产96在线|亚洲| 亚洲japanese制服美女| 国产精品无码天天爽视频| 经典三级在线一区| 91九色国产在线| 国产精品亚洲lv粉色| 国产在线麻豆精品观看| 91久久偷偷做嫩草影院| 亚洲成人中文字幕在线| 国产精品亚洲综合一区在线观看| 69174成人网| 亚洲大尺度网站| 丁香一区二区三区| 精品久久蜜桃| 九九热视频在线观看| 久久精品亚洲一区二区三区浴池| 亚洲三区在线| 国产福利视频在线| 亚洲线精品一区二区三区八戒| 精品视频在线观看一区| 欧美日韩电影免费看| 欧美三级中文字幕在线观看| 亚洲国产日韩欧美在线观看| 99精品视频在线免费播放| 欧美电影免费观看完整版| 精品影片一区二区入口| 亚洲素人在线| 日韩性生活视频| 加勒比av在线播放| 99香蕉国产精品偷在线观看 | 99久久久免费精品| 欧美久久影院| 国产成人精品电影| av中文字幕免费在线观看| 成人免费视频caoporn| 国产一区二区三区四区五区在线| 自拍视频在线播放| 亚洲在线视频免费观看| 热久久精品国产| 国产精品igao视频网网址不卡日韩 | 亚洲欧洲在线观看| 国产精品久久久精品四季影院| 夜夜嗨av一区二区三区网站四季av| 欧美中文在线字幕| 99国产在线播放| 国产亚洲精品7777| 99热这里只有精品免费| 丝袜美腿诱惑一区二区三区| 日韩一区二区精品| 人人妻人人澡人人爽人人精品| 久久精品亚洲人成影院| 欧美一区二区三区……| 91亚洲视频在线观看| 国产亚洲一区字幕| 日韩小视频网站| 欧洲亚洲精品久久久久| 亚洲国产成人91精品| 国产精品www爽爽爽| 99精品国产99久久久久久福利| 国产精品中文字幕久久久| 少妇人妻偷人精品一区二区| 综合自拍亚洲综合图不卡区| 苍井空浴缸大战猛男120分钟| 国产精一区二区| 色婷婷成人综合| 4438国产精品一区二区| 粉嫩aⅴ一区二区三区四区| 在线国产精品网| 亚洲四虎影院| 国产婷婷97碰碰久久人人蜜臀| 午夜剧场免费在线观看| 日韩电影一二三区| 欧美成ee人免费视频| 亚洲羞羞网站| 欧美男同性恋视频网站| 女~淫辱の触手3d动漫| 日韩午夜免费| 国产精品久久久久免费| 97超碰资源站在线观看| 91精品免费在线观看| 337人体粉嫩噜噜噜| 视频一区国产视频| 久久99精品久久久久子伦| 俄罗斯一级**毛片在线播放| 欧美一区二区三区在线电影| 成年人在线免费看片| 日韩av成人高清| 欧洲国产精品| 日韩电影免费观| 日韩精品中文字幕久久臀| 男女啊啊啊视频| 91亚洲永久精品| 91九色在线观看视频| 另类春色校园亚洲| 欧美激情xxxxx| 人人妻人人澡人人爽人人欧美一区| 一区二区三区**美女毛片| 久久发布国产伦子伦精品| 欧美激情日韩| 成人欧美一区二区三区黑人免费| 男女在线视频| 亚洲成色www8888| 日本中文字幕在线免费观看| 成人一区二区三区在线观看| 黄色免费视频大全| 国产精品羞羞答答在线观看| 国产97在线|日韩| 伊人免费在线| 欧美一级二级在线观看| 国产小视频在线观看免费| 大陆成人av片| 一本大道熟女人妻中文字幕在线 | 天天操天天舔天天射| 精品一区中文字幕| 成人国产在线看| 免费日韩一区二区三区| 国产成人精品在线| 在线免费看黄| 宅男噜噜噜66一区二区66| 免费在线一区二区三区| av福利精品导航| 日韩免费高清在线| 五月婷婷六月综合| 91手机在线播放| 性爽视频在线| 色综久久综合桃花网| www.天堂av.com| 欧美性猛交xxxxx水多| 欧美另类69xxxx| 9i在线看片成人免费| 538在线视频观看| 亚洲欧美综合久久久| 国产精品一区而去| 91精品美女| 久热在线中文字幕色999舞| 日本免费一区视频| 欧美日韩一卡二卡三卡| wwwxxx亚洲| 亚洲欧美日韩中文字幕一区二区三区| 在线精品视频播放| 青青草97国产精品免费观看| 欧美极品少妇无套实战| 女厕嘘嘘一区二区在线播放 | 色爱av美腿丝袜综合粉嫩av | 另类欧美日韩国产在线| 欧美性猛交内射兽交老熟妇| 国产探花一区在线观看| 亚洲自拍小视频免费观看| 欧美日韩精品一区二区三区视频| 欧美国产中文字幕| 国产精品视频二区三区| 精品国产百合女同互慰| 一级黄色片免费看| 婷婷丁香久久五月婷婷| 国产午夜手机精彩视频| 国产日韩av一区| 久久久久99人妻一区二区三区 | 国偷自产av一区二区三区麻豆| 日韩精品91亚洲二区在线观看| 成人午夜视频免费观看| 日本午夜一区| 久久av免费观看| 中文字幕一区二区三区四区久久| 国产精品com| 女人高潮被爽到呻吟在线观看| 欧美精品免费在线| 日韩在线免费电影| 亚洲天堂影视av| 婷婷色在线视频| 日韩欧美国产三级电影视频| 黄色在线视频网址| 欧美日韩视频免费播放| 国产真实乱人偷精品视频| 亚洲美女少妇撒尿| 韩国一级黄色录像| 国产欧美1区2区3区| 欧美高清性xxxx| 成人av免费在线| www激情五月| 国产成人综合在线观看| 欧美国产日韩在线视频| 久久精品国产精品亚洲红杏| 亚洲精品一二三四五区| 日韩精品久久理论片| 久久久久久久久久久免费视频| 亚洲精品美女| av日韩在线看| 国产精品久久久久久模特| 欧美国产日韩激情| 一区在线观看| 黄色成人在线看| 国产精品美女久久久| 欧美日韩性生活片| 亚洲国产日本| 久久精品网站视频| 日日夜夜精品免费视频| 国产v亚洲v天堂无码久久久| 首页欧美精品中文字幕| 四季av一区二区| 国内成人精品2018免费看| 久久这里只精品| 青青国产91久久久久久| 久久人人爽人人片| 成人深夜福利app| 日本一区二区在线免费观看| 91啪亚洲精品| 夜夜春很很躁夜夜躁| 国产精品久久久久影院亚瑟| 亚洲二区在线播放| 一区二区在线免费观看| 精品无码一区二区三区电影桃花| 午夜影院久久久| 中文在线第一页| 欧美日韩精品是欧美日韩精品| 国产深喉视频一区二区| 日韩精品电影网| 黄色视屏网站在线免费观看| 精品国产一区二区三区久久久狼| 亚洲图区一区| 日本a级片电影一区二区| 91欧美精品| 91视频九色网站| 在线一级成人| 黄频视频在线观看| 黄色工厂这里只有精品| 99免费视频观看| 激情综合一区二区三区| 免费不卡av网站| 国产色91在线| 欧美精品一区二区蜜桃| 精品人伦一区二区三区蜜桃免费| 中国女人真人一级毛片| 日韩欧美一区二区免费| 日本大臀精品| 欧美成人精品一区| 在线免费日韩片| 91av免费看| 精品久久91| 欧美激情视频免费看| 亚洲综合国产| 黄色激情在线观看| 国产精品久久久久久久裸模| 国产精选第一页| 91精品国产综合久久精品| 午夜视频www| 日韩午夜在线视频| 国产精品蜜月aⅴ在线| 国产一级二级三级精品| 99久久精品费精品国产| 日本成年人网址| 国产麻豆精品一区二区| 亚洲AV无码国产成人久久| 亚洲精品国产高清久久伦理二区| 香蕉污视频在线观看| 精品国免费一区二区三区| 在线国产91| 日本中文字幕久久看| 北条麻妃一区二区三区在线| 在线观看国产一区| 日韩中文字幕一区二区三区| 男男做爰猛烈叫床爽爽小说 | 日本一区精品| 亚洲麻豆视频| 潘金莲一级淫片aaaaaaa| 国产精品人成在线观看免费| 亚洲 小说区 图片区| 亚洲欧美日韩精品| 高清在线视频不卡| 99国产超薄丝袜足j在线观看 | 色吧亚洲视频| 久久婷婷亚洲| 久久人人妻人人人人妻性色av| 一区二区三区波多野结衣在线观看| 91丨九色丨蝌蚪丨对白| 中文字幕精品www乱入免费视频| 最近在线中文字幕| 久久久久久国产精品一区| 亚洲天堂黄色| 亚洲午夜久久久久久久久| 亚洲第一成年网| 天天操天天干天天爱| 91精品91久久久久久| 粉嫩精品导航导航| 欧美一级爱爱视频| 国产高清无密码一区二区三区| 破处女黄色一级片| 日韩一区二区三区免费看 | 亚洲性av网站| 亚洲天堂一区二区| 日韩免费电影一区二区| 青青国产91久久久久久| 无码人妻精品一区二区中文| 欧美性感一区二区三区| 国产私拍精品| 国产精品一二区| 欧美黄色录像片| 在线视频观看一区二区| 中文字幕亚洲不卡| 精品黑人一区二区三区| 国产一区二区日韩| 福利精品一区| 中文字幕中文字幕一区三区| 狠狠狠色丁香婷婷综合久久五月| 艳妇荡乳欲伦69影片| 日韩一区二区在线免费观看| 伊人222成人综合网| 成人a在线视频| 亚洲精品一二| 白丝女仆被免费网站| 欧美视频第二页| 成码无人av片在线观看网站| 成人久久一区二区| 国产精品毛片| 91麻豆制片厂| 日韩欧美国产wwwww| 91桃色在线观看| 欧美日韩精品免费在线观看视频| 精品亚洲porn| 国产性xxxx高清| 亚洲午夜久久久久久久| 2019中文亚洲字幕|