生产者发送消息流程
以下是生产者发送消息的概要时序图:
先了解发送同步消息的详细流程和源码解析,后面在分析发送其他消息的区别:
用户将发送的消息包装成Message对象,调用DefaultMQProducer的send的方法
Message是MQ发送的消息的载体,其包含的属性如下:
// 消息
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
// 主题
private String topic;
// 暂时没确定什么用处
private int flag;
// 消息拓展信息,tag、keys、延迟级别、是否等待消息存储完成都存放在这里
private Map<String, String> properties;
// 消息体,byte数组,注意生产者和消费者的编码解码需要一致
private byte[] body;
// 事务id
private String transactionId;
.....
}
而DefaultMQProducer的send方法实际上调用的是DefaultMQProducerImpl的sendDefaultImpl方法,sendDefaultImpl方法解析如下:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验serviceState是RUNNING状态,Producer创建时为CREATE_JUST,Producer启动start()后会设置serviceState为RUNNING状态
this.makeSureStateOK();
// 校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 从nameServ获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 同步发送,循环重试2次,不是同步发送则只执行一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
// 最近一次选择的brokerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 根据topic的路由信息、最近一次选择的brokerName,选择一条消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
// 在重发消息前重置topic
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
// 计算总耗时,如果超时,则抛错
if (timeout < costTime) {
callTimeout = true;
break;
}
// 发送消息(内部基于netty实现)
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新当前broker的延迟:endTimestamp - beginTimestampPrev
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
/**
* 同步模式下,如果发送消息失败:{FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE}
* 即正常返回同步刷盘超时、从服务同步刷盘超时、从服务不可用
* 判断retryAnotherBrokerWhenNotStoreOK是{是否在发送失败时重试另一个broker}否为true,否则直接返回
*/
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
/**
* 抛出异常,更新broker的延迟信息,继续重试
*/
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
// 找不到消息队列或重试2次还是失败后抛错
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
Validators#checkMessage方法实现了校验消息是否为空、校验topic不允许为空,只允许%a-zA-Z0-9-_字符,最长不能超过127,校验topic不是内部的topic,消息的消息体body
不为空且大于0、消息体长度大小不能超过默认的消息体最大值。这个方法源码实现简单,就不再赘述。
tryToFindTopicPublishInfo方法则是从nameServ中获取topic路由信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从topicPublishInfoTable根据topic获取topic路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果topicPublishInfoTable没有,则从nameServAddr中获取并更新
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 添加发送消息的topic到topicPublishInfoTable,以便定时任务后续可以更新topic的路由信息,putIfAbsent避免topic的路由信息存在覆盖
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从nameServ更新topic的TopicPublishInfo信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
// 获取最新的topic的TopicPublishInfo信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 如果TopicPublishInfo的isHaveTopicRouterInfo为true或其消息队列不为空,即成功获取到topic的路由信息,直接返回获取到的TopicPublishInfo信息
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// isHaveTopicRouterInfo为false或者消息队列为空,获取生产者默认的“TBW102”为topic的路由信息作为topic的路由信息,第一次发送新的topic,且没有手动在Broker创建对应的topic时
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
topicPublishInfoTable
是DefaultMQProducerImpl中存有topic(主题)和TopicPublishInfo(路由信息)的映射Map集合。topicPublishInfoTable
开始的时候只有默认的key为“TBW102”的路由信息,每当新的topic进来都需要从nameServ中获取对应的路由信息。
此外,producer启动时会启动一个定时任务,定时更新topicPublishInfoTable
中每个topic的路由信息(见Producer启动流程的(3)小步启动各个定时任务方法)。
从上面我们可以知道,producer会先根据传入的topic去找topicPublishInfoTable
是否存在对应的路由信息,找不到则尝试调用updateTopicRouteInfoFromNameServer方法从nameServ中获取,并更新topic的路由信息。
如果还是找不到,证明Broker中没有这个topic存在,此时则会获取默认的“TBW102”的路由信息作为传入的topic的路由信息并更新对应的路由信息(两个地方的updateTopicRouteInfoFromNameServer传参不一致)。producer根据这个路由信息发送消息,若Broker设置了autoCreateTopicEnable=true,根据autoCreateTopicEnable=true在Broker启动时创建的“TBW102”主题的路由信息中的perm是否有PERM_INHERIT权限,在Broker端接受到消息后就会根据“TBW102”的路由信息继承并创建消息topic的路由信息;如果Broker设置autoCreateTopicEnable=false,则会返回错误信息topic不存在。
updateTopicRouteInfoFromNameServer源码解析如下:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 请求nameServ获取“TBW102”为topic的路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// 如果isDefault=false或defaultMQProducer为空,根据topic获取路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
// 判断是否需要替换旧的topicRouteData
// 判断旧的topicRouteData与新的是否一致,false表示一致没有改变,true表示不一致改变了
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
// 如果一致,则判断生产者的对应topic路由信息是否为空或者消息队列是否为空 或者 消费者是否存在对应topic的路由信息
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
// 更新broker本地缓存地址信息
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
// 从路由信息获取TopicPublishInfo信息
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 保存/更新所有生产者的对应topic的TopicPublishInfo信息
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
// 从topic路由信息中获取topic的所有可读消息队列信息
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
// 获取所有消费者实例
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
// 保存/更新所有消费者的topic、消息队列关系信息
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 保存/更新Topic的本地缓存路由信息
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
private boolean isNeedUpdateTopicRouteInfo(final String topic) {
boolean result = false;
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 生产者,判断topic对应topicPublishInfoTable的路由信息是否为空,或者路由信息中的消息队列是否为空
result = impl.isPublishTopicNeedUpdate(topic);
}
}
}
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
// 消费者,判断RebalanceImpl中的subscriptionInner和topicSubscribeInfoTable是否存在topic为key的信息
result = impl.isSubscribeTopicNeedUpdate(topic);
}
}
}
return result;
}
这里需要注意,获取到topic的路由信息后,会比对topic对的旧的路由信息(从topicRouteTable
中获取)和通过nameserv获取到路由信息进行比对,判断是否一致,change=true;如果一致,则判断生产者的对应topic路由信息是否为空或者消息队列是否为空 或者 消费者是否存在对应topic的路由信息,如果为空或者不存在,change=true。在change=true的情况下,首先会更新``brokerAddrTable`嵌套Map集合:
// 所有broker本地缓存地址信息
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
然后更新Producer或Consumer的topic对应的路由、订阅信息,并更新MQClientInstance的topicRouteTable
本地缓存topic路由信息
回到DefaultMQProducerImpl#sendDefaultImpl方法,如果获取topic对应的TopicPublishInfo
成功,接下来就会选择消息队列发送消息。同步模式下发送次数默认是三次(第1次+2次重试),其他模式只有1次发送次数。
MQFaultStrategy#selectOneMessageQueue方法用于选择一个消息队列:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 如果开启了发送延迟容错开关,sendLatencyFaultEnable默认为false
if (this.sendLatencyFaultEnable) {
try {
// 获取自增index
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index 与 消息队列取模获取下标
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 获取下标的消息队列信息
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 如果延迟时间可以接受则返回
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 找一个较好延迟的broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 获取该broker的可写队列
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
// 选择往后一个的消息队列
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
// 移除没有写队列的broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 尝试递增找一个消息队列
return tpInfo.selectOneMessageQueue();
}
// 选择递增且不是上次broker的消息队列,如果找不到就递增找一个
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
这里需要了解latencyFaultTolerance
对象的*LatencyFaultTolerance*接口类,LatencyFaultToleranceImpl类是它的实现类。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
……
}
LatencyFaultToleranceImpl类有两个属性:faultItemTable和whichItemWorst。faultItemTable是以BrokerName作为key,FaultItem延迟信息作为value的Map集合。
FaultItem类是LatencyFaultToleranceImpl类的内部类,记录Broker的相关延迟信息:
class FaultItem implements Comparable<FaultItem> {
// brokerName
private final String name;
// 当前延迟
private volatile long currentLatency;
// 最早启动时间,System.currentTimeMillis() + notAvailableDuration
private volatile long startTimestamp;
……
}
LatencyFaultToleranceImpl类的实现都是根据faultItemTable和FaultItem的信息实现的。
选择完消息队列,Producer就调用sendKernelImpl方法发送消息:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 1、根据brokerName获取brokerAddrTable中Master broker的address
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 2、如果brokerAddrTable中不存在brokerName的信息,尝试从nameServ获取、更新topic的路由信息
tryToFindTopicPublishInfo(mq.getTopic());
// 3、再次尝试获取
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 4、判断是否使用VIP通道vipChannelEnabled,vipChannelEnabled为true的话,原本的brokerAddr的port-2
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
// 设置消息的实例id:namespace
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
// 尝试压缩消息,如果成功压缩,证明消息体长度超过4M,设置sysFlag和msgBodyCompressed
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// todo ? CheckForbiddenHook似乎是给用户实现的
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// todo ?
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//5、发送消息
SendResult sendResult = null;
switch (communicationMode) {
// 异步发送,传输的消息的消息体是未压缩前
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
// ONEWAY模式下和SYNC模式调用同样的接口
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
1、从brokerAddrTable
中获取brokerName对应的Master Broker的brokerAddr信息,这个brokerAddrTable
在上面获取topic路由信息时程序已经将broker对应的addr保存,但是在Producer启动的各个定时任务中,有个定时任务是用于定时清理线下的Broker,会将下线的Broker从brokerAddrTable
中移除,源码在MQClientInstance#start是调用的startScheduledTask方法中
private void startScheduledTask() {
……
// 定时清除下线broker,发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 通过判断broker的addr是否存在nameServ的topic路由信息中,来移除brokerAddrTable中下线的broker
MQClientInstance.this.cleanOfflineBroker();
// 发送心跳到所有的Broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
……
}
/**
* Remove offline broker
*/
private void cleanOfflineBroker() {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try {
ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerTable.hasNext()) {
Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
cloneAddrTable.putAll(oneTable);
Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> ee = it.next();
String addr = ee.getValue();
// 判断broker的addr是否存在nameServ的topic路由信息中
if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
// 不存在则移除
it.remove();
log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
}
}
if (cloneAddrTable.isEmpty()) {
itBrokerTable.remove();
log.info("the broker[{}] name's host is offline, remove it", brokerName);
} else {
updatedTable.put(brokerName, cloneAddrTable);
}
}
if (!updatedTable.isEmpty()) {
this.brokerAddrTable.putAll(updatedTable);
}
} finally {
this.lockNamesrv.unlock();
}
} catch (InterruptedException e) {
log.warn("cleanOfflineBroker Exception", e);
}
}
private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, TopicRouteData> entry = it.next();
TopicRouteData topicRouteData = entry.getValue();
List<BrokerData> bds = topicRouteData.getBrokerDatas();
for (BrokerData bd : bds) {
if (bd.getBrokerAddrs() != null) {
boolean exist = bd.getBrokerAddrs().containsValue(addr);
if (exist)
return true;
}
}
}
return false;
}
从上面的源码可知,每30s,客户端就会检查是否有下线的Broker和发送心跳给Broker。通过遍历brokerAddrTable
集合中每个Broker对应的addr,检查每个addr是否存在topicRouteTable
集合保存的路由信息中,如果不存在,则移除来更新brokerAddrTable
集合。
2、这里尝试从nameServ中获取、更新topic信息,这里之前已经讲解过,不在赘述
3、重新更新了brokerAddrTable
集合,尝试再次获取topic对应的Broker Addr信息
4、接下来就是发送消息前对消息的预处理和消息发送前的钩子的执行(钩子Hook请看消息轨迹)
5、根据传入的模式(同步、异步、单向)调用MQClientAPIImpl#sendMessage发送消息:
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
// 如果是回复类型消息
if (isReply) {
// sendSmartMsg默认为true
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
// 不是回复类型消息,则如果sendSmartMsg为true,或者消息是批量消息
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
可以看到同步、单向和异步发送消息的传参是不一样的,主要是异步多了sendCallback
(发送消息成功后、失败后的回调,由用户实现SendCallback接口传入),retryTimesWhenSendAsyncFailed(异步发送消息的重试次数,默认为2),路由信息等。
MQClientAPIImpl#sendMessage中同步消息和单向消息发送都是通过netty发送,处理返回结果。特殊的是异步发送消息,异步发送消息的流程前面和同步、单向发送消息的逻辑一致,而在sendMessageAsync方法则不同:
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
// 1、发送成功,更新延迟消息
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
// 1、发送成功,更新延迟消息
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
// 1、发送失败,更新延迟消息
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 2、发送失败,重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
// 2、发送失败,重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
// 2、发送失败,重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
// 2、发送失败,重试
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}
private void onExceptionImpl(final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int timesTotal,
final AtomicInteger curTimes,
final Exception e,
final SendMessageContext context,
final boolean needRetry,
final DefaultMQProducerImpl producer
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
}
} else {
if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onException(e);
} catch (Exception ignored) {
}
}
}
从上面的源码可以知道,异步发送消息无论发送成功还是失败,都会更新Broker的延迟信息updateFaultItem:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
1、更新延迟信息首先确认Broker的不可用的时间,通过比较当前延迟时间currentLatency
和latencyMax
数组,遍历``latencyMax找到小于等于当前延迟时间
currentLatency的值的下标,根据小标找
notAvailableDuration数组中保存的不可用持续时间(
notAvailableDuration与
latencyMax`数组小标的值一一对应)
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
然后根据计算得到的不可用的时间更新延迟信息faultItemTable
集合,设置startTimestamp为System.currentTimeMillis() + notAvailableDuration,表示当前时间加上不可用时间内,broker都属于不可用状态。只有当System.currentTimeMillis() - startTimestamp >= 0才表示Broker可用(FaultItem#isAvailable)
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
2、当出现异常时,除了更新延迟信息,还会尝试重新发送消息,重试次数由retryTimesWhenSendAsyncFailed
值决定,默认为2.需要注意的是虽然异步发送重试消息,但是并不会重新选择broker去发送,但是同步发送只要开启了sendLatencyFaultEnable
发送延迟容错开关,则会重新选择一个broker发送。
回到同步发送流程开头:DefaultMQProducerImpl#sendDefaultImpl方法,同步发送完消息,会和异步发送消息一样无论成不成功更新延迟消息(异步消息模式下在这里会直接返回)。
至此同步、异步、单向消息发送流程完毕。