0
点赞
收藏
分享

微信扫一扫

RabbitMQ 和 Spring 的整合

Spring整合RabbitMQ大纲

  • 第一章、RabbitAdmin管理交换机/队列/绑定
  • 1.1 创建SpringBoot工程
  • 1.2 测试四种类型的交换机的创建个队列以及绑定的操作方法
  • 1.3 RabbitAdmin清空队列消息
  • 第二章 RabbitTemplate用于发送和接收消息
  • 2.1 将RabbitTemplate 写入到Bean对象中,注意参数是**连接工厂**
  • 2.2 使用 RabbitTemolate发送消息到交换机
  • 2.3 使用Receive方法接收消息
  • 2.4 使用SImpleMessageListenerContainer监听消息队列



重要的类:

  • RabbitAdmin类 可以很好的处理交换机和队列,修改,删除并且可以注解到spring里(就像系统管理员一样管理我们的消息中间件)。
  • RabbitTemplate类 负责收发消息。



第一章、RabbitAdmin管理交换机/队列/绑定

1.1 创建SpringBoot工程

添加包依赖

<!--rabbitmq 基本依赖-->
 <dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
 </dependency>
 <!--spring-rabbit依赖-->
 <dependency>
     <groupId>org.springframework.amqp</groupId>
     <artifactId>spring-rabbit</artifactId>
 </dependency>



1.2 测试四种类型的交换机的创建个队列以及绑定的操作方法

编写Rabbit配置类:ConfigureRabbit类

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfigureRabbit {

    /**
     * 连接工厂
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // setAddresses()方法,可同时设置服务器地址和端口
        //connectionFactory.setAddresses("121.36.146.10:5672");

        connectionFactory.setHost("121.36.146.10");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    /**
     * 管理交换机和队列的RabbitAdmin
     * @param connectionFactory 连接工厂
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }
}

编写测试类:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Rabbitmq01ApplicationTests {

    @Autowired
    RabbitAdmin rabbitAdmin;

    /**
     * 测试 声明队列 和 创建交换机,以及之间的绑定
     */
    @Test
    void test01() {
        // 声明队列
        Queue queue = new Queue("队列spring短信",
                true, false, false);
        rabbitAdmin.declareQueue(queue);

        // 声明Direct型(直连)交换机   , DirectExchange子类 直连型交换机
        Exchange exchangeDirect = new DirectExchange("交换机spring短信-direct",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeDirect);


        // 声明交换机 fanout型 (废除路由键类型的)交换机
        Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeFanout);


        // 声明交换机 topic型(主题)交换机
        Exchange exchangeTopic = new TopicExchange("交换机spring短信-topic",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeTopic);


        // 声明交换机 header型交换机
        Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeHeader);

        /*
        // 第一种绑定方式
        Binding binding = new Binding("队列spring短信",
                Binding.DestinationType.QUEUE,
                "交换机spring短信",
                "发送短信",
                null);
        rabbitAdmin.declareBinding(binding);
        */

        // 第二种绑定方式(个人倾向于第二种)
        //  bind队列  to将队列绑定到哪一个交换机上  with routingKey  and附加数据 ,
        //  and()方法返回的是Binding对象,和第一种其实差不多。
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeDirect)
                        .with("发送短信")
                        .and(null)
        );

        // fanout型交换机废除了路由键 , 所以加不加发送routingKey都一样 可以直接传入空字符串
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeFanout)
                        .with("")
                        .and(null)
        );

        // topic型交换机
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeTopic)
                        .with("0086.*")
                        .and(null)
        );

        // header型交换机也同样废除了路由键 , 一般用的少 主要是验证 附加参数 是否一致的情况  , 主要是设置要求传入匹配的键值对
        Map<String, Object> props = new HashMap<>();
        props.put("x-match", "all"); // 指的是要求匹配当前Map对象里面所有的键值对,除了当前x-match,其他的都要比配上这样的消息才能被路由到队列里面去
        props.put("用户名", "华为");
        props.put("密码", "123456");

        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeHeader)
                        .with("")
                        .and(props)
        );
    }

}

spring-rabbit依赖中,对每一种类型的交换机都创建了一个 单独的子类对象,我们只需要通过子类去创建父类交换机对象即可。


DirectExchange(direct直连型)、FanoutExchange( fanout型)、TopicExchange( topic主题型)、HeadersExchange(header型交换机)



上诉测试代码执行后,可以在RabbitMQ服务中 ,看到如下效果:

RabbitMQ 和 Spring 的整合_队列

在看一下队列是否成功

RabbitMQ 和 Spring 的整合_队列_02


RabbitMQ 和 Spring 的整合_队列_03





1.3 RabbitAdmin清空队列消息

// 清空队列消息  参数为队列的名称即可
        rabbitAdmin.purgeQueue("队列-direct");

        // 删除队列
        rabbitAdmin.deleteQueue("队列spring短信");

        // 删除交换机
        rabbitAdmin.deleteExchange("交换机spring短信-direct");
        rabbitAdmin.deleteExchange("交换机spring短信-fanout");
        rabbitAdmin.deleteExchange("交换机spring短信-header");
        rabbitAdmin.deleteExchange("交换机spring短信-topic");





第二章 RabbitTemplate用于发送和接收消息

2.1 将RabbitTemplate 写入到Bean对象中,注意参数是连接工厂

/**
     * 创建一RabbitTemplate类 , 用于收发消息的模板对象
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

2.2 使用 RabbitTemolate发送消息到交换机

@Autowired
    RabbitTemplate rabbitTemplate;    //获取 RabbitTemplate 对象
	
	 /**
     * 第一种使用 RabbitTemplate 发送消息
     */
    @Test
    void test03(){
        String  loginSms = "登录业务短信";

        rabbitTemplate.convertAndSend("交换机spring短信-direct",
                "发送短信",
                loginSms, new MessagePostProcessor() {
                    // 添加附加信息
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        Map<String, Object> headers = message.getMessageProperties().getHeaders();
                        headers.put("登录次数" , 2);     //添加附加信息
                        return message;
                    }
                });
    }

RabbitMQ 和 Spring 的整合_rabbitmq_04

那么使用 RabbitTemplate 发送消息一共有两种方法:
一种是上面的 rabbitTemplate.convertAndSend( ) ;
二种是使用rabbitTemplate.send( )方法;

如下完整junit测试案例代码:

@Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 第一种使用 RabbitTemplate 发送消息
     */
    @Test
    void test03(){
        String  loginSms = "登录业务短信";

        rabbitTemplate.convertAndSend("交换机spring短信-direct",
                "发送短信",
                loginSms, new MessagePostProcessor() {
                    // 添加附加信息
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        Map<String, Object> headers = message.getMessageProperties().getHeaders();
                        headers.put("登录次数" , 2);     //添加附加信息
                        return message;
                    }
                });
    }

    /**
     * 第二种使用 RabbitTemplate 发送消息
     */
    @Test
    void test04(){
        MessageProperties mp = new MessageProperties();
        mp.setContentEncoding("UTF-8");  // 设置编码 防止附加消息 乱码

        Map<String, Object> headers = mp.getHeaders();
        headers.put("登录次数" , 3);

        String loginSms = "登录业务发送短信Send";
        Message message = new Message(loginSms.getBytes() , mp);   // 自定消息 和 附加信息
        rabbitTemplate.send("交换机spring短信-fanout" , "" , message);   // 调用send方法发送信息 fanout 废弃了路由键
        }

执行结果可以在 rabbitmq服务中查看到,确认是否写入成功

RabbitMQ 和 Spring 的整合_spring_05





那么topic型header型交换机完整代码如下:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class Rabbitmq01ApplicationTests {

    @Autowired
    RabbitAdmin rabbitAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送消息 topic和header型交换机发送案例
     */
    @Test
    void test05(){
        MessageProperties mp = new MessageProperties();
        mp.setContentEncoding("UTF-8");  // 设置编码 防止附加消息 乱码

        Map<String, Object> headers = mp.getHeaders();
        headers.put("登录次数" , 4);
        // topic发送
        String loginSmsTopic = "登录业务发送短信SendTopic";
        Message message01 = new Message(loginSmsTopic.getBytes() , mp);   // 自定消息 和 附加信息
        // 发送到哪一个交换机 , 指定 路由键匹配规则 , 和 发送的消息
        rabbitTemplate.send("交换机spring短信-topic" , "0086.13760081226", message01);

        // header型发送消息  header 匹配规则是附加属性要求一致
        String loginSmsHeader = "录业务发送短信SendHeader";
        mp.getHeaders().put("用户名", "华为");
        mp.getHeaders().put("密码", "123456");

        Message message02 = new Message(loginSmsHeader.getBytes() , mp);   // 自定消息 和 附加信息
        rabbitTemplate.send("交换机spring短信-header" , "", message02);

    }



    /**
     * 第一种使用 RabbitTemplate 发送消息direct型案例
     */
    @Test
    void test03(){
        String  loginSms = "登录业务短信";

        rabbitTemplate.convertAndSend("交换机spring短信-direct",
                "发送短信",
                loginSms, new MessagePostProcessor() {
                    // 添加附加信息
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        Map<String, Object> headers = message.getMessageProperties().getHeaders();
                        headers.put("登录次数" , 2);     //添加附加信息
                        return message;
                    }
                });
    }

    /**
     * 第二种使用 RabbitTemplate 发送消息fanout型案例
     */
    @Test
    void test04(){
        MessageProperties mp = new MessageProperties();
        mp.setContentEncoding("UTF-8");  // 设置编码 防止附加消息 乱码

        Map<String, Object> headers = mp.getHeaders();
        headers.put("登录次数" , 3);

        String loginSms = "登录业务发送短信Send";
        Message message = new Message(loginSms.getBytes() , mp);   // 自定消息 和 附加信息
        rabbitTemplate.send("交换机spring短信-fanout" , "" , message);   // 调用send方法发送信息 fanout 废弃了路由键
    }

    /**
     * 删除交换机或删除队列以及清空队列
     */
    @Test
    void test02(){

        // 清空队列消息  参数为队列的名称即可
        rabbitAdmin.purgeQueue("队列-direct");

        // 删除队列
        rabbitAdmin.deleteQueue("队列spring短信");

        // 删除交换机
        rabbitAdmin.deleteExchange("交换机spring短信-direct");
        rabbitAdmin.deleteExchange("交换机spring短信-fanout");
        rabbitAdmin.deleteExchange("交换机spring短信-header");
        rabbitAdmin.deleteExchange("交换机spring短信-topic");

    }

    /**
     * 声明队列 和 创建交换机,以及之间的绑定
     */
    @Test
    void test01() {
        // 声明队列
        Queue queue = new Queue("队列spring短信",
                true, false, false);
        rabbitAdmin.declareQueue(queue);

        // 声明Direct型(直连)交换机   , DirectExchange子类 直连型交换机
        Exchange exchangeDirect = new DirectExchange("交换机spring短信-direct",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeDirect);


        // 声明交换机 fanout型 (废除路由键类型的)交换机
        Exchange exchangeFanout = new FanoutExchange("交换机spring短信-fanout",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeFanout);


        // 声明交换机 topic型(主题)交换机
        Exchange exchangeTopic = new TopicExchange("交换机spring短信-topic",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeTopic);


        // 声明交换机 header型交换机
        Exchange exchangeHeader = new HeadersExchange("交换机spring短信-header",
                true, false, null);
        rabbitAdmin.declareExchange(exchangeHeader);

        /*
        // 第一种绑定方式
        Binding binding = new Binding("队列spring短信",
                Binding.DestinationType.QUEUE,
                "交换机spring短信",
                "发送短信",
                null);
        rabbitAdmin.declareBinding(binding);
        */

        // 第二种绑定方式(个人倾向于第二种)
        //  bind队列  to将队列绑定到哪一个交换机上  with routingKey  and附加数据 ,
        //  and()方法返回的是Binding对象,和第一种其实差不多。
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeDirect)
                        .with("发送短信")
                        .and(null)
        );

        // fanout型交换机废除了路由键 , 所以加不加发送routingKey都一样 可以直接传入空字符串
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeFanout)
                        .with("")
                        .and(null)
        );

        // topic型交换机
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeTopic)
                        .with("0086.*")
                        .and(null)
        );

        // header型交换机也同样废除了路由键 , 一般用的少 主要是验证 附加参数 是否一致的情况  , 主要是设置要求传入匹配的键值对
        Map<String, Object> props = new HashMap<>();
        props.put("x-match", "all"); // 指的是要求匹配当前Map对象里面所有的键值对,除了当前x-match,其他的都要比配上这样的消息才能被路由到队列里面去
        props.put("用户名", "华为");
        props.put("密码", "123456");

        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(queue)
                        .to(exchangeHeader)
                        .with("")
                        .and(props)
        );
    }
}





2.3 使用Receive方法接收消息

@Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 使用 Recive方法接收消息
     */
    @Test
    void testReciveTest(){
        while (true){
            // 如果队列中不是空的 拿到的对象不会是null的 ,
            // 2000的参数指得是 如果队列里面没有数据,会等上两秒钟是否有数据,如果两秒钟还没有消息那么就返回一个null
            Message msg = rabbitTemplate.receive("队列spring短信", 2000);
            if(msg == null){  // 如果拿到的Message对象是空的话 , 就说明没有拿到数据
                break;
            }
            
            // 打印 消息队列内容
            String str = new String(msg.getBody());  // 可以获取队列中的一条消息
            System.out.println("收到了: "+str+"进行短信发送处理");

            // 打印 附加消息
            Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            for(String key : headers.keySet()){
                System.out.println( key +"  :  "+headers.get(key));
            }
        }
    }




2.4 使用SImpleMessageListenerContainer监听消息队列

需要运行在springboot程序下

  1. MessageListener (无法得到信道)
  2. ChannelAwareMessageListener (可以得到多个信道)

等待后期更新…


举报

相关推荐

0 条评论