消息追踪
设置消息追踪需要修改 broker 启动的配置文件,添加一行配置:traceTopicEnable=true 即可,操作如下:
# 进入到 rocketmq 的安装目录中
# 先复制一份配置文件
cp broker.conf custom.conf
# 在自定义配置文件中添加一行配置
vi custom.conf
## 添加配置
traceTopicEnable=true
# 杀死原来的 broker 进程,再重新启动即可
# 先查看原来 broker 进程 id
jps 
# 杀死 broker
kill -9 [进程id]
# 重新启动 broker,并指定配置文件
nohup sh bin/mqbroker -c conf/custom.conf ‐n localhost:9876 & autoCreateTopicEnable=true在发送消息的时候,指定消息的 keys 就可以在 DashBoard 中观看到消息的追踪记录了
public class GlobalProducer {
    public static void main(String[] args) throws Exception {
        // true 即设置允许消息追踪
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 12; i++) {
            Message msg = new Message(
                    "Global-Orderly-Topic",
                    "Global_Orderly_Tag",
                    ("( " + i + " )message from GlobalProducer").getBytes());
            // 设置消息的 keys
            msg.setKeys("Global_Orderly_Tag");
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }
}
之后就可以在 DashBoard 中查看消息的追踪记录了:

点击进去,查看消息追踪详细信息如下:

延时消息实战
上边的案例使用了 SpringCloudStream 的 API 进行消息的收发,这里使用原生 API 进行消息收发实战,通过设置消息的延时时间,可以让消息等待指定时间之后再发送
5.x 之前,只能设置固定时间的延时消息
5.x 之后,可以自定义任意时间的延时消息
由于这里引入的 SpringCloudAlibaba 整合的 RocketMQ 是 4.9.4 版本的,因此只能设置固定时间的延时消息
延时时间有以下几种,通过 Leven 进行定位,如果 delayTimeLevel = 2,就是第二个延时时间 5s
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h消费者代码如下:
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 3、订阅主题
        consumer.subscribe("custom-delay-topic", "*");
        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5、把消费者直接启动起来
        consumer.start();
    }
}生产者代码如下:
public class Producer {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 3、把我们的生产者直接启动起来
        producer.start();
        // 4、创建消息、并发送消息
        for (int i = 0; i < 3; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-delay-topic",
                    "delayTag",
                    "CUSTOM_DELAY",
                    ("("+i+")Hello Message From Delay Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            // 设置定时的逻辑
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            message.setDelayTimeLevel(2);
            // 利用生产者对象,将消息直接发送出去
            producer.send(message);
        }
        System.out.println("Send Finished.");
    }
}









