消息中间件
- 消息:应用间传送的数据
- 消息中间件:利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成
- 消息中间件的传递模式:P2P、Pub/Sub
- 适用:需要可靠的数据传送的分布式环境
- 优点:能够在客户端和服务器之间提供同步或异步的连接,并且在任何时刻都可以将消息进行传送或存储转发
- 作用:解耦、存储、扩展、削峰、可恢复、顺序保证、缓冲、异步通信
RabbitMQ入门
1.基础示例
<!-- RabbitMQ依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
/**
* 生产者代码演示
*/
public class Producer {
public final String IP_ADDRESS = "127.0.0.1"; // 地址
public final int PORT = 5672; // 端口,默认5672
public final String USERNAME = "guest"; // 默认用户名
public final String PASSWORD = "guest"; // 默认密码
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
var factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 2.创建连接
var connection = factory.newConnection();
// 3.创建信道
var channel = connection.createChannel();
// 4.声明(创建)交换器
channel.exchangeDeclare("exchange_demo", "direct", true, false, null);
// 5.声明(创建)队列
channel.queueDeclare("queue_demo", true, false, false, null);
// 6.绑定交换器与队列
channel.queueBind("queue_demo", "exchange_demo", "routing_key");
// 7.发送消息
var msg = "Hello World!"
channel.basicPublish("exchange_demo", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 8.关闭资源
channel.close();
connection.close();
}
}
/**
* 消费者代码演示
*/
public class Consumer {
public final String IP_ADDRESS = "127.0.0.1"; // 地址
public final int PORT = 5672; // 端口,默认5672
public final String USERNAME = "guest"; // 默认用户名
public final String PASSWORD = "guest"; // 默认密码
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂
var factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 2.创建连接
var connection = factory.newConnection();
// 3.创建信道
var channel = connection.createChannel();
// 4.消费消息
channel.basicConsume("queue_demo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息
System.out.println("receive message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// 5.等待消息消费完成
TimeUnit.SECONDS.sleep(5);
// 6.关闭资源
channel.close();
connection.close();
}
}
2.相关概念介绍
RabbitMQ是一个生产者消费者模型,主要负责接收、存储和转发消息。
RabbitMQ的模型结构
- Producer:生产者,投递消息的一方
- Consumer:消费者,接收消息的一方
- Broker:消息中间件的服务结点
- Queue:队列,用于存储消息
- Exchange:交换器,用于将消息路由给队列
- RoutingKey:路由键,交换器将消息路由给队列的规则
- BindingKey:绑定键,将交换器与队列绑定
3.交换器的类型
- fanout:该类型会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
- direct:该类型会把消息路由到BindingKey和RoutingKey完全匹配的队列中
- topic:该类型会把消息路由到BindingKey和RoutingKey模糊匹配的队列中
- BindingKey需要是由"."分隔的字符串
- BindingKey可存在两种特殊字符:"*",表示匹配一个单词,"#",表示匹配多个单词(可以是0个)
- headers:根据消息中的headers属性匹配,性能很差
RabbitMQ Java客户端使用
1.声明(创建)交换器
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
- exchange:交换器的名称
- type:交换器类型,可以用枚举类或者字符串
- durable:是否持久化,即存储到磁盘,服务重启时不会丢失
- autoDelete:是否自动删除
- internal:是否是内置的,客户端不能发送消息到内置交换器,只能通过其他交换器路由到内置交换器
- arguments:其他参数
交换器的一些其他方法:
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
exchangeDeclarePassive:检测交换器是否存在
- name:交换器的名称
exchangeDelete:删除交换器
- exchange:交换器的名称
- ifUnused:表示交换机没有被使用的情况下才会被删除
2.声明(创建)队列
Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
- queue:队列的名称
- durable:是否持久化
- exclusive:是否排他
- autoDelete:是否自动删除
- arguments:其他参数
队列的一些其他方法:
// 参考交换器的这些方法
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
// 清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;
- ifEmpty:表示队列为空的情况下才会被删除
3.交换器和队列的绑定、解绑
Queue.BindOk queueBind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
Queue.UnbindOk queueUnbind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
- queue:队列的名称
- exchange:交换器的名称
- routingKey:路由键
- arguments:其他参数
4.交换器和交换器的绑定、解绑
Exchange.BindOk exchangeBind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
void exchangeUnbindNoWait(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
- destination:可看作交换器和队列绑定时队列的角色
- source:可看作交换器和队列绑定时交换器的角色
- routingKey:路由键
- arguments:其他参数
5.发送消息
void basicPublish(String exchange,
String routingKey,
boolean mandatory,
boolean immediate,
BasicProperties props,
byte[] body) throws IOException;
- exchange:交换器
- routingKey:路由键
- mandatory:后续说明
- immediate:后续说明
- props:消息的基本属性集
- body:消息体
6.消费消息
消费模式:
- 推(Push)模式
- 拉(Pull)模式
推模式
// 订阅队列
String basicConsume(String queue,
boolean autoAck,
String consumerTag,
boolean noLocal,
boolean exclusive,
Map<String, Object> arguments,
Consumer callback) throws IOException;
// 取消订阅
void basicCancel(String consumerTag) throws IOException;
// 最多可接受的未被确认的消息个数
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
-
queue:队列的名称
-
autoAck:是否自动确认,建议设置为false,防止消息丢失
-
consumerTag:消费者标签,用来区分多个消费者
-
noLocal:表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
-
exclusive:是否排他
-
arguments:其他参数
-
callback:回调函数,用来处理推送过来的消息
-
prefetchSize:服务器将提供的最大内容量(以八位字节为单位),如果不受限制,则为0
-
prefetchCount:最大未被确认的消息个数
-
global:表示应用于整个Channel
Consumer接口(混个眼熟先):
public interface Consumer {
/**
* 其他方法之前调用
*/
void handleConsumeOk(String consumerTag);
/**
* 显示取消订阅时调用,即调用了basicCancel方法
*/
void handleCancelOk(String consumerTag);
/**
* 隐式取消订阅时调用,如订阅的队列已删除
*/
void handleCancel(String consumerTag) throws IOException;
/**
* Channel或者Connection关闭的时候调用
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
/**
* 处理RabbitMQ投递过来的消息
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException;
}
拉模式
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
区别
- 推模式在消费者订阅队列期间,RabbitMQ会不断地推送消息给消费者
- 拉模式只会从队列中获取单条消息
- 若要持续订阅,不能使用拉模式放在循环中,这样会严重影响性能
7.消息的确认、拒绝
// 消息确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;
// 批量拒绝消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
// 拒绝单条消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
// 重新发送未被确认的消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
- deliveryTag:可看作消息的编号
- multiple:表示多个,在ack中表示确认编号在deliveryTag前的所有消息,在nack中表示拒绝编号在deliveryTag前的所有消息
- requeue:表示拒绝该消息后消息是否重回队列
RabbitMQ进阶1
1.mandatory、immediate
void basicPublish(String exchange,
String routingKey,
boolean mandatory,
boolean immediate,
BasicProperties props,
byte[] body) throws IOException;
前面在发送消息的方法中提到mandatory和immediate,这两个参数都是用于将消息返回给生产者
mandatory
当该参数设置为true时,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列时,该消息会返回给生产者
当该参数设置为false时,消息直接丢弃
immediate
当该参数设置为true时,交换器根据自身的类型和路由键所匹配到的所有队列中,必须存在至少一个消费者,否则该消息会返回给生产者,不会存入队列等待消费者
RabbitMQ3.0开始去掉对该参数的支持,采用TTL和DLX替代(后续说明)
2.备份交换器
设置mandatory时,需要添加ReturnListener监听器处理返回的消息,如果不想复杂化编程逻辑,又不想丢失消息,可使用备份交换器
未被路由的消息都会存储在备份交换器的队列中,需要的时候再去处理这些消息
// 声明交换器时设置参数alternate-exchange实现备胎交换器
var args = new HashMap<String, Object>();
args.put("alternate-exchange", "备胎");
channel.exchangeDeclare("normal_exchange", "direct", true, false, args);
channel.exchangeDeclare("备胎", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueDeclare("备胎队列", true, false, false, null);
channel.queueBind("备胎队列", "备胎", "备胎路由键");
当normal_exchange中消息未被路由到normal_queue时,就会路由到备胎队列
3.过期时间(TTL)
设置消息的TTL
通过设置队列的属性,让队列中的所有消息有相同的过期时间
var args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000); //单位毫秒
channel.queueDeclare("queue_demo", true, false, false, args);
针对每条消息单独设置过期时间
var builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000"); //单位毫秒
var properties = builder.build();
channel.basicPublish("exchange_demo", "routing_key", properties, "test".getBytes());
var properties = new AMQP.BasicProperties();
properties.setExpiration("6000"); //单位毫秒
channel.basicPublish("exchange_demo", "routing_key", properties, "test".getBytes());
- 两种方式同时设置时,取较小值
- 不设置TTL,则消息不会过期
- 设置TTL=0,除非此时消息就能投递到消费者,否则该消息会被立即丢弃
设置队列的TTL
var args = new HashMap<String, Object>();
args.put("x-expires", 6000); //单位毫秒,不能设置为0
channel.queueDeclare("queue_demo", true, false, false, args);
RabbitMQ重启后,持久化的队列的过期时间会被重新计算
4.死信队列
DLX,Dead-Letter-Exchange,死信交换器,当消息在一个队列中变成死信之后,它能够被发送到另一个交换器,这个交换器就是DLX,与DLX绑定的队列就叫做死信队列
消息变成死信的情况:
- 消息被拒绝(basicReject/basicNack),并设置requeue=false
- 消息过期
- 队列达到最大长度
// 创建DLX
channel.exchangeDeclare("dlx_exchange", "direct", true, false, null);
var args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
channel.queueDeclare("queue_demo", true, false, false, args);
利用死信队列和设置TTL=0,可以替代immediate
5.延迟队列、优先级队列
- 延迟队列:当消息发送后,消费者不会立即拿到消息,而是等待特定时间后消费者才能拿到这个消息
- 优先级队列:队列通过x-max-priority设置,消息通过priority设置