0
点赞
收藏
分享

微信扫一扫

二、RocketMQ常用消息实战

书写经典 2022-03-26 阅读 83
java

准备

Rocket部署

下载源码并构建

下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip

> unzip rocketmq-all-4.9.3-source-release.zip
> cd rocketmq-all-4.9.3/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3

启动NameServer

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动Broker

> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log 
The broker[%s, 172.30.30.233:10911] boot success...

测试消息的发送和消费

> export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

停止服务

如果需要停止服务,执行如下命令

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

部署RocketMQ Dashbord

为了方便管理,我们还需要Dashbord
下载地址:https://gitee.com/nswish/rocketmq-dashboard/repository/archive/master.zip

修改配置文件application.properties,增加配置rocketmq.config.namesrvAddr地址

# 省略...
rocketmq.config.namesrvAddr=localhost:9876
# 省略...

构建并启动

# Maven spring-boot run
mvn spring-boot:run

# 或 Maven build and run
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

访问:http://localhost:8080。
至此,RocketMQ已经部署完成。

项目中导入依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId> 
  <version>4.9.3</version>
</dependency>

普通消息

消息发送

消息发送的步骤:

  1. 创建消息生产者 producer,并指定生产者组名
  2. 指定 Nameserver 地址
  3. 启动 producer
  4. 创建消息对象,指定 Topic、Tag 和消息体
  5. 发送消息
  6. 关闭生产者 producer

同步发送

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。

代码示例:

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

异步发送

异步消息通常用在对响应时间敏感的业务场景。

示例代码:

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //启用Broker故障延迟机制
        producer.setSendLatencyFaultEnable(true);

        for (int i = 0; i < 100; i++) {
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(10000);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

单向发送

这种方式主要用在不需要关心发送结果的场景,例如日志发送。

示例代码:

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer   对象。
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 20; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);

        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消息发送的权衡

发送方式发送TPS发送结果反馈可靠性适用场景
同步发送可靠适用广泛,如重要的消息通知,短信通知等。
异步发送可靠对响应时间敏感的应用场景
单向发送最快不 可靠可靠性要求不高的场景,如日志采集

消息消费

  1. 创建消费者 Consumer,指定消费者组名
  2. 指定 Nameserver 地址
  3. 订阅主题 Topic 和 Tag
  4. 设置回调函数,处理消息
  5. 启动消费者 consumer

集群消费

在这里插入图片描述
一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。实际上,每个 Consumer 是平均分摊 Message Queue 的。例如,一个 Topic 有3个 Queue,其中一个Consumer Group 有3个实例,那么每个实例只消费其中一个Queue。
这种模式下,消费进度(Consumer Offset)的存储会持久化到 Broker。

代码示例,启动同一分组下的两个消费者

public class BalanceComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.setMaxReconsumeTimes(1);
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

广播消费

在这里插入图片描述
消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。实际上,是一个消费组下的每个消费者实例都获取到了 topic 下面的每个 Message Queue 去拉取消费。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。

代码示例,启动统一分组下的两个消费者

public class BroadcastComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("TopicTest", "*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消息消费的权衡

集群模式

  1. 消费端集群化部署,每条消息只需要被处理一次。
  2. 由于消费进度在服务端维护,可靠性更高。
  3. 集群消费模式下,每一条消息都只会被分发到一台机器上处理。

广播模式

  1. 每条消息都需要被相同逻辑的多台机器处理。
  2. 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  3. 不支持顺序消息、不支持重置消费位点。
  4. 广播模式下, RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  5. 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  6. 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

顺序消息

在默认的情况下,消息发送会采取轮询方式把消息发送到不同的 queue;而消费消息的时候是从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。

但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
全局顺序消息:
全局顺序消息
分区顺序消息:
分区顺序消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。

public class ProducerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        // 订单列表
        List<Order> orderList = new ProducerInOrder().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < orderList.size(); i++) {
            // 加个时间前缀
            String body = dateStr + " Order:" + orderList.get(i);
            Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//订单id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /**
     * 订单
     */
     @Getter
     @Setter
     @ToString
    private static class Order {
        private long orderId;
        private String desc;
    }

    /**
     * 生成模拟订单数据  3个订单   每个订单4个状态
     */
    private List<Order> buildOrders() {
        List<Order> orderList = new ArrayList<Order>();

        Order orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

运行结果:

SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}

使用顺序消息时,首先要保证消息是有序进入 MQ 的,对 id 等关键字进行取模后,放入指定 messageQueue中,Consume 消费消息失败时, 不能返回 reconsume_later,这样会导致乱序,应该返回 suspend_current_queue_a_moment。

消费时,同一个 OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。

public class ConsumerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("PartOrder", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                } catch (Exception e) {
                    e.printStackTrace();
                    //这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

运行结果:

consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}

延时消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。

消息生产和消费有时间窗口要求;比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。

延迟消息是根据延迟队列的 level 来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟 10 秒;、“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java

生产者示例代码:

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
            // delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            message.setDelayTimeLevel(4);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

消费者示例代码:

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topics
        consumer.subscribe("ScheduledTopic", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

批量消息

批量发送消息能显著提高传递消息的性能。限制是,这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

生产者示例代码:

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            producer.shutdown();
            e.printStackTrace();
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者示例代码:

public class BatchComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("BatchTest", "*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

批量切分

如果消息的总长度可能大于 4MB 时,这时候需要把消息进行分割。

生产者示例代码:

public class SplitBatchProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        //10万元素的数组
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        //把大的消息分裂成若干个小的消息(1M左右)
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
            Thread.sleep(100);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
        System.out.printf("Consumer Started.%n");
    }

}

class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;//1M
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的开销20字节
            if (tmpSize > sizeLimit) {
                //单个消息超过了最大的限制(1M)
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

过滤消息

Tag 过滤

在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。

生产者示例代码:

public class TagFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者示例代码:

public class TagFilterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");

                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是一个消息只能有一个标签。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。

Sql 过滤

SQL 基本语法

RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较: 比如:>,>=,<,<=,BETWEEN,=;
字符比较: 比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号:AND,OR,NOT;
常量支持类型为: 数值,比如:123,3.1415; 字符,比如:‘abc’,必须用单引号包裹起来; NULL,特殊的常量;布尔值,TRUE 或 FALSE
生产者示例代码:

public class SqlFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置一些属性
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者示例代码:

public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");

                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

如果出现如下报错错误:说明 Sql92 功能没有开启
在这里插入图片描述
需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务。

事物消息

在这里插入图片描述
如图,事务消息分为两个流程:

  • 正常事务消息的发送和提交(1,2,3,4)
  • 事务消息的补偿流程(4,6,7)

正常事务流程

  1. 发送半事务消息。
  2. 服务端响应半事务消息的发送结果。
  3. 根据发送结果执行本地事务。如果写入失败,此时半消息对业务不可见,本地逻辑不执行。
  4. 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)。

事务补偿流程

补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

  1. 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”。
  2. Producer 收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新 Commit 或者 Rollback。

事务消息状态

事务消息共有三种状态:
TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown:它代表需要检查消息队列来确定状态。

使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,通过设置自定义线程池来处理事务回查请求。
执行本地事务后、需要根据执行结果对消息队列进行回复。

生产者示例代码

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("hzy_produce");
        producer.setNamesrvAddr("localhost:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //设置生产者回查线程池
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。checkLocalTransaction 方法用于检查本地事务状态。

事务监听器示例代码

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
举报

相关推荐

0 条评论