首先,borker需要做什么?
- 启动
- 处理producer发送的消息
- 存储(持久化)消息
- 给consumer提供消息
- 主从之间的数据同步
- 消息查询
borker的启动过程
主要在BrokerController.initialize() 这个方法里面.
1.从持久化文件中加载数据到内存中
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
2.messageStore初始化
//消息存取的核心接口初始化,提供put/get message接口,提供根据offset获取消息的接口
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//messageStore的指标统计类,提供最近一天的消息吞吐量的统计数据
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
//添加消息分发器,分发到布隆过滤器
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
3.message store加载内存映射文件,commit log文件,consumer queue文件,index文件
result = result && this.messageStore.load();
4.启动其他东西
//初始化netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
//传说中的VIP channel,端口是broker端口-2(10909),不接收consumer的Pull请求
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// 启动一系列的线程池
// 注册处理器
this.registerProcessor();
5.启动一系列定时任务
- 打印broker的消息吞吐信息到日志文件
- 记录consumerOffet到文件
- 记录consumer filter到文件中
- 定时检查consumer的消费记录,如果延时太大,则disable consumer,不再往这个consumer投递消息
- 打印当前Queue size日志
- 打印dispatch落后情况
- 更新nameserv address信息
- 如果是slave,启动定时任务,每分钟从master同步配置和offset;如果是master,定时打印slave延时情况
6.TLS链接,监控签名文件有没有更新
7.初始化事务消息service
8.最后调用BrokerController.start
这里会启动定时任务,执行registerBrokerAll,注册自己的信息到nameSerivce
处理producer发送的消息
BrokerController#registerProcessor里面注册了一系列的processor。其中就有SendMessageProcessor。
里面有个asyncProcessRequest方法,会去调用DefaultMessageStore#asyncPutMessage去存储数据。然后调用commitLog.asyncPutMessage,每个commitLog其实是一系列mappedFile,找到其中一个mappedFile去存储消息。
然后有个定时任务ReputMessageService,会定时执行doReput方法,从commitLog里面查出数据,dispatch出去。CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex会把数据存在consumerQueue和index文件中。
给consumer提供消息
rocketMq消费分为pull模式和push模式,底层其实都是使用pull模式的
pull模式主要是在PullMessageProcessor中。
刚开始是一大堆的校验。然后是通过messageStore去获取消息。
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
首先根据topic和queueId获取consumeQueue。然后consumeQueue中有mepperFile。找到对应的mapperFile。从mapperFile获取指定数量的数据,然后进行过滤。然后得到offset,去commitLog里面获取数据。
如果没有获取到消息,则会判断是否为push模式。如果是则发送一个hold请求
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
PullRequestHoldService
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
这里会根据topic+queueId把请求放到一个table里面去异步执行。
有两种情况会执行到。一个是定时执行,一个是broker有新消息到来的时候。
PullRequestHoldService本身就是一个线程,它的run方法会定时去调用checkHoldRequest方法。
实际是去调用notifyMessageArriving方法。这也是broker有新消息到来的时候调用方法。
notifyMessageArriving这个方法里面就是一直判断是否有新消息到来,如果有新消息就执行request。如果超时了,也执行request,让其返回。