0
点赞
收藏
分享

微信扫一扫

分布式事务之 Spring 事务抽象和源码实现


现在 Spring 已经成为了 Java 开发事实上的标准,各种 Java 框架几乎都需要提供与 Spring/Spring Cloud 集成的包。所以在研究 Java 分布式事务的一些框架之前,需要对 Spring 自身的事务抽象有一定的了解。

事务抽象

Spring 在 Java EE 规范的基础下提供了事务抽象,无论我们使用的是 Hibernate 还是 MyBatis,或者 JDBC 操作数据库,Spring 都能很好的统一起来。在​​《分布式事务之 Atomikos》​​​中可以看到配置了一个 ​​JtaTransactionManager​​​ ,平时我们使用的最多的就是 ​​DataSourceTransactionManager​​​ ,它们都是属于 ​​PlatformTransactionManager​​ 的实现。

public interface PlatformTransactionManager {

//获取事务状态
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;

//事务提交
void commit(TransactionStatus status) throws TransactionException;

//事务回滚
void rollback(TransactionStatus status) throws TransactionException;

}

在 ​​PlatformTransactionManager​​​ 中还看到了 ​​TransactionDefinition​​​ 抽象,看名称就能知道,它是 Spring 对事务的定义,比如事务的传播、隔离级别、超时和是否只读等等。而在 ​​PlatformTransactionManager​​​ 中能通过 ​​TransactionDefinition​​​ 获得 ​​TransactionStatus​​​,看名称就知道这是事务的状态抽象,​​TransactionStatus​​​ 也是 ​​SavepointManager​​ 的实现,也就是 Spring 事务中其实是可以限制事务的影响范围的。

从 ​​commit​​​ 和 ​​rollback​​​ 方法来看,需要传入一个 ​​TransactionStatus​​​。从传统的 JDBC 编程中来看,其实事务是与 ​​Connection​​​ 绑定的,但是我们实际编码的过程中,可能会有更复杂的场景,比如多个事务方法之间的调用,比如事务的传播,所以 Spring 基于 ​​TransactionStatus​​ 进行了一个逻辑事务的封装(比如实现 MySQL InnoDB 事务的时候,Spring 默认会先将事务的自动提交进行关闭,这样 Spring 才能更方便对事务进行更细致的管理)。

还有一个要注意的是实际上 Spring 的 ​​PlatformTransactionManager​​ 不仅仅只针对关系型数据库的事务,像其它的如 JMX,反正就是只要与数据一致性相关的,都可以基于事务管理器去实现。

Spring AOP 是 Spring 实现事务的一个非常重要的基础,Spring AOP 需要对要进行事务操作的方法前后织入一些信息,那么有一个关键点就是,哪些方法需要被代理呢,​​ProxyTransactionManagementConfiguration​​ 就是来干这个事的:

{@code @Configuration} class that registers the Spring infrastructure beans necessary
* to enable proxy-based annotation-driven transaction management.

在中可以看到 ​​BeanFactoryTransactionAttributeSourceAdvisor​​​ 、​​TransactionAttributeSource​​​ 和 ​​TransactionInterceptor​​ 。

​BeanFactoryTransactionAttributeSourceAdvisor​​​ 是 ​​PointcutAdvisor​​​ 的实现,这很明显就是基于 ​​BeanFactory​​​ 去识别事务切面的;​​TransactionAttributeSource​​​ 从名称就能看出是事务的属性源;​​TransactionInterceptor​​ 就是对事务方法执行的一个拦截。

同时 Spring 也对事务异常进行了抽象,即 ​​DataAccessException​​​,像平时经常遇到的比如 ​​DuplicateKeyException​​​ 就是它的实现。​​DataAccessException​​​ 有很多的实现,那么 Spring 是怎么知道该使用哪一种异常呢,还有就是 Spring 不是仅仅只支持 MySQL,还有其他的如 Oracle、DB2,每一种数据库的异常码也都是不一样的。所以 Spring 就把它们都放在了 ​​sql-error-codes.xml​​ 中,对各种异常进行了分类,以 MySQL 为例:

<bean id="MySQL" class="org.springframework.jdbc.support.SQLErrorCodes">
<property name="badSqlGrammarCodes">
<value>1054,1064,1146</value>
</property>
<property name="duplicateKeyCodes">
<value>1062</value>
</property>
<property name="dataIntegrityViolationCodes">
<value>630,839,840,893,1169,1215,1216,1217,1451,1452,1557</value>
</property>
<property name="dataAccessResourceFailureCodes">
<value>1</value>
</property>
<property name="cannotAcquireLockCodes">
<value>1205</value>
</property>
<property name="deadlockLoserCodes">
<value>1213</value>
</property>
</bean>

当然 Spring 也是支持扩展的,咱们也可以自己在 classpath 下放一个 ​​sql-error-codes.xml​​ ,这样就能替换 Spring 默认的配置。

源码分析

还是之前的观点,Spring 的源码非常复杂,没必要过于纠结。

那么以哪个地方为入口呢,在上文中提到了​​TransactionInterceptor​​​ 。明显就是对事务方法的拦截,但是程序怎么知道哪个方法是事务方法呢,很明显肯定是被 ​​@Transactional​​ 标准的方法。

先看 ​​org.springframework.transaction.interceptor.TransactionInterceptor#invoke​​ 方法:

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}

它其实就是调用的 ​​org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction​​ 方法:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
//Spring 把与 Reactive 相关的单独拎了出来,Reactive 与本文议题无关,先忽略
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
...
return result;
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//常使用的DataSourceTransactionManager符合这个条件
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//可以理解为为被@Transactional标注的方法创建一个事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//执行业务方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务的异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清理资源
cleanupTransactionInfo(txInfo);
}

if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
//事务正常执行完成就提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
...
}

可以看到在排除其他干扰(如 Reactive 的处理等)后,整体流程非常清晰,就是:

开启事务(createTransactionIfNecessary)->执行事务方法(proceedWithInvocation)->异常处理(completeTransactionAfterThrowing)/提交事务(commitTransactionAfterReturning)

先看 ​​org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary​​​ -> ​​org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction​​​ -> ​​org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction​​ 方法:

@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

可以看到所谓的“事务”最终就是临时被封装成了 ​​DataSourceTransactionObject​​​,但是 ​​Connection​​​ 是从 ​​ThreadLocal​​​ 中获取的,第一次进入这个方法,很明显 ​​conHolder​​​ 是空的。其实从这里也验证的一个常识,单机事务单线程的情况下,被 ​​@Transactional​​​ 标准的方法中的数据库操作都是同一个 ​​Connection​​​,所以也有将多个数据库操作放在一个被 ​​@Transactional​​​ 标准的方法中,从而提高效率的说法(具体可参看​​《一次群聊“事件”引发的对 @Transactional 和 MyBatis 的思考》​​)。

接下来在 ​​org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction​​​ 中会调用 ​​org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction​​ 方法:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//JDBC开启事务
doBegin(transaction, definition);
//为一堆ThreadLocal初始化数据
prepareSynchronization(status, definition);
return status;
}

到这里,其实已经开启了事务,本质还是基于 JDBC 来的。接下来就开始执行目标方法了,我这里的目标方法就是

@Transactional
public String test2(String id) {
spv1T1Mapper.insertId2(Integer.valueOf(id));
return "OK";
}

因为我这里使用的是 MyBatis,所以明显后面又要到了 MyBatis 动态代理的那一块了,这里就不过多赘述了。直接到 ​​org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning​​ 正常提交事务方法:

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

最终又会到 ​​org.springframework.transaction.support.AbstractPlatformTransactionManager#commit​​​ -> ​​org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit​​ 方法:

@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw translateException("JDBC commit", ex);
}
}

这是 ​​DataSourceTransactionManager​​​ 重写的方法,当然 ​​DataSourceTransactionManager​​​ 这里的处理很简单,就是直接基于 ​​Connection​​​ 去 ​​commit​​ 了。

到这里事务的执行就完成了,有一种索然无味的感觉,因为整体流程其实是很清晰的。事务的异常回滚处理这里也就不赘述了。

但我们可以基于 ​​DataSourceTransactionManager​​​,学习它对 Spring 异常抽象的实现操作,再结合 ​​JtaTransactionManager​​ 的扩展,为后面研究其他分布式事务框架的实现打下基础。

分布式事务之 Spring 事务抽象和源码实现_ide


举报

相关推荐

0 条评论