0
点赞
收藏
分享

微信扫一扫

RocketMQ源码分析系列之四:消息发送

​三种消息发送方式​

  • 同步:发送者向RocketMQ发送消息时,同步等待直到消息服务器返回发送结果。

  • 异步:发送者向RocketMQ发送消息时,指定消息发送成功后的回调函,调用消息发送API后立即返回,消息发送者线程不阻塞,消息发送成功或失败的回调任务在另一个新的线程中执行。

  • 单向:发送者向RocketMQ发送消息时直接返回,不等待消息服务器的执行结果,也不注册回调函数,即只管发送,不关心是否成功。


​消息发送过程示意图

RocketMQ源码分析系列之四:消息发送_RocketMQ


​消息体结构​

消息实体​org.apache.rocketmq.common.message.Message,先来看一下它的成员变量及注释:

/**
* 消息主题
*/
private String topic;
/**
* 消息标签,用于消息过滤
*/
private int flag;
/**
* 消息属性,key值参考 org.apache.rocketmq.common.message.MessageConst
*/
private Map<String, String> properties;
/**
* 消息体
*/
private byte[] body;
/**
* 事务消息的事务ID
*/
private String transactionId;


消息发送方法​

类​DefaultMQProducerImpl的私有方法sendDefaultImpl,包含了状态检查、消息检查、路由查找、故障延迟等功能,具体代码及注释如下:

/**
* 私有消息发送方法
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//检查ServiceState
this.makeSureStateOK();
//检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//查找路由信息(负载均衡),详见方法体
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
//注意mq的赋值,第一次一定为null,每执行一次for循环就把当前选择的MessageQueue实例赋值给mq
//如果发送消息成功,则跳出for循环,如果失败则执行下一次循环,此时mq的值就是上一个循环中失败的那个
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//消息队列选择(负载均衡),详见方法体
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//更新失败Item
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}

return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}

throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());

log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}

if (sendResult != null) {
return sendResult;
}

String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));

info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}

if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}

throw mqClientException;
}

validateNameServerSetting();

throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

路由信息查找(tryToFindTopicPublishInfo)、消息队列选择(selectOneMessageQueue)、故障延迟(updateFaultItem)我们在之前已经介绍过了,接下来我们看下另一个私有的方法sendKernelImpl:

/**
* 当前类实际执行发送的方法
* @param msg 消息
* @param mq 将消息发送到该队列上
* @param communicationMode 消息发送模式:SYNC, ASYNC, ONEWAY
* @param sendCallback 异步消息回调
* @param topicPublishInfo 路由信息
* @param timeout 发送消息超时时间
* @return SendResult
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//获取到broker地址,详见方法体
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//当没有获取到broker地址时,尝试从name server获取路由信息
tryToFindTopicPublishInfo(mq.getTopic());
//再次获取broker地址
//注意,这里先执行tryToFindTopicPublishInfo,然后再执行findBrokerAddressInPublish,而不是在当前方法直接获取到broker地址
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
//vip,虚拟ip信道
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
//非MessageBatch设置全局唯一ID,因为MessageBatch已经设置过了
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

//判断使用使用namespace
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
//判断是否要对消息进行压缩,默认超过4k即压缩
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}

//如果有消息发送的hook,则在消息发送前执行
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}

if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}

long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//执行发送,详见方法体
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//执行发送
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

接下来我们看下执行发送的MQClientAPIImpl类的sendMessage方法:

/**
* 构造消息体并发送消息
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());

switch (communicationMode) {
//只发送,不关心结果
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
//异步发送
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
//同步发送
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}

return null;
}

在这个方法里,我们能看到ONEWAY、SYNC、ASYNC三种发送方式的具体实现。

​关于MQClientInstance​

我们已经不止一次的提到了MQClientInstance这个类,那么接下来我们来看下这个类及成员变量的说明:

/**
* MQ客户端实例,每台应用服务器将持有一个MQClientInstance对象,供该应用服务器的消费者,生产者使用
* 该类是消费者、生产者处理的核心类
*/
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
/**
* 客户端配置,DefaultMQProducer是其子类,赋值是多数会使用DefaultMQProducer实例
* 如:DefaultMQProducerImpl 类的 MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook)
*/
private final ClientConfig clientConfig;
/**
* 实例索引
*/
private final int instanceIndex;
/**
* 客户端ID
*/
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();
/**
* 生产组-消息生产者,每个生产者组在同一台应用服务器只会初始化一个生产者实例
*/
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
/**
* 消费组-消息消费者,每个消费者组在同一台应用服务器只会初始化一个消费者实例
*/
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
/**
* 处理运维命令
*/
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
/**
* netty客户端配置
*/
private final NettyClientConfig nettyClientConfig;
/**
* MQ客户端实现类
*/
private final MQClientAPIImpl mQClientAPIImpl;
/**
* MQ管理命令实现类
*/
private final MQAdminImpl mQAdminImpl;
/**
* topic路由信息
*/
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
/**
* broker地址信息,这些信息存在于NameServer,本地客户端进行缓存,供生产者、消费者共同使用
*/
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
/**
* 客户端命令处理器
*/
private final ClientRemotingProcessor clientRemotingProcessor;
/**
* 一个MQClientInstance只会启动一个消息拉取线程,为ServiceThread子类
*/
private final PullMessageService pullMessageService;
/**
* 队列动态负载线程,为ServiceThread子类
*/
private final RebalanceService rebalanceService;
/**
* 默认的消息生产者,注意这个实例,是在MQClientInstance构造器中new出来的
*/
private final DefaultMQProducer defaultMQProducer;
/**
* 消费者统计
*/
private final ConsumerStatsManager consumerStatsManager;
/**
* 心跳发送次数
*/
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
/**
* 状态
*/
private ServiceState serviceState = ServiceState.CREATE_JUST;

//以下省略
}

​两个截图​

最后我们看两个关于消息发送方法的截图,消息发送有很多重载方法,具体使用时根据需要选择即可:

RocketMQ源码分析系列之四:消息发送_RocketMQ_02

RocketMQ源码分析系列之四:消息发送_消息队列_03


结束

作为自己阅读源码的随记,不可避免存在问题和遗漏,欢迎指正错误和讨论。



原文在我的公众号“平凡的程序员”。

RocketMQ源码分析系列之四:消息发送_消息队列_04

举报

相关推荐

0 条评论