一、SpringBoot操作RabbitMQ
springboot整合rabbitMQ只需要添加如下依赖即可:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yaml中配置RabbitMQ信息
spring:
rabbitmq:
host: 192.168.42.146
port: 5672
username: guest
password: guest
virtual-host: / #虚拟主机用于在RabbitMQ服务器上创建逻辑上的隔离环境。在这里,虚拟主机的名称是/,表示使用默认虚拟主机
#生产端
publisher-returns: true # 启用生产者消息返回机制。当消息无法路由到队列时,生产者将收到返回的消息
template:
mandatory: true # 设置生产者发送消息时是否要求消息必须成功路由到队列。如果设置为true,当消息无法路由到队列时,将抛出异常。
#消费端
listener:
simple:
acknowledge-mode: manual # 手动ack
#初始连接数量
concurrency: 5 #设置消费者的初始连接数量。这表示同时处理的消息数量
#最大连接数量
max-concurrency: 10 # 设置消费者的最大连接数量。这表示允许的最大同时处理的消息数量。
#限流
prefetch: 1 # 设置消费者的预取计数。这表示在消费者处理完当前消息之前,不会从RabbitMQ服务器获取更多的消息。
二、创建配置RabbitMQ配置类
在com.augus.config包下创建配置类:RabbitMQConfig,在里面创建交换机、队列,然后进行绑定,代码如下:
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE = "springboot_exchange";
public static final String QUEUE = "springboot_queue";
public static final String ROUTING_KEY = "*.black.*";
/**
* 创建一个交换机对象,注入到bean中
* @return 返回一个主题交换机对象
*/
@Bean
public Exchange bootExchange(){
//创建一个主题交换机(Topic Exchange)对象并指定了交换机的名称为 EXCHANGE。然后,通过调用 build() 方法,创建并返回一个主题交换机对象
return ExchangeBuilder.topicExchange(EXCHANGE).build();
}
/**
* 创建一个队列(Queue)对象。
* @return 返回队列对象
*/
@Bean
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE).build();
}
/**
* 创建一个绑定(Binding)对象,将队列绑定到交换机上。
* @param bootExchange 交换机对象,通过 bootExchange() 方法获取
* @param bootQueue 队列对象,通过 bootQueue()方法获取
* @return
*/
@Bean
public Binding bootBinding(Exchange bootExchange, Queue bootQueue) {
//通过 bootExchange() 方法创建了一个名为 EXCHANGE 的主题交换机(Topic Exchange),
// 通过 bootQueue() 方法创建了一个名为 QUEUE 的持久化队列(Durable Queue)。然后,通过 bootBinding() 方法将队列和交换机进行绑定,
// 绑定的路由键为 ROUTING_KEY
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}
三、创建生产者
创建测试类,模拟生产者产生数据发到队列,代码如下:
import com.augus.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class PublisherTest {
@Autowired
public RabbitTemplate rabbitTemplate;
@Test
public void publish(){
//将消息 "你好,国庆节去哪里玩呢?" 发送到名为 EXCHANGE 的交换机,并使用路由键 "topic.black.dog" 进行消息路由。根据交换机和队列的绑定关系,消息将被路由到与该路由键匹配的队列中,供消费者进行消费。
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"topic.black.dog","你好,张飒,明天有空吗?");
System.out.println("消息发送成功");
}
/**
* 向 RabbitMQ 发布一条带有自定义属性的消息, 和上一个方法别,这里可以自定义属性
* new MessagePostProcessor() 是一个匿名内部类,用于对消息进行后处理操作。
* postProcessMessage() 方法是 MessagePostProcessor 接口的实现方法,用于对消息进行进一步处理。
*/
@Test
public void publishWithProps(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "topic.black.dog", "风雨飘摇", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//通过 message.getMessageProperties().setCorrelationId("111111") 设置了消息的关联 ID(correlation ID)为 "123"。
message.getMessageProperties().setCorrelationId("111111");
return message;
}
});
System.out.println("消息发送成功!!!");
}
}
四、创建消费者
在com.augus包下创建ConsumeListener,作为消费者,让其监听配置类RabbitMQConfig中定义的队列中的消息,代码如下:
import com.augus.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ConsumeListener {
//该注解表示该方法监听名为 RabbitMQConfig.QUEUE 的队列,当队列中有消息到达时,该方法将被触发执行。
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void consume(String msg, Channel channel, Message message) throws IOException {
/**
* String msg:表示接收到的消息内容,即队列中的消息。
* Channel channel:表示 RabbitMQ 的通道(Channel),可以用于进行消息确认、拒绝等操作。
* Message message:表示接收到的消息对象,包含了消息的内容和属性等信息。
*/
System.out.println("队列的消息为:"+msg);
//通过 message.getMessageProperties().getClusterId() 获取消息的唯一标识(cluster ID)
String id = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识为:"+id);
/**
* 进行消息确认,告知 RabbitMQ 已成功处理该消息。
* message.getMessageProperties().getDeliveryTag():获取消息的交付标签(delivery tag),表示消息在通道中的唯一标识。
* false:表示是否批量确认模式。在这里设置为 false,表示只确认当前的消息。
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
五、测试
先启动消费者,让其处于监听状态,监听队列中的信息,然后启动消费者,发送消息到队列,如下:
在查看消费者这里监听到信息如下: