MessageQueue --- RabbitMQ
RabbitMQ Intro
RabbitMQ 核心概念
消息的路由过程如下:
名词解释:
RabbitMQ 分发类型
Direct Exchange
Topic Exchange
Fanout exchange
Header exchange
配置RabbitMQ 示例代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class DirectExchangeExample {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "my_queue";
private static final String ROUTING_KEY = "info";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个 Direct Exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到 Direct Exchange,并指定绑定键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 定义消息处理函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received message: " + message);
};
// 消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
System.out.println("Press any key to exit.");
System.in.read();
}
}
}
Dead letter (死信)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class DeadLetterExample {
private static final String EXCHANGE_NAME = "normal_exchange";
private static final String QUEUE_NAME = "normal_queue";
private static final String DLX_EXCHANGE_NAME = "dlx_exchange";
private static final String DLX_QUEUE_NAME = "dlx_queue";
private static final String DLX_ROUTING_KEY = "dlx_routing_key";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建普通交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 创建死信交换机和队列
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, DLX_ROUTING_KEY);
// 设置普通队列的死信参数
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
// 定义消息处理函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received message: " + message);
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 消费消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
System.out.println("Press any key to exit.");
System.in.read();
}
}
}
保证消息的可靠传递
要确保消息的可靠传递,可以采取以下几个步骤:
发布者确认机制Example
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReliableMessagingExample {
private static final String QUEUE_NAME = "my_queue";
private static final String EXCHANGE_NAME = "my_exchange";
private static final String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接和信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列和交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 启用发布者确认模式
channel.confirmSelect();
// 添加发布者确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Message confirmed, delivery tag: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Message not confirmed, delivery tag: " + deliveryTag);
// 可以在这里进行相应的处理,例如重新发送消息
}
});
// 发布消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
try {
// 等待发布者确认
channel.waitForConfirmsOrDie();
} catch (InterruptedException e) {
// 可以在这里进行相应的处理,例如重新发送消息
e.printStackTrace();
}
// 关闭信道和连接
channel.close();
connection.close();
}
}
事务保证消息可靠性Example
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReliableMessagingExample {
private static final String QUEUE_NAME = "my_queue";
private static final String EXCHANGE_NAME = "my_exchange";
private static final String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接和信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
try {
// 开启事务
channel.txSelect();
// 声明队列和交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发布消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 提交事务
channel.txCommit();
System.out.println("Transaction committed successfully.");
} catch (IOException e) {
// 发生异常,回滚事务
channel.txRollback();
System.out.println("Transaction rolled back due to an exception.");
e.printStackTrace();
} finally {
// 关闭信道和连接
channel.close();
connection.close();
}
}
}