参考API : Overview (RabbitMQ Java Client 5.20.0 API)
参考文档: RabbitMQ: One broker to queue them all | RabbitMQ
目录
在此之前您需要了解生产者消费者模型... 学习的时候, 应该结合下面这张图一起看.
结构
Hello World
consumer
package one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException {
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("106.14.165.91");
factory.setPassword("123");
factory.setUsername("admin");
// 进行连接
Connection connection = factory.newConnection();
// 链接成功之后创建一个信道
Channel channel = connection.createChannel();
// 消费者消费消息
/**
* 参数
* 1.消费哪个队列
* 2.消费成功之后,是否要自动应答,true表示自动应答, 否则false
* 3.未消费成功的回调方法
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String var1, Delivery var2) throws IOException {
String msg = new String(var2.getBody());
System.out.println(msg);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("发生错误:" + s);
}
});
}
}
producer
package one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String QUEUE_NAME = "hello";
// 发消息
public static void main(String[] args) throws IOException, TimeoutException {
// 创建一个链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂的ip, 链接rabbit队列
factory.setHost("106.14.165.91");
factory.setUsername("admin");
factory.setPassword("123");
// 建立连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel1 = connection.createChannel();
// 生成一个队列
/**
* 第一个参数: 队列名称
* 第二个参数: 消息是否持久化, true表示存储在磁盘上, 否则表示存储在内存中(默认)
* 第三个参数: 该队列是否消息共享, true表示可以多个消费者消费, 否则只能一个消费者消费
* 第四个参数: 是否自动删除, 最后一个消费者断开连接之后, 该队列是否自动删除,true表示自动删除
* 其他参数:
*/
channel1.queueDeclare(QUEUE_NAME,false,false,false,null);
long nextPublishSeqNo = channel1.getNextPublishSeqNo();
System.out.println(nextPublishSeqNo);
// 发送消息
String msg = "hello world";
/** 参数列表
* 1 : 发送到哪个交换机
* 2 : 路由的key值, 本次是队列名称
* 3 : 其他参数
* 4 : 消息体
*/
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
long nextPublishSeqNo2 = channel1.getNextPublishSeqNo();
System.out.println(nextPublishSeqNo);
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
long nextPublishSeqNo3 = channel1.getNextPublishSeqNo();
System.out.println("消息发送完毕over");
}
}
创建连接API解析
官方api网址: Channel (RabbitMQ Java Client 5.20.0 API)declaration: package: com.rabbitmq.client, interface: Channelhttps://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html
Connection:publisher/consumer和broker之间的TCP连接, Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。
Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销.
创建连接工厂
创建一个连接工厂之后, 设置对应Rabbitmq在哪个服务器上面, 并提供安全访问的验证.
在建立连接工厂之后进行连接, 就可以使用工厂创建连接.
ConnectionFactory factory = new ConnectionFactory();
// 工厂的ip, 链接rabbit队列
factory.setHost("106.14.165.11");
factory.setUsername("usr");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
创建链接之后就可以使用这个链接的对象来创建channel.
生产者生产消息
生产者生产消息, 然后通过channel发送给队列. 通过创建的channel对象, 调用其中的basicPublish方法来将消息发送给队列.
basicPublish
是 RabbitMQ 中用于发布消息到指定交换机的方法。它的主要作用是允许生产者将消息发送到 RabbitMQ 的交换机,然后交换机根据路由规则将消息发送到相应的队列中,以供消费者消费。
basicPublish参数解析:
basicPublish有三个重载版本:
void basicPublish(String exchange , String routingKey , AMQP.BasicProperties props, byte[] body ) throws IOException;
- exchange : 指定要发送的交换机的名称, 如果设置空字符串, 那么消息会被发送到RabbitMQ的默认交换机.
- routingKey : 路由键, 用于指定消息要路由到的队列.
- props : 消息的属性, 这是一个可选参数, 里面有: 消息类型, 格式, 优先级, 过期时间等等
- body : 消息体, 也就是要发送的消息本身
void basicPublish(String exchange, String routingKey, boolean var3, AMQP.BasicProperties props, byte[] body) throws IOException;
- exchange和
routingKey
:与第一个方法中的意义相同,分别是交换机名称和路由键。 var3
(boolean):是否强制路由(mandatory routing)。如果设置为true
,并且消息无法路由到任何队列(没有匹配的绑定),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被丢弃。- props和 body:与第一个方法中的意义相同,分别是消息属性和消息体。
void basicPublish(String var1, String var2, boolean var3, boolean var4, AMQP.BasicProperties var5, byte[] var6) throws IOException;
var1
和var2
:与前两个方法中的意义相同,分别是交换机名称和路由键。var3
(boolean):是否强制路由,与第二个方法中的意义相同。var4
(boolean):是否立即发布(immediate flag)。如果设置为true
,并且消息无法路由到任何消费者(没有匹配的队列或消费者不在线),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被存储在队列中等待消费者。var5
和var6
:与第一个方法中的意义相同,分别是消息属性和消息体。
消费者消费消息
消费者消费消息的方法为basicConsume() 这个方法有很多个重载, 如下:
地址: Channel (RabbitMQ Java Client 5.20.0 API)
这里只讲解最常见的, 也是初学者最常用的一个方法:
basicConsume(
String queue,
boolean autoAck,
Map<String,Object> arguments,
DeliverCallback deliverCallback,
CancelCallback cancelCallback
)
参数解析:
- String queue:
- 这个参数指定了消费者要从中接收消息的队列名称。
- boolean autoAck:
- 这个参数决定了是否自动确认消息。如果设置为
true
,则一旦消息被交付给消费者,RabbitMQ 会自动将其标记为已确认,即使消费者还没有实际处理完这条消息。这种模式下,如果消费者在处理消息时崩溃或发生错误,那么这条消息就会丢失,因为 RabbitMQ 认为它已经被成功处理了。 - 如果设置为
false
,则消费者需要显式地调用basicAck
方法来确认消息已被成功处理。这样,如果消费者在处理消息时崩溃,RabbitMQ 会重新将这条消息放回队列中,等待其他消费者处理,从而保证了消息的可靠性。
- 这个参数决定了是否自动确认消息。如果设置为
- Map<String,Object> arguments:
- 这个参数允许你传递额外的参数到消费者,这些参数可以用来配置消费者的行为。例如,你可以使用它来设置消费者标签(consumer tag),该标签用于唯一标识这个消费者,或者在后续的操作中引用它。
- DeliverCallback deliverCallback:
- 这是一个回调函数,当 RabbitMQ 向消费者发送消息时,会自动调用这个回调。回调函数通常包含处理消息的逻辑,比如解析消息内容、执行业务逻辑等。
- 回调函数的参数通常包含消息的内容、消息的元数据(如消息的交换机、路由键、消息ID等)以及一个通道(Channel)对象,通过这个通道对象,消费者可以发送消息确认、拒绝消息或进行其他操作。
- CancelCallback cancelCallback:
- 这是一个可选的回调函数,当消费者被取消(例如,由于连接断开或消费者显式地调用
basicCancel
)时,会自动调用这个回调。这个回调可以用于执行清理工作,比如释放资源、记录日志等。
- 这是一个可选的回调函数,当消费者被取消(例如,由于连接断开或消费者显式地调用
下面是 DeliverCallback 和CancelCallback 两个接口的代码:
@FunctionalInterface
public interface DeliverCallback {
void handle(String var1, Delivery var2) throws IOException;
}
@FunctionalInterface
public interface CancelCallback {
void handle(String var1) throws IOException;
}
我们需要重写里面的handle方法, 示例如下:
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String var1, Delivery var2) throws IOException {
// ...
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
// ...
}
});
当然你也可以使用lambda表达式;
队列声明
生产者使用的是basicPublish来将消息推送至队列, 也就是:
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
但是存在一个问题, 如果你basicPublish指定的交换机不存在? 那么你推送消息到你指定的交换机, 就会发生异常, 所以除非你的RabbitMQ-server本地已经创建了这个交换机, 那么就不需要其他操作, 但是如果你没有你指定的名称的交换机, 那么就应该去声明一个交换机.
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
queueDeclare()
也就是方法: queueDeclare() , 它可以指定参数, 也可以不指定, 下面是他们的解释:
queueDeclare
是 RabbitMQ Java 客户端库中用于声明队列的方法。这个方法有两个版本,一个不带参数,另一个带有多个参数以提供队列的详细配置。下面我将详细解释这两个方法及其参数的作用。
第一个方法:queueDeclare()
Actively declare a server-named exclusive, autodelete, non-durable queue.
这个方法不带任何参数。当你调用这个方法时,RabbitMQ 会为你创建一个新的队列,该队列的名称将由 RabbitMQ 自动生成,并且这个队列是非持久的、排他的、自动删除的,且不带任何额外的参数。
由于没有指定队列名称,你通常无法预先知道队列的确切名称,这可能会在某些场景下造成不便,比如当你需要多个消费者共享同一个队列时。此外,由于队列是非持久的,如果 RabbitMQ 服务器重启,这个队列将会丢失,所有在队列中的消息也会丢失。
第二个方法:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
这个方法允许你更细致地配置队列的属性。下面是每个参数的解释:
- String queue:
- 队列的名称。这个名称必须是非空的,并且在 RabbitMQ 服务器上是唯一的。
- boolean durable:
- 是否持久化队列。如果设置为
true
,队列会在 RabbitMQ 服务器重启后依然存在。如果设置为false
,队列则是非持久的,服务器重启后队列将不存在。
- 是否持久化队列。如果设置为
- boolean exclusive:
- 是否排他。如果设置为
true
,队列只能被声明它的连接使用,并且当连接关闭时,队列会被自动删除。这通常用于临时队列。
- 是否排他。如果设置为
- boolean autoDelete:
- 是否自动删除。如果设置为
true
,当最后一个消费者断开连接后,队列会自动删除。如果设置为false
,则不会自动删除队列。
- 是否自动删除。如果设置为
- Map<String,Object> arguments:
- 一组额外的队列参数,可以用来设置队列的更多高级特性。例如,你可以设置队列的最大长度、消息生存时间等。
对比两个方法
第一个方法(无参数版本)非常简单易用,但功能有限。它适用于那些不需要复杂队列配置的场景,比如临时测试或简单应用。然而,由于它创建的队列是非持久的,且名称不可预知,因此它可能不适用于需要持久化存储或精确控制队列名称的场景。
第二个方法(带参数版本)提供了更丰富的队列配置选项,使得你可以更精确地控制队列的行为。通过设置不同的参数,你可以创建持久化队列、排他队列、自动删除队列,以及带有额外属性的队列。这使得这个方法适用于那些需要复杂队列配置和高级特性的场景。
在实际应用中,你应该根据应用的需求来选择使用哪个方法。如果你只是需要一个简单的、临时的队列来传递消息,那么无参数版本可能足够了。但如果你需要确保队列的持久性、控制队列的名称、设置队列的额外属性等,那么你应该使用带参数版本。
对于消费者同样如此
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
这个方法确实是用于声明(或创建)一个队列的。在RabbitMQ中,队列的声明是一个幂等操作,这意味着即使队列已经存在,再次声明它也不会产生错误或导致任何不期望的行为。
当调用这个方法时,RabbitMQ会检查是否已经存在具有相同名称的队列:
-
如果队列不存在:RabbitMQ会根据提供的参数(如
durable
、exclusive
、autoDelete
和arguments
)创建一个新的队列。 -
如果队列已经存在:RabbitMQ会忽略声明请求中的大多数参数(除了
exclusive
和autoDelete
,这两个参数仅在首次声明队列时生效),并返回队列的当前属性。重要的是要注意,即使队列已经存在,durable
标志也不会影响现有队列的持久性。如果队列在原始声明时是持久的,那么它将继续是持久的,即使后续的声明将其标记为非持久的。
因此,如果你尝试声明一个已经存在的队列,RabbitMQ不会报错或采取任何特别的行动,除了验证提供的exclusive
和autoDelete
标志是否与原始声明一致(如果不一致,操作会失败)。其他参数(如durable
)将不会影响已存在的队列(但是不报错并不是绝对的, 这个需要根据版本说明去判断, 不能肯定它不报错)。
最后,需要注意的是,虽然声明队列本身不会抛出IOException
,但如果在与RabbitMQ服务器通信时发生网络问题或其他I/O问题,这个方法可能会抛出IOException
。因此,在实际使用中,你应该妥善处理这些潜在的异常。
工作队列Work Queues
工作队列, 主要是避免立即执行资源密集型任务, 而不得不等待它完成, 相反我们安装任务之后执行, 我们把任务封装为消息并将其发送给队列, 在后台运行的工作进程将弹出任务并最终执行作业, 当有多个线程工作时, 这些工作线程讲义气处理这些任务.
RabbitMQ的工作队列(Work Queue)是一种消息队列模式,它允许你将任务(通常表示为消息)分发给多个消费者(工作进程)进行并行处理。这种模式特别适用于那些可以并行处理且不需要按照特定顺序完成的任务。
在工作队列模式中,生产者发送消息到队列中,一个或多个消费者从队列中接收并处理这些消息。每个消息都会被一个消费者处理,并且通常不会被多个消费者处理(除非有明确的路由或复制逻辑)。这种模式非常适合用于处理后台任务,如批量电子邮件发送、日志处理、图像处理等。
RabbitMQ的工作队列模式有以下几个关键特点:
-
任务分发:生产者将任务作为消息发送到队列中。RabbitMQ负责将消息从队列中取出并分发给一个或多个消费者。分发通常基于消息的先进先出(FIFO)顺序,但也可以通过其他策略(如优先级队列)进行定制。
-
并行处理:多个消费者可以同时从队列中接收消息并处理任务。这使得任务可以并行执行,从而提高了整体的处理速度。
-
消息确认:为了确保消息的可靠处理,消费者通常在处理完消息后会向RabbitMQ发送一个确认信号(ack)。这样,即使消费者在处理消息时崩溃,RabbitMQ也可以将未确认的消息重新放回队列中,等待其他消费者处理。这种机制保证了消息的可靠性。
-
持久化:通过配置队列和消息的持久化属性,可以确保即使在RabbitMQ服务器重启后,消息也不会丢失。这对于处理重要任务至关重要。
-
扩展性:工作队列模式具有很好的扩展性。你可以根据需要添加更多的消费者来处理更多的任务,从而轻松应对负载的增加。
使用RabbitMQ的工作队列模式,你可以构建高效、可靠且可扩展的后台任务处理系统,以满足各种应用场景的需求。
下面我们来一一列举出案例来解析工作队列的特性....
公平分发
RabbitMQ 在默认情况下,其分发机制是公平的,它试图将消息平均地分发给各个消费者,确保每个消费者都有机会处理大致相同数量的消息。这种分发并不是随机的,而是按照一定的顺序或规则进行。
但是这种分发模式会有一个很大的问题, 那么就是如果一个消费者处理消息的速度慢, 一个快, 那么就会有一个消费者产生饥饿的情况, 而另外一个消费者非常忙碌, 严重的队列会出现消息积压的情况. 此时产生饥饿的消费者没有完全利用cpu来消费消息, 所以就产生了资源的浪费, 为了避免这个情况
在 RabbitMQ 中,如果消息被平均分发到多个消费者(如消费者a和b),但消费者的处理速度不同(如a处理速度很快,b处理速度很慢),那么未被消费的消息会继续保留在队列中,等待消费者处理。具体来说,当消费者a迅速处理完自己的消息后,它会继续从队列中获取并处理新的消息(如果有的话)。而消费者b由于处理速度慢,它还未消费完的消息会留在队列中,等待其逐渐处理。
RabbitMQ 本身并没有为每个消费者设置单独的缓存来存储未处理的消息。消息的处理和存储都是在队列层面进行的。队列是消息的缓冲区,它负责存储和分发消息给消费者。消费者按照自己的速度从队列中拉取(或在某些配置下由队列推送)消息进行处理。
为了解决这种情况, 可以使用basicQos(1)方法来设置每个消费者同时只能消费一个消息, 这个设置将会告诉队列, 给我发送的消息, 同时不能超过一个 , 或者说是"别给我发送消息, 除非我上一个消息已经处理并应答", 同时, 他会将第二个消息发送给另外一个空闲的消费者来处理.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
但是如果所有的消费者都处于忙碌状态, 消息无法即使处理, 那么如果你还有必要维护这个队列, 那么推荐您多创建几个消费者去消费.
轮训分发
首先创建两个消费者, 创建一个生产者, 看看他们之间的任务是如何分配的:
消费者1 :
package MutiThreadWorkQueue;
import Util.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
/**
* 工作队列1
* 也就是消费者1
*/
public class Worker1 {
// 接收消息
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
// sout
System.out.println("worker1 : ");
// 接收消息
channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("worker1: " + new String(delivery.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("worker1 发生错误");
}
});
}
}
消费者2同消费者1一样, 只不过里面的一些向控制台输出的提示信息发生了一些修改, 例如:
System.out.println("worker2 : ");
生产者
package MutiThreadWorkQueue;
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
// 从控制台输入 接收信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish("",RabbitMQUtil.QUEUE_NAME,null,msg.getBytes());
System.out.println("发送消息: " + msg + ",发送完毕");
}
}
}
RabbitMQUtil :
package Util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtil {
public static final String QUEUE_NAME = "hello";
public static Channel getChannel() {
ConnectionFactory factory = new ConnectionFactory();
// 工厂的ip, 链接rabbit队列
factory.setHost("106.14.165.91");
factory.setUsername("admin");
factory.setPassword("123");
// 建立连接
Connection connection = null;
try {
connection = factory.newConnection();
return connection.createChannel();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
// 获取信道
}
}
查看rabbitmq是否已经存在RabbitMQUtil.QUEUE_NAME这个字符串对应的队列:
已经存在, 直接启动生产者和消费者, 然后在生产者中多次输入信息:
查看消费者1和消费者2 :
如果你多次重复的去实验你就会发现, 总是奇数的在woker1或者是woker2.
为什么?
我们首先看看消费者的消费的代码:
channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("worker1: " + new String(delivery.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("worker1 发生错误");
}
});
然后根据上面的参数解析, 可以发现, 其实他是使用的这个构造方法:
这个构造方法有什么特别之处? 那就是它没有指定exclusive, 也就是没有指定它是否是排他的, 但是不设置就不代表没有隐式设置.
官方文档给出的这个构造方法的描述是:
也就是non-sxclusive, 我们知道exclusive是排他的意思, 那么non-exclusive就是不拍他的, 也就是说, 消费的时候允许其他消费者一起共享处理. 但是每一个任务只能分发给一个消费者.
消息应答
首先你得确认一个东西, 那就是, 消息从生产者这里发送出去, 就可以不管了吗, 队列将消息分配给消费者, 就可以不管了吗, 当然不是, 还需要使用一种应答机制, 你可以将它和TCP协议的应答报文机制和超时重传进行一个对比.
RabbitMQ 的消息应答机制是一个确保消息在发送和接收过程中可靠性的重要手段。这种机制主要用于处理消费者在处理消息时可能出现的异常情况,如消费者在处理消息过程中宕机,导致消息丢失。
RabbitMQ 一旦向消费者传递了一条消息,通常会将该消息标记为已发送。然而,如果消费者在处理消息的过程中发生宕机,未处理的消息可能会丢失。为了保证消息在发送过程中不丢失,RabbitMQ 引入了消息应答机制。
消息应答机制的工作原理是:消费者在接收到消息并且处理完该消息之后,会向 RabbitMQ 发送一个确认信号,告诉 RabbitMQ 它已经处理了该消息,此时 RabbitMQ 可以安全地将该消息从队列中删除。
RabbitMQ 提供了两种消息应答模式:
- 自动应答(Auto Acknowledgment):在这种模式下,一旦消息被消费者接收,RabbitMQ 会立即将消息标记为已被消费,而不需要消费者明确地向 RabbitMQ 发送确认。这种模式对消息的处理时机和可靠性要求不高,可以容忍一定程度的消息丢失。但是,如果消费者在处理消息的过程中发生错误,消息仍然会从队列中删除,这可能导致消息丢失。
- 手动应答(Manual Acknowledgment):在手动应答模式下,消费者在处理完消息之后,需要向 RabbitMQ 发送明确的确认信号。这种模式下,消费者可以更精确地控制消息的删除时机,只有在确认消息已经成功处理后才通知 RabbitMQ 删除消息。这有助于防止因消费者处理错误或宕机而导致的消息丢失。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
在实际使用中,可以根据应用的需求和消息的重要性来选择适合的消息应答模式。对于要求消息可靠传递的场景,建议使用手动应答模式;而对于对消息丢失容忍度较高的场景,可以选择自动应答模式以提高处理效率。
对于自动应答, 你想想,你有没有在什么地方见过?? 那肯定见过呀, 就在我们的api解析中, basicConsume的构造方法中, 存在一个参数, 名为AutoAck, 也就是Auto acknowledgement, 自动确认. 此时, 如果你设置为true, 那么就表明他是自动确认, 当队列将消息发送给消费者, 只要消费者接收到消息之后, RabbitMQ会立即将消息标记为已消费, 然后删除.
但是这样不安全, 得换一个更安全的方法 : 手动应答.
开启消息应答(手动) 你首先需要设置消费者的消费autoack为false :
官方有这样一句话描述basicAck:
简而言之就是, 如果你没有进行手动应答, 虽然是一个很容易犯的错误, 但是他会造成严重的后果, 也就是当你的客户端退出的时候消息会被重新推送(就像一些消息被无规则的推送), 但是RabbitMQ将会占用越来越多的内存, 这是因为这些消息没有得到正确的处理.
接下来我们看看basicAck这个方法的声明:
// Acknowledge one or several received messages.
basicAck(long deliveryTag, boolean multiple)
参数解析:
- deliveryTag: 这是一个长整型(
long
)参数,代表要确认的消息的投递标签(delivery tag)。投递标签是 RabbitMQ 在发送消息给消费者时附带的,用于唯一标识这个消息。通过确认特定的投递标签,消费者可以告诉 RabbitMQ 它已经处理了哪个消息。 - multiple : 这是一个布尔型(
boolean
)参数,指示是否确认一个投递标签范围内的多个消息。如果multiple
设置为true
,则 RabbitMQ 会将投递标签小于或等于指定deliveryTag
的所有未确认消息标记为已确认。如果multiple
设置为false
,则仅确认具有指定deliveryTag
的单个消息。
那么, 这个deliveryTag和multiple从哪里来? 还记得处理接口DeliverCallback 吗, 每次消息队列向这个消费者发送消息, 消费者就会调用这个接口.
DeliverCallback
在 RabbitMQ 的 Java 客户端中是一个回调接口,用于处理从 RabbitMQ 队列接收到的消息。当 RabbitMQ 服务器向消费者发送消息时,它会调用这个回调接口,并将消息作为参数传递给 DeliverCallback
的实现方法。
具体来说,DeliverCallback
的实现方法接收两个参数:
consumerTag
:这是一个唯一标识消费者的标签,用于在多个消费者之间区分不同的消费实例, 是队列发送给消费者的时候自动为消费者分配的。delivery
:这是一个Delivery
对象,它包含了从 RabbitMQ 接收到的消息的内容以及其他相关信息,如消息的包体(body)、消息的头部(headers)、消息的投递标签(delivery tag)等。
DeliverCallback
不是一个缓存。它仅仅是一个回调函数,用于实时处理从 RabbitMQ 服务器接收到的消息。每当有新消息到达时,RabbitMQ 就会调用这个回调函数,并将消息传递给它。因此,你的消费者代码需要在 DeliverCallback
的实现中编写处理消息的逻辑。
例如,在上面的代码示例中,当接收到消息时,DeliverCallback
的实现会打印出消息内容,模拟一些处理过程(在这个例子中是等待两秒),然后发送一个确认信号给 RabbitMQ,告诉它消息已经被成功处理。
需要注意的是,DeliverCallback
的实现应该尽可能快地处理消息并发送确认信号,以避免消息在队列中堆积。如果处理消息的过程非常耗时,或者有可能失败,你可能需要考虑使用更复杂的错误处理机制,比如重试逻辑、死信队列等。
channel.basicConsume(TASK_QUEUEN_NAME, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("c1 消费者接收到 取消接口消费回调逻辑");
}
});
至于multiple, 批量应答以减少网络拥堵:
处理消息:
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
发布确认
为了保证消息被安全的发送给broker, 也就是RabbitMQ队列, 你应该需要使用到一些策略, 来让发布确认生效.
开启发布确认模式
Channel channel = connection.createChannel();
channel.confirmSelect();
发布确认模式是, AMQP 0.9.1 协议对于RabbitMQ的扩展, 所以发布确认模式不是默认启动的, 发布确认需要再channel 频道开启, 使用上述的confirmSelect()方法来开启发布确认.
开启发布确认之后, producer每次发送消息之后, 都会遵循相应的确认策略, 可以单个确认, 也可以批量确认, 下面是发布确认的一些常用确认方法:
- waitForConfirms()
这个方法会阻塞当前线程,直到自上次调用此方法以来发布的所有消息都被Broker确认(ack)或拒绝(nack)。如果没有设置超时时间,它可能会无限期地等待,直到所有消息都被处理。
返回类型是boolean
,但在大多数情况下,此方法可能会因为阻塞而不返回任何值。实际上,其返回值的意义可能取决于具体的RabbitMQ客户端库实现,但通常这种同步等待方法不会使用其返回值来进行流控制或错误处理。
2. waitForConfirms(long timeout)
与上一个方法类似,这个方法也会阻塞当前线程,等待Broker对消息的确认或拒绝。但是,它接受一个超时参数timeout
,表示等待的最大时间(以毫秒为单位)。如果在指定的超时时间内Broker没有对所有消息进行确认或拒绝,那么该方法将停止等待并返回。
返回类型是boolean
,但同样,返回值的意义可能取决于具体的RabbitMQ客户端库实现。通常,如果所有消息都在超时前得到了确认,则返回true
;如果超时了,则返回false
。
3. waitForConfirmsOrDie()
这个方法的行为与waitForConfirms()
类似,也会阻塞当前线程,等待Broker对所有消息进行确认或拒绝。但是,如果Broker没有对所有消息进行确认或拒绝,那么这个方法不会返回,而是会抛出异常(通常是运行时异常),导致当前线程终止。
由于这个方法可能导致线程终止,因此它通常用于那些对消息确认有严格要求的场景,并且愿意在消息未得到确认时让整个程序失败。
4. waitForConfirmsOrDie(long timeout)
这个方法结合了waitForConfirms(long timeout)
和waitForConfirmsOrDie()
的特点。它会在指定的超时时间内等待Broker对所有消息进行确认或拒绝。如果超时时间到了,而Broker还没有对所有消息进行确认或拒绝,那么这个方法会抛出异常,导致当前线程终止。
这种方法在需要确保消息被处理但又不想无限期等待的情况下非常有用。它允许设置一个合理的超时时间,以便在消息处理失败时能够及时地采取其他措施。
单独的发送消息
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
上面的例子中我们发送了一个消息, 然后等待他的确认(waitForConfirmsOrDie(5_000)), 这个方法将会再消息得到队列的确认之后返回, 如果消息没有在指定time内确认, 或者是由于某些原因队列无法返回确认消息(比如网络原因) , 那么该方法就会抛出异常, 这种异常的处理一般是记录日志, 或者重新将消息发送.
不同的客户端库拥有不同的方法区同步处理发布者确认模式, 所以确保仔细阅读你所使用的客户端的文件.
缺点:
这种方法虽然是很简便的, 但是也有一些主要的缺点, 它大大降低了发布者发布的效率, 因为一个消息的确认, 阻止了发布随后将要发布的所有消息.这种方法, 将不能提供每秒发送几百条消息的吞吐量, 但是这种方法对于某一些应用来说, 还是很不错的, 足够支持一个应用了.
批量发布
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
对比上面的一次性发布确认, 可以看到这个java代码有很大的不同, 首先, 他不并不是每一次循环都进行一个等待确认, 而是当 outstandingMessageCount == batchSize 这个条件成立再进行确认.
等待批量发送的消息被确认, 这个提高了吞吐量(对比于单独确认), 差不多时单独确认的20 ~ 30倍的效率提升, 但是他的一个缺点就是, 我们不能明确知道在失败的情况中, 是什么原因造成这种失败. 所以我们需要让整个批量发送维护在内存中来记录一些有用的东西, 或者重新发送该消息, 并且这种方法依然是同步的, 也就是在等待确认的时候, 会阻塞当前线程, 也就会阻止当前线程继续publish消息.
异步确认
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
broker异步的确认发送过来的消息, 仅仅只需要在客户端上注册一个回调函数, 来监视这些确认信息.
这里有两个回调, 一个是已经确认的消息, 一个是被拒绝的消息(你可以理解为被RabbitMQ丢弃的消息), 每一次回调都有两个参数:
- sequenceNumber: 序列号, 这个序列号码用来标记被确认或者被拒绝的消息,
- multiple : boolean类型的数据, 如果为false, 那么仅仅是一个消息被确认或者拒绝. 如果为true, 所有的小于等于sequenceNumber的消息都会被确认或者拒绝.
每个消息在发布之前, 你可以通过下面的方法来获取到序列号:
int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
你可以使用这个序列号来找到对应的被拒绝或者是被确认的消息, 然后做出相关的处理操作. 但是在此之前, 你应该首先维护一个 key -value 的map, 以便记录sequenceNumber和对应消息的关联.
下面是一些代码案例:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
如何使用这个ConcurrentSkipListMap?
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
相关的实体类说明
Delivery消息体
源码:
public class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
this._envelope = envelope;
this._properties = properties;
this._body = body;
}
public Envelope getEnvelope() {
return this._envelope;
}
public AMQP.BasicProperties getProperties() {
return this._properties;
}
public byte[] getBody() {
return this._body;
}
}
解析:
这个Delivery
类是在RabbitMQ的Java客户端中使用的,用于封装从RabbitMQ服务器接收到的消息。下面我将详细解释类中的参数和方法的作用:
参数:
Envelope envelope
:- 作用:这个参数包含了消息的元数据,比如消息的
deliveryTag
(投递标签)、exchange
(交换机)名称、routingKey
(路由键)等。 - 重要字段:
deliveryTag
是一个唯一的标识符,用于确认(ack)或拒绝(nack)特定的消息。
- 作用:这个参数包含了消息的元数据,比如消息的
AMQP.BasicProperties properties
:- 作用:这个参数包含了消息的附加属性,比如消息的内容类型、消息头部、消息的优先级、消息的发布和过期时间等。
- 重要字段:
contentType
表示消息的内容类型(例如,text/plain
或application/json
),headers
可以包含自定义的键值对,用于传递额外的信息。
byte[] body
:- 作用:这个参数包含了消息的实际内容。在RabbitMQ中,消息的内容被表示为字节数组,这意味着你可以发送任何类型的数据,只要你能将其转换为字节。
- 处理方式:通常,你需要根据
properties
中的contentType
字段来确定如何解析这个字节数组。例如,如果contentType
是text/plain
,你可能需要将其转换为字符串;如果是application/json
,你可能需要将其解析为JSON对象。
方法:
public Envelope getEnvelope()
:- 作用:这个方法返回消息的元数据(
Envelope
对象)。通过这个方法,你可以获取到消息的deliveryTag
,进而在处理完消息后进行确认或拒绝操作。
- 作用:这个方法返回消息的元数据(
public AMQP.BasicProperties getProperties()
:- 作用:这个方法返回消息的附加属性(
AMQP.BasicProperties
对象)。你可以使用这个方法获取到消息的contentType
、headers
等字段,以便正确地解析和处理消息内容。
- 作用:这个方法返回消息的附加属性(
public byte[] getBody()
:- 作用:这个方法返回消息的实际内容(字节数组)。你需要根据
getProperties()
返回的属性来确定如何解析这个字节数组。
- 作用:这个方法返回消息的实际内容(字节数组)。你需要根据
使用场景:
当你在RabbitMQ的Java客户端中消费消息时,RabbitMQ服务器会将消息封装为一个Delivery
对象,并通过DeliverCallback
回调给你。你可以在回调中处理这个消息,例如解析消息内容、执行业务逻辑,并在处理完后通过channel.basicAck
方法发送确认。
总之,Delivery
类及其参数和方法在RabbitMQ的Java客户端中起到了封装和传递消息的作用,使得开发者能够方便地获取和处理从RabbitMQ服务器接收到的消息。
Envelope元数据
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.rabbitmq.client;
public class Envelope {
private final long _deliveryTag;
private final boolean _redeliver;
private final String _exchange;
private final String _routingKey;
public Envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) {
this._deliveryTag = deliveryTag;
this._redeliver = redeliver;
this._exchange = exchange;
this._routingKey = routingKey;
}
public long getDeliveryTag() {
return this._deliveryTag;
}
public boolean isRedeliver() {
return this._redeliver;
}
public String getExchange() {
return this._exchange;
}
public String getRoutingKey() {
return this._routingKey;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Envelope(deliveryTag=").append(this._deliveryTag);
sb.append(", redeliver=").append(this._redeliver);
sb.append(", exchange=").append(this._exchange);
sb.append(", routingKey=").append(this._routingKey);
sb.append(")");
return sb.toString();
}
}
Envelope
类是 RabbitMQ Java 客户端库中的一个类,它用于封装从 RabbitMQ 服务器接收到的消息的元数据。这个类包含了关于消息的一些重要信息,比如投递标签(deliveryTag
)、是否重新投递(redeliver
)、交换机名称(exchange
)和路由键(routingKey
)。
下面是 Envelope
类中每个字段和方法的详细解释:
字段:
_deliveryTag
:- 类型:
long
- 描述:这是 RabbitMQ 为每条消息分配的唯一标识符。当消费者处理完消息后,需要使用此标签来确认(ack)或拒绝(nack)消息。
- 类型:
_redeliver
:- 类型:
boolean
- 描述:这个字段表示消息是否被重新投递。如果消息之前被投递过但因为某些原因(例如消费者未正确确认)而被 RabbitMQ 重新放入队列,这个字段就会是
true
。
- 类型:
_exchange
:- 类型:
String
- 描述:这个字段表示消息最初被发送到的交换机名称。交换机是 RabbitMQ 中用于路由消息的关键组件。
- 类型:
_routingKey
:- 类型:
String
- 描述:这个字段表示消息在发送时使用的路由键。路由键用于确定消息应该被路由到哪个队列。
- 类型:
方法:
getDeliveryTag()
:- 返回值:
long
- 描述:这个方法返回消息的投递标签。
- 返回值:
isRedeliver()
:- 返回值:
boolean
- 描述:这个方法返回一个布尔值,表示消息是否被重新投递。
- 返回值:
getExchange()
:- 返回值:
String
- 描述:这个方法返回消息最初被发送到的交换机名称。
- 返回值:
getRoutingKey()
:- 返回值:
String
- 描述:这个方法返回消息在发送时使用的路由键。
- 返回值:
toString()
:- 返回值:
String
- 描述:这个方法覆盖了
Object
类中的toString
方法,用于返回Envelope
对象的字符串表示形式,方便调试和日志记录。
- 返回值:
使用场景:
当消费者从 RabbitMQ 接收消息时,每条消息都会附带一个 Envelope
对象。消费者可以使用 Envelope
对象中的方法来获取消息的元数据,并根据这些信息来决定如何处理消息。例如,消费者可以使用 getDeliveryTag
方法获取投递标签,以便在处理完消息后发送确认。
持久化
我们上面已经了解如何保证任务不会丢失, 即使消费者连接丢失. 但是我们的任务依然会有丢失的风险, 例如RabbitMQ服务器崩掉.
当RabbitMQ服务器退出或者崩溃的时候, 他将会清除队列和消息, 除非你指定它不清除. 我们需要做两件事情, 来保证即使是服务器崩溃也不会丢失数据.
首先我们需要确保队列会在RabbitMQ节点重启之后存活, 要想做到这样, 就需要声明这个队列为持久化模式
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
但是前面我们讲到, 我们应该避免对一个已经存在的队列重新定义, 因为他不会生效, RabbitMQ是不允许使用不同的参数(durable, autoDelete,exclusive等)重新定义一个已经存在的queue的. 即使这个语句本身是正确的. 如果你这样做将会返回一个错误信息.
你可以声明一个不同名称的queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
设置了以上的信息之后, 就可以保证此时这个队列将不会在RabbitMQ重启的时候丢失了, 但是这并不意味着RabbitMQ重启之后, 消息不会丢失, 因为你仅仅只是持久化了queue, 而不是消息, 现在我们需要将我们的消息同时也标记为持久化模式.
如何将消息体设置为durable? 我们思考一下, 首先消息是从producer那边publish过来的, 那么我们可不可以从basicPublish这个方法中找线索?? 还真被你找到了, 如下:
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
我们在推送消息的时候, 可以设置一个属性AMQP.BasicProperties props, 这个属性定义如下:
public static class BasicProperties extends AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String, Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
// 方法体 ... 省略
}
这个BasicProperties
类继承自AMQBasicProperties
,它扩展了AMQP协议中消息属性的基础定义。AMQP(高级消息队列协议)是一个开放、可靠、面向消息的中间件协议,它支持多种消息传递模式,包括发布/订阅、点对点、请求/响应等。下面是对该类中一些属性和方法的基本解释:
属性:
- contentType:
- 用途:表示消息体的MIME类型,例如
text/plain
或application/json
。这有助于接收方知道如何解析消息内容。
- 用途:表示消息体的MIME类型,例如
- contentEncoding:
- 用途:表示消息内容使用的字符编码,如
UTF-8
。
- 用途:表示消息内容使用的字符编码,如
- headers:
- 用途:一个自定义的键值对集合,允许发送方和接收方传递额外的信息。
- deliveryMode:
- 用途:定义消息的持久性。通常有两个值:1表示非持久(消息不存储在服务器上),2表示持久(消息存储在服务器上,直到被消费)。
- priority:
- 用途:消息的优先级,用于在多个消息等待消费时决定先处理哪个消息。
- correlationId:
- 用途:用于将回复与请求关联起来,通常用于RPC(远程过程调用)模式。
- replyTo:
- 用途:用于指定一个队列名,用于接收对这条消息的回复。这在RPC场景中特别有用。
- expiration:
- 用途:定义消息的生存时间(TTL,Time-To-Live)。如果在这段时间内消息没有被消费,它将被丢弃。
- messageId:
- 用途:为消息提供一个全局唯一的标识符。
- timestamp:
- 用途:表示消息创建或发送的时间。
- type:
- 用途:表示消息的类型或名称,用于在多个不同类型的消息中进行区分。
- userId:
- 用途:创建或发送消息的用户ID。
- appId:
- 用途:标识创建消息的应用程序的名称。
- clusterId:
- 用途:表示消息来自的RabbitMQ集群的ID。
方法:
通常,该类还会包含一些用于获取和设置这些属性的getter和setter方法,以及可能的其他方法用于序列化、反序列化或比较属性等。具体的方法实现取决于这个类的完整源代码。
使用场景:
这些属性通常用于确保消息的正确路由、处理和持久化。例如,发送方可能会设置replyTo
和correlationId
以接收RPC回复;或者设置priority
来确保某些关键消息优先被处理。接收方则会使用这些属性来正确地处理或路由消息。
其中有一个deliveryMode, 这个表示消息的持久化 .
所以我们第一个想到的就是通过构建一个BasicProperties对象,然后设置里面的属性,然后传入给basicPublish, 如下:
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
// ... 其他代码 ...
// 创建消息属性
Map<String, Object> headers = new HashMap<>();
headers.put("custom-header", "some-value");
BasicProperties properties = new BasicProperties.Builder()
.contentType("text/plain")
.contentEncoding("UTF-8")
.headers(headers)
.deliveryMode(2) // 设置为持久化消息
.priority(1)
.correlationId("my-correlation-id")
.replyTo("my-reply-queue")
.expiration("60000") // 消息将在60秒后过期
.messageId("my-message-id")
.timestamp(new java.util.Date())
.type("my-message-type")
.userId("my-user-id")
.appId("my-app-id")
.clusterId("my-cluster-id")
.build();
// 获取RabbitMQ的Channel
Channel channel = connection.createChannel();
// 发布消息到指定的交换机和路由键,并带上属性
String exchange = "my-exchange";
String routingKey = "my.routing.key";
String messageBody = "Hello, RabbitMQ!";
channel.basicPublish(exchange, routingKey, properties, messageBody.getBytes(StandardCharsets.UTF_8));
// ... 其他代码 ...
除此之外, 官方还提供了第二种方法, 你可以不用build一个BasicProperties,而是直接使用封装好的AMQP.BasicProperties实例对象MessageProperties来直接传入:
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
下面是MessageProperties的原码:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.rabbitmq.client;
import java.util.Date;
import java.util.Map;
public class MessageProperties {
public static final AMQP.BasicProperties MINIMAL_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final AMQP.BasicProperties BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final AMQP.BasicProperties PERSISTENT_BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final AMQP.BasicProperties TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
public MessageProperties() {
}
}
此处的消息持久化为最后一个PERSISTENT_TEXT_PLAIN, 使用的构造方法为:
public BasicProperties(String contentType, String contentEncoding, Map<String, Object> headers, Integer deliveryMode, Integer priority, String correlationId, String replyTo, String expiration, String messageId, Date timestamp, String type, String userId, String appId, String clusterId) {
this.contentType = contentType;
this.contentEncoding = contentEncoding;
this.headers = headers == null ? null : Collections.unmodifiableMap(new HashMap(headers));
this.deliveryMode = deliveryMode;
this.priority = priority;
this.correlationId = correlationId;
this.replyTo = replyTo;
this.expiration = expiration;
this.messageId = messageId;
this.timestamp = timestamp;
this.type = type;
this.userId = userId;
this.appId = appId;
this.clusterId = clusterId;
}
发布订阅
这个部分我们将做一些完全不一样的事情 -- 我们将会把一个消息发送给多个消费者, 这个模式就被称为发布订阅模式.
为了用图解寿命这个模式, 我们将会建立一个简单的日志系统, 他将会包含两个项目, 第一个会发送日志消息, 第二个会接受然后打印这些消息.
在这个日志系统中, 每一个接受者的副本都会收到消息, 因此我们可以启动一个接受者, 也可以称为消费者, and将这些log消息导向硬盘, 与此同时, 我们将会跑起另外一个消费者并且看到这些日志打印到屏幕上.
交换机
其实一个消息并不是直接传递给队列的, 而是指定交换机, 然后由交换机传递给对应的队列.
我们之前所构造的例子中, 包含这三个部分:
- 一个生产者来生产消息, 然后发给队列
- 一个队列, 这个队列来转发消息给消费者
- 一个消费者, 消费者接受并处理来自队列的消息
RabbitMQ的核心消息模式, 是生产者永远都不会直接给队列发送任何消息, 事实上大多数情况下, 生产者会并不知道它生产的消息将会被发送到哪个队列.
相反, 生产者仅仅只能发送消息给交换机, 一个交换机是一个很简单的实现, 一方面它接受来自生产者的消息, 另外一方面,它将这些消息转发给队列. 交换机必定确切的知道它收到消息之后, 这个消息将会被发送到哪个队列. 比如说它是否会被添加到一个指定的队列, 或者是是其他的队列. 亦或是将其丢弃. 不管是哪种, 这些规则都是由交换机的类型决定
首先创建一个交换机:
然后给这个交换机绑定一个队列, 如下:
可以看到这个test交换机绑定了一个test队列, 绑定之后指定routingKey, 后期producer发送消息的时候可以通过exchangeName来指定交换机, 然后通过routingKey来指定要传入哪个队列.
那我可以将两个交换机绑定的队列, 并且将其指定的routingKey的值设置为一样的吗?
一个交换机确实可以绑定两个队列,并且这两个绑定队列的routingKey可以设置为一样。但是,这样做的话,当消息使用这个特定的routingKey发送到交换机时,交换机会将消息路由到这两个队列中,实现消息的广播效果。
在实际应用中,是否使用相同的routingKey取决于你的业务需求。如果你希望消息被发送到多个队列进行处理,那么可以设置相同的routingKey。但如果你希望根据不同的routingKey将消息路由到不同的队列,以实现更细粒度的控制,那么就应该为每个队列设置不同的routingKey。
此外,需要注意的是,routingKey的匹配规则还受到交换机类型的影响。例如,在Direct Exchange中,routingKey必须与队列的绑定键完全匹配;而在Topic Exchange中,routingKey可以与绑定键进行模式匹配。因此,在设置routingKey时,还需要考虑你使用的交换机类型。
交换机的类型:
- Direct Exchange(直连交换机):
- 特点:消息会传送给绑定键(BindingKey)与消息的路由键(RoutingKey)完全匹配的那个队列。
- 工作原理:在发送消息时,需要指定一个RoutingKey。当消息到达交换机时,交换机会查找与这个RoutingKey完全匹配的BindingKey,并将消息转发给对应的队列。如果找不到匹配的队列,消息则会被丢弃。
- 应用场景:适用于需要精确匹配RoutingKey的场景,如简单的请求-响应模型或者路由到特定服务或处理流程的队列。
在这个设置中,我们可以看到direct exchange X绑定了两个队列。第一个队列用绑定密钥橙色绑定,第二个队列有两个绑定,一个绑定密钥黑色,另一个绑定密钥绿色。
在这样的设置中,发布到交换机的带有路由关键字橙色的消息将被路由到队列Q1。路由关键字为黑色或绿色的消息将发送到Q2。所有其他消息都将被丢弃。
多次绑定:
使用同一个RoutingKey绑定多个队列是完全合法的, 例如上图, 我们可以给c1和c2这两个队列绑定同一个direct类型的交换机, 并且使用同一个RoutingKey : black. 上图这个案例中, 这个direct交换机的作用就类似于一个fanout交换机.
- Topic Exchange(主题交换机):
- 特点:与Direct类型的交换器类似,也是将消息路由到RoutingKey与BindingKey匹配的队列中,但它支持模糊匹配。
- 工作原理:BindingKey可以包含通配符(如
.
和*
),使得RoutingKey可以与多个BindingKey匹配。这样,一个消息可以被路由到多个队列中。 - 应用场景:适用于需要将消息发送到一组相关的队列的场景,如基于主题或模式的消息发布和订阅。
- 使用: topic类型的交换机, 其RoutingKey不能乱写, 必须满足一定的要求, 它必须要求是一个单词列表, 单词之间谁用 点号分开. 例如 stock.usr.notice
- routingKey的匹配规则: * 可以代表一个单词, 使用#代表多个单词, 例如 *.test可以匹配 a.test和b.test, test.#可以匹配test.a.b和test.a.c
- Headers Exchange(头交换机):
- 特点:不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
- 工作原理:在绑定队列和交换器时,会制定一组键值对。当发送消息到交换器时,RabbitMQ会获取到该消息的headers,并对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果匹配,则消息会被路由到该队列中。
- 应用场景:适用于需要根据消息内容中的特定属性进行路由的场景,提供了更灵活的消息路由机制。
- Fanout Exchange(扇型交换机):
- 特点:发布/订阅的广播模式,它会将发送到该交换机的消息发送到所有与该交换机绑定的队列中。
- 工作原理:当一个消息发送到扇形交换机时,交换机会将消息分别发送给所有绑定到该交换机上的队列,无论它们的RoutingKey或BindingKey是什么。
- 应用场景:适用于需要将消息广播到多个队列的场景,如通知系统或需要多个服务或组件同时处理同一消息的情况。
创建交换机, 可以通过RabbitMQ提供的web插件来生成 :
可以通过java client来生成:
channel.exchangeDeclare("logs", "fanout");
其声明如下:
现在我们就可以通过这个交换机来推送消息:
channel.basicPublish( "logs", "", null, message.getBytes());
但是有人可能会想起来, 这和我们之前写的不一样, 我们之前没有指定这个交换机name啊, 或者是指定了一个空字符串, 如下:
channel.basicPublish("", "hello", null, message.getBytes());
为什么它还是能够指定到hello这个队列??
那是因为:
案例
实现一个fanout交换机, 实现一个生产者, 两个队列, 两个队列bind到这个fanout交换机, 创建两个消费者, 分别接受两个队列的消息.
实现一个生产者, 可以不断地输入数据 :
package fanoutExchangeTest;
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// declaring an exchange named logs and its type is fanout
channel.exchangeDeclare("logs","fanout");
// bind queue
channel.queueBind("queue1","logs", "logsToQueue1");
channel.queueBind("queue2","logs", "logsToQueue2");
// declaring two queues the one named queue1 and the other one named queue2
// channel.queueDeclare("queue1",true,true,false,null);
// channel.queueDeclare("queue2",true,true,false,null);
// manage message
ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
// publish and confirm
channel.confirmSelect();
// callback : success
ConfirmCallback success = (sequenceNumber,multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(sequenceNumber,true);
longStringConcurrentNavigableMap.clear();
} else {
map.remove(sequenceNumber);
}
};
// callback : fail
ConfirmCallback fail = (sequenceNumber,multiple) -> {
String body = map.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
success.handle(sequenceNumber,multiple);
};
// add non - sycn listener
channel.addConfirmListener(success,fail);
// publis code
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish("logs", "testFanout",null, msg.getBytes());
channel.waitForConfirmsOrDie(3000L);
}
}
}
创建两个消费者:
package fanoutExchangeTest;
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare("queue1",false,false,false,null);
channel.basicConsume("queue1", false, (s, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer1] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, s -> {
System.out.println("nothing");
});
}
}
package fanoutExchangeTest;
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare("queue2",false,false,false,null);
channel.basicConsume("queue2", false, (s, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer2] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, s -> {
System.out.println("nothing");
});
}
}
首先启动两个消费者, 然后启动生产者, 随后输入数据, 输出:
临时队列
有时候我们需要一些流动性, 变化性很强的数据, 就可以创建临时队列, 他有如下特性:
- 匿名性:临时队列通常没有明确的名称,而是由RabbitMQ服务器在创建时自动分配一个唯一的名称。这使得它们非常适合于一次性使用或短暂存在的场景。
- 自动删除:当最后一个消费者断开连接时,临时队列会自动被删除。这种特性使得队列的管理变得简单,因为您不需要手动跟踪和删除不再使用的队列。
- 非持久化:临时队列通常也是非持久化的,这意味着它们不会存储在磁盘上,因此当RabbitMQ服务器重启时,这些队列及其内容会丢失。
- 使用场景:临时队列在RPC(远程过程调用)场景中特别有用,其中客户端发送一个请求并等待一个响应。在这种情况下,客户端可以创建一个临时队列来接收响应,一旦响应被接收,队列就可以被自动删除。
- 创建方式:在代码中,您可以使用RabbitMQ的客户端库来创建临时队列。例如,在RabbitMQ的Java客户端中,您可以通过不指定队列名称,并设置某些参数来创建一个临时队列。当您声明一个队列但不提供名称时,RabbitMQ会自动为您生成一个唯一的名称。
- 注意事项:虽然临时队列提供了便利性和简化管理的好处,但您也应该意识到它们的局限性。由于它们是非持久化的,并且会在最后一个消费者断开连接时自动删除,因此不适合用于需要长期保存数据或需要在多个会话之间共享数据的场景。
下面是如何进行获取一个临时队列:
String queueName = channel.queueDeclare().getQueue();
死信队列
死信队列(Dead-Letter Queue,DLQ)是一种特殊的队列,用于存放无法被正常处理的消息。这些消息可能由于各种原因,如消息被拒绝、消息过期、队列达到最大长度、消息格式错误或处理过程中抛出异常等,无法被消费者正常消费。通过将这些无法处理的消息放入死信队列,可以防止它们阻塞正常的消息处理流程,同时也方便进行后续的问题排查和处理。
死信队列在消息中间件中是一个重要的概念,它增强了消息的可靠性,有效避免了因消息处理失败而引起的数据丢失和系统异常。此外,死信队列中的消息可以进行特殊处理,如记录日志、统计失败次数、发送告警通知等,有助于监控系统的健康状况,并对处理失败的消息进行进一步的分析和处理。
值得注意的是,死信队列通常需要手动维护,而不是自动清空,因为死信消息往往需要人工分析和处理。在实际应用中,可以通过查询、导出和重新发送进入死信队列的死信消息,按需管理死信消息,避免消息漏处理。
消费者C1代码:
package DeadQueue;
import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.sql.SQLOutput;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者1
*/
public class Consumer1 {
// 有两个交换机
public static final String normal_exchange = "normal_exchange";
// 死信交换机
public static final String dead_exchange = "dead_exchange";
// 普通队列
public static final String normal_queue = "normal_queue";
// 死信队列
public static final String dead_queue = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
// 声明两个交换机: 死信交换机和普通交换机
channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT);
Map<String, Object> map = new HashMap<>();
// 过期时间
map.put("x-message-ttl",10000);
// 正常队列设置死信交换机
map.put("x-dead-letter-exchange",dead_exchange);
// 设置死信消息的RoutingKey
map.put("x-dead-letter-routing-key", "lisi");
// 声明两个队列
channel.queueDeclare(normal_queue,false,false,false,map); // 声明将死信发送给死信交换机
channel.queueDeclare(dead_queue,false,false,false,null);
// 绑定交换机和队列
// 绑定普通队列和消费者1
channel.queueBind(normal_queue,normal_exchange,"zhangsan");
channel.queueBind(dead_queue,dead_exchange,"lisi");
DeliverCallback deliverCallback = (tag,msg) -> {
System.out.println("consumer1接收到消息: " + new String(msg.getBody(),"UTF-8"));
};
channel.basicConsume(normal_queue,true, deliverCallback, tag-> {});
}
}
消费者C2代码:
package DeadQueue;
import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者1
*/
public class Consumer2 {
// 死信队列
public static final String dead_queue = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
DeliverCallback deliverCallback = (tag,msg) -> {
System.out.println("consumer2接收到消息: " + new String(msg.getBody(),"UTF-8"));
};
channel.basicConsume(dead_queue,true, deliverCallback, tag-> {});
}
}
生产者代码:
package DeadQueue;
import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
public class Producer {
// 定义一个普通交换机即可
public static final String normal_exchange = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
// 死信消息
for (int i = 0; i < 10; i++) {
String msg = "info" + i;
channel.basicPublish(normal_exchange,"zhangsan", null/* 这里消息的过期时间已经在队列声明的时候设置*/, msg.getBytes());
}
}
}
首先启动消费者C1, 让其创建相关队列和交换机, 随后关闭消费者C1模拟其崩溃, 然后开启生产者, 发现normal队列里面产生了10条无法被消费消息;
随后开启消费者C2, 来消费死信队列的消息:
当然, 一个消息被放入死信队列当然不止 设置过期时间这一种, 还可以设置队列最大长度, 当普通队列的长度到达最大值的时候, 这个时候额外的消息会被放入死信队列
Map<String, Object> props = new HashMap<>();
// 过期时间
// props.put("x-message-ttl",10000);
// 设置最大长度为6
props.put("x-max-length",6);
当然你也可以主动拒绝消息, 而不是被动的触发转发给死信队列.
如何设置主动拒绝?
// 其他代码
DeliverCallback deliverCallback = (tag,msg) -> {
String getMsg = new String(msg.getBody(), StandardCharsets.UTF_8);
if (getMsg.equals("info")) {
System.out.println("消息:" + getMsg + " 被拒绝");
// 拒绝策略需要开启手动应答
// 第二个参数设置为false表示 不会重新将此消息返回原来的队列.
channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);
} else {
System.out.println("consumer1接收到消息: " + getMsg);
}
};
channel.basicConsume(normal_queue,false, deliverCallback, tag-> {});