六、分布式事务 实战:TCC-transaction的事务恢复

六 TCC-transaction的事务恢复

五 TCC-transaction的事务恢复

由于2.X提供了embedded和server的部署模式,在embedded模式下,事务恢复仍旧由tcc-client实现;在server模式时,事务恢复交给tcc-server模块实现。这里,我仅以tcc-client端(embedded模式)的事务模式,研究Tcc-transaction的事务恢复过程。

 

5.1 TCC-Client的启动

public class SpringTccClient extends TccClient implements TransactionManagerFactory {

    public SpringTccClient() {
        super(new ClientConfig());
    }

    public SpringTccClient(ClientConfig clientConfig) {
        super(clientConfig);
    }
}

SpringTccClient类在spring配置类中,实例化进入spring容器。

@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
@ComponentScan(value = "org.mengyun.tcctransaction", excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {XmlTccTransactionConfiguration.class})})
public class AnnotationTccTransactionConfiguration {

     @Bean("springBeanFactory")
    public SpringBeanFactory getSpringBeanFactory() {
        return new SpringBeanFactory();
    }
  
    @Bean
    @DependsOn({"springBeanFactory"})
    public SpringTccClient getTccClient() {
        return new SpringTccClient(clientConfig);
    }
    
    }

随后,在TccClient的初始化过程中:

  @Override
    @PostConstruct
    //spring容器启动的时候,执行该方法
    public void start() throws Exception {
        this.isStarting = true;

        //如果TCC-Transaction的客户端client的存储模式为REMOTING,表示使用server的部署结构
        if (this.clientConfig.getStorageType() == StorageType.REMOTING) {
            try {
                this.registryService.start();
                this.registryService.subscribe();
            } catch (Exception e) {
                logger.error("failed to initialize registryService, stop the application!", e);
                StopUtils.stop();
            }
            initializeRemotingClient(); //初始化远程client
          
        } else { //此时,使用本地存储,事务对象的持久化和事务恢复,都在本地

            registerDomain(this.clientConfig.getDomain());
            if (transactionRepository.supportRecovery() && this.clientConfig.isRecoveryEnabled()) {
							//TAG1 scheduler
              //TAG2 scheduler.registerScheduleAndStartIfNotPresent
                scheduler.registerScheduleAndStartIfNotPresent(this.clientConfig.getDomain());
            }
        }

        this.isStarting = false;
    }

tccclient.start方法在spring容器启动加载该对象时,执行。

TccClient.start():
		1 StorageType.REMOTING 当使用remoting存储模式时,需要向remotingClient注册当前client;
    2 其他情况,使用本地的持久化和事务恢复
      scheduler.registerScheduleAndStartIfNotPresent

TAG1 scheduler

当前scheduler对象在实例化时创建:

        this.scheduler = new RecoveryScheduler(this.clientConfig);

public class RecoveryScheduler {

    public static final String JOB_NAME = "TCC_JOB_%s"; //任务名称
    public static final String TRIGGER_NAME = "TCC_TRIGGER_%s"; //触发任务的名称
    private static final Logger logger = LoggerFactory.getLogger(RecoveryScheduler.class.getSimpleName());
    private RecoveryConfig recoveryConfig;
		//缓存domain和Scheduler---一个domain一个Scheduler
    private Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();

    public RecoveryScheduler(RecoveryConfig recoveryConfig) {
        this.recoveryConfig = recoveryConfig;
    }

TAG2 scheduler.registerScheduleAndStartIfNotPresent

向RecoveryScheduler定时恢复任务器中创建、缓存scheduler并启动

    public void registerScheduleAndStartIfNotPresent(String domain) {
        Scheduler scheduler = registerScheduleIfNotPresent(domain); //创建、缓存scheduler
        start(scheduler);//启动scheduler
    }

    public Scheduler registerScheduleIfNotPresent(String domain) {
      //如果没有创建过切没有缓存入schedulers的本地缓存中
        if (!schedulers.containsKey(domain)) {
            synchronized (RecoveryScheduler.class) {
              //如果没有创建和缓存过Scheduler
                if (!schedulers.containsKey(domain)) { 
                  //TAG2.1 createScheduler
                    Scheduler scheduler = createScheduler(domain);
                  //TAG2.2 scheduleJob
                    scheduleJob(scheduler, domain);
                    schedulers.put(domain, scheduler);
                }
            }
        }
        return schedulers.get(domain);
    }

TAG2.1 createScheduler 创建scheduler内的任务JobDetail

该方法创建scheduler内执行的任务,并设置定时任务的触发器

   private void scheduleJob(Scheduler scheduler, String domain) {

        String jobName = String.format(JOB_NAME, domain);
        String triggerName = String.format(TRIGGER_NAME, domain);

     //创建Quartz定时任务执行器的JobDetail
     //TAG2.1.1 QuartzRecoveryTask任务
        JobDetail jobDetail = JobBuilder.newJob(QuartzRecoveryTask.class).withIdentity(jobName).build();
        jobDetail.getJobDataMap().put(MixAll.DOMAIN, domain);
        //定义触发器
        CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName)
                //TCC配置的操作间隔:0/30 * * * * ? 每30秒一次
                .withSchedule(CronScheduleBuilder.cronSchedule(recoveryConfig.getCronExpression())
                        //如果任务时间完成-下一次触发时间<misfireThreshold,立即执行;否则,等待下一次触发时间
                        .withMisfireHandlingInstructionDoNothing()).build();

        try {
          //检查当前job任务是否存在于scheduler中
            if (!scheduler.checkExists(JobKey.jobKey(jobName))) {
              
              //TAG2.1.2 scheduler.scheduleJob
              //如果不存在,就按JobDetail、触发器cronTrigger执行定时任务
                scheduler.scheduleJob(jobDetail, cronTrigger);
            } else {
                if (recoveryConfig.isUpdateJobForcibly()) {
                  //如果存在,就先删除scheduler内的job,再传入并执行新的job
                    scheduler.deleteJob(JobKey.jobKey(jobName));
                    scheduler.scheduleJob(jobDetail, cronTrigger);
                }
            }
        } catch (SchedulerException se) {
            try {
                scheduler.shutdown();
            } catch (Exception ignore) {
                //ignore
            }
            throw new SystemException(String.format("register recovery task for domain<%s> failed", domain), se);
        }
    }

Quartz每30秒执行QuartzRecoveryTask任务,进行事务的恢复重试。

TAG2.1.1 QuartzRecoveryTask任务

这是TCC-Transaction创建的Quartz定时任务执行类

@DisallowConcurrentExecution
public class QuartzRecoveryTask implements Job {

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        String domain = context.getJobDetail().getJobDataMap().getString(MixAll.DOMAIN);
        logger.info("start recovery {}", domain);
      //RTask1 TransactionStoreRecovery.startRecover(domain)
        FactoryBuilder.factoryOf(TccService.class).getInstance().getTransactionStoreRecovery().startRecover(domain);
    }
}

RTask1 TransactionStoreRecovery.startRecover(domain)执行任务恢复

 public class TransactionStoreRecovery implements Closeable {

    public void startRecover(String domain) {

        try {
            String offset = null;

            int totalCount = 0;
            do {

              //RT1.1 loadErrorTransactionsByPage分页加载错误的事务transaction
                Page<TransactionStore> page = loadErrorTransactionsByPage(domain, offset);

                if (!page.getData().isEmpty()) {
                  //RT1.2 并发恢复错误事务transaction
                    concurrentRecoveryErrorTransactions(page.getData());
                    offset = page.getNextOffset();
                    totalCount += page.getData().size();
                } else {
                    break;
                }
            } while (true);

            // 告警
            AlertManager.tryAlert(domain, totalCount, transactionStorage);

            logger.debug("total recovery count {} from repository:{}", totalCount, transactionStorage.getClass().getName());
        } catch (Throwable e) {
            logger.error("recovery failed from repository:{}.", transactionStorage.getClass().getName(), e);
        }
    }
    
    }

RT1.1 loadErrorTransactionsByPage分页加载错误的事务transaction

    private Page<TransactionStore> loadErrorTransactionsByPage(String domain, String offset) {

        long currentTimeInMillis = Instant.now().toEpochMilli();
            //事务恢复间隔默认设置30s
      //RT1.1.1 transactionStorage).findAllUnmodifiedSince
        return ((StorageRecoverable) transactionStorage).findAllUnmodifiedSince(domain, new Date(currentTimeInMillis - recoveryConfig.getRecoverDuration() * 1000), offset, recoveryConfig.getFetchPageSize());
    }

注意这里对于异常事务的定义:

newDate(currentTimeInMillis - recoveryConfig.getRecoverDuration() * 1000)

就是当前时间-事务恢复间隔>事务变更时间。也就是事务的修改事件,在超过事务恢复间隔时间后,仍旧未被执行,就被定义为异常的事务。对于这部分事务需要进行恢复、重试。

RT1.1.1 transactionStorage).findAllUnmodifiedSince 调用数据库查询transaction

找到所有未修改的事务信息。

public class JdbcTransactionStorage extends AbstractTransactionStorage implements StorageRecoverable {

    @Override
    public Page<TransactionStore> findAllUnmodifiedSince(String domain, Date date, String offset, int pageSize) {
        return pageList(domain, date, offset, pageSize, false);
    }
  
      private Page<TransactionStore> pageList(String domain, Date date, String offset, int pageSize, boolean isMarkDeleted) {

        List<TransactionStore> transactions = new ArrayList<>();

        Connection connection = null;
        PreparedStatement stmt = null;

        int currentOffset = StringUtils.isEmpty(offset) ? 0 : Integer.parseInt(offset);

        try {
          //获取数据库连接
            connection = this.getConnection();
/**………………………………………………………………………………………………………创建查询语句…………………………………………………………………………………………………………………………… */
            StringBuilder builder = new StringBuilder();
            builder.append(SQL_SELECT_PREFIX_FOR_TCC_TRANSACTION + getTableName() + " WHERE LAST_UPDATE_TIME < ?");
            builder.append(" AND IS_DELETE = ?");
            builder.append(StringUtils.isNotEmpty(domain) ? " AND DOMAIN = ?" : "");
            builder.append(" ORDER BY TRANSACTION_ID ASC");
            builder.append(String.format(" LIMIT %s, %d", currentOffset, pageSize));

            stmt = connection.prepareStatement(builder.toString());
						//设置sql语句内的参数
            stmt.setTimestamp(1, new Timestamp(date.getTime()));
            stmt.setInt(2, isMarkDeleted ? MARK_DELETED_YES : MARK_DELETED_NO);

            if (StringUtils.isNotEmpty(domain)) {
                stmt.setString(3, domain);
            }
						//执行sql语句
            ResultSet resultSet = stmt.executeQuery();

            this.constructTransactions(resultSet, transactions);
        } catch (Throwable e) {
            throw new TransactionIOException(e);
        } finally {
            closeStatement(stmt);
            this.releaseConnection(connection);
        }

        return new Page<>(String.valueOf(currentOffset + transactions.size()), transactions);
    }
  
}

这里,对在持久存储数据库查询的语句:

SELECT DOMAIN,ROOT_XID,XID,CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,IS_DELETE,ROOT_DOMAIN,REQUEST_ID 
FROM TableName
 WHERE LAST_UPDATE_TIME < TIME
 AND IS_DELETE =0 #未被标记删除
 AND DOMAIN =domain
 ORDER BY TRANSACTION_ID ASC
  currentOffset, pageSize

从数据库查询error的transaction

RT1.2 concurrentRecoveryErrorTransactions 并发恢复错误事务transaction

  private void concurrentRecoveryErrorTransactions(List<TransactionStore> transactions) throws InterruptedException, ExecutionException {

        initLogStatistics();
				//创建恢复transaction的task
        List<RecoverTask> tasks = new ArrayList<>();
        for (TransactionStore transaction : transactions) {
          //RT1.2.1 RecoverTask error任务的task
            tasks.add(new RecoverTask(transaction));
        }
			//RT1.2.2 recoveryExecutorService.invokeAll批量执行任务
        List<Future<Void>> futures = recoveryExecutorService.invokeAll(tasks, CONCURRENT_RECOVERY_TIMEOUT, TimeUnit.SECONDS);

        for (Future future : futures) {
            future.get();
        }
    }

RT1.2.1 RecoverTask error任务的task

TransactionStoreRecovery的内部类

   class RecoverTask implements Callable<Void> {

        TransactionStore transaction;

        public RecoverTask(TransactionStore transaction) {
            this.transaction = transaction;
        }

        @Override
        public Void call() throws Exception {
          
            recoverErrorTransaction(transaction);
            return null;
        }
    }

  private void recoverErrorTransaction(TransactionStore transactionStore) {
				
    //如果当前重试次数,超过recoveryConfig.getMaxRetryCount()
        if (transactionStore.getRetriedCount() > recoveryConfig.getMaxRetryCount()) { //1

            logSync.lock();
            try {
              //当triggerMaxRetryPrintCount的log输出不大于max,继续输出重试信息
                if (triggerMaxRetryPrintCount.get() < logMaxPrintCount) {
                    logger.error(
                            "recover failed with max retry count,will not try again. domain:{}, xid:{}, rootDomain:{}, rootXid:{}, status:{},retried count:{}",
                            transactionStore.getDomain(),
                            transactionStore.getXid(),
                            transactionStore.getRootDomain(),
                            transactionStore.getRootXid(),
                            transactionStore.getStatusId(),
                            transactionStore.getRetriedCount());
                    triggerMaxRetryPrintCount.incrementAndGet();
                } else if (triggerMaxRetryPrintCount.get() == logMaxPrintCount) {
                    logger.error("Too many transactionStore's retried count max then MaxRetryCount during one page transactions recover process , will not print errors again!");
                }

            } finally {
                logSync.unlock();
            }

            return;
        } //1
    
/** …………………………………………………………………………………………………未超过重试次数时……………………………………………………………………………………………………………………………*/
        try {
				/**……………………………………………………………………………………………………根事务的恢复逻辑…………………………………………………………………… */
            if (transactionStore.getTransactionTypeId() == TransactionType.ROOT.getId()) {

                switch (TransactionStatus.valueOf(transactionStore.getStatusId())) {
                    case CONFIRMING:
                        commitTransaction(transactionStore); 
                        break;
                    case CANCELLING:
                        rollbackTransaction(transactionStore);
                        break;
                    case TRYING:
                        tryTreatAsFailed(transactionStore, TransactionStatus.CANCELLING);
                        break;
                    default:
                        //ignore it.
                        break;

                }

            } 
				/**……………………………………………………………………………………………………branch分支事务的恢复逻辑…………………………………………………………………… */
          else {

                //transactionStore type is BRANCH
                switch (TransactionStatus.valueOf(transactionStore.getStatusId())) {
                    case CONFIRMING:
                    //ERR1 commitTransaction
                        commitTransaction(transactionStore);
                        break;
                    case CANCELLING:
                    case TRY_FAILED:
                    //ERR2 rollbackTransaction
                        rollbackTransaction(transactionStore);
                        break;
                    case TRY_SUCCESS:

                        if (storageMode == StorageMode.CENTRAL) {

                            //check the root transactionStore
                            TransactionStore rootTransaction = transactionStorage.findByXid(transactionStore.getRootDomain(), transactionStore.getRootXid());

                            if (rootTransaction == null) {
                                // In this case means the root transactionStore is already rollback.
                                // Need cancel this branch transactionStore.
                                rollbackTransaction(transactionStore);
                            } else {
                                switch (TransactionStatus.valueOf(rootTransaction.getStatusId())) {
                                    case CONFIRMING:
                                        commitTransaction(transactionStore);
                                        break;
                                    case CANCELLING:
                                        rollbackTransaction(transactionStore);
                                        break;
                                    default:
                                        break;
                                }
                            }
                        }
                        break;
                    case TRYING:
                    //ERR3 tryTreatAsFailed
                        tryTreatAsFailed(transactionStore, TransactionStatus.TRY_FAILED);
                        break;
                    default:
                        //ignore it.
                        break;
                }

            }

        } catch (Throwable throwable) {

            if (throwable instanceof TransactionOptimisticLockException
                    || ExceptionUtils.getRootCause(throwable) instanceof TransactionOptimisticLockException) {

                logger.warn(
                        "optimisticLockException happened while recover. txid:{}, status:{},retried count:{}",
                        transactionStore.getXid(),
                        transactionStore.getStatusId(),
                        transactionStore.getRetriedCount());
            } else {

                logSync.lock();
                try {
                    if (recoveryFailedPrintCount.get() < logMaxPrintCount) {
                        try {
                            logger.error("recover failed, txid:{}, status:{},retried count:{},transactionStore content:{}",
                                    transactionStore.getXid(),
                                    transactionStore.getStatusId(),
                                    transactionStore.getRetriedCount(),
                                    jackson.writeValueAsString(transactionStore), throwable);
                        } catch (JsonProcessingException e) {
                            logger.error("failed to serialize transactionStore {}", transactionStore.toString(), e);
                        }
                        recoveryFailedPrintCount.incrementAndGet();
                    } else if (recoveryFailedPrintCount.get() == logMaxPrintCount) {
                        logger.error("Too many transactionStore's recover error during one page transactions recover process , will not print errors again!");
                    }
                } finally {
                    logSync.unlock();
                }
            }
        }
    }

这里,对error事务的恢复逻辑如下:

事务状态主要有:CONFIRMING、CANCELLING、TRYING、TRY_FAILED、TRY_SUCCESSED。

根事务主要需要对CONFIRMING、CANCELLING、TRYING进行处理;

分支事务,需要对所有情况处理,此外,对于TRY_SUCCESSED的事务状态,要根据根事务root的事务状态:如果store模式为central中心存储,仅用于客户端,当取值为CENTRAL,分支事务补偿时会对TRY_SUCCESS状态的事件,进行处理。

如果此时root事务不存在,则表示根事务已经rollback,此时branch需要rollback;

如果root事务存在,处于confirming,就confirm;cancelling,就cnacel;其他情况,不处理。

transactionStore.getTransactionTypeId() ROOT根事务时
  事务当前状态:
  	 1 case CONFIRMING:
       commitTransaction(transactionStore); //重新commit
     2 case CANCELLING:
       rollbackTransaction(transactionStore);//重新rollback
     3 case TRYING:
       tryTreatAsFailed(transactionStore, TransactionStatus.CANCELLING); //将其视作失败,并设置状态为cancelling
		其他情况:
      忽略不处理

transactionStore.getTransactionTypeId() BRANCH分支事务时
  事务当前状态:
     1 case CONFIRMING:
       commitTransaction(transactionStore);

     2 case CANCELLING:
     case TRY_FAILED:
       rollbackTransaction(transactionStore);

		3 case TRY_SUCCESS:
		if (storageMode == StorageMode.CENTRAL) {
      if (rootTransaction == null)
        rollbackTransaction(transactionStore)
      else
        case CONFIRMING:
            commitTransaction(transactionStore); //重新commit
     		case CANCELLING:
       rollbackTransaction(transactionStore);//重新rollback
      其他情况:
        不处理
        
     4 case TRYING:
       tryTreatAsFailed(transactionStore, TransactionStatus.TRY_FAILED); //将其视作失败,并设置状态为TRY_FAILED

ERR1 commitTransaction
   private void commitTransaction(TransactionStore transactionStore) {
        recoveryExecutor.commit(transactionStore);
    }

public class ClientRecoveryExecutor implements RecoveryExecutor {

    public void commit(TransactionStore transactionStore) {

        Transaction transaction = TransactionConvertor.getTransaction(transactionSerializer, transactionStore);

      //将事务的重试次数+1
        transaction.setRetriedCount(transaction.getRetriedCount() + 1);
        transaction.setStatus(TransactionStatus.CONFIRMING); //设置事务状态confirming
        try {
            transactionRepository.update(transaction); //更新
        } catch (TransactionOptimisticLockException e) {
            logger.debug("multiple instances try to recovery<commit> the same transaction<{}>, this instance ignore the recovery.", transactionStore.getXid());
            return;
        }
        transaction.commit(); //重新执行commit提交
        transactionRepository.delete(transaction); //删除
    }

ERR2 rollbackTransaction

rollback的恢复处理,和commitTransaction相同

ERR3 tryTreatAsFailed
    private void tryTreatAsFailed(TransactionStore transactionStore, TransactionStatus transactionStatus) {
        Date lastUpdateTime = transactionStore.getLastUpdateTime();
        Date currentTime = new Date();
      //trying阶段的恢复,视作failed,同样具有最大次数
        int maxTimeTreatTryingAsFailed = recoveryConfig.getMaxTimeTreatTryingAsFailed();
        if (maxTimeTreatTryingAsFailed > 0
                && (currentTime.getTime() - lastUpdateTime.getTime()) > maxTimeTreatTryingAsFailed * 1000) {
            //update the status to cancel or try failed, waiting for the recovery task to recover
          //版本号+1
            transactionStore.setVersion(transactionStore.getVersion() + 1);
          //TransactionStatus.TRY_FAILED设置transaction的状态为try-failed
            transactionStore.setStatusId(transactionStatus.getId());
            transactionStorage.update(transactionStore);
        }
    }

这里的操作,是将transaction的状态设置为TransactionStatus.TRY_FAILED。等待重试时候,进行rollback。

注意:

对于root根事务的事务恢复,只对trying的视作cancelling,然后再下一次的失败重试时,会对root为cancelling的直接rollback;

对于branch的分支事务,对于trying的视作try-failed。然后等待下一次再次的重试,在branch的再次恢复中,对于try-failed的会直接rollback;

RT1.2.2 recoveryExecutorService.invokeAll批量执行任务

调用恢复transaction的线程池,调用invokeAll执行批量任务。

TAG2.2 Scheduler.scheduleJob

这里调用quartz定时任务处理器的类,执行前述的定时恢复任务。

到这里,TCC-client端的事务恢复就执行完毕。

5.2 TCC-Server的启动(todo)

基本同tcc-server,等随后贴源码过程