RabbitMQ模式
RabbitMQ提供了六种模式,这六种模式就是RabbitMQ将消息发送到消费者的六种方式,方式的不同,RabbitMQ会将消息发送到不同的消费者程序。
Simple(简单模式)
一个队列,一个消费者。一个队列中的消息只能被一个消费者消费。
生产者代码
package rabbitmq.ced.pattern.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 六种模式
* Simple 简单模式
* 创建生产者
*
* @author 崔二旦
* @since now
*/
public class SimplePatternProducer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3. 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 3. 在连接中创建信道
channel = connection.createChannel();
// 声明队列,如果队列不存在会创建队列
channel.queueDeclare("simple_pattern", false, false, true, null);
// 准备要被发送的消息内容
String message = "崔二旦";
//推送消息
channel.basicPublish("", "simple_pattern", null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 释放关闭连接信道与连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者代码
package rabbitmq.ced.pattern.simple;
import com.rabbitmq.client.*;
/**
* 六种模式
* Simple 简单模式
* 创建消费者
*
* @author 崔二旦
* @since now
*/
public class SimplePatternConsumer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3. 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 3. 在连接中创建信道,RabbitMQ中的所有操作都是在信道中完成的
channel = connection.createChannel();
System.out.println("等待接收消息......");
/*
* RabbitMQ推送给消费者消息回调接口,在该接口中用于编写如何对消息进行处理。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
* @param2 推送过来的消息的信息。其中包括真正的数据body(消息体),
* Properties(消息的属性信息)和其它信息。
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到队列发送来的消息:" + message);
};
/*
* rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口。
* 当我们在RabbitMQ管理界面手动删除该队列时,就会调用该接口。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断"+consumerTag);
};
//自动应答 ,稍后介绍
boolean autoAck = true;
// 向队列注册自己
String consumerTag = channel.basicConsume("simple_pattern",
autoAck, deliverCallback, cancelCallback);
System.out.println("注册到RabbitMQ中后,RabbitMQ给的唯一标识是:"
+ consumerTag);
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息接收异常");
} finally {
// 释放关闭连接信道与连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
Work(工作模式)
一个队列,多个消费者。一个队列将队列中的消息发送给多个消费者,每个消费者接收到的消息是不重复的。
工作模式的消息分发还分两种方式,公平分发,不公平分发。
-
公平分发
公平分发就是轮询分发的概念,就是你一条,我一条,你一条,我一条,消费者收到的消息数量基本上一样的,公平分发是不管你处理速度快慢的。
生产者代码
package rabbitmq.ced.pattern.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
/**
* 六种模式
* Work 工作模式 公平分发
* 创建生产者
*
* @author 崔二旦
* @since now
*/
public class WorkPatternFairProducer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3. 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 3. 在连接中创建信道
channel = connection.createChannel();
// 声明队列,如果队列不存在会创建队列
channel.queueDeclare("work_pattern_fair", false,
false, true, null);
// 通过控制台输入信息
Scanner scanner = new Scanner(System.in);
System.out.println("等待发送消息。。。");
while (scanner.hasNext()) {
String message = scanner.next();
// 记录即将被发送的消息信息
channel.basicPublish("", "work_pattern_fair",
null, message.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 释放关闭连接信道与连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者1代码
package rabbitmq.ced.pattern.work.fair;
import com.rabbitmq.client.*;
/**
* 六种模式
* Work 工作模式 公平分发
* 创建消费者1
*
* @author 崔二旦
* @since now
*/
public class WorkPatternFairConsumer1 {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection ;
Channel channel ;
try {
// 3. 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 3. 在连接中创建信道
channel = connection.createChannel();
System.out.println("消费者1 等待接收消息......");
/*
* RabbitMQ推送给消费者消息回调接口,在该接口中用于编写如何对消息进行处理。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
* @param2 推送过来的消息的信息。其中包括真正的数据body(消息体),
* Properties(消息的属性信息)和其它信息。
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到队列发送来的消息:" + message);
};
/*
* rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口。
* 当我们在RabbitMQ管理界面手动删除该队列时,就会调用该接口。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断" + consumerTag);
};
//自动应答 ,稍后介绍
boolean autoAck = true;
// 向队列注册自己
channel.basicConsume("work_pattern_fair",
autoAck, deliverCallback, cancelCallback);
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息接收异常");
}
}
}
消费者2代码
package rabbitmq.ced.pattern.work.fair;
import com.rabbitmq.client.*;
/**
* 六种模式
* Work 工作模式 公平分发
* 创建消费者2
*
* @author 崔二旦
* @since now
*/
public class WorkPatternFairConsumer2 {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2. 设置连接属性
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection;
Channel channel;
try {
// 3. 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 3. 在连接中创建信道
channel = connection.createChannel();
System.out.println("消费者2 等待接收消息......");
/*
* RabbitMQ推送给消费者消息回调接口,在该接口中用于编写如何对消息进行处理。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
* @param2 推送过来的消息的信息。其中包括真正的数据body(消息体),
* Properties(消息的属性信息)和其它信息。
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到队列发送来的消息:" + message);
};
/*
* rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口。
* 当我们在RabbitMQ管理界面手动删除该队列时,就会调用该接口。
* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断" + consumerTag);
};
//自动应答 ,稍后介绍
boolean autoAck = true;
// 向队列注册自己
channel.basicConsume("work_pattern_fair",
autoAck, deliverCallback, cancelCallback);
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息接收异常");
}
}
}
发送效果展示
总结
根据控制台打印的效果就是,凡是注册到这个队列上的消费者都会收到消息,你一条,我一条,他一条,你一条,我一条,他一条,这种效果,RabbitMQ不会管你处理速度如何,就是按照这种效果发送消息。所以当某个消费者的处理很慢时,就会出现消息积压的情况。
-
不公平分发
不公平分发,是按照消费者的能力强弱来发送的,处理速度快的就多处理,处理速度慢的就少处理。这样的处理更加合理,因为不会出现一个堆积消息很忙,而另一个则是在闲着。
生产者代码
消费者1代码
消费者2代码
总结
Publish/Subscribe(订阅模式)
订阅模式队列将消息分发给多个消费者,每个消费者都对应有自己的队列,当消息发送到交换机,交换机会将消息发送给所有的队列中,RabbitMQ将队列中的消息分发给指定的消费者。
生产者代码
消费者代码
总结
Routing(路由模式)
路由模式是发布订阅模式的升级,每个消费者也是有自己指定的队列,只是在交换机将消息发送到队列中时需要进行规则匹配,只有完全匹配之后,RabbitMQ才会将消息分发给队列对应的消费者。
交换机类型设置为Direct类型
生产者代码
消费者代码
总结
Topic(主题模式)
主题模式是Routing(路由模式)的一个升级,在路由规则中加入了模糊匹配规则,根据模糊匹配的结果将消息分发到相应的消费者。模糊匹配的规则是符号“#”匹配一个或多个词,符号“*”只能匹配一个词。
交换机类型设置为Topic类型
生产者代码
消费者代码
总结
RPC(模式)
RPC(远程过程调用)因基本上用不到,所以略。