现在 Spring 已经成为了 Java 开发事实上的标准,各种 Java 框架几乎都需要提供与 Spring/Spring Cloud 集成的包。所以在研究 Java 分布式事务的一些框架之前,需要对 Spring 自身的事务抽象有一定的了解。
事务抽象
Spring 在 Java EE 规范的基础下提供了事务抽象,无论我们使用的是 Hibernate 还是 MyBatis,或者 JDBC 操作数据库,Spring 都能很好的统一起来。在《分布式事务之 Atomikos》中可以看到配置了一个 JtaTransactionManager
,平时我们使用的最多的就是 DataSourceTransactionManager
,它们都是属于 PlatformTransactionManager
的实现。
public interface PlatformTransactionManager {
//获取事务状态
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
//事务提交
void commit(TransactionStatus status) throws TransactionException;
//事务回滚
void rollback(TransactionStatus status) throws TransactionException;
}
在 PlatformTransactionManager
中还看到了 TransactionDefinition
抽象,看名称就能知道,它是 Spring 对事务的定义,比如事务的传播、隔离级别、超时和是否只读等等。而在 PlatformTransactionManager
中能通过 TransactionDefinition
获得 TransactionStatus
,看名称就知道这是事务的状态抽象,TransactionStatus
也是 SavepointManager
的实现,也就是 Spring 事务中其实是可以限制事务的影响范围的。
从 commit
和 rollback
方法来看,需要传入一个 TransactionStatus
。从传统的 JDBC 编程中来看,其实事务是与 Connection
绑定的,但是我们实际编码的过程中,可能会有更复杂的场景,比如多个事务方法之间的调用,比如事务的传播,所以 Spring 基于 TransactionStatus
进行了一个逻辑事务的封装(比如实现 MySQL InnoDB 事务的时候,Spring 默认会先将事务的自动提交进行关闭,这样 Spring 才能更方便对事务进行更细致的管理)。
还有一个要注意的是实际上 Spring 的 PlatformTransactionManager
不仅仅只针对关系型数据库的事务,像其它的如 JMX,反正就是只要与数据一致性相关的,都可以基于事务管理器去实现。
Spring AOP 是 Spring 实现事务的一个非常重要的基础,Spring AOP 需要对要进行事务操作的方法前后织入一些信息,那么有一个关键点就是,哪些方法需要被代理呢,ProxyTransactionManagementConfiguration
就是来干这个事的:
{@code @Configuration} class that registers the Spring infrastructure beans necessary
* to enable proxy-based annotation-driven transaction management.
在中可以看到 BeanFactoryTransactionAttributeSourceAdvisor
、TransactionAttributeSource
和 TransactionInterceptor
。
BeanFactoryTransactionAttributeSourceAdvisor
是 PointcutAdvisor
的实现,这很明显就是基于 BeanFactory
去识别事务切面的;TransactionAttributeSource
从名称就能看出是事务的属性源;TransactionInterceptor
就是对事务方法执行的一个拦截。
同时 Spring 也对事务异常进行了抽象,即 DataAccessException
,像平时经常遇到的比如 DuplicateKeyException
就是它的实现。DataAccessException
有很多的实现,那么 Spring 是怎么知道该使用哪一种异常呢,还有就是 Spring 不是仅仅只支持 MySQL,还有其他的如 Oracle、DB2,每一种数据库的异常码也都是不一样的。所以 Spring 就把它们都放在了 sql-error-codes.xml
中,对各种异常进行了分类,以 MySQL 为例:
<bean id="MySQL" class="org.springframework.jdbc.support.SQLErrorCodes">
<property name="badSqlGrammarCodes">
<value>1054,1064,1146</value>
</property>
<property name="duplicateKeyCodes">
<value>1062</value>
</property>
<property name="dataIntegrityViolationCodes">
<value>630,839,840,893,1169,1215,1216,1217,1451,1452,1557</value>
</property>
<property name="dataAccessResourceFailureCodes">
<value>1</value>
</property>
<property name="cannotAcquireLockCodes">
<value>1205</value>
</property>
<property name="deadlockLoserCodes">
<value>1213</value>
</property>
</bean>
当然 Spring 也是支持扩展的,咱们也可以自己在 classpath 下放一个 sql-error-codes.xml
,这样就能替换 Spring 默认的配置。
源码分析
还是之前的观点,Spring 的源码非常复杂,没必要过于纠结。
那么以哪个地方为入口呢,在上文中提到了TransactionInterceptor
。明显就是对事务方法的拦截,但是程序怎么知道哪个方法是事务方法呢,很明显肯定是被 @Transactional
标准的方法。
先看 org.springframework.transaction.interceptor.TransactionInterceptor#invoke
方法:
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
它其实就是调用的 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
方法:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
//Spring 把与 Reactive 相关的单独拎了出来,Reactive 与本文议题无关,先忽略
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
...
return result;
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//常使用的DataSourceTransactionManager符合这个条件
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//可以理解为为被@Transactional标注的方法创建一个事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//执行业务方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务的异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清理资源
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
//事务正常执行完成就提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
...
}
可以看到在排除其他干扰(如 Reactive 的处理等)后,整体流程非常清晰,就是:
开启事务(createTransactionIfNecessary)->执行事务方法(proceedWithInvocation)->异常处理(completeTransactionAfterThrowing)/提交事务(commitTransactionAfterReturning)
先看 org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary
-> org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
-> org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
方法:
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
可以看到所谓的“事务”最终就是临时被封装成了 DataSourceTransactionObject
,但是 Connection
是从 ThreadLocal
中获取的,第一次进入这个方法,很明显 conHolder
是空的。其实从这里也验证的一个常识,单机事务单线程的情况下,被 @Transactional
标准的方法中的数据库操作都是同一个 Connection
,所以也有将多个数据库操作放在一个被 @Transactional
标准的方法中,从而提高效率的说法(具体可参看《一次群聊“事件”引发的对 @Transactional 和 MyBatis 的思考》)。
接下来在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
中会调用 org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
方法:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//JDBC开启事务
doBegin(transaction, definition);
//为一堆ThreadLocal初始化数据
prepareSynchronization(status, definition);
return status;
}
到这里,其实已经开启了事务,本质还是基于 JDBC 来的。接下来就开始执行目标方法了,我这里的目标方法就是
@Transactional
public String test2(String id) {
spv1T1Mapper.insertId2(Integer.valueOf(id));
return "OK";
}
因为我这里使用的是 MyBatis,所以明显后面又要到了 MyBatis 动态代理的那一块了,这里就不过多赘述了。直接到 org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
正常提交事务方法:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
最终又会到 org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
-> org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
方法:
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw translateException("JDBC commit", ex);
}
}
这是 DataSourceTransactionManager
重写的方法,当然 DataSourceTransactionManager
这里的处理很简单,就是直接基于 Connection
去 commit
了。
到这里事务的执行就完成了,有一种索然无味的感觉,因为整体流程其实是很清晰的。事务的异常回滚处理这里也就不赘述了。
但我们可以基于 DataSourceTransactionManager
,学习它对 Spring 异常抽象的实现操作,再结合 JtaTransactionManager
的扩展,为后面研究其他分布式事务框架的实现打下基础。