0
点赞
收藏
分享

微信扫一扫

rocketmq核心源码分析第七篇一消息处理第一部分一消息发送


文章目录

  • ​​图示​​
  • ​​核心特性​​
  • ​​源码分析一sendDefaultImpl​​
  • ​​源码分析一sendKernelImpl​​
  • ​​源码分析一MQClientAPIImpl.sendMessage​​
  • ​​组件图​​
  • ​​发送功能图​​
  • ​​sendMessage​​
  • ​​底层同步逻辑一sendMessageSync​​
  • ​​总结​​
  • ​​扩展点一tryToFindTopicPublishInfo​​
  • ​​updateTopicRouteInfoFromNameServer​​
  • ​​知识点一topic自动创建​​
  • ​​知识点一autoCreateTopicEnable为什么不推荐开启​​

图示

rocketmq核心源码分析第七篇一消息处理第一部分一消息发送_rocketmq

核心特性

  • 同步消息: 事务,顺序,普通 , 延时
  • 异步消息:延时
  • 单边消息:延时 发送失败丢失

源码分析一sendDefaultImpl

参数

作用

Message

消息体 ,如果是延时消息可设置延时级别

CommunicationMode

通信模式, 同步SYNC,异步ASYNC,单边ONEWAY

SendCallback

异步发送的回调函数

timeout

总发送超时[包含重试]

  • step-1: 消息校验
  • step-2: 获取发送消息主题的相关路由信息
  • step-3: 发送端负载计算
  • step-4: 发送消息
  • step-5: 处理发送延时后的broker可用性
  • step-6: sync处理重试

private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,/* 同步 异步 单向 模式*/
final SendCallback sendCallback,/* 异步模式时候的回调函数 */
final long timeout /* 发送超时时间*/
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
step-1 消息校验 主题合法 消息大小不超过4M 消息body非空
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
step-2 获取发送消息主题的相关路由信息 (1缓存2 nameserver获取 3兜底: 根据【TBW102】topic从broker获取默认相关信息)
其内部有个监听模式 如果新老路由信息不一样 需要更新本地内存生产者和消费者相关路由元信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
同步共三次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
step-3 轮询 选择broker 选择message queue
负载均衡决定选择哪个队列进行发送
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
失败重试发送前减去前面总发送的时间
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
总的发送超时
callTimeout = true;
break;
}
step-4 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
step-5 处理 sendLatencyFaultEnable对应的功能 发送失败isolation为true代表被隔离 false 延迟【endTimestamp - beginTimestampPrev】
更新失败策略,主要用于规避发生故障的 broker
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
// true 30s @see selectOneMessageQueue 如果选择策略时打开sendLatencyFaultEnable 则30秒不可选择改失败的broker
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...... 删除不同异常
...... 删除日志
throw e;
}
} else {
break;
}
}

if (sendResult != null) {
return sendResult;
}

...... 删除异常描述谢姐
MQClientException mqClientException = new MQClientException(info, exception);
throw mqClientException;
}
...... 删除其他代码
}

源码分析一sendKernelImpl

  • step1 获取broker信息[本地缓存 > nameserver获取 > 获取TBW102配置 ]
  • step2 设置消息唯一id[ip+进程id+ 类加载器hashcode + 时间戳 + 自增id等]
  • step3 系统压缩[4K以上则尝试压缩][批量消息不压缩][压缩比默认为5]
  • 钩子函数执行
  • 构建请求头
  • 通过MQClientAPI发送消息

private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

long beginStartTime = System.currentTimeMillis();
// step-1 获取broker地址 本地缓存
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 远程加载
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
判读生产者是否配置走vip通道从而发送修改发送端口为vip端口
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// step2 设置消息唯一id msgId unqid
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}

int sysFlag = 0;
// 超过4k则强制压缩
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
...... 删除禁止钩子执行,没有默认实现
// step-2: 发送钩子执行
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
...... 删除context参数设置
this.executeSendMessageHookBefore(context);
}
// 构建请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
...... 删除请求头参数设置

SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}

if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}

long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
异步发送其内部管理重试
sendResult = this.mQClientFactory.getMQClientAPIImpl(). sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
发送前判断sendKernelImpl已经耗时是否超时,如超时则不在发送远程请求
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// 发送完毕钩子
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
...... 删除其他异常处理
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

源码分析一MQClientAPIImpl.sendMessage

组件图

  • 生产者启动时已经介绍过
  • 上层组件消息最终通过通信层完成发送,通信层结构如下

发送功能图

  • client负责构建RemotingCommand rpc消息对象
  • 异步请求通过onexceptionImpl实现消息重试
  • rocketmq核心源码分析第七篇一消息处理第一部分一消息发送_rocketmq_02


sendMessage

  • 构建RemotingCommand
  • 根据communicationMode执行具体发送逻辑
  • 执行同步发送[异步 oneway]

public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
step-1 构建rpc消息头
RemotingCommand request = null;
...... 删除请求头构建细节
step-2 构建rpc消息体
request.setBody(msg.getBody());
step-3 根据communicationMode执行具体发送逻辑
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
step-4: 执行同步发送
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}

底层同步逻辑一sendMessageSync

  • 通过ResponseFuture等待超时机制完成netty底层channel的Response回调监听,实现通过

异步通过netty底层的callback函数处理上层的sendcallback函数[代码略]

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}

responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
同步等待响应
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}

return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

总结
  • 生产者发送消息,业务层包含事务,顺序,单边,延时消息
  • 根据发送方式分为单边,异步,同步消息
  • 消息的发送需要处理服务发现,负载选择,重试,异步,消息id计算,消息压缩等逻辑
扩展点一tryToFindTopicPublishInfo

topicPublishInfoTable包含topic以及其对应broker的MessageQueue集合

  • step1 本地缓存获取
  • step2 : step1如果找到可用的路由信息并返回
  • step3 尝试使用默认的topic【TBW102】去找路由配置信息作为本topic参数信息

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
step1 本地缓存获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
step-1 尝试从 NameServer获取配置信息并更新本地缓存配置 第一次不会自动创建
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}


// steo2 : step1如果找到可用的路由信息并返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
steo3 尝试使用默认的topic 【TBW102】去找路由配置信息作为本topic参数信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

updateTopicRouteInfoFromNameServer

  • step-1默认TBW102主题的配置构建指定topic的路由信息
  • step-2 通过nameserver获取自身的路由信息
  • step-3: 如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存。
  • step-3.1 更新生产者信息
  • step-3.2 更新消费者信息

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
----- 获取路由信息的命令对应nameserver的 GET_ROUTEINTO_BY_TOPIC (105)命令

TopicRouteData topicRouteData;
step-1默认TBW102主题的配置构建指定topic的路由信息
if (isDefault && defaultMQProducer != null) {
///获取默认topic的路由信息///
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
注意 虽然 [TBW102]默认有8个读写队列 但是生产者默认的队列数为4
最终取的是TBW102队列数和生产者默认数的小值也就是4
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
step-2 通过nameserver获取自身的路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
step-3: 如果有变化,则需要同步更新发送者、消费者关于该 topic 的缓存。
拿到最新的 topic 路由信息后,需要与本地缓存中的 topic 发布信息进行比较
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

路由存在变化
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

step-3.1 更新生产者信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String /* group name*/ , MQProducerInner/* defaultproducerImpl */>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

step-3.2 更新订阅者信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String/* consumer group name */, MQConsumerInner/*consumer group instance | DefaultMQPushConsumerImpl*/>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
return false;
}

知识点一topic自动创建
  • 发送消息时,topic不存在[nameserver无法获取相关信息],则采用TBW102主题作为改topic的配置,broker需要创建topic,
  • broker可能允许自动创建,也可能不允许报错[配置autoCreateTopicEnable决定]
知识点一autoCreateTopicEnable为什么不推荐开启
  • 假设有两台broker机器在处理生产者消息发送
  • TBW102获取的broker有两台机器
  • 生产者发送消息时通过负载策略,依据TBW102配置向其中一个broker发送仅一条消息
  • 被发送消息broker存在30秒自动注册nameserver机制,假设30秒producer无其他消息发送
  • 则producer再次拉取nameserver时获取的broker信息只有一台机器
  • 另外一台broker由于未接受producer发送的消息而导致未注册nameserver
  • 接下来造成只能向其中一台broker发送消息

1 虽存在缓存,但producer会定时拉取nameserver信息
2 打破此负载不均的方式是第一次拉取时不断发送消息,在broker未注册nameserver前,各broker均收到producer发送消息则所有broker都会注册nameserver




举报

相关推荐

0 条评论