一:如何保证本地操作数据库和发送消息的一致性
思路一:先发消息后操作数据库
begin transaction
// 1. 发送MQ
// 2. 数据库操作
commit transaction
- MQ发送失败,当然第二步也得不到执行,保证了事务一致性。
- 可是如果是发送MQ成功了,数据库操作失败了,已经发送的MQ就收不回来了,这种情况就不能保证事务的一致性。此时奇迹出现了,RocketMQ能够实现发出去的消息再收回来。
RocketMQ支持发送一种事务消息
也称为半消息
,当生产者发出半消息时,此时RocketMQ会将当前消息标记为等待消费的状态,也就是说消费者是消费不到”等待消费“状态的消息的,这就是半消息的由来,生产者发了消息但是消费者收不到消息,整个过程只完成了一半。
当BrokerServer收到一个半消息时会异步通知生产者,告诉生产者MQ已经成功收到消息了(100%收到了),当生产者接受到成功发送的消息时就可以执行本地数据库的相关操作,当所有业务逻辑都成功处理完了就给MQ回一个提交状态,当MQ收到这个提交状态,就会将半消息的状态改为允许消费的状态,这样消费者就能够消费到消息。如果在执行本地数据库相关操作时出现异常就给MQ一个回滚的状态,MQ收到回滚状态就会将半消息给删除掉,这样消费者也消费不到消息,这样就保证了数据库操作和成功发送消除的一致性。
当生产者向MQ响应提交状态或者回滚状态时假如网络超时,MQ没接收到,MQ会定时调用生产者,让生产者重新响应该消息是什么状态。
二:RocketMQ发送事务消息
事务执行结果表。
CREATE TABLE `tbl_mq_tx_log` (
`id` bigint(20) NOT NULL,
`tx_no` varchar(255) DEFAULT NULL COMMENT '半事务全局唯一值',
`status` tinyint(1) DEFAULT NULL COMMENT '0: 未发送, 1: 处理成功, 2: 处理失败',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
发送消息。发送消息时需要指定事务生产者组名(txProducerGroup),后面事务监听器会使用到。
@RequestMapping("/sendTx")
public void sendTx() {
String uuid = UUID.randomUUID().toString();
mqTxLogMapper.insert(new MQTxLog(uuid, 0));
Map<String, Object> headers = new HashMap<>();
headers.put("txNo", uuid);
GenericMessage message = new GenericMessage("tx msg body", headers);
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("testTxProducerGroup", "tx-topic", message, uuid);
log.info("send transcation message body={},result= {}",message.getPayload(),transactionSendResult.getSendStatus());
}
本地业务逻辑处理。
@Service
public class TestService {
@Transactional(rollbackFor = Exception.class)
public void createAccount() {
System.out.println("处理业务逻辑, 插入用户");
}
}
本地事务监听器。注意rocketmq-spring-boot-starter不同版本中的@RocketMQTransactionListener参数不太一样,高版本的可能没有txProducerGroup参数,这里使用的是2.0.2。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
本地事务监听器。指定需要监听的事务消息组。
@Component
@RocketMQTransactionListener(txProducerGroup = "testTxProducerGroup")
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private MQTxLogMapper mqTxLogMapper;
@Autowired
private TestService userService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("RocketMQLocalTransactionListener#executeLocalTransaction arg=" + arg);
userService.createAccount();
// 更新事务状态为成功
MQTxLog mqTxLog = new MQTxLog();
mqTxLog.setStatus(1);
mqTxLogMapper.update(mqTxLog, Wrappers.<MQTxLog>lambdaUpdate().eq(MQTxLog::getTxNo, arg));
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 更新事务状态为失败
MQTxLog mqTxLog = new MQTxLog();
mqTxLog.setStatus(2);
mqTxLogMapper.update(mqTxLog, Wrappers.<MQTxLog>lambdaUpdate().eq(MQTxLog::getTxNo, arg));
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("消息回查");
// 在数据库中查询事务的处理结果
String txNo = msg.getHeaders().get("txNo").toString();
MQTxLog mqTxLog = mqTxLogMapper.selectOne(Wrappers.<MQTxLog>lambdaQuery().eq(MQTxLog::getTxNo, txNo));
if (mqTxLog == null) {
return RocketMQLocalTransactionState.UNKNOWN;
} else {
Integer status = mqTxLog.getStatus();
if (status == 1) {
return RocketMQLocalTransactionState.COMMIT;
} else if (status == 2) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
消费消息。
@Component
@RocketMQMessageListener(
consumerGroup = "txConsumerGroup",
topic = "tx-topic"
)
public class TxTopicConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String messageExt) {
System.out.println(messageExt);
}
}