0
点赞
收藏
分享

微信扫一扫

SpringBoot操作RabbitMQ

一、SpringBoot操作RabbitMQ

springboot整合rabbitMQ只需要添加如下依赖即可:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

在application.yaml中配置RabbitMQ信息

spring:
  rabbitmq:
    host: 192.168.42.146
    port: 5672
    username: guest
    password: guest
    virtual-host: / #虚拟主机用于在RabbitMQ服务器上创建逻辑上的隔离环境。在这里,虚拟主机的名称是/,表示使用默认虚拟主机
    #生产端
    publisher-returns: true # 启用生产者消息返回机制。当消息无法路由到队列时,生产者将收到返回的消息
    template:
      mandatory: true # 设置生产者发送消息时是否要求消息必须成功路由到队列。如果设置为true,当消息无法路由到队列时,将抛出异常。
    #消费端
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
        #初始连接数量
        concurrency: 5 #设置消费者的初始连接数量。这表示同时处理的消息数量
        #最大连接数量
        max-concurrency: 10 # 设置消费者的最大连接数量。这表示允许的最大同时处理的消息数量。
        #限流
        prefetch: 1 # 设置消费者的预取计数。这表示在消费者处理完当前消息之前,不会从RabbitMQ服务器获取更多的消息。

二、创建配置RabbitMQ配置类

在com.augus.config包下创建配置类:RabbitMQConfig,在里面创建交换机、队列,然后进行绑定,代码如下:

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE = "springboot_exchange";
    public static final String QUEUE = "springboot_queue";
    public static final String ROUTING_KEY = "*.black.*";

    /**
     * 创建一个交换机对象,注入到bean中
     * @return 返回一个主题交换机对象
     */
    @Bean
    public Exchange bootExchange(){
        //创建一个主题交换机(Topic Exchange)对象并指定了交换机的名称为 EXCHANGE。然后,通过调用 build() 方法,创建并返回一个主题交换机对象
        return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }

    /**
     * 创建一个队列(Queue)对象。
     * @return 返回队列对象
     */
    @Bean
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 创建一个绑定(Binding)对象,将队列绑定到交换机上。
     * @param bootExchange 交换机对象,通过 bootExchange() 方法获取
     * @param bootQueue 队列对象,通过 bootQueue()方法获取
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange bootExchange, Queue bootQueue) {
        //通过 bootExchange() 方法创建了一个名为 EXCHANGE 的主题交换机(Topic Exchange),
        // 通过 bootQueue() 方法创建了一个名为 QUEUE 的持久化队列(Durable Queue)。然后,通过 bootBinding() 方法将队列和交换机进行绑定,
        // 绑定的路由键为 ROUTING_KEY
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }

}

三、创建生产者

创建测试类,模拟生产者产生数据发到队列,代码如下:

import com.augus.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class PublisherTest {
    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        //将消息 "你好,国庆节去哪里玩呢?" 发送到名为 EXCHANGE 的交换机,并使用路由键 "topic.black.dog" 进行消息路由。根据交换机和队列的绑定关系,消息将被路由到与该路由键匹配的队列中,供消费者进行消费。
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"topic.black.dog","你好,张飒,明天有空吗?");
        System.out.println("消息发送成功");
    }

    /**
     * 向 RabbitMQ 发布一条带有自定义属性的消息, 和上一个方法别,这里可以自定义属性
     * new MessagePostProcessor() 是一个匿名内部类,用于对消息进行后处理操作。
     * postProcessMessage() 方法是 MessagePostProcessor 接口的实现方法,用于对消息进行进一步处理。
     */
    @Test
    public void publishWithProps(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "topic.black.dog", "风雨飘摇", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //通过 message.getMessageProperties().setCorrelationId("111111") 设置了消息的关联 ID(correlation ID)为 "123"。
                message.getMessageProperties().setCorrelationId("111111");
                return message;
            }
        });
        System.out.println("消息发送成功!!!");
    }

}

四、创建消费者

在com.augus包下创建ConsumeListener,作为消费者,让其监听配置类RabbitMQConfig中定义的队列中的消息,代码如下:

import com.augus.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class ConsumeListener {

    //该注解表示该方法监听名为 RabbitMQConfig.QUEUE 的队列,当队列中有消息到达时,该方法将被触发执行。
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        /**
         * String msg:表示接收到的消息内容,即队列中的消息。
         * Channel channel:表示 RabbitMQ 的通道(Channel),可以用于进行消息确认、拒绝等操作。
         * Message message:表示接收到的消息对象,包含了消息的内容和属性等信息。
         */
        System.out.println("队列的消息为:"+msg);

        //通过 message.getMessageProperties().getClusterId() 获取消息的唯一标识(cluster ID)
        String id = message.getMessageProperties().getCorrelationId();

        System.out.println("唯一标识为:"+id);

        /**
         * 进行消息确认,告知 RabbitMQ 已成功处理该消息。
         * message.getMessageProperties().getDeliveryTag():获取消息的交付标签(delivery tag),表示消息在通道中的唯一标识。
         * false:表示是否批量确认模式。在这里设置为 false,表示只确认当前的消息。
         */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

五、测试

先启动消费者,让其处于监听状态,监听队列中的信息,然后启动消费者,发送消息到队列,如下:

SpringBoot操作RabbitMQ_spring

在查看消费者这里监听到信息如下:

SpringBoot操作RabbitMQ_spring_02



举报

相关推荐

0 条评论