0
点赞
收藏
分享

微信扫一扫

带你玩转RabbitMQ的五种队列


目录

一、准备工作

二、 Channel的方法讲解

2.1 queueDeclare 

2.2 basicPublish

2.3 basicConsume 

2.4 basicQos

三、简单队列

🌵 解读

🌴 生产者

🌴 消费者

🔥 分析

🚀 应用场景

四、Work模式

🌵 解读

4.1 轮询模式

🌴 生产者

🌵 消费者

🔥 分析

4.2 公平分发

🚀 应用场景

五、发布/订阅模式

🌵 解读

🌴 生产者

🌴 消费者

🔥 分析

🚀 应用场景

六、路由模式

🌵解读  

🌴 生产者

🌴 消费者

🔥 分析

🚀 应用场景

七、主题模式

🌵 解读

🌴 生产者

🌴 消费者

🔥 分析

八、交换器分析

小结


一、准备工作

(1)打开RabbitMQ的服务

 systemctl start rabbitmq-server

(2)Maven项目导入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

(3)定义工具类

package com.yixin.util;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import com.rabbitmq.client.Connection;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

    public static Connection getConnection(String connectName,String host,int port,String vHost,String name,String password){
        //1、定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2、设置服务器地址
        connectionFactory.setHost(host);
        //3、设置端口
        connectionFactory.setPort(port);
        //4、设置虚拟主机、用户名、密码
        connectionFactory.setVirtualHost(vHost);
        connectionFactory.setUsername(name);
        connectionFactory.setPassword(password);

        //5、通过连接工厂获取连接
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection(connectName);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return connection;
    }
}

二、 Channel的方法讲解

2.1 queueDeclare 

🌵 源码:

 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) 
throws IOException;

🌵 参数讲解:

🚀 queue: 队列名字

🚀   durable:是否持久化。

关于持久化:
        队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失队列(即便队列中有数据),如果想重启之后还存在就要使队列持久化 ( 设置durable为true ),保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。


🔥 知识点补充:

  channel.basicPublish("交换机名称", "路由类型", MessageProperties.PERSISTENT_TEXT_PLAIN, "消息主体".getBytes());

MessageProperties.PERSISTENT_TEXT_PLAIN 就是设置消息持久化

🚀 autoDelete:当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除.

🚀 exclusive:是否排外。
两个作用:

🚀 arguments:其他参数

2.2 basicPublish

🌵源码:

 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

🌵参数讲解:

🚀exchange:要将消息发送到的Exchange(交换器)。

🚀routingKey:路由Key。

🚀props:其它的一些属性。

🚀body:消息内容,字节数组类型。

2.3 basicConsume 

🌵源码:

 String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

🌵参数:

🚀queue:队列名 

🚀autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答 。

🚀deliverCallback: 当一个消息发送过来后的回调接口 。

🚀cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法 。

 

2.4 basicQos

🌵源码:

void basicQos(int prefetchCount) throws IOException;

🌵作用:

🌵参数讲解:

🚀prefetchCount:表示同一时刻服务器只会发送一条消息给消费者 如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积 。

 

三、简单队列

🌵 解读

🌴 生产者

package com.yixin.simple;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;


public class Producer {

    private final static String QUEUE_NAME = "queue1";
    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 4: 准备发送消息的内容
            String message = "你好,一心同学";
            // 5: 发送消息给中间件rabbitmq-server

            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

此时我们点击运行生产者,那么在我们的队列中就会存在一条消息

🌴 消费者

package com.yixin.simple;

import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;


public class Consumer {

    private final static String QUEUE_NAME = "queue1";
    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //4:接收消息
            channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));
                }
            },new CancelCallback(){
                public void handle(String consumerTag) throws IOException{
                    System.out.println("接收失败");
                }
            });

            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.out.println("开始接收消息");

            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🔥 分析

现在点击运行消费者,查看控制台:

可以看到,我们的消费者已经从队列中取出这条消息了,我们再去管理页面查看:

 发现这条消息已经被消费了

🚀 应用场景

四、Work模式

🌵 解读

4.1 轮询模式

🌴 生产者

package com.yixin.work;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;


public class Producer {

    private final static String QUEUE_NAME = "work_queue";
    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 4: 定义消息内容(发布多条消息)

            for(int i=0;i<10;i++) {

                String message = "你好,一心同学,rabbitmq:" + i;
                // 5: 发送消息给中间件rabbitmq-server
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

                System.out.println("消息发送成功:"+i);

                //模拟发送消息延时,便于演示多个消费者竞争接受消息
                Thread.sleep(i*10);
            }

        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🌵 消费者

消费者1:每接收一条消息后休眠10毫秒

package com.yixin.work;

import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;


public class Consumer1 {

    private final static String QUEUE_NAME = "work_queue";
    public static void main(String[] args)  {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 4: 定义接受消息的回调
            Channel finalChannel = channel;

            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));

                    //消费者1接收一条消息后休眠10毫秒
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                }
            },new CancelCallback(){
                public void handle(String consumerTag) throws IOException{
                    System.out.println("接收失败");
                }
            });

            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.out.println("开始接收消息");

            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者2:每接收一条消息后休眠1000毫秒

package com.yixin.work;

import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;


public class Consumer2 {

    private final static String QUEUE_NAME = "work_queue";
    public static void main(String[] args)  {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 4: 定义接受消息的回调
            Channel finalChannel = channel;

            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到的消息是:"+new String(delivery.getBody(),"UTF-8"));

                    //消费者2接收一条消息后休眠1000毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                }
            },new CancelCallback(){
                public void handle(String consumerTag) throws IOException{
                    System.out.println("接收失败");
                }
            });

            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.out.println("开始接收消息");

            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🔥 分析

我们开始进行测试,首先生产者进行打印10(0-9)条数据:

接着我们看消费者1,结果为打印偶数条消息:

再查看消费者2,结果为打印奇数条消息 :

 分析结果:

(1)消费者1和消费者2获取到的消息内容是不同的,也就是说同一个消息只能被一个消费者获取。

(2)消费者1和消费者2分别获取奇数条消息和偶数条消息,两种获取消息的条数是一样的。

4.2 公平分发

只需在我们的消费者中添加以下这行代码即可:

 channel.basicQos(1);

我们再重新查看消费者1和消费者2的消息结果:

消费者1:

消费者2:

🚀 应用场景

五、发布/订阅模式

🌵 解读

在RabbitMQ中,交换器主要有四种类型:directfanouttopicheaders,这里的交换器是 fanout

🌴 生产者

package com.yixin.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.yixin.util.ConnectionUtil;


public class Producer {

    //定义交换机
    private final static String EXCHANGE_NAME = "fanout-exchange";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 准备发送消息的内容
            String message = "hello,world";

            String routingKey = "";

            //4:声明交换器,并且其类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

            // 5: 发送消息给中间件rabbitmq-server
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 6: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🌴 消费者

消费者1:

package com.yixin.fanout;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "fanout-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "fanout-queue-1";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            // 4:声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //5:绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");


            //6:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);
            // 7: 定义接受消息的回调
            Channel finalChannel = channel;

            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");

            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者2:

package com.yixin.fanout;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "fanout-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "fanout-queue-2";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 申明队列queue存储消息

            // 4:声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //5:绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");


            //6:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);
            // 7: 定义接受消息的回调
            Channel finalChannel = channel;

            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");

            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🔥 分析

这个时候我们启动生产者,消费者1和消费者2,查看控制台:

 

🚀 应用场景

六、路由模式

🌵解读  

🌴 生产者

package com.yixin.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;


public class Producer {

    //定义交换机
    private final static String EXCHANGE_NAME = "direct-exchange";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 准备发送消息的内容
            String message = "hello,world";

            String routingKey = "update";

            //4:声明交换器,并且其类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");

            // 5: 发送消息给中间件rabbitmq-server
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 6: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🌴 消费者

消费者1:

package com.yixin.routing;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "direct-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "direct-queue-1";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();

            // 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //绑定队列到交换机
            //4:绑定队列到交换机,指定路由key为update、delete、add(可以多个也可以单个)
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add");

           //5:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);

            // 6: 定义接受消息的回调
            Channel finalChannel = channel;



            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");
            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者2:

package com.yixin.routing;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "direct-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "direct-queue-2";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();

            // 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            //绑定队列到交换机
            //4:绑定队列到交换机,指定路由key为update、delete、add(可以多个也可以单个)
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");


            //5:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);

            // 6: 定义接受消息的回调
            Channel finalChannel = channel;



            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");
            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🔥 分析

我们进行启动生产者,接着同时启动消费者1和消费2,查看控制台:

🚀 应用场景

七、主题模式

🌵 解读

🌴 生产者

package com.yixin.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;


public class Producer {

    //定义交换机
    private final static String EXCHANGE_NAME = "topic-exchange";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();
            // 3: 准备发送消息的内容
            String message = "hello,world";

            //定义路由key
            String routingKey = "update.Name";

            //4:声明交换器,并且其类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");

            // 5: 发送消息给中间件rabbitmq-server
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 6: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🌴 消费者

消费者1:

package com.yixin.topic;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "topic-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "topic-queue-1";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();

            // 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);


            //4:绑定队列到交换机,指定路由key为update.#
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");

           //5:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);

            // 6: 定义接受消息的回调
            Channel finalChannel = channel;

            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");
            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者2:

package com.yixin.topic;


import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    //需要绑定的交换机
    private final static String EXCHANGE_NAME = "topic-exchange";

    //队列的名称
    private final static String QUEUE_NAME = "topic-queue-2";

    public static void main(String[] args) {

        Connection connection = null;
        Channel channel = null;
        try {
            // 1: 从连接工厂中获取连接
            connection =  connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
            // 2: 从连接中获取通道channel
            channel = connection.createChannel();

            // 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);


            //4:绑定队列到交换机,指定路由key为select.#
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select.#");

            //5:同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);

            // 6: 定义接受消息的回调
            Channel finalChannel = channel;

            //参数2:手动返回完成状态
            finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {

                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {

                }
            });

            System.out.println(QUEUE_NAME + ":开始接受消息");
            //使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

🔥 分析

我们把生产者和消费者启动后,到控制台进行查看结果:

八、交换器分析

我们的交换器分为四种,分别是:directfanouttopic和 headers。


 


小结

以上就是【一心同学】认真整理的关于【RabbitMQ的五种队列】,对每一个知识的讲解也可以说是保姆级别的了,大家务必要掌握这【五种队列】,每一种队列都有不同的【应用场景】,我们要根据实际开发需要去选择我们需要的【模式】。

举报

相关推荐

0 条评论