最近在回滚分布式事务的一些内容,也与一些同学有了不少讨论。在文章的开头,先表达一个个人的观点:分布式事务是分布式系统中一个常见的问题。我们讨论它主要是为了解决这个问题,而不应该拘泥于某一种“市面上常见的解决方案”或者“某一个框架”。
个人觉得 3PC 中的一些思想在 TCC 中也有所体现,所以在步入正题探讨 TCC 之前先看一下 3PC 这一种分布式事务的解决方案。
3PC
3PC,英文就是 three-phase-commitment,即三阶段提交协议,属于 2PC 的改进版,将 2PC 的第一阶段又拆分成了两个阶段。形成了 CanCommit、PreCommit、DoCommit 三个阶段。
其中 CanCommit 阶段就是做一个检查询问工作,还没开始干“正事”。如果在 CanCommit 阶段有一个 RM 失败了,那么整个分布式事务就直接失败了。
如果所有的 RM CanCommit 阶段都返回成功了,那么就会进入 PreCommit 阶段,这个阶段其实跟 2PC 的第一个阶段差不多,各个 RM 会执行本地事务,但是不提交,等着 TM 发 commit 或者 rollback 消息。要注意的就是如果在超时时间范围内 RM 还没有收到 TM 在第三阶段要发的消息,那么会直接自己执行 commit 操作。因为能够到 PreCommit 阶段开始执行,证明所有的 RM 在 CanCommit 阶段都是成功了的,那么超时后 RM 就会自己 Commit。
但是可能出现的一个问题是,如果 TM 本来是想给 RM 发 rollback 消息的,结果因为一些原因在超时时间内 RM 没有收到,此时 RM 仍然会 commit,这就出现了数据不一致。
那么问题就来了,为什么会出现“TM 本来是想给 RM 发 rollback 消息”呢,有这么几种情况:
- 网络问题 TM 无法在指定时间收到 RM 在 PreCommit 阶段的响应结果
- 有 RM 在 PreCommit 执行失败了
网络原因先不说,那为啥会“有 RM 在 PreCommit 执行失败了”呢,关键原因还是 CanCommit 阶段没有“Can”到位,换句话说,如果 CanCommit 阶段可以确保 PreCommit 执行成功的话(这是一个非常重要的点,对后面分析 TCC 非常关键),就不会出现后面的麻烦事了。
个人觉得 3PC 其实整体思想挺好的,但是有点“过于理想化”,不是很完善,还是无法应对一些场景,于是又有了别的解决思路来处理分布式事务的问题。
TCC
接下来步入正题,到 TCC 了。
TCC 就是 try-confirm-cancel 的首字母:
- Try: 尝试执行业务
- Confirm: 确认执行业务
- Cancel: 取消执行业务
是一个基于两阶段提交的柔性事务解决方案。那么为什么称之为两阶段提交呢,因为 TCC 中 Try 是一个阶段,Confirm/Cancel 是一个阶段。
概念很简单,但是细节很多,接下来一一分析。
Try 阶段究竟能干啥
Try 阶段我们通常说的是“尝试执行事务”,那么什么算“尝”试执行事务呢,它与 3PC 中的 CanCommit 阶段又有什么区别呢?
所谓的“尝试”执行事务,称之为是“资源占用”或者“资源标识”更为贴切。以用户下单的一个简要流程为例:
1.用户下单(inst1)
1.1.生成订单(inst1 本地事务)
1.2.用户京豆+线上支付+增加积分(inst1 RPC inst2)
1.2.1.扣款(inst2 RPC inst3)
1.2.2.扣减京豆(inst2 RPC inst4)
1.2.2.增加积分(inst2 RPC inst5)
1.3.扣减库存(inst1 RPC inst6)
1.4.发送下单成功短信(inst1 RPC inst7)
首先在 1.1 系统已经生成订单的时候,这个订单既不能是“无效”也不能是“有效”状态,可以给这个订单加一个“下单中”的中间状态,一直到 1.3 执行成功后才能将状态变更为“有效”状态。这个地方就能交给“Try”去做。
接着看 1.2 步骤,用户京豆+线上支付+增加积分,也就是要做三个事情:扣减京东、扣款和增加积分,比如用户有 100 个京豆,要扣 50 个;有 200 块钱,要扣 30 块钱;先有积分 1000 分,要加 10 分。那么就可以把用户的 100 个京豆中的 50 个,200 块钱中的 30 块钱设置为“冻结状态”,要增加的 10 积分也设置为冻结状态。待整个分布式事务执行完成后,执行 Confirm,冻结的数据该加的加,该减的减。
Cancel 阶段是取消 Try 的还是取消 Confirm 的
这是一个很有意思的点,TCC 的三个流程字面上都好理解。那么到底 Cancel 阶段是取消 Try 的还是取消 Confirm 的呢。这个时候就很容易产生飞一般的思绪:
- Try 失败了怎么办?
- Confirm 失败了怎么办?
- Cancel 失败了怎么办?
首先 Try 失败了肯定是执行 Cancel,这个没啥争议。那么 Confirm 失败了怎么办呢,也交给 Cancel 嘛?
- 如果 Confirm 失败了也如果交给 Cancel,上面已经说过了,Try 失败了肯定要执行 Cancel 的,那如果 Confirm 失败了也交给 Cancel,要注意的是,Try 和 Confirm 执行逻辑不一样,那它们的 Cancel 自然也是不一样的。那是不是说明咱们实现 TCC 的时候就得有两个 Cancel,一个给 Try 失败了用,一个给 Confirm 失败了用;
- 如果 Confirm 失败了不交给 Cancel,那一个 RM Confirm 失败了就会造成数据不一致,还是产生了分布式事务的问题;
- Cancel 失败了怎么办?总不能再搞一个 Cancel 吧。
其实这里就有点仁者见仁智者见智了,还是在文章开头我的观点,我们不要拘泥于某一种“市面上常见的解决方案”或者“某一个框架“上面,而是应该根据业务场景去分析。
通过查阅 ByteTCC 和 TCC-Transaction 这两个 TCC 实现框架的源码,发现它们都是”Try 失败了就走 Cancel,Cancel/Confirm 失败了就不停重试“。
ByteTCC
这里直奔主题,看下 org.bytesoft.bytetcc.work.CompensableWork#run
-> org.bytesoft.bytetcc.TransactionRecoveryImpl#timingRecover
-> org.bytesoft.bytetcc.TransactionRecoveryImpl#recoverTransaction
-> org.bytesoft.bytetcc.TransactionRecoveryImpl#recoverCoordinator
方法:
private void recoverCoordinator(Transaction transaction)
throws CommitRequiredException, RollbackRequiredException, SystemException {
CompensableManager compensableManager = this.beanFactory.getCompensableManager();
TransactionLock compensableLock = this.beanFactory.getCompensableLock();
org.bytesoft.transaction.TransactionContext transactionContext = transaction.getTransactionContext();
TransactionXid xid = transactionContext.getXid();
boolean forgetRequired = false;
boolean locked = false;
try {
compensableManager.associateThread(transaction);
switch (transaction.getTransactionStatus()) {
case Status.STATUS_ACTIVE:
case Status.STATUS_MARKED_ROLLBACK:
case Status.STATUS_PREPARING:
case Status.STATUS_UNKNOWN: /* TODO */ {
if (transactionContext.isPropagated() == false) {
if ((locked = compensableLock.lockTransaction(xid, this.endpoint)) == false) {
throw new SystemException();
}
transaction.recoveryRollback();
forgetRequired = true;
}
break;
}
case Status.STATUS_ROLLING_BACK: {
if ((locked = compensableLock.lockTransaction(xid, this.endpoint)) == false) {
throw new SystemException();
}
transaction.recoveryRollback();
forgetRequired = true;
break;
}
case Status.STATUS_PREPARED:
case Status.STATUS_COMMITTING: {
if ((locked = compensableLock.lockTransaction(xid, this.endpoint)) == false) {
throw new SystemException();
}
transaction.recoveryCommit();
forgetRequired = true;
break;
}
case Status.STATUS_COMMITTED:
case Status.STATUS_ROLLEDBACK:
forgetRequired = true;
break;
default: // ignore
}
} finally {
compensableManager.desociateThread();
if (locked) {
compensableLock.unlockTransaction(xid, this.endpoint);
} // end-if (locked)
if (forgetRequired) {
transaction.forgetQuietly(); // forget transaction
} // end-if (forgetRequired)
}
}
可以看到 ByteTCC 会每隔 60s 去不断重试失败的 Confirm/Cancel 操作。
TCC-Transaction
看下 org.mengyun.tcctransaction.recovery.TransactionRecovery#startRecover()
-> org.mengyun.tcctransaction.recovery.TransactionRecovery#concurrentRecoveryErrorTransactions
-> org.mengyun.tcctransaction.recovery.TransactionRecovery.RecoverTask#call
-> org.mengyun.tcctransaction.recovery.TransactionRecovery#recoverErrorTransaction
方法:
private void recoverErrorTransaction(TransactionRepository transactionRepository, Transaction transaction) {
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverFrequency().getMaxRetryCount()) {
...
try {
if (transaction.getTransactionType().equals(TransactionType.ROOT)) {
switch (transaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
rollbackTransaction(transactionRepository, transaction);
break;
default:
//the transaction status is TRYING, ignore it.
break;
}
} else {
//transaction type is BRANCH
switch (transaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
case TRY_FAILED:
rollbackTransaction(transactionRepository, transaction);
break;
case TRY_SUCCESS:
if(transactionRepository.getRootDomain() == null) {
break;
}
//check the root transaction
Transaction rootTransaction = transactionRepository.findByRootXid(transaction.getRootXid());
if (rootTransaction == null) {
// In this case means the root transaction is already rollback.
// Need cancel this branch transaction.
rollbackTransaction(transactionRepository, transaction);
} else {
switch (rootTransaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
rollbackTransaction(transactionRepository, transaction);
break;
default:
break;
}
}
break;
default:
// the transaction status is TRYING, ignore it.
break;
}
}
} catch (Throwable throwable) {
...
}
可以看到 TCC-Transaction 也是基于 Quartz 只针对 Confirm 和 Cancel 失败的操作才进行重试。
TCC 本质是同步调用还是异步调用
在《分布式事务之 Spring 编程式事务》中我举了一个例子说明了将 RPC 调用也包裹在一个事务方法中会占用数据库连接,占用时间过长就会触发无法获取连接的异常。那么以上面用户下单为例,生成订单、支付、扣减库存、发短信这些操作,如果使用 TCC,那么他们也都需要包裹在一个事务里面吗。按照通用 TCC 的解决思路,是的。但是要注意的是,这里包裹在一个事务中的 RPC 并不是执行操作的 RPC,而是执行 Try 操作的 RPC。以 ByteTCC 的使用 Demo 为例(Demo 地址:https://gitee.com/dongguabai/blog/tree/master/bytetcc-sample):
数据库连接数设置为 2:
public DataSource invokeGetDataSource() {
BasicDataSource bds = new BasicDataSource();
bds.setDriverClassName("com.mysql.jdbc.Driver");
bds.setUrl("jdbc:mysql://192.168.18.176:3306/spvi");
bds.setUsername("root");
bds.setPassword("root");
bds.setMaxTotal(2);
bds.setInitialSize(2);
bds.setMaxWaitMillis(60000);
bds.setMinIdle(6);
bds.setLogAbandoned(true);
bds.setRemoveAbandonedOnBorrow(true);
bds.setRemoveAbandonedOnMaintenance(true);
bds.setRemoveAbandonedTimeout(1800);
bds.setTestWhileIdle(true);
bds.setTestOnBorrow(false);
bds.setTestOnReturn(false);
bds.setValidationQuery("select 'x' ");
bds.setValidationQueryTimeout(1);
bds.setTimeBetweenEvictionRunsMillis(30000);
bds.setNumTestsPerEvictionRun(20);
return bds;
}
被调用方增加耗时操作:
@ResponseBody
@RequestMapping(value = "/decrease", method = RequestMethod.POST)
@Transactional
public void decreaseAmount(@RequestParam("acctId") String acctId, @RequestParam("amount") double amount) {
System.out.println(new Date()+":"+Thread.currentThread().getId()+"->进来了......");
try {
Thread.sleep(6000000); //耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = this.jdbcTemplate.update(
"update tb_account_one set amount = amount - ?, frozen = frozen + ? where acct_id = ?", amount, amount, acctId);
if (value != 1) {
throw new IllegalStateException("ERROR!");
}
System.out.printf("exec decrease: acct= %s, amount= %7.2f%n", acctId, amount);
//throw new IllegalStateException("error");
}
调用方本地事务操作和 RPC 都包裹在一个事务中:
@ResponseBody
@RequestMapping(value = "/simplified/transfer", method = RequestMethod.POST)
@Transactional
public void transfer(@RequestParam String sourceAcctId, @RequestParam String targetAcctId, @RequestParam double amount) {
this.increaseAmount(targetAcctId, amount); // 本地事务
this.acctService.decreaseAmount(sourceAcctId, amount); // rpc try
// throw new IllegalStateException("rollback!");
}
当连续三个请求打到调用方的时候,就出现了异常:
{
"nested exception is org.apache.ibatis.exceptions.PersistenceException: \n### Error updating database. Cause: org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is java.sql.SQLException: Cannot get a connection, pool error Timeout waiting for idle object\n### Cause: org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is java.sql.SQLException: Cannot get a connection, pool error Timeout waiting for idle object",
"path": "/simplified/transfer"
}
不过一般 Try 操作会很快,所以也可以接收,但也说明了常规 TCC 更适合各个操作耗时都比较短的场景。当然这里也可以变通,如果没有数据依赖的话,可以先 RPC 再执行本地事务。
如果项目要使用 TCC 该做哪些改造
一般来说首先涉及到的接口肯定得改造,至少要有一个 Try 操作,Confirm 和 Cancel 不一定是必须的,以 ByteTCC 官方文档为例:
3.1、TCC型服务Try业务定义
通过@Compensable注解定义的service为可补偿型service。@Compensable注解需要定义三个参数:
1)interfaceClass,必需。该值用于指定confirm/cancel针对的业务接口,该接口同时被用于校验confirm/cancel实现类。confirm/cancel实现类如果没有实现该业务接口则会被认为无效;
2)confirmableKey,可选。该值用于指定confirm实现类在容器中的beanId,若没有confirm逻辑则不必指定;
3)cancellableKey,可选。该值用于指定cancel实现类在容器中的beanId,若没有cancel逻辑则不必指定;注意:若try阶段执行了写操作则必须有相应的取消逻辑;
其他的改造,这个还是得看准备怎么实现 TCC,有的封装程度较高的框架会提供特定 RPC 的支持包,比如你用的 Dubbo,他会有 Dubbo 的支持,用的 Feign,他会有对 Feign 相关的支持。数据源肯定也会有相关的改造,但这个改动一般不大。
链式调用如何处理
接下来说一种比较复杂的场景可能出现的问题。
上面已经说了涉及到 TCC 的方法会需要改造成三个接口,Try、Confirm 和 Cancel:
inst1_methodA(T、C、C)->inst2_methodB(T、C、C)->inst3_methodC(T、C、C)->inst4_methodD(T、C、C)
这里涉及到了四个微服务的调用链路,每个。一般来说,其实我们编写业务代码的时候,都是写“Try”的,也就是这样:
inst1_methodA(T)->inst2_methodB(T)->inst3_methodC(T)->inst4_methodD(T)
至于 Confirm 和 Cancel,那就是框架会去帮我们自行处理。当然这里指的是一般场景,具体还是要看框架的实现。
总结
本文主要是介绍了 TCC 几个常见的探讨点,后面将会选一个或者多个 TCC 的实现框架进行源码分析和手写实现一个自己的 TCC 框架。
References
- https://github.com/slimina/ByteTCC
- https://github.com/changmingxie/tcc-transaction