0
点赞
收藏
分享

微信扫一扫

RocketMQ ( 一 ) 消息类别

Message类型

  1. 基础类型
  2. 顺序类型
  3. 延迟类型
  4. 事务类型

基础类型

procedure 生产者

  1. 同步 Sync
  2. 异步 Async
  3. 单项 OneWay

同步

public class SyncProducer {
    
    public static void main(String[] args) throws Exception {
        // 1, 创建生产者 并 命名生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        // 2. 链接RocketMQ 中的 NameServer
        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        // 3. 启动生产者
        producer.start();

        // 4. 发送消息
        /**
         *  参数一 : Topic         主题
         *  参数二 : Tags          标签
         *  参数三 : Message       消息内容
         */

        Message msg = new Message(RocketMQConst.TEST_TOPIC, RocketMQConst.TEST_TAG, "hello world".getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(msg);
        System.out.println("结果为 : " + result);

        // 5. 关闭
        producer.shutdown();

    }
}

异步

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group2");

        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        producer.start();

        Message msg = new Message();
        producer.send(msg, new SendCallback() {
            // 当消息发送成功时的回调方法
            @Override
            public void onSuccess(SendResult sendResult) {

            }
            // 当消息发送失败时的回调方法
            @Override
            public void onException(Throwable throwable) {

            }
        });

        producer.shutdown();


    }
}

单项

public class OneWayProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("oneWay");

        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        producer.start();

        // oneWay发送的消息没有结果
        producer.sendOneway(new Message());

        producer.shutdown();

    }
}

consumer 消费者

  1. 广播模式
  2. 负载均衡模式

广播

public class Consumer01 {

    public static void main(String[] args) throws Exception {

        // 1.
        DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer("defaultConsumer");

        defaultConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        // BROADCASTING : 广播模式  当前组的消费者消费一遍Topic中的数据
        // CLUSTERING   : 负载均衡  默认的模式Topic中的数据被组中的消费者分
//        defaultConsumer.setMessageModel(MessageModel.BROADCASTING);
        defaultConsumer.subscribe(RocketMQConst.BROADCASTING_TOPIC, RocketMQConst.ANY_TAG);

        defaultConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultConsumer.start();


    }
}

负载均衡

默认的消费者行为

顺序类型

因为RocketMQ在一个主题(Topic)中有多个队列(Queue),那么如果有消息的处理是有顺序的话。这时候顺序可能不对,顺序消息就是让一类的消息都放在同一个队列中。让其保持先进先出的原则 procedure 生产者

public class OrderlyProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer orderlyProducer = new DefaultMQProducer("orderlyProducer");

        orderlyProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        orderlyProducer.start();

        Message msg = new Message();
        int userId = 1;
        // 当你需要确保一些消息是有序的话使用 MessageQueueSelector example 下订单的流程  付款 -> 生成订单 这两个操作是有顺序的不能乱
        orderlyProducer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                return list.get(userId % list.size());
            }
        }, userId);
        // 使用lambda
//        orderlyProducer.send(msg, (list, message, o) -> list.get(userId % list.size()), userId);

        orderlyProducer.shutdown();

    }
}

延迟类型

procedure 生产者

public class DelayConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer delayConsumer = new DefaultMQPushConsumer("delayConsumer");

        delayConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        delayConsumer.subscribe(RocketMQConst.DELAY_TOPIC, RocketMQConst.ANY_TAG);

        delayConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
                System.out.println(msg.getReconsumeTimes());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        delayConsumer.start();

    }
}

事务类型

procedure 生产者

public class TransactionProducer {

    public static void main(String[] args) throws Exception {

        TransactionMQProducer transactionProducer = new TransactionMQProducer("transactionProducer");
        
        transactionProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        
        // 设置事务监听器
        transactionProducer.setTransactionListener(new TransactionListener() {
            // 判断当前的消息是否提交到MQ中 Commit rollback UNKNOW
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                /**
                 * COMMIT_MESSAGE   提交事务
                 * ROLLBACK_MESSAGE 回滚事务
                 * UNKNOW           不知道状态
                 * 
                 * 如果提交的是UNKNOW这时候MQ会到Procedure中复查
                 */
                return LocalTransactionState.UNKNOW;
            }

            // 复查
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("复查");
                // 还是可以提交三个状态
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        
        transactionProducer.start();

        Message msg = new Message(RocketMQConst.TRANSACTION_TOPIC, "transaction message".getBytes(StandardCharsets.UTF_8));
        transactionProducer.send(msg);
        
        transactionProducer.shutdown();
    }
}

批量发送消息

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer batchProducer = new DefaultMQProducer("BatchProducer");
        
        batchProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        
        batchProducer.start();

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message(RocketMQConst.DELAY_TOPIC, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            messageList.add(msg);
        }
        // 批量发送消息
        batchProducer.send(messageList);
        
        batchProducer.shutdown();
        
    }
}

消息过滤

  1. Tag过滤
  2. Sql过滤

Tag过滤

public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");

        filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        // RocketMQConst.FILTER_TAG 标签过滤
        // 第一种方式
//        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, RocketMQConst.FILTER_TAG);
        // 第二中方式
        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag(RocketMQConst.FILTER_TAG));
        filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        filterConsumer.start();
    }
}

Sql过滤

public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");
        
        filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        // MessageSelector.byTag("i>5") 可以使用Sql语句
        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag("i>5"));
        
        filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        filterConsumer.start();
    }
}

补充

代码中的Topic值 和 Tag值是定义了常量,这样可以避免自己打错出现的报错也可以避免出现魔法值。这样代码看起来整洁而且后期维护会更简单,建议使用。

举报

相关推荐

0 条评论