一、使用springboot配置类集成
1.添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2.创建mq-provider微服务,配置文件添加如下内容
server.port=8090
spring.application.name=mq-provider
spring.rabbitmq.addresses=ip:5672,ip:5673,ip:5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
# 消息发送者与交换器(exchange)确认机制,开启确认回调
spring.rabbitmq.publisher-confirm-type=correlated
# 消息发送者与队列(queue)错误机制,是否开启:消息没有到达queue返回回调
spring.rabbitmq.publisher-returns=true
3.添加rabbitmq的配置类,需要创建一个队列(queue),一个交换器(exchange),并把队列和交换器通过路由键绑定
@Configuration
public class RabbitMqConfig {
@Bean
public Queue stockQueue() {
/**
* 队列名称
* 是否持久化队列
*/
return new Queue("StockQueue",true);
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("OrderDirectExchange",false,false);
}
@Bean
public Binding directStockBinding() {
return BindingBuilder.bind(this.stockQueue()).to(this.directExchange()).with("order.direct.stock.routingKey");
}
}
4.发送消息,通过指定exchange和routingKey,即可将消息发送到queue中
@Service
public class MessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) {
exchange = "OrderDirectExchange";
routingKey = "order.direct.stock.routingKey";
log.info("开始发送消息" + message);
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}
}
5.监听消息发送端和服务端(broker)的确认机制,前提是配置文件中配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@Slf4j
@Component
public class ConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// 指定调用 confirm 方法的对象,该对象必须实现RabbitTemplate.ConfirmCallback接口,且一个 RabbitTemplate 实例只能指定一个对象
// returnedMessage 方法同理
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 消息发送到exchange,确认是否成功的回调方法
* @param data
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData data, boolean ack, String cause) {
log.info(" 回调id: {}", data);
if (ack) {
log.info("消息成功发送");
} else {
log.info("消息发送失败:" + cause);
}
}
/**
* 消息发送到exchange,但进入queue失败时,回调的方法
* @param message 消息体
* @param replyCode 应答码
* @param replyText 应答内容
* @param exchange 交换器
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("replyCode: " + replyCode);
log.info("replyText: " + replyText);
log.info("exchange: " + exchange);
log.info("routingKey: " + routingKey);
log.info("message: " + message);
}
6.至此,消息发送端编写完成,消息已经能成功发送到队列中,可以打开rabbitmq自带的web管理页面查看
7.编写消息消费端,创建mq-consumer微服务,配置文件中添加如下配置
server.port=8091
spring.application.name=mq-consumer
spring.rabbitmq.addresses=124.70.69.70:5672,124.70.69.70:5673,124.70.69.70:5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
# 设置消费者消费消息为手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
8.编写mq消费类,只需在消费端监听对应的queue,即可消费消息
@Slf4j
@Component
@RabbitListener(queues = {"StockQueue"})
public class ConsumerProcess {
@RabbitHandler
public void receive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
// todo 要处理的业务逻辑
/**
* 手动确认消息,确认之后代表已消费
* tag: 消息进入channel的唯一标识,自增的序列
* multiple=true,表示确认所有小于等于tag的消息
*/
channel.basicAck(tag,false);
/**
* 手动否认消息,否认之后根据 requeue=true/false 决定是否重新进入队列
* tag: 消息进入channel的唯一标识,自增的序列
* multiple=true,表示否认所有小于等于tag的消息
*/
//channel.basicNack(tag,false,false);
/**
* 手动拒绝消息,拒绝之后不会重新进入队列
* tag: 消息进入channel的唯一标识,自增的序列
* multiple=true,表示拒绝所有小于等于tag的消息
*/
//channel.basicReject(tag,false);
System.out.println("消费消息: " + msg);
}
}
9.至此,消费端编写完成,需要注意的是,消费端监听的queue必须是已经存在的,即发送端必须先将消息发送出去,否则服务启动报错
二、上面使用springboot配置类的集成方式已经介绍完,下面介绍使用注解集成的方式
1.发送端的RabbitMqConfig配置类取消掉,其它不变
2.消费端的RabbitListener注解使用如下,其它不变
@RabbitListener(
bindings = {
@QueueBinding(value = @Queue(value = "StockQueue",autoDelete = "false"),
exchange = @Exchange(value = "OrderDirectExchange"),
key = {"order.direct.stock.routingKey"}
)}
)
3.至此,使用注解集成rabbitmq已介绍完,需要注意的是,exchange和queue的创建在消费端的微服务中,消费端的服务起来了,把exchange和queue创建好,发送端才能发送消息
4.springboot集成rabbitmq的两种方式介绍已完成,注解更简洁,配置类更灵活,看个人喜好了
5.demo源码地址
https://github.com/lanjiangit/mq-parent.git
6.关于rabbitmq集群的搭建可以参考
https://www.jianshu.com/p/d231844b9c46