三、分布式事务 实战:TCC-transaction分布式事务关键组件--2事务拦截器

三 TCC-transaction分布式事务关键组件--2事务拦截器

2.2.3 事务拦截器

TCC有两个拦截器:

CompensableTransactionInterceptor:可补偿事务拦截器
  用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)

ResourceCoordinatorInterceptor:资源协调拦截器
    用于记录tcc事务的Participant(参与方)

 

这两个拦截器,例如ResourceCoordinatorInterceptor,其基于@Compensable注解在try方法上、@Aspect注解在aspect类上作为切面类,能够拦截事务的try方法。在参与者的try方法前后是around切面逻辑,拦截器在try目标方法执行前后,分别调用两个拦截器内的方法,实现TCC事务的透明化执行过程。

@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable) || @annotation(org.mengyun.tcctransaction.api.EnableTcc)")
    public void transactionResourcePointcut() {

    }

    @Around("transactionResourcePointcut()")
    public Object interceptTransactionResourceMethodWithCompensableAnnotation(ProceedingJoinPoint pjp) throws Throwable {

        Method method = ((MethodSignature) pjp.getSignature()).getMethod();

        Compensable compensable = method.getAnnotation(Compensable.class);

        return interceptTransactionContextMethod(new AspectJTransactionMethodJoinPoint(pjp, compensable, ThreadLocalTransactionContextEditor.class));
    }

0 @Compensable注解

该注解标注在参与者的try方法上,并指定cancel/confirm方法名、事务的传播级别

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Compensable {
        //指定事务传播级别  
    public Propagation propagation() default Propagation.REQUIRED;
        //指定cc方法
    public String confirmMethod() default "";
    public String cancelMethod() default "";
        //是否异步执行
    public boolean asyncConfirm() default false;
    public boolean asyncCancel() default false;
}

传播级别定义:

public enum Propagation {
    /**
 * 支持当前事务,如果当前没有事务,就新建一个事务。
 */
REQUIRED(0),
/**
 * 支持当前事务,如果当前没有事务,就以非事务方式执行。
 */
SUPPORTS(1),
/**
 * 支持当前事务,如果当前没有事务,就抛出异常。
 */
MANDATORY(2),
/**
 * 新建事务,如果当前存在事务,把当前事务挂起。
 */
REQUIRES_NEW(3);

1 CompensableTransactionInterceptor可补偿事务拦截器

 

用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)

 

(1)CompensableTransactionAspect切面类
//配置当前类为aspect切面类(springAOP)
@Aspect
public abstract class CompensableTransactionAspect {

    //切面类内对目标方法拦截,需要调用拦截器执行具体的TCC框架逻辑
    private CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();

  //设置拦截器内的TransactionManager
    public void setTransactionManager(TransactionManager transactionManager) {
        this.compensableTransactionInterceptor.setTransactionManager(transactionManager);
    }
//配置@Compensable标注的方法为切点
    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableTransactionPointcut() {

    }

  //在切点处,配置around环绕执行的逻辑
    @Around("compensableTransactionPointcut()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
                //获取@Compensable注解的方法对象
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();

        Compensable compensable = method.getAnnotation(Compensable.class);

        return compensableTransactionInterceptor.interceptCompensableMethod(new AspectJTransactionMethodJoinPoint(pjp, compensable, ThreadLocalTransactionContextEditor.class));
    }

  //获取当前切面执行的优先级
    public abstract int getOrder();
}

(2)ConfigurableTransactionAspect 切面配置类(执行优先级)
@Aspect
public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE; //指定当前切面的优先级
    }

    @Autowired
    public void setTransactionManager(TransactionManagerFactory transactionManagerFactory) {
        super.setTransactionManager(transactionManagerFactory.getTransactionManager());
    }
}

当前切面配置类,主要是实现orderde接口,定义切面的优先级Ordered.HIGHEST_PRECEDENCE(ResourceCoordinatorInterceptor配置的优先级为Ordered.HIGHEST_PRECEDENCE+1,所以执行时,位于后面,先执行CompensableTransactionAspect)

(3)CompensableTransactionInterceptor拦截器
public class CompensableTransactionInterceptor {
        //拦截器中持有transactionManager
    private TransactionManager transactionManager;

    public void setTransactionManager(TransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public Object interceptCompensableMethod(TransactionMethodJoinPoint pjp) throws Throwable {
                //获取当前transactionManager的本地线程变量上事务队列的首部transaction
        Transaction transaction = transactionManager.getCurrentTransaction();
      //创建可补偿方法执行上下文对象
        CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction);

    //如果方法被@Compensable注解&transaction context为null&transaction对象为null,那么ParticipantRole为ROOT根角色
        //如果方法被@Compensable注解&transaction context存在&transaction对象为null,那么表示已经创建transaction context事务上下文,此时ParticipantRole的为privider提供者角色
      //TAG1 compensableMethodContext.getParticipantRole()
        switch (compensableMethodContext.getParticipantRole()) {
            case ROOT: //执行root根事务方法
            //TAG2 rootMethodProceed
                return rootMethodProceed(compensableMethodContext);
            case PROVIDER: //执行provider提供者事务方法
            //TAG3 providerMethodProceed
                return providerMethodProceed(compensableMethodContext);
            default: //默认不处理,继续目标方法try的执行
                return compensableMethodContext.proceed();
        }
    }

这里,CompensableTransactionInterceptor拦截器,在被@Compensable标注的try方法执行时,通过切面类拦截,然后调用此处拦截器方法:

1 获取当前transactionManager的本地线程变量上事务队列的首部transaction;
2 创建CompensableMethodContext;
3 根据TCC方法上下文,判断当前参与者的角色,是ROOT(事务发起者)还是PROVIDER(分支事务的参与者)------根据不同参与者类型,执行不同的事务处理

TAG1 compensableMethodContext.getParticipantRole()

该方法,是通过transactionContext、compensable、transaction三个对象的存在情况,判断当前参与者的角色--是ROOT发起根事务的角色、还是PRIVIDER分支事务参与者的角色。

判断参与者角色是ROOT还是PROVIDER原则:

     //如果方法被@Compensable注解&transaction context为null&transaction对象为null,那么ParticipantRole为ROOT根角色
        //如果方法被@Compensable注解&transaction context存在&transaction对象为null,那么表示已经创建transaction context事务上下文,此时ParticipantRole的为privider提供者角色

public class CompensableMethodContext {

  //事务方法连结点(具体详看spring部分)
    TransactionMethodJoinPoint pjp = null;
    TransactionContext transactionContext = null;
    Compensable compensable = null;
    private Transaction transaction = null;

        public ParticipantRole getParticipantRole() {

  // 1 如果方法是@Compensable注释的,则表示需要tcc事务,如果没有活动事务,则需要require new。
    // 2 If方法没有@Compensable注解,但TransactionContext存在:
        //2.1如果有活动事务,则表示需要参与者tcc事务。如果transactionContext为空,那么它将事务登记为CONSUMER角色;
        // 2.2 如果没有活动事务activeTransaction,表示有另一个方法被调用,因为Consumer已经登记为了事务,这个方法不需要登记为参与者。
        //如果方法被@Compensable注解&transaction context为null&transaction对象为null,那么ParticipantRole为ROOT根角色
        if (compensable != null && transaction == null && transactionContext == null) {
            return ParticipantRole.ROOT;
        }

       //如果方法被@Compensable注解&transaction context存在&transaction对象为null,那么表示已经创建transaction context事务上下文,此时ParticipantRole的为privider提供者角色
        if (compensable != null && transaction == null && transactionContext != null) {
            return ParticipantRole.PROVIDER;
        }

          //不进行事务处理
        return ParticipantRole.NORMAL;
    }

    public Object proceed() throws Throwable {
        return this.pjp.proceed();
    }

TAG2 rootMethodProceed发起TCC流程

当ParticipantRole为ROOT时,发起根事务,开启TCC流程:

   private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Object returnValue = null;
                //事务对象
        Transaction transaction = null;
                //是否异步执行confirm/cancel
        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        try {
            //发起根root事务
            transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

            try {
                //执行方法原逻辑,即try逻辑
                returnValue = compensableMethodContext.proceed();
            } catch (Throwable tryingException) {

                try {
                    //当原逻辑执行失败时,即try阶段失败---回滚
                    transactionManager.rollback(asyncCancel);
                } catch (Exception rollbackException) {
                    logger.warn("compensable transaction rollback failed, recovery job will try to rollback later", rollbackException);
                }

                throw tryingException;
            }
            //当原逻辑执行成功时,即try阶段成功,提交事务
            transactionManager.commit(asyncConfirm);

        } finally {
            //将事物从当前线程事务队列移除
            transactionManager.cleanAfterCompletion(transaction);
        }

        return returnValue;
    }

主要逻辑:

1 transactionManager.begin发起根事务;
2 compensableMethodContext.proceed()执行try;
    2.1 如果try失败抛异常,transactionManager.rollback(asyncCancel)回滚
3  transactionManager.commit(asyncConfirm) try执行成功,commit提交事务;
4 transactionManager.cleanAfterCompletion(transaction)将transaction从事务管理器transactionManager的本地线程变量的事务队列中移除;
5 返回结果;

TAG3 providerMethodProceed服务提供者参与TCC流程

当参与者角色为PROVIDER时,执行该方法:(ROOT是发起事务,在TCC三个阶段开始前,所以不需要通过compensableMethodContext.getTransactionContext().getStatus()判断当前所处的阶段;但是PROVIDER角色时,参与者参与事务,因此需要知道自己当前处于当前事务的哪个阶段执行过程TRYING、CONFIRMING、CANCELLING。

   private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Transaction transaction = null;
                //是否异步执行cc
        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        try {
        //获取TransactionContext().getStatus()中,当前TCC执行的阶段
            switch (compensableMethodContext.getTransactionContext().getStatus()) {
/** ……………………………………………………………………………………………………TRYING………………………………………………………………………………………………………………*/
                case TRYING:
           //如果trying阶段,需要propagationNewBegin传播并发起分支事务(包括创建transaction、存储、注册)
                    transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());

                    Object result = null;

                    try {
                        //执行原try逻辑
                        result = compensableMethodContext.proceed();
                      //设置transactionManager上事务队列首个事务状态TRY_SUCCESS,并在数据库中更新
                        transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS);
                    } catch (Throwable e) {
                        try {
                        //如果try执行失败,设置transactionManager上事务队列首个事务状态为TRY_FAILED,并更新数据库中数据
                            transactionManager.changeStatus(TransactionStatus.TRY_FAILED);
                        } catch (Exception ignore) {
                            //ignore
                        }
                        throw e;
                    }

                    return result;

/** ……………………………………………………………………………………………………CONFIRMING………………………………………………………………………………………………………………*/
                case CONFIRMING:
                    try {
                        //获取传播分支事务
                        transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                      //执行提交
                        transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                        logger.warn("no existed transaction found at CONFIRMING stage, will ignore and confirm automatically. transaction xid:{}", compensableMethodContext.getTransactionContext().getXid());
                    }
                    break;

/** ……………………………………………………………………………………………………CANCELLING………………………………………………………………………………………………………………*/
                case CANCELLING:
                    try {
                        //从消费者端传递过来的,分支事务branch的状态status
                        ParticipantStatus transactionStatusFromConsumer = compensableMethodContext.getTransactionContext().getParticipantStatus();

                        //获取传播分支事务
                        transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());

  // 1 只有transaction's status 处于TRY_SUCCESS、TRY_FAILED、CANCELLING stage,才能rollback(表示第一阶段的try执行结束,所以,单纯的trying状态,是不能够rollback的)
  // 2 如果 transactionStatusFromConsumer 是TRY_SUCCESS,也表示第一阶段try执行结束, 无论当前transaction的状态是否是trying,都可以rollback
    // (原因:transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS 可能会发生的,就是当 transaction's changeStatus 是异步执行时.)
                        if (transaction.getStatus().equals(TransactionStatus.TRY_SUCCESS)
                                || transaction.getStatus().equals(TransactionStatus.TRY_FAILED)
                                || transaction.getStatus().equals(TransactionStatus.CANCELLING)
                                || ParticipantStatus.TRY_SUCCESS.equals(transactionStatusFromConsumer)) {
                            transactionManager.rollback(asyncCancel);
                        } else {
                            //in this case, transaction's Status is TRYING and transactionStatusFromConsumer is TRY_FAILED
                            // this may happen if timeout exception throws during rpc call.
                            throw new IllegalTransactionStatusException("Branch transaction status is TRYING, cannot rollback directly, waiting for recovery job to rollback.");
                        }

                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                        logger.info("no existed transaction found at CANCELLING stage, will ignore and cancel automatically. transaction xid:{}", compensableMethodContext.getTransactionContext().getXid());
                    }
                    break;
            }

        } finally {
          //清理当前事务对象
            transactionManager.cleanAfterCompletion(transaction);
        }

        Method method = compensableMethodContext.getMethod();

        return ReflectionUtils.getNullValue(method.getReturnType());
    }

对于服务提供者provider,在事务执行过程的逻辑如下:

 switch (compensableMethodContext.getTransactionContext().getStatus()) {

 TRYING:
    1 propagationNewBegin传播并发起分支事务------该方法调用前提:TransactionStatus.TRYING&ParticipantRole.PROVIDER;
  2 compensableMethodContext.proceed()执行try方法逻辑;
  3 transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS) try成功,修改transaction状态,并update持久存储内;
    2.1 如果try失败,transactionManager.changeStatus(TransactionStatus.TRY_FAILED),修改transaction状态,并update;
  4 return;

 CONFIRMING:
 1 propagationExistBegin传播并获取分支事务------该方法调用前提:TransactionStatus.CONFIRMING&ParticipantRole.PROVIDER;
 2 transactionManager.commit(asyncConfirm)提交;
   (confirm失败的逻辑,在transactionManager中,会update更新transaction)

 CANCELLING:
  1 propagationExistBegin传播并获取分支事务------该方法调用前提:TransactionStatus.CANCELLING&ParticipantRole.PROVIDER;
  2 可以执行rollback的条件和情况如下: 
    // 1 只有transaction's status 处于TRY_SUCCESS、TRY_FAILED、CANCELLING stage,才能rollback(表示第一阶段的try执行结束,所以,单纯的trying状态,是不能够rollback的)
  // 2 如果 transactionStatusFromConsumer 是TRY_SUCCESS,也表示第一阶段try执行结束, 无论当前transaction的状态是否是trying,都可以rollback
    // (原因:transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS 可能会发生的,就是当 transaction's changeStatus 是异步执行时.)

    finally{
      transactionManager.cleanAfterCompletion(transaction);清理事务
    }

2 ResourceCoordinatorInterceptor资源协调拦截器

 

用于记录tcc事务的Participant(参与方),ResourceCoordinatorInterceptor只在try阶段使用,负责participant的创建、添加入transaction、pjp.proceed执行try方法、根据try执行是否成功设置participant的执行状态。负责参与者的管理。

 

(1)ResourceCoordinatorAspect 切面类
@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable) || @annotation(org.mengyun.tcctransaction.api.EnableTcc)")
    public void transactionResourcePointcut() {

    }

    @Around("transactionResourcePointcut()")
    public Object interceptTransactionResourceMethodWithCompensableAnnotation(ProceedingJoinPoint pjp) throws Throwable {

        Method method = ((MethodSignature) pjp.getSignature()).getMethod();

        Compensable compensable = method.getAnnotation(Compensable.class);

        return interceptTransactionContextMethod(new AspectJTransactionMethodJoinPoint(pjp, compensable, ThreadLocalTransactionContextEditor.class));
    }

    public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }

    public void setTransactionManager(TransactionManager transactionManager) {
        this.resourceCoordinatorInterceptor.setTransactionManager(transactionManager);
    }

    public abstract int getOrder();
}

具体分析和可补偿事务拦截器相同,省略。

(2)ConfigurableCoordinatorAspect切面配置类(执行优先级)
@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1;
    }

    @Autowired
    public void setTransactionManager(TransactionManagerFactory transactionManagerFactory) {
        super.setTransactionManager(transactionManagerFactory.getTransactionManager());
    }
}

切面优先级为Ordered.HIGHEST_PRECEDENCE + 1,其在可补偿事务拦截器切面后执行。

(3)ResourceCoordinatorInterceptor资源协调拦截器

这个拦截器两个作用:
1负责participant参与者对象的创建(条件:事务transaction对象不为null&&transaction.getStatus()事务状态为TRYING),然后将participant加入transaction的list中,并update入持久化存储;

2并在try执行过程中,如果执行成功,设置ParticipantStatus.TRY_SUCCESS;如果失败,ParticipantStatus.TRY_FAILED。

总结:

ResourceCoordinatorInterceptor只在try阶段使用,负责participant的创建、添加入transaction、pjp.proceed执行try方法、根据try执行是否成功设置participant的执行状态。负责参与者的管理。

public class ResourceCoordinatorInterceptor {

    private TransactionManager transactionManager;

    public void setTransactionManager(TransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable {

        Transaction transaction = transactionManager.getCurrentTransaction();

        if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) { //1 

            //当事务状态为trying时,添加事务参与者
            Participant participant = enlistParticipant(pjp);

            Object result = null;
          //创建TransactionStatus.TRYING, ParticipantStatus.TRYING两个状态的事务上下文TransactionContext
            TransactionContext transactionContext = new TransactionContext(transaction.getRootDomain(), transaction.getRootXid(), participant.getXid(), TransactionStatus.TRYING, ParticipantStatus.TRYING);
            FactoryBuilder.factoryOf(participant.getTransactionContextEditorClass()).getInstance().set(transactionContext, pjp.getTarget(), pjp.getMethod(), pjp.getArgs());
            try {
                //执行try方法原逻辑
                result = pjp.proceed(pjp.getArgs());
              //如果执行成功,设置参与者participantStatus为TRY_SUCCESS
                participant.setStatus(ParticipantStatus.TRY_SUCCESS);
            } catch (Throwable e) {
                participant.setStatus(ParticipantStatus.TRY_FAILED);

                //if root transaction, here no need persistent transaction
                // because following stage is rollback, transaction's status is changed to CANCELING and save
//                    transactionManager.update(participant);
                throw e;
            } finally {
                FactoryBuilder.factoryOf(participant.getTransactionContextEditorClass()).getInstance().clear(transactionContext, pjp.getTarget(), pjp.getMethod(), pjp.getArgs());
            }
            return result;
        } //1 

      //如果不需要加入participant对象到transaction,那么执行方法原逻辑(即拦截器链,继续向下执行,try方法)
        return pjp.proceed(pjp.getArgs());
    }

    private Participant enlistParticipant(TransactionMethodJoinPoint pjp) {

        Transaction transaction = transactionManager.getCurrentTransaction();
        CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction);

        String confirmMethodName = compensableMethodContext.getConfirmMethodName();
        String cancelMethodName = compensableMethodContext.getCancelMethodName();
        Class<? extends TransactionContextEditor> transactionContextEditorClass = compensableMethodContext.getTransactionContextEditorClass();
        TransactionXid xid = TransactionXid.withUniqueIdentity(null);

        Class targetClass = compensableMethodContext.getDeclaredClass();

        InvocationContext invocationContext = new InvocationContext(targetClass,
                confirmMethodName,
                cancelMethodName,
                compensableMethodContext.getMethod().getParameterTypes(), compensableMethodContext.getArgs());

      //构造参与者Participant
        Participant participant =
                new Participant(
                        transaction.getRootDomain(),
                        transaction.getRootXid(),
                        xid,
                        invocationContext,
                        transactionContextEditorClass);

      //将participant加入transactionManager的当前transaction的list<Participant>中,然后update
        transactionManager.enlistParticipant(participant);
        return participant;
    }
}