0
点赞
收藏
分享

微信扫一扫

SpringBoot整合RabbitMQ及其操作


SpringBoot 整合RabbitMQ


导入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1. direct模式

直连模式,这里采用到了默认的Direct类型的Exchange,不用手动创建Exchange.

SpringBoot整合RabbitMQ及其操作_SpringBoot

配置类

package com.hao.springbootrabbitmq.normalDirect.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NormalDirectConfig {
@Bean
public Queue directQueue(){
//持久化(默认:true)
return new Queue("direct_queue");
}
}

配置类定义之后,在项目启动的过程中,会在RabbitMQ自动把相应的Queue创建出来。

SpringBoot整合RabbitMQ及其操作_发送消息_02

生产者

@Component
public class MessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDirectMessage(String message){
System.out.println("发送direct消息:"+message);
rabbitTemplate.convertAndSend("direct_queue",message);
}
}

生产者是基于SpringBoot的RabbitTemplate来发送消息的。

消费者

@Component
@RabbitListener(queues = {"direct_queue"})
public class DirectReceiver {

@RabbitHandler
public void directMessageHandler(String message){
System.out.println("接收:"+message);
}
}

可以有多个消费者同时监听同一个队列,那么消息会随机的被其中一个消费者消费掉,同一条消息只会被一个消费者消费到。

SpringBoot整合RabbitMQ及其操作_spring_03

通过上面的图文字,可以知道 【在各个worker中分发,竞争消费者模型】。

下面我创建了两个消费者同时去监听一个队列,所消费的执行结果:

发送direct消息:hello world by direct0
发送direct消息:hello world by direct1
发送direct消息:hello world by direct2
发送direct消息:hello world by direct3
发送direct消息:hello world by direct4
发送direct消息:hello world by direct5
发送direct消息:hello world by direct6
发送direct消息:hello world by direct7
发送direct消息:hello world by direct8
发送direct消息:hello world by direct9
DirectReceiverB接收:hello world by direct1
DirectReceiverA接收:hello world by direct0
DirectReceiverB接收:hello world by direct3
DirectReceiverB接收:hello world by direct5
DirectReceiverB接收:hello world by direct7
DirectReceiverB接收:hello world by direct9
DirectReceiverA接收:hello world by direct2
DirectReceiverA接收:hello world by direct4
DirectReceiverA接收:hello world by direct6
DirectReceiverA接收:hello world by direct8

消费者则首先需要使用​​@Component​​​将其声明为一个spring组件,其次使用​​@RabbitListener​​​来指定鉴定哪一个或哪几个Queue信息。使用​​@RabbitHandler​​来定义具体的消息消费逻辑。

2.routing模式

SpringBoot整合RabbitMQ及其操作_spring_04

选择性的接收消息。

配置类

@Configuration
public class DirectConfig {

@Bean
public Queue redQueue(){
return new Queue("red_queue");
}

@Bean
public Queue blackQueue(){
return new Queue("black_queue");
}

@Bean
public DirectExchange directExchange(){
return new DirectExchange("default");
}

@Bean
public Binding queueBindingRed(){
return BindingBuilder.bind(redQueue()).to(directExchange()).with("red");
}

@Bean
public Binding queueBindingBlack(){
return BindingBuilder.bind(blackQueue()).to(directExchange()).with("black");
}
}

从上面的配置信息可以看出,声明了两个队列,分别为​​redQueue​​​ 和 ​​BlackQueue​​, 同时定义了一个Direct类型的Exchange,以及将 这两个队列分别于交换机绑定,同时指明了响应的routingKey.

生产者

@Component
public class DirectMessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDirectMessage(String message){
/**
* 参数一:exchange
* 参数二:routingKey
* 参数三:消息体
*/
rabbitTemplate.convertAndSend("default","red","red:"+message);
rabbitTemplate.convertAndSend("default","black","black:"+message);
}
}

SpringBoot整合RabbitMQ及其操作_发送消息_05

当生产者发送消息之后,我们通过RabbitMQ的控制台可以看到 Red_Queue 和 Black_Queue两个队列中均有一条信息。

消费者

@Component
@RabbitListener(queues = {"red_queue"})
public class DirectRedQueueReceiver {
@RabbitHandler
public void handleRedQueueMessage(String message){
System.out.println("Red接收消息:"+message);
}
}

@Component
@RabbitListener(queues = {"black_queue"})
public class DirectBlackQueueReceiver {
@RabbitHandler
public void handleBlackMessage(String message){
System.out.println("Black接收消息:"+message);
}
}

打开消费者之后,我们在控制台可以看到这样的信息:

Black接收消息:black:Direct Message with Routing
Red接收消息:red:Direct Message with Routing

可以看出,由于之前的配置类中将相应的队列与交互机Exchange绑定 ,并指定了各自的routingKey。同时在生成者我们可以发现,在发送消息的时候,我们指定了exchange以及routingKey。这样可以将消息准确的投递到相应的queue中。

上面简单的routing模式实现了,下面将上述的实现修改一下:

配置类:

@Configuration
public class MultiRoutingKeyConfig {

/**
* 定义两个Queue
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue("error_queue");
}

@Bean
public Queue allQueue(){
return new Queue("all_queue");
}

@Bean
public DirectExchange exchange(){
return new DirectExchange("direct_routing_exchange");
}

/**
* 指定error为error_queue的routingKey
* @return
*/
@Bean
public Binding bindingError(){
return BindingBuilder.bind(errorQueue()).to(exchange()).with("error");
}

/**
* 指定info,error,warning为all_queue的routing_key
* @return
*/
@Bean
public Binding bindingError2(){
return BindingBuilder.bind(allQueue()).to(exchange()).with("info");
}

@Bean
public Binding bindingInfo(){
return BindingBuilder.bind(allQueue()).to(exchange()).with("error");
}

@Bean
public Binding bindingWarning(){
return BindingBuilder.bind(allQueue()).to(exchange()).with("warning");
}
}

生成者:

@Component
public class RoutingKeyMessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 指定发送到的交换机,并指定相应的routingKey.
* @param message
* @param routingKey
*/
public void sendMessageByRoutingkey(String routingKey,String message){
rabbitTemplate.convertAndSend("direct_routing_exchange",routingKey,message);
}
}

测试代码:

@Test
void testRoutingKey(){
routingKeyMessageSender.sendMessageByRoutingkey("error","error message");
routingKeyMessageSender.sendMessageByRoutingkey("info","info message");
routingKeyMessageSender.sendMessageByRoutingkey("warning","warning message");
}

页面控制台结果:

SpringBoot整合RabbitMQ及其操作_RabbitMQ_06

可以看出,在测试类中我们总共发送的三条信息,但是控制界面上总共出现了四条信息。在all_queue中有三条,error_queue中有一条。我们继续进去看看消息内容:

error_queue的信息内容:

SpringBoot整合RabbitMQ及其操作_SpringBoot_07

all_queue的信息内容:

SpringBoot整合RabbitMQ及其操作_System_08

通过上述图片可以得知,在error_queue中收到了一个信息【error message】.而在all_queue中收到了三条信息:【error message, info message, warning message】。可以得出结论,当消息匹配到响应的routingKey时,就会把消息发送到相应的Queue中中。所以在error queue中可以看到一条,而在all_queue中可以看到全部信息,是因为error_queue中只定义了一个routingKey【error】,而在all_queue中定义了全部的routingKey。

我们在看一下Exchange交换机的情况:

SpringBoot整合RabbitMQ及其操作_RabbitMQ_09

通过Exchange可以看出,all_queue绑定了三个routingKey, error_queue绑定了一个routingKey. 而上述两个queue都绑定到了​​direct_routing_exchange​​交换机上。

3. fanout模式

SpringBoot整合RabbitMQ及其操作_发送消息_10

发布订阅模式,同时发送信息给多个消费者。

配置类

@Configuration
public class FanoutConfig {

/**
* 定义三个Queue
* @return
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanout_queue_a");
}

@Bean
public Queue fanoutQueueB(){
return new Queue("fanout_queue_b");
}

@Bean
public Queue fanoutQueueC(){
return new Queue("fanout_queue_c");
}

@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_exchange");
}

@Bean
public Binding bindingQueueA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}

@Bean
public Binding bindingQueueB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}

@Bean
public Binding bindingQueueC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}

生产者

@Component
public class FanoutMessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 参数一:exchange
* 参数二:routingKey,为null或是”“空字符串
* 参数三:信息
* @param message
*/
public void sendFanoutMessage(String message){
rabbitTemplate.convertAndSend("fanout_exchange","",message);
}
}

注意:这里发送消息需要设置 routingKey,routingKey可以设置为null,或设置为""空字符串。不能不填写。

测试发送信息:

@Test
void testFanout(){
fanoutMessageSender.sendFanoutMessage("send fanout message");
}

此时将消费者注释了,查看RabbitMQ的控制台,如下图,可以看到一条信息发送,会同时发送到下列三个Queue中。即只要Queue绑定到了发送消息的Exchange上,就可以收到生产者发送的消息。故称为:发布/订阅模式。

SpringBoot整合RabbitMQ及其操作_spring_11

消费者

@Component
@RabbitListener(queues = {"fanout_queue_a"})
public class MessageReceiverA {

@RabbitHandler
public void handleMessage(String message){
System.out.println("fanout_queue_a 接收到信息:"+message);
}
}

@Component
@RabbitListener(queues = {"fanout_queue_b"})
public class MessageReceiverB {

@RabbitHandler
public void handleMessage(String message){
System.out.println("fanout_queue_b 接收到信息:"+message);
}
}

@Component
@RabbitListener(queues = {"fanout_queue_c"})
public class MessageReceiverC {

@RabbitHandler
public void handleMessage(String message){
System.out.println("fanout_queue_c 接收到信息:"+message);
}
}

开启消费者,查询控制台信息,如下。三个监听不同的消费者都拿到了相同的信息。(类似于广播)

fanout_queue_b 接收到信息:send fanout message
fanout_queue_a 接收到信息:send fanout message
fanout_queue_c 接收到信息:send fanout message

4. topic 模式

SpringBoot整合RabbitMQ及其操作_spring_12

topic模式,基于一个模式上接收信息。

这里需要指明一下,官网是这样说明的星号和井号:

* (star) can substitute for exactly one word.
星号精确(exactly)替换一个单词
# (hash) can substitute for zero or more words.
井号可以代替0个或多个单词

配置类

@Configuration
public class TopicConfig {

//声明三个Queue
@Bean
public Queue topicQueueA(){
return new Queue("topic_queue_a");
}

@Bean
public Queue topicQueueB(){
return new Queue("topic_queue_b");
}


@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic_exchange");
}

@Bean
public Binding binding1(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with("*.orange.*");
}

@Bean
public Binding binding2(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with("*.*.rabbit");
}

@Bean
public Binding binding3(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with("lazy.#");
}
}

说明:上面已经介绍了*和#的意义。即 星号必须是一个单词,而井号可以是0个或多个单词。

生产者

@Component
public class TopicMessageSender {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMessage(String routingKey,String message){
System.out.println("发送topic消息,routingKey:"+routingKey+", message:"+message);
rabbitTemplate.convertAndSend("topic_exchange",routingKey,message);
}
}

测试及结果如下。

@Test
void testTopic(){
//不能发送到任何的Queue
topicMessageSender.sendMessage("hello.orange","a_orange");
//不能发送到任何的Queue
topicMessageSender.sendMessage("hello.orange.java.rabbit","a_orange");
//可以发送到topic_queue_a
topicMessageSender.sendMessage("java.orange.rabbitmq","java_orange_rabbitmq");
//可以发送到topic_queue_a,topic_queue_b
topicMessageSender.sendMessage("java.orange.rabbit","a_orange");
//可以发送到topic_queue_b
topicMessageSender.sendMessage("lazy","layMessage");
//不能发送到任何的Queue
topicMessageSender.sendMessage("hello.lazy","hellLazyMessage");
//可以发送到topic_queue_b
topicMessageSender.sendMessage("lazy.rabbit","lazyRabbit");
//可以发送到topic_queue_b
topicMessageSender.sendMessage("lazy.java.rabbit","lazyJavaRabbit");
}

控制页面的最终结果:

SpringBoot整合RabbitMQ及其操作_发送消息_13


举报

相关推荐

0 条评论