一、分布式事务 实战:TCC-transaction分布式事务(TCC事务原理)

一TCC-transaction分布式事务(TCC事务原理)

一 TCC事务原理

1.1 TCC事务原理

tcc分布式事务是一个两阶段的事务解决方案,但是是业务层级的事务定义(2PC/3PC依赖于XA规范的数据库db来实现)。解决了事务运行过程中,对资源锁定(独占数据库)的大颗粒度的问题,使得锁的粒度可以依据业务,自己控制精细程度。

本质是补偿的思路,把事务运行过程分成 Try、Confirm / Cancel 两个阶段。在每个阶段的逻辑由业务代码控制。

Try 阶段:
完成所有业务检查( 一致性 )
预留必须业务资源( 准隔离性 )
Try :尝试执行业务
Confirm / Cancel 阶段:
释放 Try 阶段预留的业务资源

真正执行业务,不做任务业务检查

Cancel 操作满足幂等性
Confirm 操作满足幂等性

Confirm :确认执行业务
Cancel :取消执行业务

Confirm 与 Cancel 互斥

 

 

在tcc-transaction框架中,红框部分,由tcc-transaction-core模块实现:

1 启动业务活动;
2 登记业务操作;
3 commit、rollback业务活动

黄框部分由tcc-transaction-http-sample案例实现:

try操作;
confirm操作;
cancel操作;

TCC-transaction与2PC协议比较:

1 tcc事务位于业务服务层,而不是2PC的位于资源层的事务;
2 没有prepare阶段,try兼具资源的操作与prepare;
3 try可以自由定制事务(资源)的锁定粒度;

1.2 TCC-Transaction2.0框架

1.2.1 TCC-Transaction工作原理

随着传统的单体架的构微服务化,原本单体架构中不同模块,被拆分为若干个功能简单、松耦合的服务。系统微服务化后,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。

TCC-Transaction的工作流程如下:

 

  • 第一阶段:主业务服务A,经过事务拦截器,分别调用所有从业务的 try 操作,并在中登记所有从业务服务。当所有从业务服务的 try 操作都调用成功或者某个从业务服务的 try 操作失败,进入第二阶段。
  • 第二阶段:事务拦截器据第一阶段的执行结果来执行 confirm 或 cancel 操作。如果第一阶段所有 try 操作都成功,则事务拦截器调用所有从业务活动的 confirm操作。否则调用所有从业务服务的 cancel 操作。

需要注意的是第二阶段 confirm 或 cancel 操作本身也是满足最终一致性的过程,在调用 confirm 或 cancel 的时候也可能因为某种原因(比如网络)导致调用失败,所以需要事务协调者支持重试的能力,同时这也就要求 confirm 和 cancel 操作具有幂等性。

1.2.2 TCC-Transaction提供的核心功能以及2.0版本新功能

本文研究的TCC分布式事务的框架,是基于TCC-Transaction2.X版本的,相比于1.X版本,为适应微服务化,新添加了部分核心功能。

1 embedded和server模式(2.X)

在2.X版本中,新增了Server模式架构,其相比于1.X版本(embedded模式),有如下的不同:

 

从上图可以看出,tcc-transaction为了降低对于业务服务的侵入,同时也为了减少在1.X版本(相当于embedded模式)中,每个分布式服务需要部署和配置自己TCC-Client的本地事务存储等复杂且繁琐的配置,2.x将1.x中的事务存储事务补偿,挪到了tcc-server里面,原有的本地事务存储,改成了远程事务存储的方式。

  • 远程事务存储,对应实现类RemotingTransactionStorage
  • tcc-server,对应服务源码tcc-transaction-server
(1)embedded模式

在1.X版本中,模式采用2.X版本的embedded集成模式,每个在TCC分布式事务中涉及到的分布式服务(如订单、资金、红包服务),都需要单独配置TCC-Client,其中只包括事件存储模块内容(2.X的server模式,将本地事务存储和事务补偿服务,移到Tcc-server模块,单独部署server,此时就减少对每个微服务配置Transaction的持久化数据库的内容)。

 

在系统配置过程中,需要对每个模块配置tcc-client的持久化存储,以embedded存储模式为redis为例:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tcc="http://www.tcctransaction.org/schema/tcc" xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.tcctransaction.org/schema/tcc http://www.tcctransaction.org/schema/tcc.xsd">

    <!--驱动TccClient-->
    <tcc:annotation-driven client-config="clientConfig"/>

    <bean class="org.mengyun.tcctransaction.ClientConfig" id="clientConfig">
        <property name="recoveryConfig">
            <bean class="org.mengyun.tcctransaction.properties.RecoveryProperties">
                <!--开启补偿任务-->
                <property name="recoveryEnabled" value="true"/>
            </bean>
        </property>
        <property name="storeConfig">
            <bean class="org.mengyun.tcctransaction.properties.store.StoreProperties">
                <property name="domain" value="TCC:DUBBO:CAPITAL"/>
                <!--embedded存储模式(REDIS)-->
                <property name="storageType" value="REDIS"/>
                <property name="redis">
                    <bean class="org.mengyun.tcctransaction.properties.store.RedisStoreProperties">
                        <property name="host" value="127.0.0.1"/>
                        <property name="port" value="6379"/>
                        <property name="database" value="0"/>
                        <property name="poolConfig">
                            <bean class="redis.clients.jedis.JedisPoolConfig">
                                <property name="maxTotal" value="100"/>
                                <property name="maxIdle" value="100"/>
                                <property name="maxWaitMillis" value="300"/>
                                <property name="minIdle" value="10"/>
                            </bean>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

</beans>

(2) server模式

 

为了减少tcc-transaction对微服务系统的侵入性和减少共同配置,将1.X版本中每个微服务的Tcc-Client下面的持久化存储模块,移到Tcc-server内(并移入事务恢复模块),这样减少了事务持久化的配置:

这里以注册中心[direct]+存储类型[memory]+任务部署类型[quartz集群模式]的组合,application.yaml可如下配置:(由于tcc-server内包含事物存储和事务恢复,因此需要配置存储类型及其信息、定期恢复的quartz的配置信息)

server:
  port: 12332
logging:
  level:
    root: info
spring:
  application:
    name: tcc-transaction-server
  tcc:
    storage:
      storage-type: memory
      max-attempts: 1
    recovery:
      quartz-clustered: true
      quartz-data-source-url: jdbc:mysql://localhost:3306/TCC_SERVER?useSSL=false&allowPublicKeyRetrieval=true
      quartz-data-source-driver: com.mysql.jdbc.Driver
      quartz-data-source-user: root
      quartz-data-source-password: welcome1
    registry:
      registry-type: direct
      cluster-name: default
    remoting:
      listen-port: 2332

2 事务持久化类型

为了保证事务在执行过程的各种异常情况下不丢失,就考虑对transaction持久化存储。目前提供了memory、jdbc、redis、remoting的模式。

在配置server模式的情况下,由于使用TCC-server服务器部署事务持久化、事务恢复,因此此时事务配置的类型为remoting远程持久化模式。

spring:
  tcc:
    storage:
      storage-type: remoting
    registry:
      registry-type: direct
      cluster-name: default
      direct:
        server-addresses: 127.0.0.1:2332
      zookeeper:
        connect-string: 127.0.0.1:2181
        max-retries: 4
      nacos:
        server-addr: 127.0.0.1:8848

3 隐式或显示传递TransactionContext参数

transactionContext在Tcc-transaction的执行过程中,是root根事务传递给branch的参数,提供了当前事务的相关信息:

public class TransactionContext implements Serializable {
    private static final long serialVersionUID = -8199390103169700387L;
    // 取自上一级事件参与方的事务ID,作为当前分支事件ID
    private Xid xid;
    // 主事件ID
    private Xid rootXid;
    // 主事件Domain
    private String rootDomain;
    // 事件状态
    private TransactionStatus status = TransactionStatus.TRYING;
    // 参与方状态
    private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
    // 扩展字段
    private Map<String, String> attachments = new ConcurrentHashMap<String, String>();

}

Tcc-transaction提供了两种方式来传递该参数:

(1)显式传递:

此时对于api模块的接口方法不再用@EnableTcc注解标注:

 public interface CapitalTradeOrderService {
   //显示传递TransactionContext
        public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto);
    }

实现类中:

  @Override
    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
    @Transactional
    public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
        ...
    }

最后在调用两个服务的order中:

     @Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
       @Transactional
       public void makePayment(String orderNo) {

           System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

           Order order = orderRepository.findByMerchantOrderNo(orderNo);

           String result = tradeOrderServiceProxy.record(buildCapitalTradeOrderDto(order));
           String result2 = tradeOrderServiceProxy.record(buildRedPacketTradeOrderDto(order));
       }

(2)隐式传递:

tcc-transaction-dubbo-capital-api开发 #

在api模块中定义的接口方法中,使用@EnableTCC隐式传递上下文:

public interface CapitalTradeOrderService {
    @EnableTcc
    public String record(CapitalTradeOrderDto tradeOrderDto);
}

然后再tcc-transaction-dubbo-capital模块的开发中:实现方法用@Compensable注解

public class CapitalTradeOrderServiceImpl implements CapitalTradeOrderService {
    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
  @Transactional
    public String record(CapitalTradeOrderDto tradeOrderDto) {
        // do someting ...
        return "success";
    }

    public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) {
        // do something ...
    }

    public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) {
        // do something ...
    }
}

最后在order调用capital、redPacket服务中:

@Service
public class PaymentServiceImpl {
    @Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
    public void makePayment(@UniqueIdentity String orderNo) {
        String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
        String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
    }
    public void confirmMakePayment(String orderNo) {
        // do something ...
    }
    public void cancelMakePayment(String orderNo) {
        // do something ...
    }
}

4 支持RPC框架

目前支持dubbo、grpc、openfeign等框架。

1.2.3 TCC-Transaction核心类和注解及概述

1 ConfigurableTransactionAspect

分布式事务切面,用于对方法注解@Compensable的拦截,具体拦截逻辑见CompensableTransactionInterceptor

2 ConfigurableCoordinatorAspect

事务协调切面,用于对方法注解@Compensable和@EnableTcc的拦截,具体拦截逻辑见ResourceCoordinatorInterceptor
注意:当遇到@Compensable时,切面执行顺序为ConfigurableTransactionAspect>ConfigurableCoordinatorAspect

3 CompensableTransactionInterceptor

对注解@Compensable进行拦截,实现对主事务、分支事务的try-confirm-cancel流程。

4 ResourceCoordinatorInterceptor

对注解@Compensable或者@EnableTcc进行拦截,实现事件及参与方存储。

5 TransactionContext

事件上下文,分支事务场景用到。

public class TransactionContext implements Serializable {
    private static final long serialVersionUID = -8199390103169700387L;
    // 取自上一级事件参与方的事务ID,作为当前分支事件ID
    private Xid xid;
    // 主事件ID
    private Xid rootXid;
    // 主事件Domain
    private String rootDomain;
    // 事件状态
    private TransactionStatus status = TransactionStatus.TRYING;
    // 参与方状态
    private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
    // 扩展字段
    private Map<String, String> attachments = new ConcurrentHashMap<String, String>();

}

6 Transaction

事件,每个事务都会产生一个事件。

public class Transaction implements Serializable {
    // 参与方列表
    private List<Participant> participants = new ArrayList<Participant>();
    // 预留字段
    private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
    // 事件ID
    private Xid xid;
    // 主事件ID,当前事件为主事件时,xid = rootId
    private Xid rootXid;
    // 主domain
    private String rootDomain;
    // 事件类型,主事件-ROOT(1),分支事件-BRANCH(2)
    private TransactionType transactionType;
    // 事件状态,TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12);
    private TransactionStatus status;
    // 事件创建时间
    private Date createTime = new Date();
    // 事件更新时间
    private Date lastUpdateTime = new Date();
    // 事件重试次数,记录事件补偿的次数
    private volatile int retriedCount = 0;
    // 版本号
    private long version = 0L;
    
    // 添加参与方
    public void enlistParticipant(Participant participant) {
        participants.add(participant);
    }
    // 执行事件commit操作
    public void commit() {
        for (Participant participant : participants) {
            if (!participant.getStatus().equals(ParticipantStatus.CONFIRM_SUCCESS)) {
                participant.commit();
                participant.setStatus(ParticipantStatus.CONFIRM_SUCCESS);
            }
        }
    }

    // 执行事件rollback操作
    public void rollback() {
        for (Participant participant : participants) {
            if (!participant.getStatus().equals(ParticipantStatus.CANCEL_SUCCESS)) {
                participant.rollback();
                participant.setStatus(ParticipantStatus.CANCEL_SUCCESS);
            }
        }
    }

}

7 Participant

参与方,一个事件有多个参与方。

public class Participant implements Serializable {
    private static final long serialVersionUID = 4127729421281425247L;
    // 主事件ID
    private Xid rootXid; 
    // 主事件Domain
    private String rootDomain; 
    // 参与方事件ID,分支事务时,作为目标分支事务的事件ID
    private Xid xid; 
    // 方法调用上下文,执行confirm或cancel时用到
    private InvocationContext invocationContext; 
    // TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12), CONFIRM_SUCCESS(21), CANCEL_SUCCESS(31);
    private ParticipantStatus status = ParticipantStatus.TRYING; 

    // rollback操作,执行参与方cancel流程
    public void rollback() {
        Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CANCELLING, status), new Invocation(invocationContext.getCancelMethodName(), invocationContext), transactionContextEditorClass);
    }
    // commit操作,执行参与方confirm流程
    public void commit() {
        Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CONFIRMING, status), new Invocation(invocationContext.getConfirmMethodName(), invocationContext), transactionContextEditorClass);
    }

}
// 执行confirm或cancel时反射调用时用到
public class InvocationContext implements Serializable {
    private static final long serialVersionUID = -7969140711432461165L;
    private Class targetClass;          // 事务调用的目标类
    private String confirmMethodName;   // confirm方法名
    private String cancelMethodName;    // cancel方法名
    private Class[] parameterTypes;     // 参数类型列表
    private Object[] args;              // 参数列表
}

8 @Compensable 注解Try方法

此注解放在try方法上,用于定义tcc的执行逻辑,即try-confirm-cancel三阶段的实现,特别注意:confirm和cancel方法参数要与try保持一致 属性说明

参数名 含义 类型 可选值 默认值
propagation 传播方式,预留字段,暂时没用到 String
confirmMethod confirm方法名 String
cancelMethod cancel方法名 String
asyncConfirm 异步confirm boolean 默认为false
asyncCancel 异步cancel boolean 默认为false

9 @EnableTcc 声明service接口并隐式传递context

此注解用于声明tcc接口,用于远程调用时传递事件上下文(TransactionContext)

10 @UniqueIdentity

此注解作用于方法参数上,参数一般为订单号,可保证同一个订单并发请求是只产生一个事件,第二个请求时会报异常。
使用可参考如下:

    @Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
    public void makePayment(@UniqueIdentity String orderNo) {
        System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        Order order = orderRepository.findByMerchantOrderNo(orderNo);

        String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
        String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
    }