0
点赞
收藏
分享

微信扫一扫

rocketmq 重复 redis

小典典Rikako 2024-10-30 阅读 13

使用 RocketMQ 实现消息去重与 Redis

概述

在分布式系统中,消息队列(MQ)是一个重要的组件,可以解耦生产者和消费者,提升系统的可扩展性和可靠性。RocketMQ 是一个分布式消息中间件,而 Redis 则是一个高性能的键值存储,常常用于缓存和数据存储。本篇文章将指导你如何结合 RocketMQ 和 Redis 实现消息的去重功能,以避免重复处理相同的消息。

整体流程

我们将整个流程分为以下几个主要步骤:

步骤 描述
1 生产者发送消息到 RocketMQ
2 消费者从 RocketMQ 中消费消息
3 消费者检查 Redis 中是否已处理过该消息
4 如果未处理过,消费者处理该消息并将消息 ID 存入 Redis
5 如果已处理过,消费者直接跳过处理

实现步骤

接下来,我们将详细介绍每个步骤的具体实现和相关代码。

步骤 1: 生产者发送消息到 RocketMQ

我们首先需要创建一个 RocketMQ 生产者并发送消息。代码如下:

// 引入相关包
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者,指定生产组
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 发送消息
        for (int i = 0; i < 10; i++) {
            // 创建消息对象,指定主题、标签和内容
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            // 发送消息
            producer.send(msg);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

代码说明

  • DefaultMQProducer:创建生产者实例。
  • setNamesrvAddr:设置 RocketMQ 的 NameServer 地址。
  • send:发送消息到指定的主题。

步骤 2: 消费者从 RocketMQ 中消费消息

创建一个消费者继续处理消息:

// 引入相关包
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");

        // 设置消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                // 处理消息
                System.out.println("Received: " + new String(msg.getBody()));
            }
            return null;
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

代码说明

  • DefaultMQPushConsumer:创建消费者实例。
  • subscribe:订阅消息主题。
  • registerMessageListener:注册消息监听器,以异步的方式处理消息。

步骤 3: 检查 Redis 中是否已处理过该消息

在处理消息之前,我们需要检查 Redis 中是否已有这个消息的记录。使用 Jedis 库与 Redis 交互,以下是相关代码:

import redis.clients.jedis.Jedis;

public class MessageProcessor {
    private static final String REDIS_KEY_PREFIX = "msg:";

    public boolean isMessageProcessed(String messageId) {
        try (Jedis jedis = new Jedis("localhost")) {
            // 检查消息 ID 是否在 Redis 中
            return jedis.exists(REDIS_KEY_PREFIX + messageId);
        }
    }
}

代码说明

  • Jedis:用于与 Redis 交互的客户端。
  • exists:检查 Redis 中是否存在指定的键。

步骤 4: 处理消息并将消息 ID 存入 Redis

这一步我们需要处理消息,并将处理过的消息 ID 存入 Redis,以防止重复处理:

public void processMessage(String messageId) {
    // 处理消息逻辑
    System.out.println("Processing message with ID: " + messageId);

    // 将消息 ID 写入 Redis,设置过期时间以避免内存溢出
    try (Jedis jedis = new Jedis("localhost")) {
        jedis.setex(REDIS_KEY_PREFIX + messageId, 3600, "processed");
    }
}

代码说明

  • setex:将消息 ID 存入 Redis,并设置过期时间(如3600秒),以避免内存溢出。

步骤 5: 跳过已处理的消息

在消费者中整合上述步骤,代码如下:

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    MessageProcessor processor = new MessageProcessor();
    for (MessageExt msg : msgs) {
        String messageId = msg.getMsgId();
        // 检查消息是否处理过
        if (!processor.isMessageProcessed(messageId)) {
            processor.processMessage(messageId);
        } else {
            System.out.println("Message already processed: " + messageId);
        }
    }
    return null;
});

代码整合说明

  • 整合消息消费与 Redis 检查逻辑,确保消息不被重复处理。

序列图

接下来我们绘制一个序列图,描述消息的处理流程:

sequenceDiagram
    participant P as Producer
    participant C as Consumer
    participant R as Redis

    P->>C: Send Message
    C->>R: Check if processed
    alt Not processed
        C->>C: Process Message
        C->>R: Save Message ID
    else Already processed
        C->>C: Skip Processing
    end

状态图

下面是消息处理状态的状态图:

stateDiagram
    [*] --> NotProcessed
    NotProcessed --> Processing
    Processing --> Processed
    Processing --> NotProcessed
    Processed --> [*]

结尾

本文详细介绍了如何使用 RocketMQ 和 Redis 实现消息去重的完整流程。通过借助 Redis 进行状态管理,可以有效避免消费者对同一消息的重复处理。这种模式在高并发的分布式系统中尤其重要。希望这篇文章能够帮助刚入行的开发者快速理解并实现这一功能。

如有任何问题,欢迎交流与讨论!

举报

相关推荐

0 条评论