目录
一、准备工作
(1)打开RabbitMQ的服务
systemctl start rabbitmq-server
(2)Maven项目导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
(3)定义工具类
package com.yixin.util;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import com.rabbitmq.client.Connection;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
public static Connection getConnection(String connectName,String host,int port,String vHost,String name,String password){
//1、定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、设置服务器地址
connectionFactory.setHost(host);
//3、设置端口
connectionFactory.setPort(port);
//4、设置虚拟主机、用户名、密码
connectionFactory.setVirtualHost(vHost);
connectionFactory.setUsername(name);
connectionFactory.setPassword(password);
//5、通过连接工厂获取连接
Connection connection = null;
try {
connection = connectionFactory.newConnection(connectName);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
二、 Channel的方法讲解
2.1 queueDeclare
🌵 源码:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
throws IOException;
🌵 参数讲解:
🚀 queue: 队列名字
🚀 durable:是否持久化。
关于持久化:
队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失队列(即便队列中有数据),如果想重启之后还存在就要使队列持久化 ( 设置durable为true ),保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。
🔥 知识点补充:
channel.basicPublish("交换机名称", "路由类型", MessageProperties.PERSISTENT_TEXT_PLAIN, "消息主体".getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN 就是设置消息持久化
🚀 autoDelete:当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除.
🚀 exclusive:是否排外。
两个作用:
🚀 arguments:其他参数
2.2 basicPublish
🌵源码:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
🌵参数讲解:
🚀exchange:要将消息发送到的Exchange(交换器)。
🚀routingKey:路由Key。
🚀props:其它的一些属性。
🚀body:消息内容,字节数组类型。
2.3 basicConsume
🌵源码:
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
🌵参数:
🚀queue:队列名
🚀autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答 。
🚀deliverCallback: 当一个消息发送过来后的回调接口 。
🚀cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法 。
2.4 basicQos
🌵源码:
void basicQos(int prefetchCount) throws IOException;
🌵作用:
🌵参数讲解:
🚀prefetchCount:表示同一时刻服务器只会发送一条消息给消费者 如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积 。
三、简单队列
🌵 解读
🌴 生产者
package com.yixin.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;
public class Producer {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4: 准备发送消息的内容
String message = "你好,一心同学";
// 5: 发送消息给中间件rabbitmq-server
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
此时我们点击运行生产者,那么在我们的队列中就会存在一条消息:
🌴 消费者
package com.yixin.simple;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4:接收消息
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));
}
},new CancelCallback(){
public void handle(String consumerTag) throws IOException{
System.out.println("接收失败");
}
});
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.out.println("开始接收消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🔥 分析
现在点击运行消费者,查看控制台:
可以看到,我们的消费者已经从队列中取出这条消息了,我们再去管理页面查看:
发现这条消息已经被消费了。
🚀 应用场景
四、Work模式
🌵 解读
4.1 轮询模式
🌴 生产者
package com.yixin.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;
public class Producer {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4: 定义消息内容(发布多条消息)
for(int i=0;i<10;i++) {
String message = "你好,一心同学,rabbitmq:" + i;
// 5: 发送消息给中间件rabbitmq-server
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功:"+i);
//模拟发送消息延时,便于演示多个消费者竞争接受消息
Thread.sleep(i*10);
}
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🌵 消费者
消费者1:每接收一条消息后休眠10毫秒
package com.yixin.work;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));
//消费者1接收一条消息后休眠10毫秒
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
},new CancelCallback(){
public void handle(String consumerTag) throws IOException{
System.out.println("接收失败");
}
});
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.out.println("开始接收消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2:每接收一条消息后休眠1000毫秒
package com.yixin.work;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到的消息是:"+new String(delivery.getBody(),"UTF-8"));
//消费者2接收一条消息后休眠1000毫秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
},new CancelCallback(){
public void handle(String consumerTag) throws IOException{
System.out.println("接收失败");
}
});
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.out.println("开始接收消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🔥 分析
我们开始进行测试,首先生产者进行打印10(0-9)条数据:
接着我们看消费者1,结果为打印偶数条消息:
再查看消费者2,结果为打印奇数条消息 :
分析结果:
(1)消费者1和消费者2获取到的消息内容是不同的,也就是说同一个消息只能被一个消费者获取。
(2)消费者1和消费者2分别获取奇数条消息和偶数条消息,两种获取消息的条数是一样的。
4.2 公平分发
只需在我们的消费者中添加以下这行代码即可:
channel.basicQos(1);
我们再重新查看消费者1和消费者2的消息结果:
消费者1:
消费者2:
🚀 应用场景
五、发布/订阅模式
🌵 解读
在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。
🌴 生产者
package com.yixin.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.yixin.util.ConnectionUtil;
public class Producer {
//定义交换机
private final static String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 准备发送消息的内容
String message = "hello,world";
String routingKey = "";
//4:声明交换器,并且其类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 5: 发送消息给中间件rabbitmq-server
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🌴 消费者
消费者1:
package com.yixin.fanout;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "fanout-exchange";
//队列的名称
private final static String QUEUE_NAME = "fanout-queue-1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
// 4:声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//5:绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//6:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 7: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2:
package com.yixin.fanout;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer2 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "fanout-exchange";
//队列的名称
private final static String QUEUE_NAME = "fanout-queue-2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 申明队列queue存储消息
// 4:声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//5:绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//6:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 7: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🔥 分析
这个时候我们启动生产者,消费者1和消费者2,查看控制台:
🚀 应用场景
六、路由模式
🌵解读
🌴 生产者
package com.yixin.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;
public class Producer {
//定义交换机
private final static String EXCHANGE_NAME = "direct-exchange";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 准备发送消息的内容
String message = "hello,world";
String routingKey = "update";
//4:声明交换器,并且其类型为direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 5: 发送消息给中间件rabbitmq-server
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🌴 消费者
消费者1:
package com.yixin.routing;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "direct-exchange";
//队列的名称
private final static String QUEUE_NAME = "direct-queue-1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
//4:绑定队列到交换机,指定路由key为update、delete、add(可以多个也可以单个)
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add");
//5:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2:
package com.yixin.routing;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer2 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "direct-exchange";
//队列的名称
private final static String QUEUE_NAME = "direct-queue-2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
//4:绑定队列到交换机,指定路由key为update、delete、add(可以多个也可以单个)
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
//5:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🔥 分析
我们进行启动生产者,接着同时启动消费者1和消费2,查看控制台:
🚀 应用场景
七、主题模式
🌵 解读
🌴 生产者
package com.yixin.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yixin.util.ConnectionUtil;
public class Producer {
//定义交换机
private final static String EXCHANGE_NAME = "topic-exchange";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("生产者","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 准备发送消息的内容
String message = "hello,world";
//定义路由key
String routingKey = "update.Name";
//4:声明交换器,并且其类型为direct
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 5: 发送消息给中间件rabbitmq-server
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🌴 消费者
消费者1:
package com.yixin.topic;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "topic-exchange";
//队列的名称
private final static String QUEUE_NAME = "topic-queue-1";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者1","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4:绑定队列到交换机,指定路由key为update.#
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");
//5:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2:
package com.yixin.topic;
import com.rabbitmq.client.*;
import com.yixin.util.ConnectionUtil;
import java.io.IOException;
public class Consumer2 {
//需要绑定的交换机
private final static String EXCHANGE_NAME = "topic-exchange";
//队列的名称
private final static String QUEUE_NAME = "topic-queue-2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1: 从连接工厂中获取连接
connection = connection = ConnectionUtil.getConnection("消费者2","服务器地址",5672,"/","yixin","123456");
// 2: 从连接中获取通道channel
channel = connection.createChannel();
// 3: 声明队列,如果queue已经被创建过一次了,则可以不需要定义
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4:绑定队列到交换机,指定路由key为select.#
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select.#");
//5:同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
//参数2:手动返回完成状态
finalChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(QUEUE_NAME + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
//返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(QUEUE_NAME + ":开始接受消息");
//使我们的程序一直处于运行状态,也就是一直处于监听的状态,这样就不需要使用while(true)了。
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
🔥 分析
我们把生产者和消费者启动后,到控制台进行查看结果:
八、交换器分析
我们的交换器分为四种,分别是:direct、fanout、topic和 headers。
小结
以上就是【一心同学】认真整理的关于【RabbitMQ的五种队列】,对每一个知识的讲解也可以说是保姆级别的了,大家务必要掌握这【五种队列】,每一种队列都有不同的【应用场景】,我们要根据实际开发需要去选择我们需要的【模式】。