四、分布式事务 实战:TCC-transaction分布式事务关键组件--3事务存储器、TCC-ClientServer

四 TCC-transaction分布式事务关键组件--3事务存储器、TCC-Client/Server

2.2.4 事务存储器

在TCC 的过程中,根据应用内存中的事务信息完成整个事务流程。But 实际业务场景中,将事务信息只放在应用内存中是远远不够可靠的。可能会出现如下的问题:

1应用进程异常崩溃,未完成的事务信息将丢失。

2应用进程集群,当提供远程服务调用时,事务信息需要集群内共享。

3发起事务的应用需要重启部署新版本,因为各种原因,有未完成的事务

因此,TCC-Transaction除了将事务信息添加到内存中的同时,还提供外部存储来持久化事务对象。

 

TCC提供了TransactionRepository来实现对事务对象Transaction的持久化。

但是,再前面分析Transaction时可以看到,事务对象本身是个复杂的对象,其内持有List 等对象,内嵌较多的内容,因此在持久化时,需要对复杂的java对象,进行序列化处理,然后才可以持久化存储。

1 序列化

public interface ObjectSerializer<T> {

    /**
     * Serialize the given object to binary data.
     *
     * @param t object to serialize
     * @return the equivalent binary data
     */
    byte[] serialize(T t);

    /**
     * Deserialize an object from the given binary data.
     *
     * @param bytes object binary representation
     * @return the equivalent object instance
     */
    T deserialize(byte[] bytes);
    T clone(T object);
}

 

主要提供了JDK和Kryo的序列化解决方案,这里不是重点,省略分析过程。

2 TransactionRepository

 

public interface TransactionRepository extends Closeable {

    String getDomain();
    int create(Transaction transaction); //存储
    int update(Transaction transaction);//更新
    int delete(Transaction transaction);//删除
    Transaction findByXid(Xid xid);//查找

    boolean supportRecovery();
	//找到未修改的(事务恢复时候使用)
    Page<Transaction> findAllUnmodifiedSince(Date date, String offset, int pageSize);
}

然后,事务存储的实现类:

这里只分析create储存事务的方法。

public class DefaultTransactionRepository implements TransactionRepository {

    private String domain;
		//事务store存储
    private TransactionStorage transactionStorage;
		//事务序列化器
    private TransactionSerializer serializer;
    
        @Override
    public int create(Transaction transaction) {
        transaction.setVersion(1L); //乐观锁,版本为0,表示未持久化;版本为1,表示已经对transaction进行了持久化
      //TAG1 getTransactionStore(transaction)获取TransactionStore
        TransactionStore transactionStore = getTransactionStore(transaction);
      //TAG2  this.transactionStorage.create(transactionStore)持久化
        int result = this.transactionStorage.create(transactionStore);
        return result;
    }

注意,在持久化transaction时,需要修改版本号为1,表示已经持久化过;为0,表示尚未持久化。

TAG1 getTransactionStore(transaction)
 public TransactionStore getTransactionStore(Transaction transaction) {
        return TransactionConvertor.getTransactionStore(serializer, this.domain, transaction);
    }

public final class TransactionConvertor {

    public static TransactionStore getTransactionStore(TransactionSerializer serializer, String domain, Transaction transaction) {

        TransactionStore transactionStore = new TransactionStore();
        transactionStore.setXid(transaction.getXid());
        transactionStore.setRootXid(transaction.getRootXid());
        transactionStore.setRootDomain(transaction.getRootDomain());
      //serializer.serialize(transaction)序列化事务对象,并将其设置为transactionStore的内容
        transactionStore.setContent(serializer.serialize(transaction));
        transactionStore.setStatusId(transaction.getStatus().getId());
        transactionStore.setVersion(transaction.getVersion());
        transactionStore.setLastUpdateTime(transaction.getLastUpdateTime());
        transactionStore.setRetriedCount(transaction.getRetriedCount());
        transactionStore.setCreateTime(transaction.getCreateTime());
        transactionStore.setDomain(domain);
        transactionStore.setTransactionTypeId(transaction.getTransactionType().getId());

        return transactionStore;
    }

}

通过TransactionSerializer序列化transaction,并初始化一个TransactionStore的事务存储对象。该对象作为一个封装的事务对象,投入TransactionStorage中进行事务的持久化。

TAG2 this.transactionStorage.create(transactionStore)持久化

3 TransactionStorage事务存储器

public interface TransactionStorage extends Closeable {
//增删改查
    int create(TransactionStore transactionStore);
    int update(TransactionStore transactionStore);
    int delete(TransactionStore transactionStore);
    TransactionStore findByXid(String domain, Xid xid);

    TransactionStore findMarkDeletedByXid(String domain, Xid xid);
  //标记待删除
    int markDeleted(TransactionStore transactionStore);

    int restore(TransactionStore transactionStore);

    // 彻底删除标记过的事务
    int completelyDelete(TransactionStore transactionStore);

    boolean supportStorageRecoverable();
}

 

这里仅仅以JdbcTransactionStorage进行分析

3.1 JdbcTransactionStorage.create
AbstractTransactionStorage
    @Override
    public int create(TransactionStore transactionStore) {
        if (transactionStore.getContent().length > this.storeConfig.getMaxTransactionSize()) {
            throw new TransactionIOException(String.format("cur transaction size(%dB) is bigger than maxTransactionSize(%dB), consider to reduce parameter size or adjust maxTransactionSize", transactionStore.getContent().length, this.storeConfig.getMaxTransactionSize()));
        }
				//调用子类的方法,持久化
        int result = doCreate(transactionStore);

        if (result > 0) {
            return result;
        } else {

            TransactionStore foundTransactionStore = findByXid(transactionStore.getDomain(), transactionStore.getXid());

            if (foundTransactionStore != null && transactionStore.getRequestId() != null
                    && transactionStore.getRequestId().equals(foundTransactionStore.getRequestId())
                    && transactionStore.getVersion() == foundTransactionStore.getVersion()) {
                return 1;
            }

            throw new TransactionIOException(transactionStore.simpleDetail());
        }
    }

public class JdbcTransactionStorage extends AbstractTransactionStorage implements StorageRecoverable {

    private static final int MARK_DELETED_YES = 1;
    private static final int MARK_DELETED_NO = 0;

    private static final String SQL_SELECT_PREFIX_FOR_TCC_TRANSACTION = "SELECT DOMAIN,ROOT_XID,XID,CONTENT,STATUS,TRANSACTION_TYPE,CREATE_TIME,LAST_UPDATE_TIME,RETRIED_COUNT,VERSION,IS_DELETE,ROOT_DOMAIN,REQUEST_ID FROM ";
    private static final String SQL_SELECT_PREFIX_FOR_TCC_DOMAIN = "SELECT DOMAIN, PHONE_NUMBERS, ALERT_TYPE, THRESHOLD, INTERVAL_MINUTES,LAST_ALERT_TIME,DING_ROBOT_URL,CREATE_TIME,LAST_UPDATE_TIME,VERSION FROM ";

    private String tbSuffix;
    private DataSource dataSource;

    public JdbcTransactionStorage(TransactionStoreSerializer serializer, StoreConfig storeConfig) {
        super(serializer, storeConfig);
        this.tbSuffix = storeConfig.getTbSuffix();
        this.dataSource = storeConfig.getDataSource();
    }
    
       @Override
    protected int doCreate(TransactionStore transactionStore) {

        if (doFindOne(transactionStore.getDomain(), transactionStore.getXid(), false) != null) {
            return 0;
        }

        Connection connection = null;
        PreparedStatement stmt = null;

        try {
            connection = this.getConnection(); //获取connection数据库连接
						//构建insert的sql语句
            StringBuilder builder = new StringBuilder();
            builder
                    .append("INSERT INTO ")
                    .append(getTableName())
                    .append("(ROOT_XID,XID,TRANSACTION_TYPE,CONTENT,STATUS,RETRIED_COUNT,CREATE_TIME,LAST_UPDATE_TIME,VERSION,ROOT_DOMAIN,DOMAIN,REQUEST_ID) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)");

          //构建insert的sql语句
            stmt = connection.prepareStatement(builder.toString());

            stmt.setString(1, transactionStore.getRootXid().toString());
            stmt.setString(2, transactionStore.getXid().toString());
            stmt.setInt(3, transactionStore.getTransactionTypeId());
            stmt.setBytes(4, transactionStore.getContent());
            stmt.setInt(5, transactionStore.getStatusId());
            stmt.setInt(6, transactionStore.getRetriedCount());
            stmt.setTimestamp(7, new Timestamp(transactionStore.getCreateTime().getTime()));
            stmt.setTimestamp(8, new Timestamp(transactionStore.getLastUpdateTime().getTime()));
            stmt.setLong(9, transactionStore.getVersion());
            stmt.setString(10, transactionStore.getRootDomain());
            stmt.setString(11, transactionStore.getDomain());
            if (transactionStore.getRequestId() != null) {
                stmt.setInt(12, transactionStore.getRequestId());
            } else {
                stmt.setNull(12, Types.INTEGER);
            }
            return stmt.executeUpdate(); //执行sql插入insert语句

        } catch (SQLException e) {
            throw new TransactionIOException(e);
        } finally {
            closeStatement(stmt);
            this.releaseConnection(connection);
        }
    }

TCC-Transaction表的数据结构如下:

CREATE TABLE TCC_TRANSACTION (
  TRANSACTION_ID int(11) NOT NULL AUTO_INCREMENT,
  DOMAIN varchar(100) DEFAULT NULL,
  GLOBAL_TX_ID varbinary(32) NOT NULL,
  BRANCH_QUALIFIER varbinary(32) NOT NULL,
  CONTENT varbinary(8000) DEFAULT NULL,
  STATUS int(11) DEFAULT NULL,
  TRANSACTION_TYPE int(11) DEFAULT NULL,
  RETRIED_COUNT int(11) DEFAULT NULL,
  CREATE_TIME datetime DEFAULT NULL,
  LAST_UPDATE_TIME datetime DEFAULT NULL,
  VERSION int(11) DEFAULT NULL,
  PRIMARY KEY (TRANSACTION_ID),
  UNIQUE KEY UX_TX_BQ (GLOBAL_TX_ID,BRANCH_QUALIFIER)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.2.5 事务恢复

事务信息被持久化到外部的存储器中。事务存储是事务恢复的基础。通过读取外部存储器中的异常事务,定时任务会按照一定频率对事务进行重试,直到事务完成或超过最大重试次数。

详细内容见 五 TCC-transaction的事务恢复

 

2.2.6 TCC-Client/Tcc-Server

 

对于SpringTccClient作为tcc-client,是在微服务端配置。

public class SpringTccClient extends TccClient implements TransactionManagerFactory {

    public SpringTccClient() {
        super(new ClientConfig());
    }

    public SpringTccClient(ClientConfig clientConfig) {
        super(clientConfig);
    }
}

当前类在spring启动时,作为tcc-client,被加载:

@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
@ComponentScan(value = "org.mengyun.tcctransaction", excludeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {XmlTccTransactionConfiguration.class})})
public class AnnotationTccTransactionConfiguration {

    @Bean
    @DependsOn({"springBeanFactory"})
    public SpringTccClient getTccClient() {
        return new SpringTccClient(clientConfig);
    }
    
    }

而tcc-server类,由于作为服务端单独部署,因此不是spring启动时加载。