二 TCC-transaction分布式事务关键组件--1Participant参与者、TransactionManager事务管理器
二 TCC-transaction关键组件类及原理
2.1 框架原理
2.1.1 基本使用
在一个模块配置Tcc-transaction并使用,需要执行如下操作:
1.1 生效tcc-transaction
在spring项目中可以使用xml配置或者注解来生效tcc-transaction。
a.XML配置
<tcc:annotation-driven transaction-repository="transactionRepository"/>
b.注解配置
@EnableTccTransaction
1.2 配置transaction repository
transaction repository用来保存事务日志,框架提供多种Transaction Repository:
在tcc-transaction-dubbo-sample中使用MemoryStoreTransactionRepository(没有持久化能力,仅可用于测试):
<bean id="transactionRepository" class="org.mengyun.tcctransaction.repository.MemoryStoreTransactionRepository">
<property name="domain" value="TCC:DUBBO:CAPITAL:"/>
</bean>
1.3 声明tcc接口
在接口方法上加上@EnableTcc(1.7.x新增注解,1.6.x是@Compensable)将提供的接口标记为tcc接口:
public interface CapitalTradeOrderService {
@EnableTcc
public String record(CapitalTradeOrderDto tradeOrderDto);
}
1.4 配置TCC事务的Try、Confirm、Cancel方法:
在tcc接口实现上方法上添加@Compensable注解,设置confirmMethod和cancelMethod方法,分别为tcc的confirm和cancel方法。
@Override
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = DubboTransactionContextEditor.class)
@Transactional
public String record(CapitalTradeOrderDto tradeOrderDto) {
...
}
在TCC 里,一个业务活动可以有多个事务,每个业务操作归属于不同的事务,即一个事务可以包含多个业务操作。TCC-Transaction 将每个业务操作抽象成事务参与者,每个事务可以包含多个参与者。
参与者需要声明 try / confirm / cancel 三个类型的方法,和 TCC 的操作一一对应。在程序里,通过 @Compensable 注解标记在 try 方法上,并填写对应的 confirm / cancel 方法,示例代码如下:
// try
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// confirm
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// cancel
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
try阶段是由业务方发起调用,而confirm和cancel由事务管理器自动调用。在这里,事务管理器就是CompensableTransactionInterceptor(通过组合的方式,里面含有的TransactionManager才是真正的事务管理器,TransactionManager的方法会被调用),confirm和cancel方法由它自动调用。
主要通过spring-aop拦截器CompensableTransactionInterceptor、ResourceCoordinatorInterceptor
CompensableTransactionInterceptor : 用于tcc事务的流程执行begin(try)、commit(confirm)、rollback(cancel)
ResourceCoordinatorInterceptor : 用于记录tcc事务的Participant(参与方)
2.1.2 框架基本流程
2.2 核心组件类
针对TCC-Transaction框架中涉及的关键组件,按照如图所示的模块,首先对各个关键组件进行分析:
2.2.1 事务参与者Participant与事务Transaction
关系如图:
TCC将事务和事务参与者,抽象为两个类Transaction和Participant。
一个transaction可以有多个participant参与者
Transaction---{participant1;participant2;participant3......}
Transaction{
List<Participant> participants //事务对象中,用list保存当前事务的参与者
commit()/rollback() ;//事务提供的方法,通过for循环,遍历list中每个participant.commit/rollback方法实现方法提交、回滚
}
Participant{
transactionXid ;//当前事务的全局id
confirm/cancelInvocationContext;//confirm或者cancel方法调用上下文(包含调用的所有信息)
Terminator ;// commit/rollback通过Terminator对象,完成方法的调用
TransactionContextEditorClass;//transactionContext事务上下文的编辑类,可以通过该类修改和获取事务上下文
//事务的提交与回滚,通过Terminator调用
commit/rollback{
Terminator.invoke(new TransactionContext,new Invocation,transactionContextEditorClass);
}
}
1 Transaction事务
关注核心的属性和方法
//事务对象需要持久存储,所以需要其实现序列化Serializable
public class Transaction implements Serializable {
private List<Participant> participants = new ArrayList<>(); //事务的参与者participants列表
private Map<String, Object> attachments = new ConcurrentHashMap<>(); //附加属性
private Date createTime = new Date();
private Xid xid; //事务编号,唯一地标识一个事务,UUID算法生成
private Xid rootXid;
private String rootDomain;
private TransactionType transactionType; //事务类型(包含ROOT根事务、BRANCH分支事务),表示当前事务时发起者还是参与者
private TransactionStatus status;//事务状态(包含trying、confirming、cancelling),表示当前事务在tcc执行的阶段
private Date lastUpdateTime = new Date();
private volatile int retriedCount = 0;//重试次数
private long version = 0L; //版本号,用于乐观锁更新事务
public Transaction() {
}
//该构造函数,创建trying阶段的分支branch事务
public Transaction(TransactionContext transactionContext) {
this.xid = transactionContext.getXid();
this.rootXid = transactionContext.getRootXid();
this.rootDomain = transactionContext.getRootDomain();
this.status = TransactionStatus.TRYING;
this.transactionType = TransactionType.BRANCH;
}
//该构造函数,创建trying阶段的root事务
public Transaction(Object uniqueIdentity, String rootDomain) {
this.xid = TransactionXid.withUniqueIdentity(uniqueIdentity);
this.status = TransactionStatus.TRYING;
this.transactionType = TransactionType.ROOT;
this.rootXid = xid;
this.rootDomain = rootDomain;
}
//commit使用fori循环,调用每个参与者participant.commit
public void commit() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CONFIRM_SUCCESS)) {
participant.commit();
//commit提交成功后,修改参与者的状态
participant.setStatus(ParticipantStatus.CONFIRM_SUCCESS);
}
}
}
public void rollback() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CANCEL_SUCCESS)) {
participant.rollback();
participant.setStatus(ParticipantStatus.CANCEL_SUCCESS);
}
}
}
//添加事务参与者
public void enlistParticipant(Participant participant) {
participants.add(participant);
}
(1)TransactionXid
其中,TransactionXid使用uuid算法生成全局唯一的事务id:
public static TransactionXid withUniqueIdentity(Object uniqueIdentity) {
int formatId = Xid.AUTO;
String xid = null;
if (uniqueIdentity == null) {
xid = FactoryBuilder.factoryOf(UUIDGenerator.class).getInstance().generate();
} else {
xid = uniqueIdentity.toString();
formatId = Xid.CUSTOMIZED;
}
return new TransactionXid(formatId, xid);
}
(2)TransactionStatus
public enum TransactionStatus {
TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12);
(3)TransactionType
public enum TransactionType {
ROOT(1),
BRANCH(2);
2 Participant事务参与者
public class Participant implements Serializable {
Class<? extends TransactionContextEditor> transactionContextEditorClass; //事务上下文编辑器类
private Xid rootXid;
private String rootDomain;
private Xid xid; //事务id
private InvocationContext invocationContext;
private ParticipantStatus status = ParticipantStatus.TRYING; //参与者状态
public Participant() {
}
public void rollback() {
//利用执行器调用对应的rollback方法
Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CANCELLING, status), new Invocation(invocationContext.getCancelMethodName(), invocationContext), transactionContextEditorClass);
}
public void commit() {
Terminator.invoke(new TransactionContext(rootDomain, rootXid, xid, TransactionStatus.CONFIRMING, status), new Invocation(invocationContext.getConfirmMethodName(), invocationContext), transactionContextEditorClass);
}
participant的xid事务id,通过该id关联上其所属的事务。同一个事务下的事务id相同,通过同id执行tcc的参与者的操作。
(1)ParticipantStatus
参与者的状态,标识当前事务参与者,在tcc执行过程中,所处于TCC阶段的状态
public enum ParticipantStatus {
TRYING(1), CONFIRMING(2), CANCELLING(3), TRY_SUCCESS(11), TRY_FAILED(12), CONFIRM_SUCCESS(21), CANCEL_SUCCESS(31);
private int id;
(2) TransactionContext事务上下文
public class TransactionContext implements Serializable {
private static final long serialVersionUID = -8199390103169700387L;
private Xid xid;
private Xid rootXid;
private String rootDomain;
private TransactionStatus status = TransactionStatus.TRYING;
private ParticipantStatus participantStatus = ParticipantStatus.TRYING;
private Map<String, String> attachments = new ConcurrentHashMap<>();
(3) InvocationContext调用上下文
调用上下文,包含了当前调用的所有信息,包括目标类、参数类型、confirm和cancel方法名称
public class InvocationContext implements Serializable {
private static final long serialVersionUID = -7969140711432461165L;
private Class targetClass;
private String confirmMethodName;
private String cancelMethodName;
private Class[] parameterTypes;
private Object[] args;
(4)Invocation
调用,包括调用的方法名和InvocationContext
public class Invocation {
private String methodName;
private InvocationContext invocationContext;
public Invocation(String methodName, InvocationContext invocationContext) {
this.methodName = methodName;
this.invocationContext = invocationContext;
}
(5)Terminator执行器
public final class Terminator {
public static Object invoke(TransactionContext transactionContext, Invocation invocation, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
//当invocation中方法名不为空,即有实际的调用方法时
if (StringUtils.isNotEmpty(invocation.getMethodName())) {
//参与者对象类
Object target = FactoryBuilder.factoryOf(invocation.getTargetClass()).getInstance();
Method method = null;
try {
//参与者需要执行的方法(try/confirm/cancel)
method = target.getClass().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new SystemException(e);
}
//设置事务上下文transactionContext到transactionContextEditorClass上
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocation.getArgs());
try {
//反射调用
return method.invoke(target, invocation.getArgs());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new SystemException(e);
} finally {
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().clear(transactionContext, target, method, invocation.getArgs());
}
}
return null;
}
(6)TransactionContextEditor事务上下文编辑器
在TCC2.0版本以后,只有这个默认实现类,用threadLocal保存事务上下文对象。
public class ThreadLocalTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
return TransactionContextHolder.getCurrentTransactionContext();
}
@Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
TransactionContextHolder.setCurrentTransactionContext(transactionContext);
}
public void clear(TransactionContext transactionContext, Object target, Method method, Object[] args) {
TransactionContextHolder.clear();
}
}
public class TransactionContextHolder {
//创建当前线程变量ThreadLocal,保存事务上下文
private static ThreadLocal<TransactionContext> transactionContextThreadLocal = new ThreadLocal<>();
private TransactionContextHolder() {
}
public static TransactionContext getCurrentTransactionContext() {
return transactionContextThreadLocal.get();
}
public static void setCurrentTransactionContext(TransactionContext transactionContext) {
transactionContextThreadLocal.set(transactionContext);
}
public static void clear() {
transactionContextThreadLocal.remove();
}
}
可知,TransactionContextEditor是将事务上下文保存在本地线程对象ThreadLocal中。
2.2.2 事务管理器TransactionManager
事务管理器类,主要提供事务的begin、commit、rollback和获取事务、participant的新增管理
public class TransactionManager {
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<>();
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
private int threadQueueSize = 1024;
//TCC提供的异步执行Terminator线程池
private ExecutorService asyncTerminatorExecutorService = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadQueueSize), new ThreadPoolExecutor.AbortPolicy());
private ExecutorService asyncSaveExecutorService = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadQueueSize * 2), new ThreadPoolExecutor.CallerRunsPolicy());
private TransactionRepository transactionRepository;
//事务的开启、commit、rollback
public Transaction begin(Object uniqueIdentify) {
public void commit(boolean asyncCommit) {
public void rollback(boolean asyncRollback) {
//
public Transaction propagationNewBegin(TransactionContext transactionContext)
public Transaction propagationExistBegin(TransactionContext transactionContext)
//添加事务参与者Participant
public void enlistParticipant(Participant participant)
//注册事务Transaction
private void registerTransaction(Transaction transaction)
然后,对于事务管理器提供的核心方法,进行分析:
(a)TransactionManager.begin开启事务
发起根事务TransactionType.ROOT---此时创建事务类型为ROOT的根事务,并存储、注册。
public Transaction begin(Object uniqueIdentify) {
//TAG1 创建根事务
Transaction transaction = new Transaction(uniqueIdentify, this.transactionRepository.getDomain());
//为了性能调优,在创建transaction阶段,不着急持久化存储
if (transaction.getXid().getFormatId() == Xid.CUSTOMIZED) {
//TAG2 对于自定义的xid,确保在TCC阶段之前只创建一次事务(所以对于自定义的xid,需要在tcc阶段开始前,先持久化存储)
transactionRepository.create(transaction);
}
//TAG3 注册事务transaction
registerTransaction(transaction);
return transaction;
}
TAG1:这里创建事务Transaction,使用如下构造函数:
public Transaction(Object uniqueIdentity, String rootDomain) {
this.xid = TransactionXid.withUniqueIdentity(uniqueIdentity);
this.status = TransactionStatus.TRYING;
this.transactionType = TransactionType.ROOT;
this.rootXid = xid;
this.rootDomain = rootDomain;
}
Transaction有两个构造函数,当前构造函数只被begin()调用,只创建TransactionType.ROOT根事务、且 TransactionStatus.TRYING创建的是TCC阶段的try阶段。
TAG2:transactionRepository.create(transaction) 存储事务。详细见后续
TAG3 注册事务transaction
(b)TransactionManager.registerTransaction注册事务
//事务管理器的本地线程变量,其内存储事务Transaction的队列
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<>();
private void registerTransaction(Transaction transaction) {
if (CURRENT.get() == null) {
CURRENT.set(new LinkedList<Transaction>());
}
//将事务添加入本地线程事务队列的头部
CURRENT.get().push(transaction);
}
注意:
在事务管理器TransactionManager.register注册事务,是将事务对象添加入本地线程事务队列的头部,后面获取事务时,也是从头部获取。(因为在TCC事务中,支持一个事务管理器管理多个事务transaction,而后创建的事务,要先提交,所以,入队和出队都要从头部)
©TransactionManager.propagationNewBegin传播并发起分支事务
注意调用时机:创建分支事务,在try阶段调用。
这里,首先创建分支事务,然后注册。
public Transaction propagationNewBegin(TransactionContext transactionContext) {
//注意:这里创建分支BRANCH事务
Transaction transaction = new Transaction(transactionContext);
//for performance tuning, at create stage do not persistent为了性能调优角度,在创建阶段,不持久化存储事务transaction
// transactionRepository.create(transaction);
//注册事务
registerTransaction(transaction);
return transaction;
}
此处区分于beginTransaction,这里创建事务对象为TransactionType.BRANCH分支事务、TransactionStatus.TRYING事务状态为try阶段。分支事务和ROOT根事务有同一个事务id。
public Transaction(TransactionContext transactionContext) {
this.xid = transactionContext.getXid();
this.rootXid = transactionContext.getRootXid();
this.rootDomain = transactionContext.getRootDomain();
this.status = TransactionStatus.TRYING;
this.transactionType = TransactionType.BRANCH;
}
(d)TransactionManager.propagationExistBegin传播并获取分支事务
注意区别:
TransactionManager.propagationExistBegin:这里是侧重获取分支事务(需要从事务存储器中查找并获取事务);在confirm/cancle阶段调用
TransactionManager.propagationNewBegin是创建BRANCH事务,并存储,侧重于发起;在try阶段调用
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
//根据事务上下文transactionContext.getXid(),从事务存储器transactionRepository查询事务
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
//将获取的事务,重新注册入TransactionManager中
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
}
(e)TransactionManager.commit提交事务
public void commit(boolean asyncCommit) {
//TAG1 获取当前事务队列的首个transaction
final Transaction transaction = getCurrentTransaction();
//修改当前事务状态为CONFIRMING
transaction.setStatus(TransactionStatus.CONFIRMING);
//更新
transactionRepository.update(transaction);
if (asyncCommit) {
try {
long statTime = System.currentTimeMillis();
asyncTerminatorExecutorService.submit(() -> commitTransaction(transaction));
logger.debug("async submit cost time:{}", System.currentTimeMillis() - statTime);
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException.getCause());
//throw new ConfirmingException(commitException);
}
} else {
commitTransaction(transaction);
}
}
private void commitTransaction(Transaction transaction) {
try {
//提交事务
transaction.commit();
//完成commit后,从持久化存储中删除事务
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
//如果commit提交失败,更新CONFIRMING状态的事务
try {
transactionRepository.update(transaction);
} catch (Exception e) {
//ignore any exception here
}
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
}
综上,transactionManager.commit逻辑如下:
1 先从当前事务管理器transactionManager获取线程变量的事务队列的首个事务;
2 修改状态为confirming;
3 存储器中更新事务;
4 transaction.commit();----如果commit失败,更新transaction(在提交的过程中,会更改transactionContext、参与者状态等信息,因此需要更新)
5 提交成功后,删除当前事务
TAG1 getCurrentTransaction 获取当前线程变量的事务队列的头部事务
public Transaction getCurrentTransaction() {
if (isTransactionActive()) {
return CURRENT.get().peek();
}
return null;
}
//判断是否是活跃事务:只要当前线程变量的事务队列中有事务即可
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
}
(e)TransactionManager.rollback
public void rollback(boolean asyncRollback) {
final Transaction transaction = getCurrentTransaction();
transaction.setStatus(TransactionStatus.CANCELLING);
transactionRepository.update(transaction);
if (asyncRollback) {
try {
asyncTerminatorExecutorService.submit(() -> rollbackTransaction(transaction));
} catch (Throwable rollbackException) {
logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
} else {
rollbackTransaction(transaction);
}
}
private void rollbackTransaction(Transaction transaction) {
try {
transaction.rollback();
transactionRepository.delete(transaction);
} catch (Throwable rollbackException) {
//try save updated transaction
try {
transactionRepository.update(transaction);
} catch (Exception e) {
//ignore any exception here
}
logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
}
逻辑和commit相同
(e)TransactionManager.enlistParticipant添加参与者到transaction
该方法在try阶段被调用。
在添加参与者Participant时,先加入参与者队列;然后根据transaction.getVersion()是否为0,判断是否需要执行存储、或者进行更新
public void enlistParticipant(Participant participant) {
Transaction transaction = this.getCurrentTransaction();
transaction.enlistParticipant(participant);
if (transaction.getVersion() == 0L) {
// transaction.getVersion()为0,表示之前从未持久化存储过,需要调用进行存储
transactionRepository.create(transaction);
} else {
transactionRepository.update(transaction);
}
}