031-云E办_RabbitMQ_工作队列
一、工作模式队列-消息轮巡分发(Round-robin)
1、介绍
通过Helloworld工程我们已经能够构建一个简单的消息队列的基本项目,项目中存在几个角色:生产者、消费者、队列,而对于我们真实的开发中,对于消息的消费者通过是有多个的,比如在实现用户注册功能时,用户注册成功,会给响对应用户发送邮件,同时给用户发送手机短信,告诉用户已成功注册网站或者app 应用,这种功能在大部分项目开发中都比较常见,而对于helloworld 的应用中虽然能够对消息进行消费,但是有很大问题:消息消费者只有一个,当消息量非常大时,单个消费者处理消息就会变得很慢,同时给节点页带来很大压力,导致消息堆积越来越多。对于这种情况,RabbitMQ 提供了工作队列模式,通过工作队列提供做个消费者,对MQ产生的消息进行消费,提高MQ消息的吞吐率,降低消息的处理时间。处理模型图如下
2、生产者send
package com.xxxx.work.rr.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作模式队列-轮巡-消息发送者
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
/**
* 声明队列
* 第一个参数queue:队列名称
* 第二个参数durable:是否持久化
* 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次
声明它的连接可见,并在连接断开时自动删除。
* 这里需要注意三点:
* 1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个
连接创建的排他队列的。
* 2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建
立同名的排他队列的,这个与普通队列不同。
* 3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会
被自动删除的。
* 这种队列适用于只限于一个客户端发送读取消息的应用场景。
* 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列
会被自动删除。
* 这种队列适用于临时队列。
*/
//绑定队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//模拟生产者的发送非常多的消息
for(int i = 0; i < 20; i++){
//创建消息
String message = "Hello World!" + i;
/*// 创建消息
String message = "Hello World!";*/
// 将产生的消息放入队列
// 发送消息(交换机、队列名称、额外发送消息、消息实体)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
// 关闭通道
if(null != channel && channel.isOpen()){
channel.close();
}
// 关闭连接
if (null != connection && connection.isOpen()){
connection.close();
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3、消费者:Recv01和02
package com.xxxx.work.rr.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作模式队列-轮巡-消息接收者。消费者
*/
public class Recv01 {
// 队列名称
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
try {
// 通过工厂创建连接
Connection connection = factory.newConnection();
// 获取通道,创建信道
Channel channel = connection.createChannel();
// 指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
// ---------------------之前旧版本的写法-------begin-----------
/*
// 获取消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope
envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 获取消息并在控制台打印
String message = new String(body, "utf-8");
System.out.println(" [x] Received '" + message + "'");
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
*/
// ---------------------之前旧版本的写法--------end------------
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费耗时:
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//手动确认:收到确认消息给队列. 可能接受一条,也能同时接受多条数据。 false:一条一条的确认。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
//false:手动去确认消息给队列
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag
-> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
4、测试:
先开启消费者01和02:
查看http服务台:开启两个接受者:两个信号和连接、多了一个信道
开启生产者:
查看消费者:01和02
轮流去信道获取消息:
5、总结:
从结果可以看出消息被平均分配到两个消费方,来对消息进行处理,提高了消息处理效率,创建多个消费者来对消息进行处理。这里RabitMQ采用轮询来对消息进行分发时保证了消息被平均分配到每个消费方,但是引入新的问题:真正的生产环境下,对于消息的处理基本不会像我们现在看到的这样,每个消费方处理的消息数量是平均分配的,比如因为网络原因,机器cpu,内存等硬件问题,消费方处理消息时同类消息不同机器进行处理时消耗时间也是不一样的,比如1号消费者消费1条消息时1秒,2号消费者消费1条消息是5秒,**对于1号消费者比2号消费者处理消息快,那么在分配消息时就应该让1号消费者多收到消息进行处理,**也即是我们通常所说的”能者多劳”,同样Rabbitmq对于这种消息分配模式提供了支持。
- 问题:任务量很大,消息虽然得到了及时的消费,单位时间内消息处理速度加快,提高了吞吐量,可是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。
为什么资源浪费: 是因为当消费者1消费的快,消费者2消费的慢,那么当消费者1消费完了以后,要等待消费者2消费完了以后,消费者1才能进行下一次消费。所以会导致资源的浪费哦。
解决:采用消息公平分发。
总结:工作队列-消息轮询分发-消费者收到的消息数量平均分配,单位时间内消息处理速度加快,提高了吞吐量。
二、工作模式队列-消息公平分发(fair dispatch)
在案例01中对于消息分发采用的是默认轮询分发,消息应答采用的自动应答模式,这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。
为了解决这个问题,我们使用 basicQos(prefetchCount = 1) 方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。执行模型图如下
1、生产者:
package com.xxxx.work.fair.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作模式队列-公平-消息发送者
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
/**
* 声明队列
* 第一个参数queue:队列名称
* 第二个参数durable:是否持久化
* 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次
声明它的连接可见,并在连接断开时自动删除。
* 这里需要注意三点:
* 1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个
连接创建的排他队列的。
* 2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建
立同名的排他队列的,这个与普通队列不同。
* 3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会
被自动删除的。
* 这种队列适用于只限于一个客户端发送读取消息的应用场景。
* 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列
会被自动删除。
* 这种队列适用于临时队列。
*/
//绑定队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//模拟生产者的发送非常多的消息
for(int i = 0; i < 20; i++){
//创建消息
String message = "Hello World!" + i;
/*// 创建消息
String message = "Hello World!";*/
// 将产生的消息放入队列
// 发送消息(交换机、队列名称、额外发送消息、消息实体)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
// 关闭通道
if(null != channel && channel.isOpen()){
channel.close();
}
// 关闭连接
if (null != connection && connection.isOpen()){
connection.close();
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2、消费者01 和 02
package com.xxxx.work.fair.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作模式队列-公平-消息接收者。消费者
*/
public class Recv01 {
// 队列名称
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
try {
// 通过工厂创建连接
Connection connection = factory.newConnection();
// 获取通道,创建信道
Channel channel = connection.createChannel();
// 指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
//限制消费者每次只能接受一条消息,处理完才能接受下一条消息
int prefetchCount=1;
channel.basicQos(prefetchCount);
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费耗时:
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//手动确认:收到确认消息给队列. 可能接受一条,也能同时接受多条数据。 false:一条一条的确认。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
//false:手动去确认消息给队列
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag
-> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3、测试:能者多劳
开启两个消费者
然后开启生成者