使用 RocketMQ 实现消息去重与 Redis
概述
在分布式系统中,消息队列(MQ)是一个重要的组件,可以解耦生产者和消费者,提升系统的可扩展性和可靠性。RocketMQ 是一个分布式消息中间件,而 Redis 则是一个高性能的键值存储,常常用于缓存和数据存储。本篇文章将指导你如何结合 RocketMQ 和 Redis 实现消息的去重功能,以避免重复处理相同的消息。
整体流程
我们将整个流程分为以下几个主要步骤:
步骤 | 描述 |
---|---|
1 | 生产者发送消息到 RocketMQ |
2 | 消费者从 RocketMQ 中消费消息 |
3 | 消费者检查 Redis 中是否已处理过该消息 |
4 | 如果未处理过,消费者处理该消息并将消息 ID 存入 Redis |
5 | 如果已处理过,消费者直接跳过处理 |
实现步骤
接下来,我们将详细介绍每个步骤的具体实现和相关代码。
步骤 1: 生产者发送消息到 RocketMQ
我们首先需要创建一个 RocketMQ 生产者并发送消息。代码如下:
// 引入相关包
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者,指定生产组
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 发送消息
for (int i = 0; i < 10; i++) {
// 创建消息对象,指定主题、标签和内容
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
// 发送消息
producer.send(msg);
}
// 关闭生产者
producer.shutdown();
}
}
代码说明:
DefaultMQProducer
:创建生产者实例。setNamesrvAddr
:设置 RocketMQ 的 NameServer 地址。send
:发送消息到指定的主题。
步骤 2: 消费者从 RocketMQ 中消费消息
创建一个消费者继续处理消息:
// 引入相关包
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者,指定消费组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
// 设置消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
// 处理消息
System.out.println("Received: " + new String(msg.getBody()));
}
return null;
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
代码说明:
DefaultMQPushConsumer
:创建消费者实例。subscribe
:订阅消息主题。registerMessageListener
:注册消息监听器,以异步的方式处理消息。
步骤 3: 检查 Redis 中是否已处理过该消息
在处理消息之前,我们需要检查 Redis 中是否已有这个消息的记录。使用 Jedis 库与 Redis 交互,以下是相关代码:
import redis.clients.jedis.Jedis;
public class MessageProcessor {
private static final String REDIS_KEY_PREFIX = "msg:";
public boolean isMessageProcessed(String messageId) {
try (Jedis jedis = new Jedis("localhost")) {
// 检查消息 ID 是否在 Redis 中
return jedis.exists(REDIS_KEY_PREFIX + messageId);
}
}
}
代码说明:
Jedis
:用于与 Redis 交互的客户端。exists
:检查 Redis 中是否存在指定的键。
步骤 4: 处理消息并将消息 ID 存入 Redis
这一步我们需要处理消息,并将处理过的消息 ID 存入 Redis,以防止重复处理:
public void processMessage(String messageId) {
// 处理消息逻辑
System.out.println("Processing message with ID: " + messageId);
// 将消息 ID 写入 Redis,设置过期时间以避免内存溢出
try (Jedis jedis = new Jedis("localhost")) {
jedis.setex(REDIS_KEY_PREFIX + messageId, 3600, "processed");
}
}
代码说明:
setex
:将消息 ID 存入 Redis,并设置过期时间(如3600秒),以避免内存溢出。
步骤 5: 跳过已处理的消息
在消费者中整合上述步骤,代码如下:
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageProcessor processor = new MessageProcessor();
for (MessageExt msg : msgs) {
String messageId = msg.getMsgId();
// 检查消息是否处理过
if (!processor.isMessageProcessed(messageId)) {
processor.processMessage(messageId);
} else {
System.out.println("Message already processed: " + messageId);
}
}
return null;
});
代码整合说明:
- 整合消息消费与 Redis 检查逻辑,确保消息不被重复处理。
序列图
接下来我们绘制一个序列图,描述消息的处理流程:
sequenceDiagram
participant P as Producer
participant C as Consumer
participant R as Redis
P->>C: Send Message
C->>R: Check if processed
alt Not processed
C->>C: Process Message
C->>R: Save Message ID
else Already processed
C->>C: Skip Processing
end
状态图
下面是消息处理状态的状态图:
stateDiagram
[*] --> NotProcessed
NotProcessed --> Processing
Processing --> Processed
Processing --> NotProcessed
Processed --> [*]
结尾
本文详细介绍了如何使用 RocketMQ 和 Redis 实现消息去重的完整流程。通过借助 Redis 进行状态管理,可以有效避免消费者对同一消息的重复处理。这种模式在高并发的分布式系统中尤其重要。希望这篇文章能够帮助刚入行的开发者快速理解并实现这一功能。
如有任何问题,欢迎交流与讨论!