一TCC-transaction分布式事务(TCC事务原理)
一 TCC事务原理
1.1 TCC事务原理
tcc分布式事务是一个两阶段的事务解决方案,但是是业务层级的事务定义(2PC/3PC依赖于XA规范的数据库db来实现)。解决了事务运行过程中,对资源锁定(独占数据库)的大颗粒度的问题,使得锁的粒度可以依据业务,自己控制精细程度。
本质是补偿的思路,把事务运行过程分成 Try、Confirm / Cancel 两个阶段。在每个阶段的逻辑由业务代码控制。
Try 阶段:
完成所有业务检查( 一致性 )
预留必须业务资源( 准隔离性 )
Try :尝试执行业务
Confirm / Cancel 阶段:
释放 Try 阶段预留的业务资源
真正执行业务,不做任务业务检查
Cancel 操作满足幂等性
Confirm 操作满足幂等性
Confirm :确认执行业务
Cancel :取消执行业务
Confirm 与 Cancel 互斥
在tcc-transaction框架中,红框部分,由tcc-transaction-core模块实现:
1 启动业务活动;
2 登记业务操作;
3 commit、rollback业务活动
黄框部分由tcc-transaction-http-sample案例实现:
try操作;
confirm操作;
cancel操作;
TCC-transaction与2PC协议比较:
1 tcc事务位于业务服务层,而不是2PC的位于资源层的事务;
2 没有prepare阶段,try兼具资源的操作与prepare;
3 try可以自由定制事务(资源)的锁定粒度;
1.2 TCC-Transaction2.0框架
1.2.1 TCC-Transaction工作原理
随着传统的单体架的构微服务化,原本单体架构中不同模块,被拆分为若干个功能简单、松耦合的服务。系统微服务化后,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。
TCC-Transaction的工作流程如下:
- 第一阶段:主业务服务A,经过事务拦截器,分别调用所有从业务的 try 操作,并在中登记所有从业务服务。当所有从业务服务的 try 操作都调用成功或者某个从业务服务的 try 操作失败,进入第二阶段。
- 第二阶段:事务拦截器据第一阶段的执行结果来执行 confirm 或 cancel 操作。如果第一阶段所有 try 操作都成功,则事务拦截器调用所有从业务活动的 confirm操作。否则调用所有从业务服务的 cancel 操作。
需要注意的是第二阶段 confirm 或 cancel 操作本身也是满足最终一致性的过程,在调用 confirm 或 cancel 的时候也可能因为某种原因(比如网络)导致调用失败,所以需要事务协调者支持重试的能力,同时这也就要求 confirm 和 cancel 操作具有幂等性。
1.2.2 TCC-Transaction提供的核心功能以及2.0版本新功能
本文研究的TCC分布式事务的框架,是基于TCC-Transaction2.X版本的,相比于1.X版本,为适应微服务化,新添加了部分核心功能。
1 embedded和server模式(2.X)
在2.X版本中,新增了Server模式架构,其相比于1.X版本(embedded模式),有如下的不同:
从上图可以看出,tcc-transaction为了降低对于业务服务的侵入,同时也为了减少在1.X版本(相当于embedded模式)中,每个分布式服务需要部署和配置自己TCC-Client的本地事务存储等复杂且繁琐的配置,2.x将1.x中的事务存储和事务补偿,挪到了tcc-server里面,原有的本地事务存储,改成了远程事务存储的方式。
- 远程事务存储,对应实现类RemotingTransactionStorage
- tcc-server,对应服务源码tcc-transaction-server
(1)embedded模式
在1.X版本中,模式采用2.X版本的embedded集成模式,每个在TCC分布式事务中涉及到的分布式服务(如订单、资金、红包服务),都需要单独配置TCC-Client,其中只包括事件存储模块内容(2.X的server模式,将本地事务存储和事务补偿服务,移到Tcc-server模块,单独部署server,此时就减少对每个微服务配置Transaction的持久化数据库的内容)。
在系统配置过程中,需要对每个模块配置tcc-client的持久化存储,以embedded存储模式为redis为例:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tcc="http://www.tcctransaction.org/schema/tcc" xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.tcctransaction.org/schema/tcc http://www.tcctransaction.org/schema/tcc.xsd">
<!--驱动TccClient-->
<tcc:annotation-driven client-config="clientConfig"/>
<bean class="org.mengyun.tcctransaction.ClientConfig" id="clientConfig">
<property name="recoveryConfig">
<bean class="org.mengyun.tcctransaction.properties.RecoveryProperties">
<!--开启补偿任务-->
<property name="recoveryEnabled" value="true"/>
</bean>
</property>
<property name="storeConfig">
<bean class="org.mengyun.tcctransaction.properties.store.StoreProperties">
<property name="domain" value="TCC:DUBBO:CAPITAL"/>
<!--embedded存储模式(REDIS)-->
<property name="storageType" value="REDIS"/>
<property name="redis">
<bean class="org.mengyun.tcctransaction.properties.store.RedisStoreProperties">
<property name="host" value="127.0.0.1"/>
<property name="port" value="6379"/>
<property name="database" value="0"/>
<property name="poolConfig">
<bean class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="100"/>
<property name="maxIdle" value="100"/>
<property name="maxWaitMillis" value="300"/>
<property name="minIdle" value="10"/>
</bean>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
(2) server模式
为了减少tcc-transaction对微服务系统的侵入性和减少共同配置,将1.X版本中每个微服务的Tcc-Client下面的持久化存储模块,移到Tcc-server内(并移入事务恢复模块),这样减少了事务持久化的配置:
这里以注册中心[direct]+存储类型[memory]+任务部署类型[quartz集群模式]的组合,application.yaml可如下配置:(由于tcc-server内包含事物存储和事务恢复,因此需要配置存储类型及其信息、定期恢复的quartz的配置信息)
server:
port: 12332
logging:
level:
root: info
spring:
application:
name: tcc-transaction-server
tcc:
storage:
storage-type: memory
max-attempts: 1
recovery:
quartz-clustered: true
quartz-data-source-url: jdbc:mysql://localhost:3306/TCC_SERVER?useSSL=false&allowPublicKeyRetrieval=true
quartz-data-source-driver: com.mysql.jdbc.Driver
quartz-data-source-user: root
quartz-data-source-password: welcome1
registry:
registry-type: direct
cluster-name: default
remoting:
listen-port: 2332
2 事务持久化类型
为了保证事务在执行过程的各种异常情况下不丢失,就考虑对transaction持久化存储。目前提供了memory、jdbc、redis、remoting的模式。
在配置server模式的情况下,由于使用TCC-server服务器部署事务持久化、事务恢复,因此此时事务配置的类型为remoting远程持久化模式。
spring:
tcc:
storage:
storage-type: remoting
registry:
registry-type: direct
cluster-name: default
direct:
server-addresses: 127.0.0.1:2332
zookeeper:
connect-string: 127.0.0.1:2181
max-retries: 4
nacos:
server-addr: 127.0.0.1:8848
3 隐式或显示传递TransactionContext参数
transactionContext在Tcc-transaction的执行过程中,是root根事务传递给branch的参数,提供了当前事务的相关信息:
public class TransactionContext implements Serializable {
private static final long serialVersionUID = -8199390103169700387L;
// 取自上一级事件参与方的事务ID,作为当前分支事件ID
private Xid xid;
// 主事件ID
private Xid rootXid;
// 主事件Domain
private String rootDomain;
// 事件状态
private TransactionStatus status = TransactionStatus.TRYING;
// 参与方状态
private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
// 扩展字段
private Map<String, String> attachments = new ConcurrentHashMap<String, String>();
}
Tcc-transaction提供了两种方式来传递该参数:
(1)显式传递:
此时对于api模块的接口方法不再用@EnableTcc注解标注:
public interface CapitalTradeOrderService {
//显示传递TransactionContext
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto);
}
实现类中:
@Override
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
@Transactional
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
...
}
最后在调用两个服务的order中:
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
@Transactional
public void makePayment(String orderNo) {
System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
Order order = orderRepository.findByMerchantOrderNo(orderNo);
String result = tradeOrderServiceProxy.record(buildCapitalTradeOrderDto(order));
String result2 = tradeOrderServiceProxy.record(buildRedPacketTradeOrderDto(order));
}
(2)隐式传递:
tcc-transaction-dubbo-capital-api开发 #
在api模块中定义的接口方法中,使用@EnableTCC隐式传递上下文:
public interface CapitalTradeOrderService {
@EnableTcc
public String record(CapitalTradeOrderDto tradeOrderDto);
}
然后再tcc-transaction-dubbo-capital模块的开发中:实现方法用@Compensable注解
public class CapitalTradeOrderServiceImpl implements CapitalTradeOrderService {
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
@Transactional
public String record(CapitalTradeOrderDto tradeOrderDto) {
// do someting ...
return "success";
}
public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) {
// do something ...
}
public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) {
// do something ...
}
}
最后在order调用capital、redPacket服务中:
@Service
public class PaymentServiceImpl {
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
public void makePayment(@UniqueIdentity String orderNo) {
String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
}
public void confirmMakePayment(String orderNo) {
// do something ...
}
public void cancelMakePayment(String orderNo) {
// do something ...
}
}
4 支持RPC框架
目前支持dubbo、grpc、openfeign等框架。
1.2.3 TCC-Transaction核心类和注解及概述
1 ConfigurableTransactionAspect
分布式事务切面,用于对方法注解@Compensable的拦截,具体拦截逻辑见CompensableTransactionInterceptor
2 ConfigurableCoordinatorAspect
事务协调切面,用于对方法注解@Compensable和@EnableTcc的拦截,具体拦截逻辑见ResourceCoordinatorInterceptor
注意:当遇到@Compensable时,切面执行顺序为ConfigurableTransactionAspect>ConfigurableCoordinatorAspect
3 CompensableTransactionInterceptor
对注解@Compensable进行拦截,实现对主事务、分支事务的try-confirm-cancel流程。
4 ResourceCoordinatorInterceptor
对注解@Compensable或者@EnableTcc进行拦截,实现事件及参与方存储。
5 TransactionContext
事件上下文,分支事务场景用到。
public class TransactionContext implements Serializable {
private static final long serialVersionUID = -8199390103169700387L;
// 取自上一级事件参与方的事务ID,作为当前分支事件ID
private Xid xid;
// 主事件ID
private Xid rootXid;
// 主事件Domain
private String rootDomain;
// 事件状态
private TransactionStatus status = TransactionStatus.TRYING;
// 参与方状态
private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
// 扩展字段
private Map<String, String> attachments = new ConcurrentHashMap<String, String>();
}
6 Transaction
事件,每个事务都会产生一个事件。
public class Transaction implements Serializable {
// 参与方列表
private List<Participant> participants = new ArrayList<Participant>();
// 预留字段
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
// 事件ID
private Xid xid;
// 主事件ID,当前事件为主事件时,xid = rootId
private Xid rootXid;
// 主domain
private String rootDomain;
// 事件类型,主事件-ROOT(1),分支事件-BRANCH(2)
private TransactionType transactionType;
// 事件状态,TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12);
private TransactionStatus status;
// 事件创建时间
private Date createTime = new Date();
// 事件更新时间
private Date lastUpdateTime = new Date();
// 事件重试次数,记录事件补偿的次数
private volatile int retriedCount = 0;
// 版本号
private long version = 0L;
// 添加参与方
public void enlistParticipant(Participant participant) {
participants.add(participant);
}
// 执行事件commit操作
public void commit() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CONFIRM_SUCCESS)) {
participant.commit();
participant.setStatus(ParticipantStatus.CONFIRM_SUCCESS);
}
}
}
// 执行事件rollback操作
public void rollback() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CANCEL_SUCCESS)) {
participant.rollback();
participant.setStatus(ParticipantStatus.CANCEL_SUCCESS);
}
}
}
}
7 Participant
参与方,一个事件有多个参与方。
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
// 主事件ID
private Xid rootXid;
// 主事件Domain
private String rootDomain;
// 参与方事件ID,分支事务时,作为目标分支事务的事件ID
private Xid xid;
// 方法调用上下文,执行confirm或cancel时用到
private InvocationContext invocationContext;
// TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12), CONFIRM_SUCCESS(21), CANCEL_SUCCESS(31);
private ParticipantStatus status = ParticipantStatus.TRYING;
// rollback操作,执行参与方cancel流程
public void rollback() {
Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CANCELLING, status), new Invocation(invocationContext.getCancelMethodName(), invocationContext), transactionContextEditorClass);
}
// commit操作,执行参与方confirm流程
public void commit() {
Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CONFIRMING, status), new Invocation(invocationContext.getConfirmMethodName(), invocationContext), transactionContextEditorClass);
}
}
// 执行confirm或cancel时反射调用时用到
public class InvocationContext implements Serializable {
private static final long serialVersionUID = -7969140711432461165L;
private Class targetClass; // 事务调用的目标类
private String confirmMethodName; // confirm方法名
private String cancelMethodName; // cancel方法名
private Class[] parameterTypes; // 参数类型列表
private Object[] args; // 参数列表
}
8 @Compensable 注解Try方法
此注解放在try方法上,用于定义tcc的执行逻辑,即try-confirm-cancel三阶段的实现,特别注意:confirm和cancel方法参数要与try保持一致 属性说明
参数名 | 含义 | 类型 | 可选值 | 默认值 |
---|---|---|---|---|
propagation | 传播方式,预留字段,暂时没用到 | String | ||
confirmMethod | confirm方法名 | String | ||
cancelMethod | cancel方法名 | String | ||
asyncConfirm | 异步confirm | boolean | 默认为false | |
asyncCancel | 异步cancel | boolean | 默认为false |
9 @EnableTcc 声明service接口并隐式传递context
此注解用于声明tcc接口,用于远程调用时传递事件上下文(TransactionContext)
10 @UniqueIdentity
此注解作用于方法参数上,参数一般为订单号,可保证同一个订单并发请求是只产生一个事件,第二个请求时会报异常。
使用可参考如下:
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
public void makePayment(@UniqueIdentity String orderNo) {
System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
Order order = orderRepository.findByMerchantOrderNo(orderNo);
String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
}