0
点赞
收藏
分享

微信扫一扫

springboot集成rabbitmq的两种方式

向上的萝卜白菜 2021-09-25 阅读 65

一、使用springboot配置类集成
1.添加依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
 </parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2.创建mq-provider微服务,配置文件添加如下内容

server.port=8090
spring.application.name=mq-provider

spring.rabbitmq.addresses=ip:5672,ip:5673,ip:5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
# 消息发送者与交换器(exchange)确认机制,开启确认回调
spring.rabbitmq.publisher-confirm-type=correlated
# 消息发送者与队列(queue)错误机制,是否开启:消息没有到达queue返回回调
spring.rabbitmq.publisher-returns=true

3.添加rabbitmq的配置类,需要创建一个队列(queue),一个交换器(exchange),并把队列和交换器通过路由键绑定

@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue stockQueue() {
        /**
         * 队列名称
         * 是否持久化队列
         */
        return new Queue("StockQueue",true);
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("OrderDirectExchange",false,false);
    }
    
    @Bean
    public Binding directStockBinding() {
        return BindingBuilder.bind(this.stockQueue()).to(this.directExchange()).with("order.direct.stock.routingKey");
    }
}

4.发送消息,通过指定exchange和routingKey,即可将消息发送到queue中

@Service
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
   
    public void sendMessage(String exchange, String routingKey, String message) {
        exchange = "OrderDirectExchange";
        routingKey = "order.direct.stock.routingKey";
        log.info("开始发送消息" + message);
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }
    
}

5.监听消息发送端和服务端(broker)的确认机制,前提是配置文件中配置

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@Slf4j
@Component
public class ConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 指定调用 confirm 方法的对象,该对象必须实现RabbitTemplate.ConfirmCallback接口,且一个 RabbitTemplate 实例只能指定一个对象
        // returnedMessage 方法同理
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 消息发送到exchange,确认是否成功的回调方法
     * @param data
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData data, boolean ack, String cause) {
        log.info(" 回调id: {}", data);
        if (ack) {
            log.info("消息成功发送");
        } else {
            log.info("消息发送失败:" + cause);
        }
    }

    /**
     * 消息发送到exchange,但进入queue失败时,回调的方法
     * @param message 消息体
     * @param replyCode 应答码
     * @param replyText 应答内容
     * @param exchange 交换器
     * @param routingKey 路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("replyCode: " + replyCode);
        log.info("replyText: " + replyText);
        log.info("exchange: " + exchange);
        log.info("routingKey: " + routingKey);
        log.info("message: " + message);
    }

6.至此,消息发送端编写完成,消息已经能成功发送到队列中,可以打开rabbitmq自带的web管理页面查看


7.编写消息消费端,创建mq-consumer微服务,配置文件中添加如下配置

server.port=8091
spring.application.name=mq-consumer

spring.rabbitmq.addresses=124.70.69.70:5672,124.70.69.70:5673,124.70.69.70:5674
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/

# 设置消费者消费消息为手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

8.编写mq消费类,只需在消费端监听对应的queue,即可消费消息

@Slf4j
@Component
@RabbitListener(queues = {"StockQueue"})
public class ConsumerProcess {
    @RabbitHandler
    public void receive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // todo 要处理的业务逻辑
        
        /**
         * 手动确认消息,确认之后代表已消费
         * tag: 消息进入channel的唯一标识,自增的序列
         * multiple=true,表示确认所有小于等于tag的消息
         */
        channel.basicAck(tag,false);
        
        /**
         * 手动否认消息,否认之后根据 requeue=true/false 决定是否重新进入队列
         * tag: 消息进入channel的唯一标识,自增的序列
         * multiple=true,表示否认所有小于等于tag的消息
         */
        //channel.basicNack(tag,false,false);
        
        /**
         * 手动拒绝消息,拒绝之后不会重新进入队列
         * tag: 消息进入channel的唯一标识,自增的序列
         * multiple=true,表示拒绝所有小于等于tag的消息
         */
        //channel.basicReject(tag,false);
        
        System.out.println("消费消息: " + msg);
    }
    
}

9.至此,消费端编写完成,需要注意的是,消费端监听的queue必须是已经存在的,即发送端必须先将消息发送出去,否则服务启动报错
二、上面使用springboot配置类的集成方式已经介绍完,下面介绍使用注解集成的方式
1.发送端的RabbitMqConfig配置类取消掉,其它不变
2.消费端的RabbitListener注解使用如下,其它不变

@RabbitListener(
    bindings = {
    @QueueBinding(value = @Queue(value = "StockQueue",autoDelete = "false"),
        exchange = @Exchange(value = "OrderDirectExchange"),
        key = {"order.direct.stock.routingKey"}
    )}
)

3.至此,使用注解集成rabbitmq已介绍完,需要注意的是,exchange和queue的创建在消费端的微服务中,消费端的服务起来了,把exchange和queue创建好,发送端才能发送消息
4.springboot集成rabbitmq的两种方式介绍已完成,注解更简洁,配置类更灵活,看个人喜好了
5.demo源码地址
https://github.com/lanjiangit/mq-parent.git
6.关于rabbitmq集群的搭建可以参考
https://www.jianshu.com/p/d231844b9c46

举报

相关推荐

0 条评论