四 TCC-transaction分布式事务关键组件--3事务存储器、TCC-Client/Server
2.2.4 事务存储器
在TCC 的过程中,根据应用内存中的事务信息完成整个事务流程。But 实际业务场景中,将事务信息只放在应用内存中是远远不够可靠的。可能会出现如下的问题:
1应用进程异常崩溃,未完成的事务信息将丢失。
2应用进程集群,当提供远程服务调用时,事务信息需要集群内共享。
3发起事务的应用需要重启部署新版本,因为各种原因,有未完成的事务
因此,TCC-Transaction除了将事务信息添加到内存中的同时,还提供外部存储来持久化事务对象。
TCC提供了TransactionRepository来实现对事务对象Transaction的持久化。
但是,再前面分析Transaction时可以看到,事务对象本身是个复杂的对象,其内持有List 等对象,内嵌较多的内容,因此在持久化时,需要对复杂的java对象,进行序列化处理,然后才可以持久化存储。
1 序列化
public interface ObjectSerializer<T> {
/**
* Serialize the given object to binary data.
*
* @param t object to serialize
* @return the equivalent binary data
*/
byte[] serialize(T t);
/**
* Deserialize an object from the given binary data.
*
* @param bytes object binary representation
* @return the equivalent object instance
*/
T deserialize(byte[] bytes);
T clone(T object);
}
主要提供了JDK和Kryo的序列化解决方案,这里不是重点,省略分析过程。
2 TransactionRepository
public interface TransactionRepository extends Closeable {
String getDomain();
int create(Transaction transaction); //存储
int update(Transaction transaction);//更新
int delete(Transaction transaction);//删除
Transaction findByXid(Xid xid);//查找
boolean supportRecovery();
//找到未修改的(事务恢复时候使用)
Page<Transaction> findAllUnmodifiedSince(Date date, String offset, int pageSize);
}
然后,事务存储的实现类:
这里只分析create储存事务的方法。
public class DefaultTransactionRepository implements TransactionRepository {
private String domain;
//事务store存储
private TransactionStorage transactionStorage;
//事务序列化器
private TransactionSerializer serializer;
@Override
public int create(Transaction transaction) {
transaction.setVersion(1L); //乐观锁,版本为0,表示未持久化;版本为1,表示已经对transaction进行了持久化
//TAG1 getTransactionStore(transaction)获取TransactionStore
TransactionStore transactionStore = getTransactionStore(transaction);
//TAG2 this.transactionStorage.create(transactionStore)持久化
int result = this.transactionStorage.create(transactionStore);
return result;
}
注意,在持久化transaction时,需要修改版本号为1,表示已经持久化过;为0,表示尚未持久化。
TAG1 getTransactionStore(transaction)
public TransactionStore getTransactionStore(Transaction transaction) {
return TransactionConvertor.getTransactionStore(serializer, this.domain, transaction);
}
public final class TransactionConvertor {
public static TransactionStore getTransactionStore(TransactionSerializer serializer, String domain, Transaction transaction) {
TransactionStore transactionStore = new TransactionStore();
transactionStore.setXid(transaction.getXid());
transactionStore.setRootXid(transaction.getRootXid());
transactionStore.setRootDomain(transaction.getRootDomain());
//serializer.serialize(transaction)序列化事务对象,并将其设置为transactionStore的内容
transactionStore.setContent(serializer.serialize(transaction));
transactionStore.setStatusId(transaction.getStatus().getId());
transactionStore.setVersion(transaction.getVersion());
transactionStore.setLastUpdateTime(transaction.getLastUpdateTime());
transactionStore.setRetriedCount(transaction.getRetriedCount());
transactionStore.setCreateTime(transaction.getCreateTime());
transactionStore.setDomain(domain);
transactionStore.setTransactionTypeId(transaction.getTransactionType().getId());
return transactionStore;
}
}
通过TransactionSerializer序列化transaction,并初始化一个TransactionStore的事务存储对象。该对象作为一个封装的事务对象,投入TransactionStorage中进行事务的持久化。
TAG2 this.transactionStorage.create(transactionStore)持久化
3 TransactionStorage事务存储器
public interface TransactionStorage extends Closeable {
//增删改查
int create(TransactionStore transactionStore);
int update(TransactionStore transactionStore);
int delete(TransactionStore transactionStore);
TransactionStore findByXid(String domain, Xid xid);
TransactionStore findMarkDeletedByXid(String domain, Xid xid);
//标记待删除
int markDeleted(TransactionStore transactionStore);
int restore(TransactionStore transactionStore);
// 彻底删除标记过的事务
int completelyDelete(TransactionStore transactionStore);
boolean supportStorageRecoverable();
}
这里仅仅以JdbcTransactionStorage进行分析
3.1 JdbcTransactionStorage.create
AbstractTransactionStorage
@Override
public int create(TransactionStore transactionStore) {
if (transactionStore.getContent().length > this.storeConfig.getMaxTransactionSize()) {
throw new TransactionIOException(String.format("cur transaction size(%dB) is bigger than maxTransactionSize(%dB), consider to reduce parameter size or adjust maxTransactionSize", transactionStore.getContent().length, this.storeConfig.getMaxTransactionSize()));
}
//调用子类的方法,持久化
int result = doCreate(transactionStore);
if (result > 0) {
return result;
} else {
TransactionStore foundTransactionStore = findByXid(transactionStore.getDomain(), transactionStore.getXid());
if (foundTransactionStore != null && transactionStore.getRequestId() != null
&& transactionStore.getRequestId().equals(foundTransactionStore.getRequestId())
&& transactionStore.getVersion() == foundTransactionStore.getVersion()) {
return 1;
}
throw new TransactionIOException(transactionStore.simpleDetail());
}
}
public class JdbcTransactionStorage extends AbstractTransactionStorage implements StorageRecoverable {
private static final int MARK_DELETED_YES = 1;
private static final int MARK_DELETED_NO = 0;
private static final String SQL_SELECT_PREFIX_FOR_TCC_TRANSACTION = "SELECT DOMAIN,ROOT_XID,XID,CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,IS_DELETE,ROOT_DOMAIN,REQUEST_ID FROM ";
private static final String SQL_SELECT_PREFIX_FOR_TCC_DOMAIN = "SELECT DOMAIN, PHONE_NUMBERS, ALERT_TYPE, THRESHOLD, INTERVAL_MINUTES,LAST_ALERT_TIME,DING_ROBOT_URL,CREATE_TIME,LAST_UPDATE_TIME,VERSION FROM ";
private String tbSuffix;
private DataSource dataSource;
public JdbcTransactionStorage(TransactionStoreSerializer serializer, StoreConfig storeConfig) {
super(serializer, storeConfig);
this.tbSuffix = storeConfig.getTbSuffix();
this.dataSource = storeConfig.getDataSource();
}
@Override
protected int doCreate(TransactionStore transactionStore) {
if (doFindOne(transactionStore.getDomain(), transactionStore.getXid(), false) != null) {
return 0;
}
Connection connection = null;
PreparedStatement stmt = null;
try {
connection = this.getConnection(); //获取connection数据库连接
//构建insert的sql语句
StringBuilder builder = new StringBuilder();
builder
.append("INSERT INTO ")
.append(getTableName())
.append("(ROOT_XID,XID,TRANSACTION_TYPE,CONTENT,STATUS,RETRIED_COUNT,CREATE_TIME,LAST_UPDATE_TIME,VERSION,ROOT_DOMAIN,DOMAIN,REQUEST_ID) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)");
//构建insert的sql语句
stmt = connection.prepareStatement(builder.toString());
stmt.setString(1, transactionStore.getRootXid().toString());
stmt.setString(2, transactionStore.getXid().toString());
stmt.setInt(3, transactionStore.getTransactionTypeId());
stmt.setBytes(4, transactionStore.getContent());
stmt.setInt(5, transactionStore.getStatusId());
stmt.setInt(6, transactionStore.getRetriedCount());
stmt.setTimestamp(7, new Timestamp(transactionStore.getCreateTime().getTime()));
stmt.setTimestamp(8, new Timestamp(transactionStore.getLastUpdateTime().getTime()));
stmt.setLong(9, transactionStore.getVersion());
stmt.setString(10, transactionStore.getRootDomain());
stmt.setString(11, transactionStore.getDomain());
if (transactionStore.getRequestId() != null) {
stmt.setInt(12, transactionStore.getRequestId());
} else {
stmt.setNull(12, Types.INTEGER);
}
return stmt.executeUpdate(); //执行sql插入insert语句
} catch (SQLException e) {
throw new TransactionIOException(e);
} finally {
closeStatement(stmt);
this.releaseConnection(connection);
}
}
TCC-Transaction表的数据结构如下:
CREATE TABLE TCC_TRANSACTION (
TRANSACTION_ID int(11) NOT NULL AUTO_INCREMENT,
DOMAIN varchar(100) DEFAULT NULL,
GLOBAL_TX_ID varbinary(32) NOT NULL,
BRANCH_QUALIFIER varbinary(32) NOT NULL,
CONTENT varbinary(8000) DEFAULT NULL,
STATUS int(11) DEFAULT NULL,
TRANSACTION_TYPE int(11) DEFAULT NULL,
RETRIED_COUNT int(11) DEFAULT NULL,
CREATE_TIME datetime DEFAULT NULL,
LAST_UPDATE_TIME datetime DEFAULT NULL,
VERSION int(11) DEFAULT NULL,
PRIMARY KEY (TRANSACTION_ID),
UNIQUE KEY UX_TX_BQ (GLOBAL_TX_ID,BRANCH_QUALIFIER)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.2.5 事务恢复
事务信息被持久化到外部的存储器中。事务存储是事务恢复的基础。通过读取外部存储器中的异常事务,定时任务会按照一定频率对事务进行重试,直到事务完成或超过最大重试次数。
详细内容见 五 TCC-transaction的事务恢复
2.2.6 TCC-Client/Tcc-Server
对于SpringTccClient作为tcc-client,是在微服务端配置。
public class SpringTccClient extends TccClient implements TransactionManagerFactory {
public SpringTccClient() {
super(new ClientConfig());
}
public SpringTccClient(ClientConfig clientConfig) {
super(clientConfig);
}
}
当前类在spring启动时,作为tcc-client,被加载:
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
@ComponentScan(value = "org.mengyun.tcctransaction", excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {XmlTccTransactionConfiguration.class})})
public class AnnotationTccTransactionConfiguration {
@Bean
@DependsOn({"springBeanFactory"})
public SpringTccClient getTccClient() {
return new SpringTccClient(clientConfig);
}
}
而tcc-server类,由于作为服务端单独部署,因此不是spring启动时加载。