六 TCC-transaction的事务恢复
五 TCC-transaction的事务恢复
由于2.X提供了embedded和server的部署模式,在embedded模式下,事务恢复仍旧由tcc-client实现;在server模式时,事务恢复交给tcc-server模块实现。这里,我仅以tcc-client端(embedded模式)的事务模式,研究Tcc-transaction的事务恢复过程。
5.1 TCC-Client的启动
public class SpringTccClient extends TccClient implements TransactionManagerFactory {
public SpringTccClient() {
super(new ClientConfig());
}
public SpringTccClient(ClientConfig clientConfig) {
super(clientConfig);
}
}
SpringTccClient类在spring配置类中,实例化进入spring容器。
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
@ComponentScan(value = "org.mengyun.tcctransaction", excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {XmlTccTransactionConfiguration.class})})
public class AnnotationTccTransactionConfiguration {
@Bean("springBeanFactory")
public SpringBeanFactory getSpringBeanFactory() {
return new SpringBeanFactory();
}
@Bean
@DependsOn({"springBeanFactory"})
public SpringTccClient getTccClient() {
return new SpringTccClient(clientConfig);
}
}
随后,在TccClient的初始化过程中:
@Override
@PostConstruct
//spring容器启动的时候,执行该方法
public void start() throws Exception {
this.isStarting = true;
//如果TCC-Transaction的客户端client的存储模式为REMOTING,表示使用server的部署结构
if (this.clientConfig.getStorageType() == StorageType.REMOTING) {
try {
this.registryService.start();
this.registryService.subscribe();
} catch (Exception e) {
logger.error("failed to initialize registryService, stop the application!", e);
StopUtils.stop();
}
initializeRemotingClient(); //初始化远程client
} else { //此时,使用本地存储,事务对象的持久化和事务恢复,都在本地
registerDomain(this.clientConfig.getDomain());
if (transactionRepository.supportRecovery() && this.clientConfig.isRecoveryEnabled()) {
//TAG1 scheduler
//TAG2 scheduler.registerScheduleAndStartIfNotPresent
scheduler.registerScheduleAndStartIfNotPresent(this.clientConfig.getDomain());
}
}
this.isStarting = false;
}
tccclient.start方法在spring容器启动加载该对象时,执行。
TccClient.start():
1 StorageType.REMOTING 当使用remoting存储模式时,需要向remotingClient注册当前client;
2 其他情况,使用本地的持久化和事务恢复
scheduler.registerScheduleAndStartIfNotPresent
TAG1 scheduler
当前scheduler对象在实例化时创建:
this.scheduler = new RecoveryScheduler(this.clientConfig);
public class RecoveryScheduler {
public static final String JOB_NAME = "TCC_JOB_%s"; //任务名称
public static final String TRIGGER_NAME = "TCC_TRIGGER_%s"; //触发任务的名称
private static final Logger logger = LoggerFactory.getLogger(RecoveryScheduler.class.getSimpleName());
private RecoveryConfig recoveryConfig;
//缓存domain和Scheduler---一个domain一个Scheduler
private Map<String, Scheduler> schedulers = new ConcurrentHashMap<>();
public RecoveryScheduler(RecoveryConfig recoveryConfig) {
this.recoveryConfig = recoveryConfig;
}
TAG2 scheduler.registerScheduleAndStartIfNotPresent
向RecoveryScheduler定时恢复任务器中创建、缓存scheduler并启动
public void registerScheduleAndStartIfNotPresent(String domain) {
Scheduler scheduler = registerScheduleIfNotPresent(domain); //创建、缓存scheduler
start(scheduler);//启动scheduler
}
public Scheduler registerScheduleIfNotPresent(String domain) {
//如果没有创建过切没有缓存入schedulers的本地缓存中
if (!schedulers.containsKey(domain)) {
synchronized (RecoveryScheduler.class) {
//如果没有创建和缓存过Scheduler
if (!schedulers.containsKey(domain)) {
//TAG2.1 createScheduler
Scheduler scheduler = createScheduler(domain);
//TAG2.2 scheduleJob
scheduleJob(scheduler, domain);
schedulers.put(domain, scheduler);
}
}
}
return schedulers.get(domain);
}
TAG2.1 createScheduler 创建scheduler内的任务JobDetail
该方法创建scheduler内执行的任务,并设置定时任务的触发器
private void scheduleJob(Scheduler scheduler, String domain) {
String jobName = String.format(JOB_NAME, domain);
String triggerName = String.format(TRIGGER_NAME, domain);
//创建Quartz定时任务执行器的JobDetail
//TAG2.1.1 QuartzRecoveryTask任务
JobDetail jobDetail = JobBuilder.newJob(QuartzRecoveryTask.class).withIdentity(jobName).build();
jobDetail.getJobDataMap().put(MixAll.DOMAIN, domain);
//定义触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName)
//TCC配置的操作间隔:0/30 * * * * ? 每30秒一次
.withSchedule(CronScheduleBuilder.cronSchedule(recoveryConfig.getCronExpression())
//如果任务时间完成-下一次触发时间<misfireThreshold,立即执行;否则,等待下一次触发时间
.withMisfireHandlingInstructionDoNothing()).build();
try {
//检查当前job任务是否存在于scheduler中
if (!scheduler.checkExists(JobKey.jobKey(jobName))) {
//TAG2.1.2 scheduler.scheduleJob
//如果不存在,就按JobDetail、触发器cronTrigger执行定时任务
scheduler.scheduleJob(jobDetail, cronTrigger);
} else {
if (recoveryConfig.isUpdateJobForcibly()) {
//如果存在,就先删除scheduler内的job,再传入并执行新的job
scheduler.deleteJob(JobKey.jobKey(jobName));
scheduler.scheduleJob(jobDetail, cronTrigger);
}
}
} catch (SchedulerException se) {
try {
scheduler.shutdown();
} catch (Exception ignore) {
//ignore
}
throw new SystemException(String.format("register recovery task for domain<%s> failed", domain), se);
}
}
Quartz每30秒执行QuartzRecoveryTask任务,进行事务的恢复重试。
TAG2.1.1 QuartzRecoveryTask任务
这是TCC-Transaction创建的Quartz定时任务执行类
@DisallowConcurrentExecution
public class QuartzRecoveryTask implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
String domain = context.getJobDetail().getJobDataMap().getString(MixAll.DOMAIN);
logger.info("start recovery {}", domain);
//RTask1 TransactionStoreRecovery.startRecover(domain)
FactoryBuilder.factoryOf(TccService.class).getInstance().getTransactionStoreRecovery().startRecover(domain);
}
}
RTask1 TransactionStoreRecovery.startRecover(domain)执行任务恢复
public class TransactionStoreRecovery implements Closeable {
public void startRecover(String domain) {
try {
String offset = null;
int totalCount = 0;
do {
//RT1.1 loadErrorTransactionsByPage分页加载错误的事务transaction
Page<TransactionStore> page = loadErrorTransactionsByPage(domain, offset);
if (!page.getData().isEmpty()) {
//RT1.2 并发恢复错误事务transaction
concurrentRecoveryErrorTransactions(page.getData());
offset = page.getNextOffset();
totalCount += page.getData().size();
} else {
break;
}
} while (true);
// 告警
AlertManager.tryAlert(domain, totalCount, transactionStorage);
logger.debug("total recovery count {} from repository:{}", totalCount, transactionStorage.getClass().getName());
} catch (Throwable e) {
logger.error("recovery failed from repository:{}.", transactionStorage.getClass().getName(), e);
}
}
}
RT1.1 loadErrorTransactionsByPage分页加载错误的事务transaction
private Page<TransactionStore> loadErrorTransactionsByPage(String domain, String offset) {
long currentTimeInMillis = Instant.now().toEpochMilli();
//事务恢复间隔默认设置30s
//RT1.1.1 transactionStorage).findAllUnmodifiedSince
return ((StorageRecoverable) transactionStorage).findAllUnmodifiedSince(domain, new Date(currentTimeInMillis - recoveryConfig.getRecoverDuration() * 1000), offset, recoveryConfig.getFetchPageSize());
}
注意这里对于异常事务的定义:
newDate(currentTimeInMillis - recoveryConfig.getRecoverDuration() * 1000)
就是当前时间-事务恢复间隔>事务变更时间。也就是事务的修改事件,在超过事务恢复间隔时间后,仍旧未被执行,就被定义为异常的事务。对于这部分事务需要进行恢复、重试。
RT1.1.1 transactionStorage).findAllUnmodifiedSince 调用数据库查询transaction
找到所有未修改的事务信息。
public class JdbcTransactionStorage extends AbstractTransactionStorage implements StorageRecoverable {
@Override
public Page<TransactionStore> findAllUnmodifiedSince(String domain, Date date, String offset, int pageSize) {
return pageList(domain, date, offset, pageSize, false);
}
private Page<TransactionStore> pageList(String domain, Date date, String offset, int pageSize, boolean isMarkDeleted) {
List<TransactionStore> transactions = new ArrayList<>();
Connection connection = null;
PreparedStatement stmt = null;
int currentOffset = StringUtils.isEmpty(offset) ? 0 : Integer.parseInt(offset);
try {
//获取数据库连接
connection = this.getConnection();
/**………………………………………………………………………………………………………创建查询语句…………………………………………………………………………………………………………………………… */
StringBuilder builder = new StringBuilder();
builder.append(SQL_SELECT_PREFIX_FOR_TCC_TRANSACTION + getTableName() + " WHERE LAST_UPDATE_TIME < ?");
builder.append(" AND IS_DELETE = ?");
builder.append(StringUtils.isNotEmpty(domain) ? " AND DOMAIN = ?" : "");
builder.append(" ORDER BY TRANSACTION_ID ASC");
builder.append(String.format(" LIMIT %s, %d", currentOffset, pageSize));
stmt = connection.prepareStatement(builder.toString());
//设置sql语句内的参数
stmt.setTimestamp(1, new Timestamp(date.getTime()));
stmt.setInt(2, isMarkDeleted ? MARK_DELETED_YES : MARK_DELETED_NO);
if (StringUtils.isNotEmpty(domain)) {
stmt.setString(3, domain);
}
//执行sql语句
ResultSet resultSet = stmt.executeQuery();
this.constructTransactions(resultSet, transactions);
} catch (Throwable e) {
throw new TransactionIOException(e);
} finally {
closeStatement(stmt);
this.releaseConnection(connection);
}
return new Page<>(String.valueOf(currentOffset + transactions.size()), transactions);
}
}
这里,对在持久存储数据库查询的语句:
SELECT DOMAIN,ROOT_XID,XID,CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,IS_DELETE,ROOT_DOMAIN,REQUEST_ID
FROM TableName
WHERE LAST_UPDATE_TIME < TIME
AND IS_DELETE =0 #未被标记删除
AND DOMAIN =domain
ORDER BY TRANSACTION_ID ASC
currentOffset, pageSize
从数据库查询error的transaction
RT1.2 concurrentRecoveryErrorTransactions 并发恢复错误事务transaction
private void concurrentRecoveryErrorTransactions(List<TransactionStore> transactions) throws InterruptedException, ExecutionException {
initLogStatistics();
//创建恢复transaction的task
List<RecoverTask> tasks = new ArrayList<>();
for (TransactionStore transaction : transactions) {
//RT1.2.1 RecoverTask error任务的task
tasks.add(new RecoverTask(transaction));
}
//RT1.2.2 recoveryExecutorService.invokeAll批量执行任务
List<Future<Void>> futures = recoveryExecutorService.invokeAll(tasks, CONCURRENT_RECOVERY_TIMEOUT, TimeUnit.SECONDS);
for (Future future : futures) {
future.get();
}
}
RT1.2.1 RecoverTask error任务的task
TransactionStoreRecovery的内部类
class RecoverTask implements Callable<Void> {
TransactionStore transaction;
public RecoverTask(TransactionStore transaction) {
this.transaction = transaction;
}
@Override
public Void call() throws Exception {
recoverErrorTransaction(transaction);
return null;
}
}
private void recoverErrorTransaction(TransactionStore transactionStore) {
//如果当前重试次数,超过recoveryConfig.getMaxRetryCount()
if (transactionStore.getRetriedCount() > recoveryConfig.getMaxRetryCount()) { //1
logSync.lock();
try {
//当triggerMaxRetryPrintCount的log输出不大于max,继续输出重试信息
if (triggerMaxRetryPrintCount.get() < logMaxPrintCount) {
logger.error(
"recover failed with max retry count,will not try again. domain:{}, xid:{}, rootDomain:{}, rootXid:{}, status:{},retried count:{}",
transactionStore.getDomain(),
transactionStore.getXid(),
transactionStore.getRootDomain(),
transactionStore.getRootXid(),
transactionStore.getStatusId(),
transactionStore.getRetriedCount());
triggerMaxRetryPrintCount.incrementAndGet();
} else if (triggerMaxRetryPrintCount.get() == logMaxPrintCount) {
logger.error("Too many transactionStore's retried count max then MaxRetryCount during one page transactions recover process , will not print errors again!");
}
} finally {
logSync.unlock();
}
return;
} //1
/** …………………………………………………………………………………………………未超过重试次数时……………………………………………………………………………………………………………………………*/
try {
/**……………………………………………………………………………………………………根事务的恢复逻辑…………………………………………………………………… */
if (transactionStore.getTransactionTypeId() == TransactionType.ROOT.getId()) {
switch (TransactionStatus.valueOf(transactionStore.getStatusId())) {
case CONFIRMING:
commitTransaction(transactionStore);
break;
case CANCELLING:
rollbackTransaction(transactionStore);
break;
case TRYING:
tryTreatAsFailed(transactionStore, TransactionStatus.CANCELLING);
break;
default:
//ignore it.
break;
}
}
/**……………………………………………………………………………………………………branch分支事务的恢复逻辑…………………………………………………………………… */
else {
//transactionStore type is BRANCH
switch (TransactionStatus.valueOf(transactionStore.getStatusId())) {
case CONFIRMING:
//ERR1 commitTransaction
commitTransaction(transactionStore);
break;
case CANCELLING:
case TRY_FAILED:
//ERR2 rollbackTransaction
rollbackTransaction(transactionStore);
break;
case TRY_SUCCESS:
if (storageMode == StorageMode.CENTRAL) {
//check the root transactionStore
TransactionStore rootTransaction = transactionStorage.findByXid(transactionStore.getRootDomain(), transactionStore.getRootXid());
if (rootTransaction == null) {
// In this case means the root transactionStore is already rollback.
// Need cancel this branch transactionStore.
rollbackTransaction(transactionStore);
} else {
switch (TransactionStatus.valueOf(rootTransaction.getStatusId())) {
case CONFIRMING:
commitTransaction(transactionStore);
break;
case CANCELLING:
rollbackTransaction(transactionStore);
break;
default:
break;
}
}
}
break;
case TRYING:
//ERR3 tryTreatAsFailed
tryTreatAsFailed(transactionStore, TransactionStatus.TRY_FAILED);
break;
default:
//ignore it.
break;
}
}
} catch (Throwable throwable) {
if (throwable instanceof TransactionOptimisticLockException
|| ExceptionUtils.getRootCause(throwable) instanceof TransactionOptimisticLockException) {
logger.warn(
"optimisticLockException happened while recover. txid:{}, status:{},retried count:{}",
transactionStore.getXid(),
transactionStore.getStatusId(),
transactionStore.getRetriedCount());
} else {
logSync.lock();
try {
if (recoveryFailedPrintCount.get() < logMaxPrintCount) {
try {
logger.error("recover failed, txid:{}, status:{},retried count:{},transactionStore content:{}",
transactionStore.getXid(),
transactionStore.getStatusId(),
transactionStore.getRetriedCount(),
jackson.writeValueAsString(transactionStore), throwable);
} catch (JsonProcessingException e) {
logger.error("failed to serialize transactionStore {}", transactionStore.toString(), e);
}
recoveryFailedPrintCount.incrementAndGet();
} else if (recoveryFailedPrintCount.get() == logMaxPrintCount) {
logger.error("Too many transactionStore's recover error during one page transactions recover process , will not print errors again!");
}
} finally {
logSync.unlock();
}
}
}
}
这里,对error事务的恢复逻辑如下:
事务状态主要有:CONFIRMING、CANCELLING、TRYING、TRY_FAILED、TRY_SUCCESSED。
根事务主要需要对CONFIRMING、CANCELLING、TRYING进行处理;
分支事务,需要对所有情况处理,此外,对于TRY_SUCCESSED的事务状态,要根据根事务root的事务状态:如果store模式为central中心存储,仅用于客户端,当取值为CENTRAL,分支事务补偿时会对TRY_SUCCESS状态的事件,进行处理。
如果此时root事务不存在,则表示根事务已经rollback,此时branch需要rollback;
如果root事务存在,处于confirming,就confirm;cancelling,就cnacel;其他情况,不处理。
transactionStore.getTransactionTypeId() ROOT根事务时
事务当前状态:
1 case CONFIRMING:
commitTransaction(transactionStore); //重新commit
2 case CANCELLING:
rollbackTransaction(transactionStore);//重新rollback
3 case TRYING:
tryTreatAsFailed(transactionStore, TransactionStatus.CANCELLING); //将其视作失败,并设置状态为cancelling
其他情况:
忽略不处理
transactionStore.getTransactionTypeId() BRANCH分支事务时
事务当前状态:
1 case CONFIRMING:
commitTransaction(transactionStore);
2 case CANCELLING:
case TRY_FAILED:
rollbackTransaction(transactionStore);
3 case TRY_SUCCESS:
if (storageMode == StorageMode.CENTRAL) {
if (rootTransaction == null)
rollbackTransaction(transactionStore)
else
case CONFIRMING:
commitTransaction(transactionStore); //重新commit
case CANCELLING:
rollbackTransaction(transactionStore);//重新rollback
其他情况:
不处理
4 case TRYING:
tryTreatAsFailed(transactionStore, TransactionStatus.TRY_FAILED); //将其视作失败,并设置状态为TRY_FAILED
ERR1 commitTransaction
private void commitTransaction(TransactionStore transactionStore) {
recoveryExecutor.commit(transactionStore);
}
public class ClientRecoveryExecutor implements RecoveryExecutor {
public void commit(TransactionStore transactionStore) {
Transaction transaction = TransactionConvertor.getTransaction(transactionSerializer, transactionStore);
//将事务的重试次数+1
transaction.setRetriedCount(transaction.getRetriedCount() + 1);
transaction.setStatus(TransactionStatus.CONFIRMING); //设置事务状态confirming
try {
transactionRepository.update(transaction); //更新
} catch (TransactionOptimisticLockException e) {
logger.debug("multiple instances try to recovery<commit> the same transaction<{}>, this instance ignore the recovery.", transactionStore.getXid());
return;
}
transaction.commit(); //重新执行commit提交
transactionRepository.delete(transaction); //删除
}
ERR2 rollbackTransaction
rollback的恢复处理,和commitTransaction相同
ERR3 tryTreatAsFailed
private void tryTreatAsFailed(TransactionStore transactionStore, TransactionStatus transactionStatus) {
Date lastUpdateTime = transactionStore.getLastUpdateTime();
Date currentTime = new Date();
//trying阶段的恢复,视作failed,同样具有最大次数
int maxTimeTreatTryingAsFailed = recoveryConfig.getMaxTimeTreatTryingAsFailed();
if (maxTimeTreatTryingAsFailed > 0
&& (currentTime.getTime() - lastUpdateTime.getTime()) > maxTimeTreatTryingAsFailed * 1000) {
//update the status to cancel or try failed, waiting for the recovery task to recover
//版本号+1
transactionStore.setVersion(transactionStore.getVersion() + 1);
//TransactionStatus.TRY_FAILED设置transaction的状态为try-failed
transactionStore.setStatusId(transactionStatus.getId());
transactionStorage.update(transactionStore);
}
}
这里的操作,是将transaction的状态设置为TransactionStatus.TRY_FAILED。等待重试时候,进行rollback。
注意:
对于root根事务的事务恢复,只对trying的视作cancelling,然后再下一次的失败重试时,会对root为cancelling的直接rollback;
对于branch的分支事务,对于trying的视作try-failed。然后等待下一次再次的重试,在branch的再次恢复中,对于try-failed的会直接rollback;
RT1.2.2 recoveryExecutorService.invokeAll批量执行任务
调用恢复transaction的线程池,调用invokeAll执行批量任务。
TAG2.2 Scheduler.scheduleJob
这里调用quartz定时任务处理器的类,执行前述的定时恢复任务。
到这里,TCC-client端的事务恢复就执行完毕。
5.2 TCC-Server的启动(todo)
基本同tcc-server,等随后贴源码过程