0
点赞
收藏
分享

微信扫一扫

AliMQ(RocketMQ)源码(六)MQClientInstance的start()方法


MQClientInstance的start()方法,客户端的start()

// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

一、this.mQClientAPIImpl.start();

这个方法实则是NettyRemotingClient的start()方法,这个方法中先是创建了Netty客户端,然后调用了两个定时执行的任务:

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanChannelTablesOfNameServer();
} catch (Exception e) {
log.error("scanChannelTablesOfNameServer exception", e);
}
}
}, 1000 * 3, 10 * 1000);

这两个任务一个是删除过期的请求,一个是将没有连接响应的nameServer断开连接。

二、this.startScheduledTask();

这个方法中执行了多个定时任务,包括:

  • 2分钟更新一次nameServer的地址
  • 30秒更新一次topic的路由信息
  • 30秒对Broker发送一次心跳检测,并将下线的broker删除
  • 5秒持久化一次consumer的offset
  • 1分钟调整一次线程池,这个定时任务其实什么都没有执行。

三、this.pullMessageService.start();

这个方法调用了PullMessageService的run()方法,里面主要是调用了DefaultMQPushConsumerImpl.pullMessage方法,

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

  • 设置最后一次拉取的时间

this.makeSureStateOK();

  • 检验客户端状态是否正常

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

  • 查看已缓存的message的条数和message的内存大小,如果超过了设置的缓存值,就不拉取消息了。

pullCallback

  • 拉取消息后的回调处理

if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
subProperties = sd.getPropertiesStr();
}

classFilter = sd.isClassFilterMode();
}

  • 如果设置了isPostSubscriptionWhenPull=true,则以后拉取消息会带上最新的订阅信息

this.pullAPIWrapper.pullKernelImpl(

  • 真实的拉取消息

四、this.pullAPIWrapper.pullKernelImpl

FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);

  • 根据brokeName、队列节点获取拉取消息的broker地址

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setSubProperties(subProperties);
requestHeader.setExpressionType(expressionType);

  • 设置请求的消息头

case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);

  • 拉取消息默认调用ASYNC的方法,如果拉取消息成功,会调用pullCallBack的OnSuccess和OnException方法

五、this.rebalanceService.start();

RebalanceImpl是consume的重新负载,什么意思呢?就是消费者与消费队列的对应关系,我们来思考一个问题,比如现在有4个消息队列(q1,q2,q3,q4),3个消费者(m1,m2,m3),那么消费者与消息队列的对应关系是什么呢?我们按照一个轮询算法来表示, m1(q1,q4) m2(q2) m3(q3),如果此时q2消息队列失效(所在的broker挂了),那么消息队列的消费就需要重新分配,RebalanceImpl就是干这事的,该类的调用轨迹如下:(MQClientInstance start --> (this.rebalanceService.start()) —> RebalanceService.run(this.mqClientFactory.doRebalance()) —> MQConsumerInner.doRebalance(DefaultMQPushConsumerImpl) —>RebalanceImpl.doRebalance
在这里着重说明一点:消息队列数量与消费者关系:1个消费者可以消费多个队列,但1个消息队列只会被一个消费者消费;如果消费者数量大于消息队列数量,则有的消费者会消费不到消息(集群模式)


举报

相关推荐

0 条评论