0
点赞
收藏
分享

微信扫一扫

RabbitMQ:路由模式

zmhc 2022-10-06 阅读 190

1.基本介绍

image.png
在路由工作模式中,我们需要配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,由消费者进行消费。

  • P:生产者,向交换机发送消息的时候,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
  • C1:消费者,它所在队列指定了需要routing key为error的信息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2.生产者

public class Producer {
    public static String DIRECT_EXCHANGE = " direct_exchange";
    public static String DIRECT_QUEUE_1 = "direct_queue_1";
    public static String DIRECT_QUEUE_2 = "direct_queue_2";

    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明交换机(交换机名称,交换机类型)
            channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明队列
            channel.queueDeclare(DIRECT_QUEUE_1,true,false,false,null);
            channel.queueDeclare(DIRECT_QUEUE_2,true,false,false,null);
            //把交换机和队列1进行绑定
            channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"error");
            //把交换机和队列2进行绑定
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"info");
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"error");
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"warning");
            //发送消息
            String msg="日志信息:调用了xxx方法,日志级别是info";
             channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }


}


3.消费者

消费者1

public class Consumer1 {
    public static void main(String[] args) {

        try {
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //消费消息
            DefaultConsumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消费者1把日志信息保存到数据库");
                }
            };
            channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);



        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者2

public class Consumer2 {
    public static void main(String[] args) {

        try {
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //消费消息
            DefaultConsumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消费者2把日志信息输出到控制台");
                }
            };
            channel.basicConsume(Producer.DIRECT_QUEUE_2,true,consumer);



        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4.测试

image.png
image.png
第一次测试,发送日志级别为info的信息
image.png
image.png
image.png
第二次测试,发送日志级别为error的信息
image.png
image.png

举报

相关推荐

0 条评论