0
点赞
收藏
分享

微信扫一扫

RocketMQ-borker

沪钢木子 2022-02-22 阅读 10

首先,borker需要做什么?

  1. 启动
  2. 处理producer发送的消息
  3. 存储(持久化)消息
  4. 给consumer提供消息
  5. 主从之间的数据同步
  6. 消息查询

borker的启动过程

主要在BrokerController.initialize() 这个方法里面.

1.从持久化文件中加载数据到内存中

boolean result = this.topicConfigManager.load();

result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();

2.messageStore初始化

 //消息存取的核心接口初始化,提供put/get message接口,提供根据offset获取消息的接口
 this.messageStore =
     new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
         this.brokerConfig);
 //messageStore的指标统计类,提供最近一天的消息吞吐量的统计数据
 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
 //load plugin
 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
 //添加消息分发器,分发到布隆过滤器
 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
           

3.message store加载内存映射文件,commit log文件,consumer queue文件,index文件

 result = result && this.messageStore.load();

4.启动其他东西

            //初始化netty server
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            //传说中的VIP channel,端口是broker端口-2(10909),不接收consumer的Pull请求
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            // 启动一系列的线程池
            // 注册处理器
            this.registerProcessor();

5.启动一系列定时任务

  • 打印broker的消息吞吐信息到日志文件
  • 记录consumerOffet到文件
  • 记录consumer filter到文件中
  • 定时检查consumer的消费记录,如果延时太大,则disable consumer,不再往这个consumer投递消息
  • 打印当前Queue size日志
  • 打印dispatch落后情况
  • 更新nameserv address信息
  • 如果是slave,启动定时任务,每分钟从master同步配置和offset;如果是master,定时打印slave延时情况

6.TLS链接,监控签名文件有没有更新

7.初始化事务消息service

8.最后调用BrokerController.start

这里会启动定时任务,执行registerBrokerAll,注册自己的信息到nameSerivce

处理producer发送的消息

BrokerController#registerProcessor里面注册了一系列的processor。其中就有SendMessageProcessor。

里面有个asyncProcessRequest方法,会去调用DefaultMessageStore#asyncPutMessage去存储数据。然后调用commitLog.asyncPutMessage,每个commitLog其实是一系列mappedFile,找到其中一个mappedFile去存储消息。

然后有个定时任务ReputMessageService,会定时执行doReput方法,从commitLog里面查出数据,dispatch出去。CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex会把数据存在consumerQueue和index文件中。

给consumer提供消息

rocketMq消费分为pull模式和push模式,底层其实都是使用pull模式的

pull模式主要是在PullMessageProcessor中。

刚开始是一大堆的校验。然后是通过messageStore去获取消息。

final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

首先根据topic和queueId获取consumeQueue。然后consumeQueue中有mepperFile。找到对应的mapperFile。从mapperFile获取指定数量的数据,然后进行过滤。然后得到offset,去commitLog里面获取数据。

如果没有获取到消息,则会判断是否为push模式。如果是则发送一个hold请求

this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);

PullRequestHoldService

    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if (prev != null) {
                mpr = prev;
            }
        }

        mpr.addPullRequest(pullRequest);
    }

这里会根据topic+queueId把请求放到一个table里面去异步执行。

有两种情况会执行到。一个是定时执行,一个是broker有新消息到来的时候。

PullRequestHoldService本身就是一个线程,它的run方法会定时去调用checkHoldRequest方法。

实际是去调用notifyMessageArriving方法。这也是broker有新消息到来的时候调用方法。

notifyMessageArriving这个方法里面就是一直判断是否有新消息到来,如果有新消息就执行request。如果超时了,也执行request,让其返回。

举报

相关推荐

0 条评论