介绍
一个全局事务是由多个分支事务组成的,而ResourceManager就是用来管理分支事务的,主要有如下3个功能
- 向TC注册资源
- 接收TC的commit请求,提交分支事务
- 接收TC的rollback请求,回滚分支事务
因为Seata支持多种模式,每种模式都有对应的资源管理器,如TCCResourceManager,DataSourceManager,ResourceManagerXA等
DefaultResourceManager
DefaultResourceManager包含了所有ResourceManager,它根据分支事务的类型来调用对应的ResourceManager执行具体的操作
public class DefaultResourceManager implements ResourceManager {
/**
* all resource managers
*/
protected static Map<BranchType, ResourceManager> resourceManagers
= new ConcurrentHashMap<>();
private DefaultResourceManager() {
initResourceManagers();
}
protected void initResourceManagers() {
//init all resource managers
List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
if (CollectionUtils.isNotEmpty(allResourceManagers)) {
for (ResourceManager rm : allResourceManagers) {
resourceManagers.put(rm.getBranchType(), rm);
}
}
}
}
DataSourceManager
注册分支事务的逻辑都差不多,通过RmNettyRemotingClient向TC发送注册消息,并在本地保存resourceId和Resource的映射关系,当提交或者回滚的时候就能根据resourceId拿到对应的Resource
二阶段提交
当TM向TC发起全局提交请求时,此时分支事务实际上以及提交了,TC立即释放该全局事务的锁,然后异步调用RM清理回滚日志
二阶段回滚
当RM收到TC发来的回滚请求时,根据xid和branchid找到对应的回滚记录,通过回滚记录生成反向的SQL并执行,完成分支的回滚。当分支回滚结束时,通过TC回滚完成,当所有分支都回滚完成时,才会释放全局事务的锁
RM在回滚的时候会有如下的校验
- beforeImmage等于afterImage则不用回滚(数据没有发生变化),否则执行下一步
- 如果当前的记录等于afterImage,则回滚,否则执行下一步
- 如果当前的记录等于beforeImage,则不用会滚了,否则抛出异常(当前记录和beforeImage和afterImage都不相等),说明发生了脏写
public class DataSourceManager extends AbstractResourceManager {
@Override
public void registerResource(Resource resource) {
DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
super.registerResource(dataSourceProxy);
}
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// 将分支提交信息包装成 Phase2Context 插入内存中的异步提交队列,异步删除undoLog
return asyncWorker.branchCommit(xid, branchId, resourceId);
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
// 根据 undoLog 构造回滚 sql 并执行
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
}
TCCResourceManager
TCC事务解决方案主要分为2个阶段
- 第一阶段为prepare阶段,锁定相关的资源
- 第二阶段为commit/rollback阶段,根据全局事务的执行状态来对分支事务进行commit/rollback阶段
理解了思路我们再来看代码就非常简单了,commit和rollback代码非常类似,因此就只分析一个分支了
提交阶段:反射调用@TwoPhaseBusinessAction指定的commitMethod方法
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
// 根据applicationData(分支事务注册的时候把相关信息放到了这里,比如commitMethod,rollbackMethod),xid等重新构建BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
// 反射调用 commitMethod 方法
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
boolean result;
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
从反射的逻辑我们可以看到对commitMethod和rollbackMethod的要求
入参只能是BusinessActionContext,返回值只能是boolean类型或者TwoPhaseResult
ResourceManagerXA
因为很多数据库都支持XA协议,因此XA模式的实现根简单,只需要通过XAResource来进行commit或者rollback即可
参考博客