0
点赞
收藏
分享

微信扫一扫

RocketMQ源码分析系列之二:负载均衡之TopicPublishInfo与路由消息查找

路由信息查找示意图

RocketMQ源码分析系列之二:负载均衡之TopicPublishInfo与路由消息查找_消息中间件

结合上图,我们先概括介绍下路由信息查找:

①路由信息决定了消息被发送到哪一个Broker上;

②tryToFindTopicPublishInfo是查找路由信息的入口方法;

③如果生产者中已经缓存则直接使用;

④如果没有缓存,则从NameServer中查询该topic的路由信息;

⑤查询到则先缓存,然后返回,如果没有查询到则抛出异常。


核心方法

首先看下核心方法tryToFindTopicPublishInfo的内容:

/**
* 根据topic从name server获取路由信息
* @param topic 主题
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//查看本地缓存
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//缓存失效的情况处理
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//如果没有缓存,则向NameServer请求更新路由信息————注意:如果查询不到则抛出异常,关注下面的方法
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

//路由信息有效,则if,否则else
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//路由信息无效,则向NameServer请求更新路由信息————注意:如果查询不到则抛出异常,关注下面的方法
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.

更新路由信息

接下来我们看下更新路由信息方法updateTopicRouteInfoFromNameServer的实现:

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
//isDefault为true,使用默认主题查询
if (isDefault && defaultMQProducer != null) {
//使用默认主题查询
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
//将路由信息中读写队列数量替换为 消息生产者默认队列数与路由中读队列数之中的较小值
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//根据主题进行查询,详见方法体
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}

if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
//查询到路由信息,与本地对比是否发生了变化,详见方法体
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
//如果发生变化,进行下面操作
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// Update Pub info
if (!producerTable.isEmpty()) {
//进行数据转换,详见方法体
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
//更新topic的路由信息(MQProducerInner子类执行)
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

// Update sub info
if (!consumerTable.isEmpty()) {
//进行数据转换,详见方法体
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
//更新topic的路由信息(MQConsumerInner子类执行)
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}

return false;
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.

根据默认主题或者指定主题查询路由信息的方法最终都会调用到下面的方法,这个方法主要就是发起了远程调用(remotingClient.invokeSync):

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}

break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}

throw new MQClientException(response.getCode(), response.getRemark());
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.

数据格式转换

数据格式转换涉及两个方法topicRouteData2TopicPublishInfo和topicRouteData2TopicSubscribeInfo:

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
//遍历Broker,从而得到每个Broker的队列数量
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
//向路由信息中增加消息队列
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}

info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
//遍历路由中的QueueData信息
for (QueueData qd : qds) {
//当有写权限时执行if
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}

if (null == brokerData) {
continue;
}

if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}

for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}

info.setOrderTopic(false);
}

return info;
}

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
//遍历路由信息中的QueueData
for (QueueData qd : qds) {
//当有读权限时,执行if
if (PermName.isReadable(qd.getPerm())) {
//根据读队列的数量,创建MessageQueue实例
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}

return mqList;
}
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.

结束

作为自己阅读源码的随记,不可避免存在问题和遗漏,欢迎指正错误和讨论。



原文在我的公众号“平凡的程序员”。

RocketMQ源码分析系列之二:负载均衡之TopicPublishInfo与路由消息查找_RocketMQ_02

举报

相关推荐

0 条评论