Spring整合RabbitMQ大纲
- 第一章、RabbitAdmin管理交换机/队列/绑定
- 1.1 创建SpringBoot工程
- 1.2 测试四种类型的交换机的创建个队列以及绑定的操作方法
- 1.3 RabbitAdmin清空队列消息
- 第二章 RabbitTemplate用于发送和接收消息
- 2.1 将RabbitTemplate 写入到Bean对象中,注意参数是**连接工厂**
- 2.2 使用 RabbitTemolate发送消息到交换机
- 2.3 使用Receive方法接收消息
- 2.4 使用SImpleMessageListenerContainer监听消息队列
重要的类:
- RabbitAdmin类 可以很好的处理交换机和队列,修改,删除并且可以注解到spring里(就像系统管理员一样管理我们的消息中间件)。
- RabbitTemplate类 负责收发消息。
第一章、RabbitAdmin管理交换机/队列/绑定
1.1 创建SpringBoot工程
添加包依赖
<!--rabbitmq 基本依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!--spring-rabbit依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
1.2 测试四种类型的交换机的创建个队列以及绑定的操作方法
编写Rabbit配置类:ConfigureRabbit类
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigureRabbit {
/**
* 连接工厂
* @return
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// setAddresses()方法,可同时设置服务器地址和端口
//connectionFactory.setAddresses("121.36.146.10:5672");
connectionFactory.setHost("121.36.146.10");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
/**
* 管理交换机和队列的RabbitAdmin
* @param connectionFactory 连接工厂
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
}
编写测试类:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class Rabbitmq01ApplicationTests {
@Autowired
RabbitAdmin rabbitAdmin;
/**
* 测试 声明队列 和 创建交换机,以及之间的绑定
*/
@Test
void test01() {
// 声明队列
Queue queue = new Queue("队列spring短信",
true, false, false);
rabbitAdmin.declareQueue(queue);
// 声明Direct型(直连)交换机 , DirectExchange子类 直连型交换机
Exchange exchangeDirect = new DirectExchange("交换机spring短信-direct",
true, false, null);
rabbitAdmin.declareExchange(exchangeDirect);
// 声明交换机 fanout型 (废除路由键类型的)交换机
Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",
true, false, null);
rabbitAdmin.declareExchange(exchangeFanout);
// 声明交换机 topic型(主题)交换机
Exchange exchangeTopic = new TopicExchange("交换机spring短信-topic",
true, false, null);
rabbitAdmin.declareExchange(exchangeTopic);
// 声明交换机 header型交换机
Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",
true, false, null);
rabbitAdmin.declareExchange(exchangeHeader);
/*
// 第一种绑定方式
Binding binding = new Binding("队列spring短信",
Binding.DestinationType.QUEUE,
"交换机spring短信",
"发送短信",
null);
rabbitAdmin.declareBinding(binding);
*/
// 第二种绑定方式(个人倾向于第二种)
// bind队列 to将队列绑定到哪一个交换机上 with routingKey and附加数据 ,
// and()方法返回的是Binding对象,和第一种其实差不多。
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeDirect)
.with("发送短信")
.and(null)
);
// fanout型交换机废除了路由键 , 所以加不加发送routingKey都一样 可以直接传入空字符串
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeFanout)
.with("")
.and(null)
);
// topic型交换机
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeTopic)
.with("0086.*")
.and(null)
);
// header型交换机也同样废除了路由键 , 一般用的少 主要是验证 附加参数 是否一致的情况 , 主要是设置要求传入匹配的键值对
Map<String, Object> props = new HashMap<>();
props.put("x-match", "all"); // 指的是要求匹配当前Map对象里面所有的键值对,除了当前x-match,其他的都要比配上这样的消息才能被路由到队列里面去
props.put("用户名", "华为");
props.put("密码", "123456");
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeHeader)
.with("")
.and(props)
);
}
}
spring-rabbit依赖中,对每一种类型的交换机都创建了一个 单独的子类对象,我们只需要通过子类去创建父类交换机对象即可。
DirectExchange(direct直连型)、FanoutExchange( fanout型)、TopicExchange( topic主题型)、HeadersExchange(header型交换机)
上诉测试代码执行后,可以在RabbitMQ服务中 ,看到如下效果:
在看一下队列是否成功
1.3 RabbitAdmin清空队列消息
// 清空队列消息 参数为队列的名称即可
rabbitAdmin.purgeQueue("队列-direct");
// 删除队列
rabbitAdmin.deleteQueue("队列spring短信");
// 删除交换机
rabbitAdmin.deleteExchange("交换机spring短信-direct");
rabbitAdmin.deleteExchange("交换机spring短信-fanout");
rabbitAdmin.deleteExchange("交换机spring短信-header");
rabbitAdmin.deleteExchange("交换机spring短信-topic");
第二章 RabbitTemplate用于发送和接收消息
2.1 将RabbitTemplate 写入到Bean对象中,注意参数是连接工厂
/**
* 创建一RabbitTemplate类 , 用于收发消息的模板对象
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
2.2 使用 RabbitTemolate发送消息到交换机
@Autowired
RabbitTemplate rabbitTemplate; //获取 RabbitTemplate 对象
/**
* 第一种使用 RabbitTemplate 发送消息
*/
@Test
void test03(){
String loginSms = "登录业务短信";
rabbitTemplate.convertAndSend("交换机spring短信-direct",
"发送短信",
loginSms, new MessagePostProcessor() {
// 添加附加信息
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("登录次数" , 2); //添加附加信息
return message;
}
});
}
那么使用 RabbitTemplate 发送消息一共有两种方法:
一种是上面的 rabbitTemplate.convertAndSend( ) ;
二种是使用rabbitTemplate.send( )方法;
如下完整junit测试案例代码:
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 第一种使用 RabbitTemplate 发送消息
*/
@Test
void test03(){
String loginSms = "登录业务短信";
rabbitTemplate.convertAndSend("交换机spring短信-direct",
"发送短信",
loginSms, new MessagePostProcessor() {
// 添加附加信息
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("登录次数" , 2); //添加附加信息
return message;
}
});
}
/**
* 第二种使用 RabbitTemplate 发送消息
*/
@Test
void test04(){
MessageProperties mp = new MessageProperties();
mp.setContentEncoding("UTF-8"); // 设置编码 防止附加消息 乱码
Map<String, Object> headers = mp.getHeaders();
headers.put("登录次数" , 3);
String loginSms = "登录业务发送短信Send";
Message message = new Message(loginSms.getBytes() , mp); // 自定消息 和 附加信息
rabbitTemplate.send("交换机spring短信-fanout" , "" , message); // 调用send方法发送信息 fanout 废弃了路由键
}
执行结果可以在 rabbitmq服务中查看到,确认是否写入成功
那么topic型header型交换机完整代码如下:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
class Rabbitmq01ApplicationTests {
@Autowired
RabbitAdmin rabbitAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送消息 topic和header型交换机发送案例
*/
@Test
void test05(){
MessageProperties mp = new MessageProperties();
mp.setContentEncoding("UTF-8"); // 设置编码 防止附加消息 乱码
Map<String, Object> headers = mp.getHeaders();
headers.put("登录次数" , 4);
// topic发送
String loginSmsTopic = "登录业务发送短信SendTopic";
Message message01 = new Message(loginSmsTopic.getBytes() , mp); // 自定消息 和 附加信息
// 发送到哪一个交换机 , 指定 路由键匹配规则 , 和 发送的消息
rabbitTemplate.send("交换机spring短信-topic" , "0086.13760081226", message01);
// header型发送消息 header 匹配规则是附加属性要求一致
String loginSmsHeader = "录业务发送短信SendHeader";
mp.getHeaders().put("用户名", "华为");
mp.getHeaders().put("密码", "123456");
Message message02 = new Message(loginSmsHeader.getBytes() , mp); // 自定消息 和 附加信息
rabbitTemplate.send("交换机spring短信-header" , "", message02);
}
/**
* 第一种使用 RabbitTemplate 发送消息direct型案例
*/
@Test
void test03(){
String loginSms = "登录业务短信";
rabbitTemplate.convertAndSend("交换机spring短信-direct",
"发送短信",
loginSms, new MessagePostProcessor() {
// 添加附加信息
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("登录次数" , 2); //添加附加信息
return message;
}
});
}
/**
* 第二种使用 RabbitTemplate 发送消息fanout型案例
*/
@Test
void test04(){
MessageProperties mp = new MessageProperties();
mp.setContentEncoding("UTF-8"); // 设置编码 防止附加消息 乱码
Map<String, Object> headers = mp.getHeaders();
headers.put("登录次数" , 3);
String loginSms = "登录业务发送短信Send";
Message message = new Message(loginSms.getBytes() , mp); // 自定消息 和 附加信息
rabbitTemplate.send("交换机spring短信-fanout" , "" , message); // 调用send方法发送信息 fanout 废弃了路由键
}
/**
* 删除交换机或删除队列以及清空队列
*/
@Test
void test02(){
// 清空队列消息 参数为队列的名称即可
rabbitAdmin.purgeQueue("队列-direct");
// 删除队列
rabbitAdmin.deleteQueue("队列spring短信");
// 删除交换机
rabbitAdmin.deleteExchange("交换机spring短信-direct");
rabbitAdmin.deleteExchange("交换机spring短信-fanout");
rabbitAdmin.deleteExchange("交换机spring短信-header");
rabbitAdmin.deleteExchange("交换机spring短信-topic");
}
/**
* 声明队列 和 创建交换机,以及之间的绑定
*/
@Test
void test01() {
// 声明队列
Queue queue = new Queue("队列spring短信",
true, false, false);
rabbitAdmin.declareQueue(queue);
// 声明Direct型(直连)交换机 , DirectExchange子类 直连型交换机
Exchange exchangeDirect = new DirectExchange("交换机spring短信-direct",
true, false, null);
rabbitAdmin.declareExchange(exchangeDirect);
// 声明交换机 fanout型 (废除路由键类型的)交换机
Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",
true, false, null);
rabbitAdmin.declareExchange(exchangeFanout);
// 声明交换机 topic型(主题)交换机
Exchange exchangeTopic = new TopicExchange("交换机spring短信-topic",
true, false, null);
rabbitAdmin.declareExchange(exchangeTopic);
// 声明交换机 header型交换机
Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",
true, false, null);
rabbitAdmin.declareExchange(exchangeHeader);
/*
// 第一种绑定方式
Binding binding = new Binding("队列spring短信",
Binding.DestinationType.QUEUE,
"交换机spring短信",
"发送短信",
null);
rabbitAdmin.declareBinding(binding);
*/
// 第二种绑定方式(个人倾向于第二种)
// bind队列 to将队列绑定到哪一个交换机上 with routingKey and附加数据 ,
// and()方法返回的是Binding对象,和第一种其实差不多。
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeDirect)
.with("发送短信")
.and(null)
);
// fanout型交换机废除了路由键 , 所以加不加发送routingKey都一样 可以直接传入空字符串
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeFanout)
.with("")
.and(null)
);
// topic型交换机
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeTopic)
.with("0086.*")
.and(null)
);
// header型交换机也同样废除了路由键 , 一般用的少 主要是验证 附加参数 是否一致的情况 , 主要是设置要求传入匹配的键值对
Map<String, Object> props = new HashMap<>();
props.put("x-match", "all"); // 指的是要求匹配当前Map对象里面所有的键值对,除了当前x-match,其他的都要比配上这样的消息才能被路由到队列里面去
props.put("用户名", "华为");
props.put("密码", "123456");
rabbitAdmin.declareBinding(
BindingBuilder
.bind(queue)
.to(exchangeHeader)
.with("")
.and(props)
);
}
}
2.3 使用Receive方法接收消息
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 使用 Recive方法接收消息
*/
@Test
void testReciveTest(){
while (true){
// 如果队列中不是空的 拿到的对象不会是null的 ,
// 2000的参数指得是 如果队列里面没有数据,会等上两秒钟是否有数据,如果两秒钟还没有消息那么就返回一个null
Message msg = rabbitTemplate.receive("队列spring短信", 2000);
if(msg == null){ // 如果拿到的Message对象是空的话 , 就说明没有拿到数据
break;
}
// 打印 消息队列内容
String str = new String(msg.getBody()); // 可以获取队列中的一条消息
System.out.println("收到了: "+str+"进行短信发送处理");
// 打印 附加消息
Map<String, Object> headers = msg.getMessageProperties().getHeaders();
for(String key : headers.keySet()){
System.out.println( key +" : "+headers.get(key));
}
}
}
2.4 使用SImpleMessageListenerContainer监听消息队列
需要运行在springboot程序下
- MessageListener (无法得到信道)
- ChannelAwareMessageListener (可以得到多个信道)
等待后期更新…