二、分布式事务 实战:TCC-transaction分布式事务关键组件--1Participant参与者、TransactionManager事务管理器

二 TCC-transaction分布式事务关键组件--1Participant参与者、TransactionManager事务管理器

二 TCC-transaction关键组件类及原理

2.1 框架原理

2.1.1 基本使用

在一个模块配置Tcc-transaction并使用,需要执行如下操作:

1.1 生效tcc-transaction

在spring项目中可以使用xml配置或者注解来生效tcc-transaction。

a.XML配置

    <tcc:annotation-driven transaction-repository="transactionRepository"/>

b.注解配置

    @EnableTccTransaction

1.2 配置transaction repository

transaction repository用来保存事务日志,框架提供多种Transaction Repository:

 

 在tcc-transaction-dubbo-sample中使用MemoryStoreTransactionRepository(没有持久化能力,仅可用于测试):

    <bean id="transactionRepository" class="org.mengyun.tcctransaction.repository.MemoryStoreTransactionRepository">
        <property name="domain" value="TCC:DUBBO:CAPITAL:"/>
    </bean>

1.3 声明tcc接口

在接口方法上加上@EnableTcc(1.7.x新增注解,1.6.x是@Compensable)将提供的接口标记为tcc接口:

     public interface CapitalTradeOrderService {
        @EnableTcc
        public String record(CapitalTradeOrderDto tradeOrderDto);

    }

1.4 配置TCC事务的Try、Confirm、Cancel方法:

在tcc接口实现上方法上添加@Compensable注解,设置confirmMethod和cancelMethod方法,分别为tcc的confirm和cancel方法。

    @Override
    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = DubboTransactionContextEditor.class)
    @Transactional
    public String record(CapitalTradeOrderDto tradeOrderDto) {
        ...
    }

在TCC 里,一个业务活动可以有多个事务,每个业务操作归属于不同的事务,即一个事务可以包含多个业务操作。TCC-Transaction 将每个业务操作抽象成事务参与者,每个事务可以包含多个参与者。

参与者需要声明 try / confirm / cancel 三个类型的方法,和 TCC 的操作一一对应。在程序里,通过 @Compensable 注解标记在 try 方法上,并填写对应的 confirm / cancel 方法,示例代码如下:

// try
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

// confirm
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

// cancel
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}

try阶段是由业务方发起调用,而confirm和cancel由事务管理器自动调用。在这里,事务管理器就是CompensableTransactionInterceptor(通过组合的方式,里面含有的TransactionManager才是真正的事务管理器,TransactionManager的方法会被调用),confirm和cancel方法由它自动调用。

主要通过spring-aop拦截器CompensableTransactionInterceptor、ResourceCoordinatorInterceptor
CompensableTransactionInterceptor : 用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)
ResourceCoordinatorInterceptor : 用于记录tcc事务的Participant(参与方)

2.1.2 框架基本流程

 

 

2.2 核心组件类

 

针对TCC-Transaction框架中涉及的关键组件,按照如图所示的模块,首先对各个关键组件进行分析:

2.2.1 事务参与者Participant与事务Transaction

关系如图:

 

TCC将事务和事务参与者,抽象为两个类Transaction和Participant。

一个transaction可以有多个participant参与者
Transaction---{participant1;participant2;participant3......}

Transaction{
  List<Participant> participants //事务对象中,用list保存当前事务的参与者
    commit()/rollback() ;//事务提供的方法,通过for循环,遍历list中每个participant.commit/rollback方法实现方法提交、回滚
}

Participant{
  transactionXid ;//当前事务的全局id
  confirm/cancelInvocationContext;//confirm或者cancel方法调用上下文(包含调用的所有信息)
  Terminator ;// commit/rollback通过Terminator对象,完成方法的调用
  TransactionContextEditorClass;//transactionContext事务上下文的编辑类,可以通过该类修改和获取事务上下文

  //事务的提交与回滚,通过Terminator调用
  commit/rollback{
    Terminator.invoke(new TransactionContext,new Invocation,transactionContextEditorClass);
  }

}

1 Transaction事务

关注核心的属性和方法

//事务对象需要持久存储,所以需要其实现序列化Serializable
public class Transaction implements Serializable {

    private List<Participant> participants = new ArrayList<>(); //事务的参与者participants列表
    private Map<String, Object> attachments = new ConcurrentHashMap<>(); //附加属性
    private Date createTime = new Date();
    private Xid xid; //事务编号,唯一地标识一个事务,UUID算法生成
    private Xid rootXid;
    private String rootDomain;
    private TransactionType transactionType; //事务类型(包含ROOT根事务、BRANCH分支事务),表示当前事务时发起者还是参与者
    private TransactionStatus status;//事务状态(包含trying、confirming、cancelling),表示当前事务在tcc执行的阶段
    private Date lastUpdateTime = new Date(); 
    private volatile int retriedCount = 0;//重试次数
    private long version = 0L; //版本号,用于乐观锁更新事务

    public Transaction() {
    }

  //该构造函数,创建trying阶段的分支branch事务
    public Transaction(TransactionContext transactionContext) {
        this.xid = transactionContext.getXid();
        this.rootXid = transactionContext.getRootXid();
        this.rootDomain = transactionContext.getRootDomain();
        this.status = TransactionStatus.TRYING;
        this.transactionType = TransactionType.BRANCH;
    }

  //该构造函数,创建trying阶段的root事务
    public Transaction(Object uniqueIdentity, String rootDomain) {

        this.xid = TransactionXid.withUniqueIdentity(uniqueIdentity);
        this.status = TransactionStatus.TRYING;
        this.transactionType = TransactionType.ROOT;
        this.rootXid = xid;
        this.rootDomain = rootDomain;
    }

  //commit使用fori循环,调用每个参与者participant.commit
      public void commit() {
        for (Participant participant : participants) {
            if (!participant.getStatus().equals(ParticipantStatus.CONFIRM_SUCCESS)) {

                participant.commit();
              //commit提交成功后,修改参与者的状态
                participant.setStatus(ParticipantStatus.CONFIRM_SUCCESS);
            }
        }
    }

    public void rollback() {
        for (Participant participant : participants) {
            if (!participant.getStatus().equals(ParticipantStatus.CANCEL_SUCCESS)) {
                participant.rollback();
                participant.setStatus(ParticipantStatus.CANCEL_SUCCESS);
            }
        }
    }

  //添加事务参与者
    public void enlistParticipant(Participant participant) {
        participants.add(participant);
    }

(1)TransactionXid

其中,TransactionXid使用uuid算法生成全局唯一的事务id:

   public static TransactionXid withUniqueIdentity(Object uniqueIdentity) {
        int formatId = Xid.AUTO;
        String xid = null;
        if (uniqueIdentity == null) {
            xid = FactoryBuilder.factoryOf(UUIDGenerator.class).getInstance().generate();
        } else {
            xid = uniqueIdentity.toString();
            formatId = Xid.CUSTOMIZED;
        }
        return new TransactionXid(formatId, xid);
    }

(2)TransactionStatus
public enum TransactionStatus {

    TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12);
(3)TransactionType
public enum TransactionType {

    ROOT(1),
    BRANCH(2);

2 Participant事务参与者

public class Participant implements Serializable {

    Class<? extends TransactionContextEditor> transactionContextEditorClass; //事务上下文编辑器类

    private Xid rootXid;
    private String rootDomain;

    private Xid xid; //事务id
    private InvocationContext invocationContext;
    private ParticipantStatus status = ParticipantStatus.TRYING; //参与者状态

    public Participant() {

    }

    public void rollback() {
      //利用执行器调用对应的rollback方法
        Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CANCELLING, status), new Invocation(invocationContext.getCancelMethodName(), invocationContext), transactionContextEditorClass);
    }

    public void commit() {
        Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CONFIRMING, status), new Invocation(invocationContext.getConfirmMethodName(), invocationContext), transactionContextEditorClass);
    }

participant的xid事务id,通过该id关联上其所属的事务。同一个事务下的事务id相同,通过同id执行tcc的参与者的操作。

(1)ParticipantStatus

参与者的状态,标识当前事务参与者,在tcc执行过程中,所处于TCC阶段的状态

public enum ParticipantStatus {
    TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12), CONFIRM_SUCCESS(21), CANCEL_SUCCESS(31);
    private int id;

(2) TransactionContext事务上下文
public class TransactionContext implements Serializable {

    private static final long serialVersionUID = -8199390103169700387L;
    private Xid xid;
    private Xid rootXid;
    private String rootDomain;

    private TransactionStatus status = TransactionStatus.TRYING;
    private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
    private Map<String, String> attachments = new ConcurrentHashMap<>();

(3) InvocationContext调用上下文

调用上下文,包含了当前调用的所有信息,包括目标类、参数类型、confirm和cancel方法名称

public class InvocationContext implements Serializable {

    private static final long serialVersionUID = -7969140711432461165L;

    private Class targetClass;
    private String confirmMethodName;
    private String cancelMethodName;
    private Class[] parameterTypes;
    private Object[] args;

(4)Invocation

调用,包括调用的方法名和InvocationContext

public class Invocation {
    private String methodName;

    private InvocationContext invocationContext;

    public Invocation(String methodName, InvocationContext invocationContext) {
        this.methodName = methodName;
        this.invocationContext = invocationContext;
    }

(5)Terminator执行器
public final class Terminator {

    public static Object invoke(TransactionContext transactionContext, Invocation invocation, Class<? extends TransactionContextEditor> transactionContextEditorClass) {

      //当invocation中方法名不为空,即有实际的调用方法时
        if (StringUtils.isNotEmpty(invocation.getMethodName())) {

          //参与者对象类
            Object target = FactoryBuilder.factoryOf(invocation.getTargetClass()).getInstance();

            Method method = null;

            try {
              //参与者需要执行的方法(try/confirm/cancel)
                method = target.getClass().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
            } catch (NoSuchMethodException e) {
                throw new SystemException(e);
            }

          //设置事务上下文transactionContext到transactionContextEditorClass上
            FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocation.getArgs());
            try {
              //反射调用
                return method.invoke(target, invocation.getArgs());
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new SystemException(e);
            } finally {
                FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().clear(transactionContext, target, method, invocation.getArgs());
            }
        }
        return null;
    }

(6)TransactionContextEditor事务上下文编辑器

在TCC2.0版本以后,只有这个默认实现类,用threadLocal保存事务上下文对象。

public class ThreadLocalTransactionContextEditor implements TransactionContextEditor {

    @Override
    public TransactionContext get(Object target, Method method, Object[] args) {
        return TransactionContextHolder.getCurrentTransactionContext();
    }

    @Override
    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
        TransactionContextHolder.setCurrentTransactionContext(transactionContext);
    }

    public void clear(TransactionContext transactionContext, Object target, Method method, Object[] args) {
        TransactionContextHolder.clear();
    }
}

public class TransactionContextHolder {
        //创建当前线程变量ThreadLocal,保存事务上下文
    private static ThreadLocal<TransactionContext> transactionContextThreadLocal = new ThreadLocal<>();

    private TransactionContextHolder() {
    }

    public static TransactionContext getCurrentTransactionContext() {
        return transactionContextThreadLocal.get();
    }

    public static void setCurrentTransactionContext(TransactionContext transactionContext) {
        transactionContextThreadLocal.set(transactionContext);
    }

    public static void clear() {
        transactionContextThreadLocal.remove();
    }
}

可知,TransactionContextEditor是将事务上下文保存在本地线程对象ThreadLocal中。

2.2.2 事务管理器TransactionManager

事务管理器类,主要提供事务的begin、commit、rollback和获取事务、participant的新增管理

public class TransactionManager {

  private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<>();

    private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
    private int threadQueueSize = 1024;

  //TCC提供的异步执行Terminator线程池
    private ExecutorService asyncTerminatorExecutorService = new ThreadPoolExecutor(threadPoolSize,
            threadPoolSize,
            0L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(threadQueueSize), new ThreadPoolExecutor.AbortPolicy());
    private ExecutorService asyncSaveExecutorService = new ThreadPoolExecutor(threadPoolSize,
            threadPoolSize,
            0L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(threadQueueSize * 2), new ThreadPoolExecutor.CallerRunsPolicy());

    private TransactionRepository transactionRepository;

        //事务的开启、commit、rollback
    public Transaction begin(Object uniqueIdentify) {
    public void commit(boolean asyncCommit) {
    public void rollback(boolean asyncRollback) {

      //
    public Transaction propagationNewBegin(TransactionContext transactionContext) 
    public Transaction propagationExistBegin(TransactionContext transactionContext) 

      //添加事务参与者Participant
    public void enlistParticipant(Participant participant) 

      //注册事务Transaction
    private void registerTransaction(Transaction transaction) 

然后,对于事务管理器提供的核心方法,进行分析:

(a)TransactionManager.begin开启事务

发起根事务TransactionType.ROOT---此时创建事务类型为ROOT的根事务,并存储、注册。

    public Transaction begin(Object uniqueIdentify) {
      //TAG1 创建根事务
        Transaction transaction = new Transaction(uniqueIdentify, this.transactionRepository.getDomain());
        //为了性能调优,在创建transaction阶段,不着急持久化存储
        if (transaction.getXid().getFormatId() == Xid.CUSTOMIZED) {
          //TAG2 对于自定义的xid,确保在TCC阶段之前只创建一次事务(所以对于自定义的xid,需要在tcc阶段开始前,先持久化存储)
            transactionRepository.create(transaction);
        }
      //TAG3 注册事务transaction
        registerTransaction(transaction);
        return transaction;
    }

TAG1:这里创建事务Transaction,使用如下构造函数:

   public Transaction(Object uniqueIdentity, String rootDomain) {

        this.xid = TransactionXid.withUniqueIdentity(uniqueIdentity);
        this.status = TransactionStatus.TRYING;
        this.transactionType = TransactionType.ROOT;
        this.rootXid = xid;
        this.rootDomain = rootDomain;
    }

Transaction有两个构造函数,当前构造函数只被begin()调用,只创建TransactionType.ROOT根事务、且 TransactionStatus.TRYING创建的是TCC阶段的try阶段。

TAG2:transactionRepository.create(transaction) 存储事务。详细见后续

TAG3 注册事务transaction

(b)TransactionManager.registerTransaction注册事务
   //事务管理器的本地线程变量,其内存储事务Transaction的队列
        private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<>();

    private void registerTransaction(Transaction transaction) {

        if (CURRENT.get() == null) {
            CURRENT.set(new LinkedList<Transaction>());
        }

      //将事务添加入本地线程事务队列的头部
        CURRENT.get().push(transaction);
    }

注意:

在事务管理器TransactionManager.register注册事务,是将事务对象添加入本地线程事务队列的头部,后面获取事务时,也是从头部获取。(因为在TCC事务中,支持一个事务管理器管理多个事务transaction,而后创建的事务,要先提交,所以,入队和出队都要从头部)

©TransactionManager.propagationNewBegin传播并发起分支事务

注意调用时机:创建分支事务,在try阶段调用。

这里,首先创建分支事务,然后注册。

   public Transaction propagationNewBegin(TransactionContext transactionContext) {
            //注意:这里创建分支BRANCH事务
        Transaction transaction = new Transaction(transactionContext);

        //for performance tuning, at create stage do not persistent为了性能调优角度,在创建阶段,不持久化存储事务transaction
//        transactionRepository.create(transaction);
     //注册事务
        registerTransaction(transaction);
        return transaction;
    }

此处区分于beginTransaction,这里创建事务对象为TransactionType.BRANCH分支事务、TransactionStatus.TRYING事务状态为try阶段。分支事务和ROOT根事务有同一个事务id。

    public Transaction(TransactionContext transactionContext) {
        this.xid = transactionContext.getXid();
        this.rootXid = transactionContext.getRootXid();
        this.rootDomain = transactionContext.getRootDomain();
        this.status = TransactionStatus.TRYING;
        this.transactionType = TransactionType.BRANCH;
    }

(d)TransactionManager.propagationExistBegin传播并获取分支事务

注意区别:

TransactionManager.propagationExistBegin:这里是侧重获取分支事务(需要从事务存储器中查找并获取事务);在confirm/cancle阶段调用

TransactionManager.propagationNewBegin是创建BRANCH事务,并存储,侧重于发起;在try阶段调用

    public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
      //根据事务上下文transactionContext.getXid(),从事务存储器transactionRepository查询事务
        Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());

        if (transaction != null) {
          //将获取的事务,重新注册入TransactionManager中
            registerTransaction(transaction);
            return transaction;
        } else {
            throw new NoExistedTransactionException();
        }
    }

(e)TransactionManager.commit提交事务
    public void commit(boolean asyncCommit) {
            //TAG1 获取当前事务队列的首个transaction
        final Transaction transaction = getCurrentTransaction();
            //修改当前事务状态为CONFIRMING
        transaction.setStatus(TransactionStatus.CONFIRMING);
            //更新
        transactionRepository.update(transaction);

        if (asyncCommit) {
            try {
                long statTime = System.currentTimeMillis();

                asyncTerminatorExecutorService.submit(() -> commitTransaction(transaction));
                logger.debug("async submit cost time:{}", System.currentTimeMillis() - statTime);
            } catch (Throwable commitException) {
                logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException.getCause());
                //throw new ConfirmingException(commitException);
            }
        } else {
            commitTransaction(transaction);
        }
    }

    private void commitTransaction(Transaction transaction) {
        try {
          //提交事务
            transaction.commit();
          //完成commit后,从持久化存储中删除事务
            transactionRepository.delete(transaction);
        } catch (Throwable commitException) {

            //如果commit提交失败,更新CONFIRMING状态的事务
            try {
                transactionRepository.update(transaction);
            } catch (Exception e) {
                //ignore any exception here
            }

            logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
    }

综上,transactionManager.commit逻辑如下:

1 先从当前事务管理器transactionManager获取线程变量的事务队列的首个事务;
2 修改状态为confirming;
3 存储器中更新事务;
4 transaction.commit();----如果commit失败,更新transaction(在提交的过程中,会更改transactionContext、参与者状态等信息,因此需要更新)
5 提交成功后,删除当前事务

TAG1 getCurrentTransaction 获取当前线程变量的事务队列的头部事务

   public Transaction getCurrentTransaction() {
        if (isTransactionActive()) {
            return CURRENT.get().peek();
        }
        return null;
    }

        //判断是否是活跃事务:只要当前线程变量的事务队列中有事务即可
       public boolean isTransactionActive() {
        Deque<Transaction> transactions = CURRENT.get();
        return transactions != null && !transactions.isEmpty();
    }

(e)TransactionManager.rollback
    public void rollback(boolean asyncRollback) {

        final Transaction transaction = getCurrentTransaction();
        transaction.setStatus(TransactionStatus.CANCELLING);

        transactionRepository.update(transaction);

        if (asyncRollback) {

            try {
                asyncTerminatorExecutorService.submit(() -> rollbackTransaction(transaction));
            } catch (Throwable rollbackException) {
                logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
                throw new CancellingException(rollbackException);
            }
        } else {

            rollbackTransaction(transaction);
        }
    }

        private void rollbackTransaction(Transaction transaction) {
        try {
            transaction.rollback();
            transactionRepository.delete(transaction);
        } catch (Throwable rollbackException) {

            //try save updated transaction
            try {
                transactionRepository.update(transaction);
            } catch (Exception e) {
                //ignore any exception here
            }

            logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    }

逻辑和commit相同

(e)TransactionManager.enlistParticipant添加参与者到transaction

该方法在try阶段被调用。

在添加参与者Participant时,先加入参与者队列;然后根据transaction.getVersion()是否为0,判断是否需要执行存储、或者进行更新

    public void enlistParticipant(Participant participant) {
        Transaction transaction = this.getCurrentTransaction();
        transaction.enlistParticipant(participant);

        if (transaction.getVersion() == 0L) {
            // transaction.getVersion()为0,表示之前从未持久化存储过,需要调用进行存储 
            transactionRepository.create(transaction);
        } else {
            transactionRepository.update(transaction);
        }
    }