精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

深度剖析 Seata 源碼

開發
本文將針對 seata 分布式事務注冊到提交回滾的全流程進行深入分析和講解,希望對你有幫助。

一、如何使用源碼

需要了解的是,這篇文章是基于筆者相對早期的項目作為樣例進行講解,所以對應的seata版本為1.4.2(核心部分實現大體是一樣的),建議讀者閱讀本文在調試源碼時可以選擇和筆者相同的版本進行理解學習,對應的下載地址為:https://github.com/apache/incubator-seata/tree/v1.4.2

完成下載后,為保證編譯可以通過我們還需要將seata-serializer-protobuf模塊移除掉,該模塊的位置如下圖所示:

同時seata的啟動類位于seata-server模塊,所以我們需要將該模塊的registry.conf的配置改為自己的配置:

以筆者為例,seata配置都是通過nacos進行統一管理的,所以對應的配置類型也都是針對nacos維度去協調適配,大體配置如下所示:

registry {
  # 將seata注冊到nacos上
  type = "nacos"
  nacos {
  # nacos地址
    serverAddr = "ip:8848"
    # 命名空間id
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    # 組名
    group = "DEFAULT_GROUP"
    # 集群節點名稱
    cluster = "default"
  }
}
config {
  # 通過nacos獲取配置
  type = "nacos"
  nacos {
    serverAddr = "ip:8848"
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    group = "DEFAULT_GROUP"
  }
}

經過這幾個步驟后seata就可以像我們日常一樣的方式進行使用了。

二、基于AT模式詳解Seata全鏈路流程

1. seata服務端啟動

我們先從seata的服務端啟動開始,seata服務端啟動時會進行如下幾個核心步驟:

  • 創建工作線程池workingThreads。
  • 基于工作線程池創建一個Netty服務端對外提供服務。
  • 基于該服務端創建的一個默認的協調者DefaultCoordinator管理全局事務。
  • 默認協調者初始化幾個定時任務處理一些異步的全局事務提交、回滾、超時監測的任務。

對應的我們給出這塊邏輯的核心入口代碼,即位于Server的主函數入口的main方法,可以看到seata服務端的創建是基于netty完成的,完成創建和初始化之后就與協調者coordinator進行綁定:

public static void main(String[] args) throws IOException {
        //......
        //創建工作線程池處理業務請求
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        //基于該線程池初始化 seata 服務端
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
 //......
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化協調者,處理seata服務端收到的各種事務讀寫請求
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        //初始化各種異步定時任務:全局事務提交、全局事務回滾、超時監測等
        coordinator.init();
        //將協調者作為seata服務端的處理器
        nettyRemotingServer.setHandler(coordinator);
       //......
    }

對應的我們也給出默認協調者的初始化源碼,即DefaultCoordinator的init方法,可以看到這段代碼本質上就是提交一些定時任務處理全局事務提交、回滾、超時監測、undo log刪除等:

public void init() {
        //每秒執行,處理需要回滾的分布式事務
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
        //異步定時提交全局事務的定時任務,每秒執行一次
        asyncCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    //掃描獲取各種異步提交的全局事務
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
    }

2. 本地服務如何基于GlobalTransaction注解開啟事務

我們都知道seata也是基于spring boot實現的,所以我們可以大膽的認為應用端使用GlobalTransaction開啟分布式事務本質上也是和spring boot自動裝配有著一定的聯系。

所以我們從seata-spring-boot-starter這個腳手架的源碼包的spring.factories文件入手,可以看到一個SeataAutoConfiguration的注入:

于是我們就可以看到一個GlobalTransactionScanner即一個關于GlobalTransaction注解掃描的類:

@Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
      //......
        //掃描我們的配置文件中配置的applicationId、txServiceGroup對應的事務
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

查看GlobalTransactionScanner源碼我們可以看到該類型繼承了spring的初始化bean并設置屬性后的拓展點InitializingBean的afterPropertiesSet方法,該方法內部會初始化當前seata客戶端,分別初始化TM客戶端(使用GlobalTransaction注解的方法的服務即做為TM)和RM客戶端處理其他TM或者RM服務端發送的消息,它們初始化的工作分別是:

  • TM客戶端會注冊各種TC消息響應的處理器,處理各種seata server對應的TC響應的消息,例如:全局事務開啟結果處理器、全局事務提交處理器、全局事務回滾處理器等。
  • RM客戶端則是注冊一些各種seata server對應TC請求消息的處理器,例如:分支事務提交、分支事務回滾、分支事務undo.log刪除等。

對應我們給出GlobalTransactionScanner的afterPropertiesSet源碼可以看到客戶端初始化這段調用的入口,可以看到啟動時某個線程完成CAS上鎖初始化標識之后,即通過initClient初始化客戶端:

@Override
    public void afterPropertiesSet() {
        //......
        //基于擴展點進行客戶端初始化
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }

步入后即可看到對于TM和RM客戶端的初始化調用:

private void initClient() {
        //......
        // 初始化TM客戶端
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
       //......
        // 初始化RM客戶端
        RMClient.init(applicationId, txServiceGroup);
      //......
    }

此時我們先看看TM客戶端內部的處理函數即位于TmNettyRemotingClient的registerProcessor即可看到上述所說的TC響應消息處理器的綁定步驟,即:

  • 注冊TC響應消息處理器
  • 注冊全局事務開啟響應處理器
  • 注冊全局事務提交響應處理器
  • 注冊心跳消息處理器
private void registerProcessor() {
        // 1.registry TC response processor 注冊一些TC響應消息的處理器
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        //全局事務開啟結果響應處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        //全局事務提交響應處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2. 注冊心跳消息
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同理我們也給出RM客戶端內部初始化的調用RmNettyRemotingClient的registerProcessor方法:

  • 注冊分支事務提交消息處理器
  • 注冊rm客戶端對應的分支事務提及和回滾處理器
  • 注冊undo Log刪除處理器
  • 注冊TC響應消息處理器
  • 注冊心跳處理器
private void registerProcessor() {
        // 1. 注冊分支事務提交消息處理器
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.注冊rm客戶端對應的分支事務回滾處理器
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3. 注冊undo log刪除處理器
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4. 注冊TC響應消息處理器
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.注冊心跳消息處理器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同時GlobalTransactionScanner繼承了AbstractAutoProxyCreator的wrapIfNecessary,該代理類會在spring容器中的bean進行檢查并決定是否進行動態代理。以我們的GlobalTransactionScanner邏輯它本質上就是:

  • 檢查當前bean是否有GlobalTransactional這個注解
  • 如果有則基于全局事務攔截器對其進行增強

對應核心邏輯如下所示,可以看到這段代碼會通過existsAnnotation檢查當前bean是否存在GlobalTransactional注解,如果有則基于globalTransactionalInterceptor 對其進行增強:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
              //......
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                   //......
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //判斷是否有GlobalTransaction注解,如果有則為其生成分布式事務的動態代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    //如果攔截器為空則初始化攔截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

              //......
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    //基于上一步的interceptor為其生成動態代理
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
             //......
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

這也就意味著我們調用帶有GlobalTransactional注解方法時,就會走到GlobalTransactionalInterceptor的增強邏輯上,它會走到GlobalTransactionalInterceptor的invoke方法上,最終會走到事務模板類transactionalTemplate的execute方法,該方法會執行如下三個核心步驟:

  • 開啟全局事務。
  • 執行原始業務邏輯。
  • 根據各個分支事務結果提交或者回滾事務。

對應的我們給出GlobalTransactionalInterceptor的invoke方法,可以看到當該方法認為注解存在的情況下會直接調用handleGlobalTransaction開啟并處理全局事務:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
      //......
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //獲取GlobalTransactional注解信息
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //......
            if (!localDisable) {
                //若全局事務注解不為空則調用handleGlobalTransaction處理全局事務
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                     //......
                }
            }
        }
         //......
    }

步入其內部就會走到transactionalTemplate的execute方法,即可看到對于:

  • 分支事務的創建
  • 告知TC請求開啟全局事務
  • 執行本地事務
  • 全局提交或者回滾

對應邏輯的源碼如下所示,讀者可結合說明了解:

public Object execute(TransactionalExecutor business) throws Throwable {
       //......

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            //如果tx為空則以全局事務啟動者的身份創建一個全新的事務
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //向TC發送請求開啟全局事務
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    //執行業務邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    //全局事務回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //分支事務執行成功,提交全局事務
                commitTransaction(tx);

                return rs;
            } finally {
             //......
            }
        } finally {
         //......
        }
    }

3. 客戶端如何開啟分布式事務

上文調用分布式事務的方法時內部會走到的代理的transactionalTemplate的execute方法,其內部有個beginTransaction就是開啟分布式事務的關鍵,由上文可知作為GlobalTransactional注解的方法對對應的服務就是作為TM即transaction manager,所以在調用beginTransaction時,這個方法的代理就會以TM的身份發送一個請求告知TC自己要開啟一個全局事務,TC經過自己的協調處理后(后文會介紹流程)返回一份xid告知TM開啟成功:

對應的我們查看seata客戶端對應TransactionalTemplate的beginTransaction方法即可看到begin方法的調用,該方法回告知seata服務端自己要開啟一個全局事務:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //......
            //開始分布式事務
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
         //......
        } catch (TransactionException txe) {
            //......

        }
    }

查看begin內部就是通過TM發起請求,得到xid并緩存到當前線程內部,開始后續的執行流程分布式事務處理流程:

@Override
    public void begin(int timeout, String name) throws TransactionException {
        //......
        //通過TM告知TC開啟全局事務,從而得到xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //將xid緩存到當前線程的緩存中
        RootContext.bind(xid);
        //......
    }

4. seata服務端如何注冊全局事務

基于上述請求,對應seata server端的TC收到請求后會基于傳參中的消息標信息,定位到對應的執行器即TM消息處理器,然后驅動TM處理器將這個請求生成一份全局session信息從而構成本次請求的全局事務信息,再將請求寫入數據表中:

我們給出TC處理消息的代碼入口AbstractNettyRemotingServer的channelRead方法,從名字不難看出TC服務端也是基于netty實現,其內部通過processMessage處理各種消息:

@Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  
            //基于netty編寫的服務端,channelRead通過processMessage處理客戶端各種請求
            processMessage(ctx, (RpcMessage) msg);
        }

步入processMessage即可看到基于處理表定位消息并交由處理器處理消息邏輯pair.getFirst().process(ctx, rpcMessage);:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
         //......
         //獲取網絡消息
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //通過處理表定位到對應的處理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //基于第一個處理器處理當前消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                //......
                            } finally {
                                //......
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        //......
                        
                    }
                } else {
                //......
               }
        }
    }

因為我們的消息是TM發來的,所以上一步的處理器是ServerOnRequestProcessor的:

@Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
         //處理TM客戶端發送來的消息
            onRequestMessage(ctx, rpcMessage);
        } else {
           //......
        }
    }

最終走到GlobalBeginRequest這個工具的handle基于協調者將事務信息寫入global_table從而得到xid返回給TM客戶端:

@Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
  //生成全局事務信息并得到xid將數據寫入響應返回給TM
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
       //.......
    }

5. RM和TC如何協調處理分支事務

完成全局事務的注冊管理之后,我們再來聊聊各個分支事務的執行和提交回滾,上文提及,seata原生我們本地的jdbc數據庫連接通過代理加以封裝,所以在我們seata客戶端執行本地事務完成后提交的commit方法是經過了seata的代理這一層,該連接代理在調用commit方法時,其內部就會通過RM向TC注冊一個分支事務的請求,TC收到請求后會執行如下工作:

  • 基于lock_table嘗試為事務生成全局鎖。
  • 分支事務信息寫入到branch_table表上并返回branch_id給RM:

我們給出ConnectionProxy的commit方法入口,其內部調用了一個doCommit方法,它就是事務提交的核心邏輯:

@Override
    public void commit() throws SQLException {
        try {
         //excute會調用doCommit生成undoLog緩存和執行分支事務
            LOCK_RETRY_POLICY.execute(() -> {
                //excuete執行成功后這一步會注冊分支事務并提交本地事務和undoLog鏡像以保證原子性
                doCommit();
                return null;
            });
        } catch (SQLException e) {
           //......
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

其內部調用ConnectionProxy的doCommit會調用processGlobalTransactionCommit執行分支事務:

private void doCommit() throws SQLException {
        //如果處于全局事務中則調用processGlobalTransactionCommit
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
         //......
        } else {
           //......
        }
    }

最終就可以在processGlobalTransactionCommit看到如下邏輯:

  • register這個注冊分支事務的邏輯,TC基于RM給定的resourceId信息,生成操作數據的全局鎖,并插入分支事務信息到brach_table中。
  • undo日志刷盤到本地undo日志中。
  • 本地業務的事務提交。
private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向TC發起請求注冊分支事務,TC基于RM給定的resourceId生成全局鎖并插入分支事務信息到brach_table后就不會拋異常
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //undo日志刷盤
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事務提交
            targetConnection.commit();
        } catch (Throwable ex) {
          //......
        }
          //......
    }

這里我們著重看一下register函數,其內部本質上就是通過RM客戶端告知TC自己準備執行分支事務提交,幫我上一把全局鎖并注冊分支事務:

private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //向tc發起請求并獲得register
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        //緩存到當前線程中
        context.setBranchId(branchId);
    }

最后這個注冊的邏輯就會來到AbstractResourceManager的branchRegister上,可以看到它會攜帶著全局事務id和主鍵等數據發送請求給TC:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            //傳入全局事務id即xid
            request.setXid(xid);
            //基于當前數據主鍵生成lockkeys
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);
            //基于RM的netty客戶端將其異步發送
            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
         //......
            return response.getBranchId();
        } catch (TimeoutException toe) {
           //......
        }
    }

6. seata服務端處理分支事務請求

TC處理流程與上述文章同理,收到消息后基于request中的消息表定位到對應的處理器,我們這里最終會走到BranchRegisterRequest的處理器上,通過AbstractTCInboundHandler注冊分支事務:

@Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //tc注冊分支事務入口
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                 //......
                }
            }
        }, request, response);
        return response;
    }

最終這段邏輯就會走到AbstractCore的branchRegister,大體執行的步驟是:

  • 生成分支事務session
  • 嘗試獲得數據全局鎖lock_table
  • 取鎖成功將分支事務信息寫入branch_table
  • 返回branch_id給RM

對應源碼邏輯如下,大體邏輯就說基于分支事務session生成全局鎖存到lock_table后,將分支事務信息存到branch_table中:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
      //......
        return SessionHolder.lockAndExecute(globalSession, () -> {
             //......
            //獲取分支事務的表信息并將其寫入到lock_table中意味獲得全局鎖,上鎖失敗會拋異常
            branchSessionLock(globalSession, branchSession);

            try {
                //添加分支事務信息到branch_table中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                 //......
            }
             //......
             //返回分支事務id
            return branchSession.getBranchId();
        });
    }

TC返回成功后,RM就會執行undo日志刷盤和本地事務提交,詳情參考我們本節代碼processGlobalTransactionCommit方法,這里不貼出了。

7. RM生成回滾日志

對于java程序而言大部分SQL操作底層都是基于Executor執行器操作的,在上述代理執行commit方法前,seata底層將代理的連接即上文的connectionProxy通過AbstractDMLBaseExecutor執行SQL操作,該執會針對我們的連接代理進行如下邏輯處理:

  • 判斷連接代理connectionProxy是否是自動提交,若非自動提交則調用executeAutoCommitFalse方法,該方法會生成undoLog數據寫入緩存,然后將RM當執行分支事務SQL,基于該執行結果生成后置鏡像,最后將undo日志寫入undo_log表中。
  • 若開啟自動提交則關閉自動提交后,復用executeAutoCommitFalse方法執行系統的undoLog和分支事務SQL的執行操作。

對應源碼的整體工作鏈路圖如下所示:

這里我們直接給出AbstractDMLBaseExecutor的doExecute方法作為入口,可以看到若開啟自動提交則調用executeAutoCommitTrue,反之調用executeAutoCommitFalse:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //若自動提交則關閉自動提交,并生成undo信息存入緩沖區
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //直接生成undo log鏡像寫入緩存
            return executeAutoCommitFalse(args);
        }
    }

因為都會復用executeAutoCommitFalse這段邏輯,所以我們直接查看這個方法的邏輯,可以看到該邏輯內部會基于分支事務前后的數據生成前置和后置鏡像:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //基于分支事務的SQL定位操作前的SQL生成前置鏡像
        TableRecords beforeImage = beforeImage();
        //執行分支事務的SQL 
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //生成分支事務操作后置鏡像
        TableRecords afterImage = afterImage(beforeImage);
        //將undoLog寫入緩沖區
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

8. 事務全局提交與回滾

TransactionalTemplate(即TM)驅動各種分支事務準備成功后,就會執行commitTransaction提交全局事務,對應的代碼位于TransactionalTemplate的execute方法,該方法會通知TC驅動全局事務提交,而TC收到該請求之后,就會驅動各個分支事務提交事務,每個分支事務收到該請求后就會刪除undoLog并提交各自未提交的事務:

public Object execute(TransactionalExecutor business) throws Throwable {
          //......

            try {
             
                //向TC發送請求開啟全局事務
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    
                    //執行業務邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {            
                    //全局事務回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

              
                //分支事務執行成功,提交全局事務
                commitTransaction(tx);

                return rs;
            } finally {
          //......
            }
        } finally {
          //......
        }
    }

步入其內部可以看到DefaultGlobalTransaction調用transactionManager即TM提交全局事務:

@Override
    public void commit() throws TransactionException {
       //......
        try {
            while (retry > 0) {
                try {
                    //執行全局事務提交
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                   //......
            }
        } finally {
          //......
        }
        //......
    }

這個commit的邏輯也很簡單,即告知TC要提交全局事務了:

@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //通知TC提交全局事務
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

對應的TC收到該請求后,對應的AbstractTCInboundHandler就會調用doGlobalCommit通知各個RM提交全局事務:

@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                //遍歷RM提交各個分支事務
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                  //......
                }
            }
     //......

          //......


        }, request, response);
        return response;
    }

對應的我們可以來道該源碼內部的DefaultCore的doGlobalCommit方法印證這一點,可以看到該方法會遍歷各個分支事務調用branchCommit通知其提交或者回滾事務:

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
      //......
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //遍歷全局事務中的分支事務
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //......
                }
                try {
                    //告知RM提交事務
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    //......
                } catch (Exception ex) {
                    //......
                }
                return CONTINUE;
            });
             //......
        }
        //......
        return success;
    }

最后請求達到RM上的DefaultRMHandler按照TC要求提交或者回滾事務:

//RM提交分支事務
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    //RM回滾分支事務
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }

提交事務本質上就是提交后刪除undoLog即可,這里我們以分支事務回滾為例,可以看到上述代碼BranchRollbackResponse 會調用handle方法執行分支事務回滾,該方法最終會走到AbstractRMHandler的doBranchRollback,該方法會調動RM管理器將分支事務回滾:

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        //......
        //回滾分支事務
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        //將xid和處理結果狀態響應給TC
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
       //......
    }

最終該方法內部就會調用AbstractUndoLogManager的undo解析當前分支事務的前置鏡像數據,該方法內部執行邏輯為:

  • 定位分支事務的undo日志數據
  • 反序列化為undo對象
  • 基于該undo對象信息解析出表名、列以及數據等信息。
  • 通過undoExecutor 執行器將分支事務還原。

對應源碼如下:

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
      //......

        for (; ; ) {
            try {
               //......

                // Find UNDO LOG
                //獲取當前分支事務的undo鏡像
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                   //......
                    //獲取undo數據
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    //反序列化生成undo對象 branchUndoLog
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍歷undo對象生成SQL還原分支事務值
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //獲取表的表名、列的元信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //獲取對應的執行執行器 將對應分支事務的表數據回滾
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                //......
            } catch (SQLIntegrityConstraintViolationException e) {
                //......
            } catch (Throwable e) {
                //......

            } finally {
               //......
            }
        }
    }

三、小結

讓我們來做個小結,總的來說seata實現數據庫的AT模式分布式事務的流程為:

(1) 調用帶有globalTransactional注解的方法執行業務邏輯。

(2) 該方法以TM的身份通知TC開啟全局事務。

(3) TC收到通知后到global_table創建該方法的全局事務信息,這里以筆者某個下單業務的分布式事務場景為例,對應的數據如下所示:

(4) RM開始工作,各自找TC注冊分支事務,基于當前數據生成全局鎖存入lock_table,保證當前數據操作時沒有其他事務干擾:

全局鎖成功后TC將數據存入branch_table表,對應數據如下所示:

(5) RM完成分支事務注冊后,持有本地鎖的事務執行本地分支事務,成功后將生成分支數據的前后鏡像undo表,如下所示:

這里我們以后置鏡像為例子查看賬戶表修改后的字段值為例,可以看到該鏡像將每一個字段的類型、值等信息都序列化為JSON生成undo鏡像:

(6) TM感知到所有分支事務準備成功,通知TC將這些RM(分支事務)提交,即將undoLog刪除,反之基于undoLog將數據回滾。

對應我們給出下面這段圖,讀者可以結合上面源碼梳理一下全流程:

我是 SharkChili ,Java 開發者,Java Guide 開源項目維護者。歡迎關注我的公眾號:寫代碼的SharkChili,也歡迎您了解我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis。

責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-27 18:56:28

ArrayList數組源代碼

2024-02-05 19:06:04

DartVMGC流程

2009-09-15 14:52:15

linq級聯刪除

2010-03-01 18:33:30

2011-05-23 14:20:59

WordPress

2023-01-10 13:48:50

ContainerdCRI源碼

2010-03-01 14:50:06

Python 工具

2010-02-02 15:25:35

Python語法

2010-02-03 16:56:24

Python包

2010-02-04 15:38:39

Android 手機

2014-10-17 09:30:38

2010-02-01 13:34:59

Python 腳本

2020-04-01 10:28:12

Apache HBas數據結構算法

2010-03-05 16:38:30

2022-04-29 14:56:40

通話應用源碼剖析

2022-03-24 14:40:31

開發Harmony鴻蒙

2010-02-23 10:05:52

Python歷史

2010-02-22 13:53:22

Python 中文亂碼

2009-09-17 15:22:38

LINQ to SQL

2009-12-07 18:43:29

WCF框架
點贊
收藏

51CTO技術棧公眾號

av免费看网址| 国产xxx69麻豆国语对白| 国产精品自在自线| 成年人网站在线| 成人美女在线视频| 欧洲s码亚洲m码精品一区| 国产成人福利在线| 先锋影音网一区二区| 亚洲制服丝袜在线| 日韩理论片在线观看| 国产精品色综合| 一本色道久久综合一区| 国产一区二区三区在线免费观看| 一起操在线视频| 午夜激情电影在线播放| 国产精品免费久久久久| 国产精品国产精品国产专区蜜臀ah| 99热只有这里有精品| 成人影视亚洲图片在线| 欧美精品一区二区高清在线观看| 国产成人精品无码播放| 3d玉蒲团在线观看| 久久精品人人做| 春色成人在线视频| 在线视频欧美亚洲| 一本一道久久综合狠狠老精东影业| 日韩在线视频网站| 狠狠人妻久久久久久综合蜜桃| 久久青草视频| 激情成人在线视频| 青少年xxxxx性开放hg| 九色在线播放| 波多野结衣中文字幕一区二区三区 | 六月激情综合网| 91在线中字| 国产精品久久久久影院老司| 九九九九精品| 精品人妻一区二区三区四区不卡 | av无码精品一区二区三区宅噜噜| 久久激情久久| 88xx成人精品| 日本天堂网在线观看| 88国产精品视频一区二区三区| 亚洲视频免费一区| 四虎永久免费影院| 成人香蕉社区| 欧美蜜桃一区二区三区| 午夜免费高清视频| 日韩免费福利视频| 91国偷自产一区二区三区成为亚洲经典| 免费在线看黄色片| 羞羞视频在线免费国产| 亚洲欧美日韩一区二区| 中文字幕欧美日韩一区二区三区| 成人高清网站| 国产精品日产欧美久久久久| 亚洲高清在线播放| 在线观看免费版| 亚洲特黄一级片| 超碰在线免费观看97| 国产调教视频在线观看| 亚洲人成在线播放网站岛国| a级黄色片网站| 2024最新电影免费在线观看| 一区二区三区色| 黄色激情在线视频| av漫画网站在线观看| 婷婷综合另类小说色区| www.com毛片| 性欧美18一19sex性欧美| 色婷婷久久久久swag精品| 毛片一区二区三区四区| 欧美色网在线| 在线不卡中文字幕播放| 成人免费播放视频| 成人搞黄视频| 精品视频久久久久久| 久久久久亚洲av无码a片| 成人婷婷网色偷偷亚洲男人的天堂| 中文字幕精品视频| 中文字幕在线观看成人| 亚洲第一网站| 国产xxx69麻豆国语对白| 一二三区在线播放| 成人永久免费视频| 欧美重口乱码一区二区| 淫片在线观看| 亚洲一线二线三线久久久| 欧美女人性生活视频| 成人在线免费av| 精品少妇一区二区三区免费观看| 中文字幕在线播放一区| 成人精品视频| 欧美激情视频给我| 波多野结衣mp4| 国产精品一级片在线观看| 精品麻豆av| av影片在线看| 偷窥少妇高潮呻吟av久久免费| 九九热在线免费| youjizzjizz亚洲| 亚洲人成网站在线播| 国产黄在线免费观看| 国产九九精品| 亚洲综合自拍一区| 国产专区在线| 亚洲午夜日本在线观看| www.com操| 欧美网色网址| 美日韩精品免费视频| 无码无套少妇毛多18pxxxx| 国产精品影视网| 欧美日韩一区综合| 欧美24videosex性欧美| 欧美三电影在线| 国产高清成人久久| 影音先锋日韩在线| 国产精品高潮呻吟视频| 黄片毛片在线看| 亚洲视频综合在线| 国产超碰在线播放| 日本妇女一区| 97人人做人人爱| 国产伦精品一区二区三区四区| 久久品道一品道久久精品| 丰满人妻一区二区三区53号| 日韩亚洲国产免费| 正在播放欧美视频| 亚洲图片在线视频| 99久久综合色| 欧美视频在线观看视频| 国产精品久久久久久久久久辛辛| 尤物99国产成人精品视频| 91av在线免费视频| 成人精品免费看| 国产黄色激情视频| 欧美中文高清| 久久人人爽人人爽人人片亚洲 | 久久国产精品久久久久久电车| 成人资源视频网站免费| 日韩av激情| 日韩一级片网站| 日本妇女毛茸茸| 国内精品久久久久影院色| 亚洲一区二区三区乱码 | 色噜噜狠狠成人中文综合 | 亚洲国产电影在线观看| 漂亮人妻被中出中文字幕| 国内露脸中年夫妇交换精品| 欧美激情按摩在线| 国产综合视频在线| 亚洲国产欧美一区二区三区丁香婷| 搡的我好爽在线观看免费视频| 四虎国产精品免费观看| 成人在线小视频| 黄色的网站在线观看| 91精品在线免费观看| 永久久久久久久| 国产经典欧美精品| 17c丨国产丨精品视频| 综合伊人久久| 久久久伊人日本| 神马精品久久| 在线国产亚洲欧美| 永久免费观看片现看| 久久97超碰国产精品超碰| 97精品国产97久久久久久粉红| 国产欧美88| 欧美极品少妇xxxxⅹ免费视频| 日韩一级在线播放| 精品久久久久久久久国产字幕| 欧美色图亚洲激情| 青草国产精品久久久久久| 欧美日韩视频免费在线观看| 狂野欧美xxxx韩国少妇| 久久全球大尺度高清视频| 你懂的视频在线观看| 91久久人澡人人添人人爽欧美| 超碰人人人人人人人| 国产一区二区网址| www.中文字幕在线| 久久一区二区三区喷水| 97超级碰碰| 人狥杂交一区欧美二区| 中文字幕综合一区| www.色视频| 日韩欧美国产网站| 伊人在线视频观看| 99精品国产视频| 色播五月激情五月| 在线不卡亚洲| 亚洲欧洲一区二区| 精品三级av在线导航| 国产精品久久二区| 狂野欧美性猛交xxxxx视频| 亚洲欧洲自拍偷拍| 性生活三级视频| 91成人免费在线| 午夜69成人做爰视频| 91视频精品在这里| 精产国品一二三区| 久久午夜视频| 欧美狂野激情性xxxx在线观| 九九热精品视频在线观看| 亚洲在线一区二区| 日韩经典一区| 国语对白做受69| 蜜桃视频在线观看www社区 | 91精品国产91久久久久青草| 成人欧美大片| 久久久久久国产免费| 最新av网站在线观看| 国产丝袜精品视频| 丰满肉嫩西川结衣av| 欧美日韩免费视频| 午夜精品久久久久久久久久久久久蜜桃| 国产精品不卡视频| 亚洲精品国产一区黑色丝袜 | 国产全是老熟女太爽了| 国产mv日韩mv欧美| 欧美日韩一区二区三区69堂| 中文亚洲欧美| 无码av天堂一区二区三区| 婷婷中文字幕一区| 日韩片电影在线免费观看| 日本午夜精品| 精品日本一区二区三区| 日韩精品一区二区三区中文 | silk一区二区三区精品视频| 国产精品欧美日韩| 精品3atv在线视频| 国产91精品久久久| www.youjizz.com在线| 欧美精品在线免费播放| 久草免费在线观看| 中文字幕欧美日韩va免费视频| 日韩黄色影片| 国产婷婷成人久久av免费高清| 欧美熟妇另类久久久久久不卡 | 国产精品国产一区| 亚洲精品一区二区三区蜜桃久 | 国产999精品久久久久久绿帽| www.夜夜爽| 日本不卡一区二区三区| 免费在线观看日韩视频| 性欧美videos另类喷潮| 无码人妻精品一区二区三区在线| 999亚洲国产精| 欧美日韩一道本| 99xxxx成人网| 黄色动漫网站入口| 视频在线观看国产精品| 成人午夜激情av| 蜜桃视频一区二区| 亚洲一区二区福利视频| 激情深爱一区二区| 中文字幕一二三区| 成人h动漫精品| 美国黄色a级片| 久久久午夜精品| av片在线免费看| 亚洲女女做受ⅹxx高潮| 欧美成人精品欧美一| 亚洲永久免费av| 国内精品福利视频| 欧美日韩一二三| aaa一区二区| 亚洲成av人乱码色午夜| 亚洲色图欧美视频| 亚洲天堂免费在线| 激情在线小视频| 久久男人av资源网站| 亚洲性色av| 国产美女搞久久| 日韩视频一区二区三区四区| 久久99精品久久久久子伦| 精品国产乱码久久久| 视频一区二区视频| 亚洲久久视频| 不卡av免费在线| 国产精品88888| 新91视频在线观看| 亚洲人成在线播放网站岛国 | 久cao在线| 91国产中文字幕| 久久亚洲人体| 国产一区二区在线网站| 日本精品三区| 成人在线视频一区二区三区| 久久高清免费观看| 日韩欧美中文在线视频| 久久婷婷国产综合国色天香| 国产精品白丝喷水在线观看| 黄网站色欧美视频| 99精品视频在线播放免费| 亚洲欧美激情视频| 日本在线观看大片免费视频| 日本精品一区二区三区在线| 不卡精品视频| 欧美精品一区二区三区在线看午夜 | 忘忧草在线影院两性视频| 91视频8mav| 国产一区二区三区四区二区| 97久久国产亚洲精品超碰热| 久久看片网站| 国产污在线观看| 综合自拍亚洲综合图不卡区| 欧美h在线观看| 精品欧美久久久| 又爽又大又黄a级毛片在线视频| 久久久综合av| 美女久久精品| 性欧美大战久久久久久久免费观看 | av福利精品导航| 国产成人久久久久| 欧美伊人久久大香线蕉综合69| 韩国中文字幕hd久久精品| 色青青草原桃花久久综合| 625成人欧美午夜电影| 国产精品二区在线| 7777久久香蕉成人影院| 亚欧美在线观看| 国产婷婷色一区二区三区四区 | 91精品人妻一区二区三区| 亚洲综合激情另类小说区| 国产一区二区三区三州| 亚洲天堂成人在线视频| 人狥杂交一区欧美二区| 国产精品一区二区三区不卡 | 国产精品久久看| 欧产日产国产69| 亚洲精品美女视频| av资源在线看片| 成人av蜜桃| 欧美日韩国产在线一区| 亚洲精品在线网址| 亚洲视频一区二区在线| 一区二区美女视频| 色av中文字幕一区| 日韩成人一区| 中文字幕一区二区三区在线乱码| 麻豆成人综合网| 大胸美女被爆操| 欧美怡红院视频| 午夜视频在线免费观看| 国产欧美一区二区三区在线看| 精品九九在线| 丝袜制服一区二区三区| 国产欧美精品一区二区色综合 | 阿v视频在线观看| 国产乱码精品一区二区三区日韩精品 | 视频一区二区视频| 国产成人啪免费观看软件| 欧美成人综合色| 亚洲а∨天堂久久精品9966| 91jq激情在线观看| 精品视频一区二区三区四区| 亚洲免费影视| 国产sm调教视频| 欧美日韩国产影片| 91网在线看| 国内精品视频在线播放| 性色av一区二区怡红| 中文字幕网站在线观看| 欧美日本乱大交xxxxx| www视频在线免费观看| 成人资源av| 午夜在线一区二区| 日本猛少妇色xxxxx免费网站| 欧美精品丝袜中出| 国产蜜臀一区二区打屁股调教| 国产精品日本一区二区| 免费欧美日韩| 刘亦菲国产毛片bd| 日韩一本二本av| 爱草tv视频在线观看992| 欧美午夜欧美| 激情欧美一区二区| 亚洲欧美在线视频免费| 一本一本久久a久久精品综合小说| 婷婷激情成人| 国产91xxx| 欧美国产精品久久| 亚洲精品97久久中文字幕| 欧美亚洲在线视频| 我不卡影院28| 性色av蜜臀av色欲av| 欧美日韩高清不卡| 国产伦子伦对白在线播放观看| 亚洲成人a**址| 成人免费不卡视频| 一区二区三区在线免费观看视频| 久久99视频免费| 国产精品亚洲片在线播放| 91大神免费观看| 色综合色狠狠天天综合色| gogogogo高清视频在线| 麻豆久久久9性大片| 国产精品一区二区男女羞羞无遮挡| 综合激情网五月|