0
点赞
收藏
分享

微信扫一扫

RabbitMq消费者和生产者模式

导入依赖:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.5</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

    </dependencies>

建生产者类

/*
* 生产者发消息
* */
public class producer {
    //队列名称
    public  static  final  String QUEUE_NAME="hello";
    //创建一个连接工厂
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接Rabbitmq的队列
        factory.setHost("192.168.184.30");
        //用户名和密码
        factory.setUsername("admin");
        factory.setPassword("admin");


        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /*
        * 生成一个队列
        * 1.队列名称
        * 2.队列里面的消息是否持久化,默认消息是存在内存中
        * 3,改队列是否只提供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能一个消费这个消费
        * 4,是否自动删除  最后一个消费者端开连接以后,该队一句是否自动删除 true:自动删除  false不自动删除
        * 5.其他参数信息
        * */
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
     //发消息
        String message="Hello Word";
        /*
        * 发送一个消费
        * 1.发送到哪个交换机
        * 2.路由的key值是哪个,本次是队列的名称
        * 3.其他的参数信息
        * */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");

    }
}

消费者类

public class Consumer {
    //队列名称
    public  static  final  String QUEUE_NAME="hello";
    //创建一个连接工厂
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接Rabbitmq的队列
        factory.setHost("192.168.184.30");
        //用户名和密码
        factory.setUsername("admin");
        factory.setPassword("admin");


        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();

        //声明接收消息
        DeliverCallback deliverCallback=(consumerTag,message)->{

            System.out.println(new String(message.getBody()));

        };

        //声明取消消息
        CancelCallback callback=mess->{
            System.out.println("消息被中断");
        };

    /*
    * 消费者消费消息
    * 1.消费哪个队列
    * 2.消费者成功之后是否要应答 true代表自动应答,false代表树洞应答
    * 3.消费者未成功消费的回调
    * 4.消费者取消消费的回调
    *
    * */
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback);
    }
}

报错

connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'admin', class-id=10, method-id=40)

定位发现是连接rabbitmq使用的用户没有赋予访问权限,我创建的是admin用户,给admin用户赋予‘/’目录的访问权限就可以,执行如下命令:

sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'

工作队列

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

3.1.轮训分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程
是如何工作的。

创建生产者

public class Producer {
    //队列名称
    public  static  final  String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getchanel();
        /*
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化,默认消息是存在内存中
         * 3,改队列是否只提供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能一个消费这个消费
         * 4,是否自动删除  最后一个消费者端开连接以后,该队一句是否自动删除 true:自动删除  false不自动删除
         * 5.其他参数信息
         * */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台当中接收消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }
        /*
         * 发送一个消费
         * 1.发送到哪个交换机
         * 2.路由的key值是哪个,本次是队列的名称
         * 3.其他的参数信息
         * */

        System.out.println("消息发送完毕");

    }

创建两个工作者(消费者) 

public class Worker01 {
    //队列名称
    public  static  final  String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getchanel();
        //消息的接收
        /*
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费者成功之后是否要应答 true代表自动应答,false代表树洞应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的回调
         *
         * */
//消息的接收
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //消息接收被取消时,执行下面的内容
        CancelCallback callback=(consumerTag)->{
            System.out.println("消息被取消掉啊");
        };
        System.out.println("C1等待接收信息......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,callback);
    }
}

3.1抽取工具类

public class RabbitMqUtils {
    //创建一个连接工厂
    public static Channel getchanel() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接Rabbitmq的队列
        factory.setHost("192.168.184.30");
        //用户名和密码
        factory.setUsername("admin");
        factory.setPassword("admin");


        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        return channel;
}}

测试

1.启动启动linux虚拟机如何启动rabbitmq

rabbitmqctl start_app 

2.访问rabbitmq的web页面

192.168.184.30:15672

3.下面展示轮训的效果开启生产者输入下面依次回车

4.启动两个消费者,消费者work01接收到的为

 消费者work01接收到的为

 思考?

   如果C1工作时间非常的长,或者直接宕机,导致任务并没有完成,那么C1的消息就丢失了,为了防止消息丢失,才有rabbitmq引入消息应答机制

 3.2.1.概念


3.2.2.自动应答

3.2.3.消息应答的方法(手动应答)

3.2.4.Multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

multiple的true和 false代表不同意思

 

3.2.5.消息自动重新入队

下面演示手动应答

1.生产者类

/*
* 消息在手动应答时是不丢失,放回队列中出现消费
* */
public class Task2 {
    //队列名称
    public  static  final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getchanel();
        /*
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化,默认消息是存在内存中
         * 3,改队列是否只提供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能一个消费这个消费
         * 4,是否自动删除  最后一个消费者端开连接以后,该队一句是否自动删除 true:自动删除  false不自动删除
         * 5.其他参数信息
         * */
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //从控制台输入
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            //通过信道把消息发送出去
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息"+message);
        }

    }
}

两个消费者类

/*
* 消费者,消息在手动应答时是不丢失,放回队列中出现消费
* */
public class Worker01 {
    //队列名称
    public  static  final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getchanel();
        System.out.println("C1等待接收消息处理时间较短");
        //要求沉睡,模拟延时
        Thread.sleep(1000);

        //消息回调接口
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
            /*
             * 1.消息的标记
             * 2.是否批量应答  false表示不批量应答信道中的消息,true表示批量应答信道中的消息
             * 3.
             * */
            //手动应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //消息取消消费接口
        CancelCallback callback=(consumerTag)->{
            System.out.println("消息已经被取消");
        };

        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,callback);

    }

}
/*
* 消费者,消息在手动应答时是不丢失,放回队列中出现消费
* */
public class Worker02 {
    //队列名称
    public  static  final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getchanel();
        System.out.println("C2等待接收消息处理时间较短");
        //要求沉睡,模拟延时
        Thread.sleep(10000);

        //消息回调接口
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
            /*
             * 1.消息的标记
             * 2.是否批量应答  false表示不批量应答信道中的消息,true表示批量应答信道中的消息
             * 3.
             * */
            //手动应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //消息取消消费接口
        CancelCallback callback=(consumerTag)->{
            System.out.println("消息已经被取消");
        };

        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,callback);

    }

}

自己演示手动应答的效果

3.3.RabbitMQ持久化


3.3.1.概念


3.3.2.队列如何实现持久化

 

设置持久化会报错

#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

 

 

 重新启动生产者暖和和消费者,下面就是持久化的队列

3.3.3.消息实现持久化


要想让消息实现持久化需要在消息生产者修改代MessageProperties.PERSISTENT_TEXT_PLAIN
添加这个属性。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。 

t添加下面属性表示持久化到磁盘上

MessageProperties.PERSISTENT_TEXT_PLAIN

 

3.3.4.不公平分发(能者多劳)

在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数channel.basicQos(1); 

在消费者端设置不公平分发

 //设置不公平分发
        channel.basicQos(1);//默认为0是轮训分发,改为1是不公平分发,能者多劳

 

3.3.5.预取值
本身消息的发送就是异步发送的,所以在任何时候,channel上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用basic.g10s.方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ.将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息5、6、7,8,并且通道的预取计数设置为4,此时 RabbitMQ将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被ack。比方说tag=6这个消息刚刚被确ACK,RabbitMQp.将会感知这个情况到并再发送一条消息。消息应答和QoS,预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为1是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间

举报

相关推荐

0 条评论