消息队列选择示意图
消息队列选择涉及到一个名词“故障延迟”,其含义是:在预设时间内不再使用该Broker。是否启用故障延迟机制的标识就是MQFaultStrategy类的成员变量sendLatencyFaultEnable,默认值FALSE。
核心方法
消息队列选择的核心方法是MQFaultStrategy的selectOneMessageQueue,下面源码及中文注释初步介绍了这个方法。
/**
* 选择消息队列
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//如果启用故障延迟机制,则进入if(默认值FALSE)
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
//循环获取一个消息队列,找到一个可用MessageQueue实例返回
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//寻找可用的MessageQueue实例,isAvailable是用来计算规避时长的,详见方法体
//故障延迟,就是在预设时间内不在使用该broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
//尝试从失败的Item中寻找一个可用的Broker,如果找不到返回null,详见方法体
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//可用Broker的写入队列数
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
//不启动故障延迟机制,详见方法体
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
规避时长计算
计算规避时长的方法是类LatencyFaultToleranceImpl的isAvailable方法,这个类的类图和该方法内容如下所示:
/**
* 判断名称为name的broker是否可用
*/
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
//计算规避时长
return faultItem.isAvailable();
}
return true;
}
这里需要进一步解释下FaultItem这个类的数据更新时机。在发送消息之后(无论成功还是失败),都会调用DefaultMQProducerImpl的updateFaultItem方法,从而进一步调用MQFaultStrategy.updateFaultItem,只是传入的第三个参数不一样,我们来看下一下这个方法的内容:
/**
* 更新失败Item,在DefaultMQProducerImpl的sendDefaultImpl方法中会调用
* @param brokerName broker名
* @param currentLatency 本次消息发送的延迟时间
* @param isolation 是否规避broker,默认规避30秒,如果不规避,则使用本次消息发送延迟时间来计算规避时长
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
接下来我们分别看下computeNotAvailableDuration和latencyFaultTolerance.updateFaultItem的详细信息:
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
上面方法latencyMax和notAvailableDuration是类MQFaultStrategy的两个成员变量,定义信息如下:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
两个变量是通过数组下标的对应关系来使用的。接下来再看另一个方法updateFaultItem:
/**
* 更新失败Item,在DefaultMQProducerImpl的sendDefaultImpl方法中会调用
*/
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
最终执行
接下来我们看下最终执行的queue选择的方法(位于类TopicPublishInfo中):
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
//如果lastBrokerName是null,即第一次,选择一个MessageQueue返回,详见方法体
return selectOneMessageQueue();
} else {
//如果lastBrokerName不是null,说明上一次发送失败
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//上一次发送失败,所以本次需要找到brokerName不是lastBrokerName的那个MessageQueue实例
//问题:上一次发送失败了,为什么这个MessageQueue实例还存在呢?怎么没删除?
// Broker与NameServer的心跳是10s,消息生产者更新路由信息的间隔是30s,时间差带来的问题需要在程序上进行处理
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
注意一下lastBrokerName,即上一次发送失败的Broker,本次需要规避。
最后我们再看下TopicPublishInfo的部分成员变量含义:
/**
* 是否顺序消息
*/
private boolean orderTopic = false;
/**
* 该topic的queue集合
*/
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
* 每选择一个消息队列,这个值会增加1,超过Integer最大值后从0开始
*/
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
结束
作为自己阅读源码的随记,不可避免存在问题和遗漏,欢迎指正错误和讨论。
原文在我的公众号“平凡的程序员”。