0
点赞
收藏
分享

微信扫一扫

RocketMQ复习

千妈小语 2022-04-13 阅读 141
运维开发

角色

在这里插入图片描述

1、 broker

  • Broker面向producer和consumer接受和发送消息
  • 向nameserver提交自己的信息
  • 是消息中间件的消息存储、转发服务器。
  • 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
broker集群
  • Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
    • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
    • Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
  • Master多机负载,可以部署多个broker
    • 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。

2、producer

  • 消息的生产者
  • 通过集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
  • 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳

3、consumer

  • 消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。

  • 注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。

4、nameserver

  • 底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个 无状态(无主从) 节点

  • nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时想nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除

  • nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,

  • NameServer集群间互不通信,没有主备的概念

  • nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化

  • 为什么不用zookeeper?:rocketmq希望为了提高性能,CAP定理,客户端负载均衡

    • nameserver并不保证所有的nameserver的数据一致;牺牲数据一致性,保证高可用;

5、Topic

对比JSM中的Topic和Queue:
Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。
在这里插入图片描述

消息发送方式

1、同步发送

  • 单条消息发送:
		Message msg = new Message("topic001", "body 第1条".getBytes());
		//同步消息发送  阻塞发送
		SendResult sendResult = producer.send(msg1);
  • 批量消息发送:
		Message msg1 = new Message("topic001", "body 第1条".getBytes());
		Message msg2 = new Message("topic001", "body 第2条".getBytes());
		Message msg3 = new Message("topic001", "body 第3条".getBytes());
		//支持批量消息发送
		ArrayList<Message> list = new ArrayList<>();
		list.add(msg1);
		list.add(msg2);
		list.add(msg3);
		//同步消息发送  阻塞发送
		SendResult sendResult = producer.send(list);
  • 单向消息
		Message message = new Message("topic001", "body 单向第1条".getBytes());

        //单向消息
        //有可能丢消息
        producer.sendOneway(message);

        producer.shutdown();
        System.out.println("Producer shutdown...");

2、异步发送

		//异步可靠消息
        //不会阻塞,等待broker的确认
        //采用事件监听方式接受broker返回的确认
        Message message = new Message("topic001", "body 异步第2条".getBytes());

        //设置异步发送失败重投时间
        producer.setRetryTimesWhenSendAsyncFailed(1000);
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功。。。");
                System.out.println("sendResult:" + sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                //如果发送异常 case异常,尝试重投
                //或者调整业务逻辑
                throwable.printStackTrace();
                System.out.println("发送异常。。。");
            }
        });

消息消费模式

  • 集群消费:
    1、 一组consumer同时消费一个topic,可以分配消费负载均衡策略分配consumer对应消费topic下的哪些queue
    2、 多个group同时消费一个topic时,每个group都会消费到数据
    3、一条消息只会被group中的一个consumer消费,
    在这里插入图片描述

  • 广播消费
    1、消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
    2、消费进度由consumer维护;
    3、消费失败不会重新投递;

在这里插入图片描述

MessageSelector

TagFilter

  • producer
	DefaultMQProducer producer = new DefaultMQProducer("producer1");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.100.100:9876");
        producer.start();

        //topic 消息将要发送的地址
        //body 消息中的数据
        Message msg1 = new Message("topic002", "tag-b", "key-1", "body 过滤第1条".getBytes());


        //同步消息发送  阻塞发送
        SendResult sendResult = producer.send(msg1);

        System.out.println("sendResult:" + sendResult);
        producer.shutdown();
  • consumer
 		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        consumer.setNamesrvAddr("192.168.100.100:9876");
        //每个consumer关注一个topic
        //过滤器 * 表示不过滤
        consumer.subscribe("topic002", "tag-b");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("consumeMessage start...");
                for (MessageExt msg : msgs){
                    System.out.println(new String(msg.getBody()));
                }
                //默认情况下 这条消息只会被一个consumer消费到 点对点
                //message 状态修改
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //MessageModel.BROADCASTING 广播模式  可以多个consumer消费同一个消息  只广播一次 不会重投
        //MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次   会重投
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.start();

SQLFilter

  • producer
		DefaultMQProducer producer = new DefaultMQProducer("producer3");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.100.100:9876");
        producer.start();

        //topic 消息将要发送的地址
        //body 消息中的数据
        for (int i = 0; i < 100; i++) {
            Message msg1 = new Message("topic003", "tag-c", "key-1", ("body SQL过滤第1条"+i).getBytes());
            msg1.putUserProperty("age", i+"");
            producer.send(msg1);
        }
        producer.shutdown();
  • consumer
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer3");
        consumer.setNamesrvAddr("192.168.100.100:9876");
        //每个consumer关注一个topic
        //过滤器 * 表示不过滤

        MessageSelector selector = MessageSelector.bySql("age>= 18 and age <=28");
        consumer.subscribe("topic003", selector);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("consumeMessage start...");
                for (MessageExt msg : msgs){
                    System.out.println(new String(msg.getBody()));
                }
                //默认情况下 这条消息只会被一个consumer消费到 点对点
                //message 状态修改
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //MessageModel.BROADCASTING 广播模式  可以多个consumer消费同一个消息  只广播一次 不会重投
        //MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次   会重投
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.start();

事务消息

  • 事务消息执行流程图
    在这里插入图片描述
  • producer
		TransactionMQProducer producer = new TransactionMQProducer("producerTrans");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.100.100:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
                //执行本地事务
                System.out.println("executeLocalTransaction:");
                System.out.println("msg:" + new String(msg.getBody()));
                System.out.println("transactionId:" + msg.getTransactionId());
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //broker端回调,检查事务
                System.out.println("checkLocalTransaction:");
                System.out.println("msg:" + new String(msg.getBody()));
                System.out.println("transactionId:" + msg.getTransactionId());
                //事务执行成功
//                return LocalTransactionState.COMMIT_MESSAGE;
                //继续等待
                return LocalTransactionState.UNKNOW;
                //回滚消息
//                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });




        producer.start();

        //异步可靠消息
        //不会阻塞,等待broker的确认
        //采用事件监听方式接受broker返回的确认
        Message message = new Message("topicTrans", "body 事务消息第2条".getBytes());

        TransactionSendResult result = producer.sendMessageInTransaction(message, null);

        System.out.println("result:"+result);
  • 执行结果
executeLocalTransaction:
msg:body 事务消息第2条
transactionId:7F000001595418B4AAC232D058D90000
result:SendResult [sendStatus=SEND_OK, msgId=7F000001595418B4AAC232D058D90000, offsetMsgId=null, messageQueue=MessageQueue [topic=topicTrans, brokerName=node1, queueId=3], queueOffset=81]
checkLocalTransaction:
msg:body 事务消息第2条
transactionId:7F000001595418B4AAC232D058D90000

Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

超时:如果超过回查次数,默认回滚消息

TransactionListener的两个方法

  • executeLocalTransaction

    半消息发送成功触发此方法来执行本地事务

  • checkLocalTransaction

    broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
    默认:每一分钟回调一次,一共回调15次;15次为返回执行成功状态,则默认失败;

本地事务执行状态

  • LocalTransactionState.COMMIT_MESSAGE

    执行事务成功,确认提交

  • LocalTransactionState.ROLLBACK_MESSAGE

    回滚消息,broker端会删除半消息

  • LocalTransactionState.UNKNOW

    暂时为未知状态,等待broker回查

顺序消费

  • 同一个topic

  • 同一个queue (默认一个topic有4个queue)

  • 同一个线程发送

  • 同一个线程消费

  • producer

		DefaultMQProducer producer = new DefaultMQProducer("producer1");
        //设置nameserver地址
        producer.setNamesrvAddr("192.168.100.100:9876");

        producer.start();

        //topic 消息将要发送的地址
        //body 消息中的数据
        for (int i = 0; i < 98; i++) {
            Message msg1 = new Message("topicSelectQueue", ("body 第"+ i +"条").getBytes());
            SendResult sendResult = producer.send(msg1, new MessageQueueSelector() {
                @Override
                //queue选择器,手动选择一个queue
                public MessageQueue select(
                        //当前topic里面包含的所有queue
                        List<MessageQueue> list,
                        //具体要发送的消息
                        Message message,
                        //对应send()里面的arg参数
                        Object o) {
                    //获取一个固定的queue,向固定的queue里面发送消息
                    MessageQueue queue = list.get((Integer)(o));
                    return queue;
                }

            },0,2000);
        producer.shutdown();
  • consumer
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerSelectQueue");
        consumer.setNamesrvAddr("192.168.100.100:9876");
        //每个consumer关注一个topic
        //过滤器 * 表示不过滤
        consumer.subscribe("topicSelectQueue", "*");
        //对大消费线程数
//        consumer.setConsumeThreadMax(10);
        //对小消费线程数
//        consumer.setConsumeThreadMin(1);

        //MessageListenerOrderly 顺序消费监听 对一个queue开启一个线程,多个queue开启多个线程
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs){
                    System.out.println("thread:" + Thread.currentThread().getName()+"   msg:"+new String(msg.getBody()));
                    System.out.println("queueId:"+msg.getQueueId());
                }
                //默认情况下 这条消息只会被一个consumer消费到 点对点
                //message 状态修改
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        //MessageModel.BROADCASTING 广播模式  可以多个consumer消费同一个消息  只广播一次 不会重投
        //MessageModel.CLUSTERING 集群消费模式(默认) 一个消息一组consumer只消费一次   会重投
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.start();
        System.out.println("Consumer1 start...");

重投机制

防止重复消费?

举报

相关推荐

0 条评论