0
点赞
收藏
分享

微信扫一扫

SpringBoot2.x Nacos RocketMQ 事务消息


需求背景:
现在有内容中心(content-center)和 用户中心(user-center)2个微服务,请求内容中心,发送消息给用户中心,完成为指定用户添加积分操作。

SpringBoot2.x Nacos RocketMQ 事务消息_apache

文章目录

  • ​​一、准备工作​​
  • ​​1. 版本对照​​
  • ​​2. 下载启动RocketMQ​​
  • ​​3. 引入maven依赖​​
  • ​​二、内容中心(服务端)​​
  • ​​2.1. 表结构设计​​
  • ​​2.2. 配置MQ信息​​
  • ​​2.3. 控制层​​
  • ​​2.4. service层​​
  • ​​2.5. RocketMQ 事务消息监听​​
  • ​​三、用户中心(客户端)​​
  • ​​3.1. 依赖​​
  • ​​3.2.配置​​
  • ​​3.3. 消息监听​​
  • ​​开源项目:​​
一、准备工作
1. 版本对照

RocketMQ 版本

RocketMQ控制台版本

RocketMQ starter版本

RocketMQ 4.8.0

支持RocketMQ 4.8.0

2.2.0

2. 下载启动RocketMQ

linux 环境 RocketMQ 4.8.0 安装、部署控制台
​​​​​

windows下RocketMQ下载、安装、部署、控制台​

3. 引入maven依赖

<!--集成rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

二、内容中心(服务端)

消息发送端代码编写

2.1. 表结构设计

share分享表和rocketmq_transaction_logRocketMQ事务日志表2张表,
share

CREATE TABLE IF NOT EXISTS `share` (
`id` INT NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` INT NOT NULL DEFAULT 0 COMMENT '发布人id',
`title` VARCHAR(80) NOT NULL DEFAULT '' COMMENT '标题',
`create_time` DATETIME NOT NULL COMMENT '创建时间',
`update_time` DATETIME NOT NULL COMMENT '修改时间',
`is_original` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否原创 0:否 1:是',
`author` VARCHAR(45) NOT NULL DEFAULT '' COMMENT '作者',
`cover` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '封面',
`summary` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '概要信息',
`price` INT NOT NULL DEFAULT 0 COMMENT '价格(需要的积分)',
`download_url` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '下载地址',
`buy_count` INT NOT NULL DEFAULT 0 COMMENT '下载数 ',
`show_flag` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否显示 0:否 1:是',
`audit_status` VARCHAR(10) NOT NULL DEFAULT 0 COMMENT '审核状态 NOT_YET: 待审核 PASSED:审核通过 REJECTED:审核不通过',
`reason` VARCHAR(200) NOT NULL DEFAULT '' COMMENT '审核不通过原因',
PRIMARY KEY (`id`))
ENGINE = InnoDB
COMMENT = '分享表';

rocketmq_transaction_logRocketMQ

-- -----------------------------------------------------
-- Table `rocketmq_transaction_log`
-- -----------------------------------------------------
create table rocketmq_transaction_log
(
id int auto_increment comment 'id'
primary key,
transaction_Id varchar(45) not null comment '事务id',
log varchar(45) not null comment '日志'
)
comment 'RocketMQ事务日志表';

具体详情:见项目源码

2.2. 配置MQ信息
  • 项目内部yml配置

server:
port: 8003
spring:
application:
# 应用名称
name: ly-rockketmq
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: nacos.server.com:8848
config:
# 配置中心地址
server-addr: nacos.server.com:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

  • nacos服务端配置

# MQ name-server地址
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必须指定group

2.3. 控制层

package com.gblfy.lyrocketmq.controller;

import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {

private final ShareService shareService;

@PutMapping("/audit/{id}")
public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
//TODO 认证授权
return this.shareService.auditById(id, auditDTO);
}
}

2.4. service层

package com.gblfy.lyrocketmq.service;

import com.gblfy.api.RemoteProductService;
import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.common.dto.ShareDTO;
import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.common.dto.UserDTO;
import com.gblfy.common.enums.AuditStatusEnum;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.mapper.ShareMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Objects;
import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareService {

private final ShareMapper shareMapper;
private final RemoteProductService userCenterFeignClient;
private final RocketMQTemplate rocketMQTemplate;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

public ShareDTO findById(Integer id) {
Share share = this.shareMapper.selectByPrimaryKey(id);
Integer userId = share.getUserId();

UserDTO userDTO = this.userCenterFeignClient.findById(userId);

ShareDTO shareDTO = new ShareDTO();
BeanUtils.copyProperties(share, shareDTO);
//设置发布人
shareDTO.setWxNickname(userDTO.getWxNickname());
return shareDTO;
}

public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
}

//----------------------------------------发送半消息----------------------------------------
// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
//消息id
String transactionId = UUID.randomUUID().toString();

this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
MessageBuilder.withPayload(UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
// Header有妙用
).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.build(),
//arg有大用处
auditDTO);

} else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}

/**
* 审批
*
* @param id
* @param auditDTO
*/
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享..")
.build()
);
}
}

2.5. RocketMQ 事务消息监听

package com.gblfy.lyrocketmq.listener;

import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

/**
* 执行本地事务
*
* @param msg 消息header信息
* @param arg 消息体
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer share_id = Integer.valueOf((String) headers.get("share_id"));

try {
this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}

/**
* 本地事务的检查,检查本地事务是否成功
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId).build()
);
if (rocketmqTransactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}

详细见源码:本文底部

三、用户中心(客户端)

消息消费端代码编写

3.1. 依赖

<!--集成rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

3.2.配置
  • 项目内部yml配置

server:
port: 9000
spring:
application:
# 应用名称
name: ly-product
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: nacos.server.com:8848
config:
# 配置中心地址
server-addr: nacos.server.com:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

  • nacos服务端配置

rocketmq:
name-server: 127.0.0.1:9876

3.3. 消息监听

package com.gblfy.product.listenner;

import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.product.entity.BonusEventLog;
import com.gblfy.product.entity.User;
import com.gblfy.product.mapper.BonusEventLogMapper;
import com.gblfy.product.mapper.UserMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group")
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {

private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;

@Override
public void onMessage(UserAddBonusMsgDTO message) {
// 1. 为用户添加积分
Integer userId = message.getUserId();
Integer bonus = message.getBonus();

User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);

// 2.记录日志到bonus_event_log表中
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event("CONTRIBUTE")
.createTime(new Date())
.description("投稿加积分...")
.build()
);

log.info("积分添加完毕...");
}
}

开源项目:

​​https://gitee.com/gb_90/micro-service-parent​​


举报

相关推荐

0 条评论