文章目录
Broker详解
Broker概述
Broker是RocketMQ体系中的核心组件之一,存储是Broker的核心功能之一,决定了整个RocketMQ体系的吞吐性能、可靠性和可用性。
Broker主要负责各种TCP请求(计算)和存储消息(存储)。Broker分为Master和Slave。Master主要提供服务,Slave在Master宕机后提供消费服务。
- Commitlog:这是一个目录,其中包含具体的commitlog文件。文件名长度为20个字符,文件名由文件保存消息的最大物理offset值在高位补0组成。每个文件大小一般是1GB,可以通过
mapedFileSizeCommitLog
进行配置。 - consumequeue:这是一个目录,包含该Broker上所有的Topic对应的消费队列文件信息。消费队列文件的格式为“./consumequeue/Topic 名字/queue id/具体消费队列文件”。每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息、更新位点使用。
- Index:这是一个目录,全部的文件都是按照消息key创建的Hash索引。文件名使用创建时的时间戳命名的。
- Config:这是一个目录,保存了当前Broker中全部的Topic、订阅关系和消费进度。这些数据Broker会定时从内存持久化到磁盘,以便宕机后恢复。
- abort:Broker是否异常关闭的标志。正常关闭时该文件会被删除,异常关闭时则不会。当Broker重新启动时,根据是否异常宕机决定是否需要重新构建Index索引等操作。
- checkpoint:Broker最近一次正常运行时的状态,比如最后一次正常刷盘的时间、最后一次正索引的时间等。
Broker启动和停止流程
启动命令分为两个脚本:./bin/mqbroker 和 ./bin.runbroker.sh。mqbroker准备了RocketMQ启动本身的环境数据。runbroker.sh主要设置了JVM启动参数.
BrokerStartup.java类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化Broker配置,创建BrokerController实例等工作,
BrokerController.java类是Broker的掌控者,他管理和控制Broker的各个模块,包含通信模块、存储模块、索引模块、定时任务等。在BrokerController全部模块初始化并启动成功后,将在日志输出info信息“boot success”。
具体启动过程如下:
-
初始化启动环境,由./bin/mqbroker 和 ./bin.runbroker.sh 两个脚本来完成。
-
初始化BrokerController,主要包含RocketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程。
在各个配置对象吃石化完毕后,程序会调用BrokerController.initialize()方法对Broker的各个模块进行初始化。
-
启动RocketMQ的各个组件
Broker关闭只是调用BrokerStartup.java中注册JVM Hook的BrokerController.shutdown()方法,该方法再调用各个模块关闭方法,最后关闭正常进程。Broker进行关闭处理完成后,日志输出info信息“Shutdown hook over”。
Broker存储机制
org.apache.rocketmq.store.CommitLog类负责处理全部消息的存储逻辑——普通消息、定时消息、顺序消息、未消费的消息和已消费的消息。Broker保存的消息是org.apache.rocketmq.store.MessageExtBrokerInner,继承org.apache.rocketmq.common.message.MessageExt,而 MessageExt 类继承org.apache.rocketmq.common.message.Message类:
// 消息
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
// 主题
private String topic;
// 暂时没确定什么用处
private int flag;
// 消息拓展信息,tag、keys、延迟级别、是否等待消息存储完成都存放在这里
private Map<String, String> properties;
// 消息体,byte数组,注意生产者和消费者的编码解码需要一致
private byte[] body;
// 事务id 事务消息时用
private String transactionId;
……
}
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
// 消息所在broker的名称
private String brokerName;
// 消息所在分区id
private int queueId;
// 消息存储大小
private int storeSize;
// 消息所在分区位置
private long queueOffset;
// 系统标志
private int sysFlag;
// 发送消息时间戳
private long bornTimestamp;
// 发送消息主机
private SocketAddress bornHost;
// 存储消息时间戳
private long storeTimestamp;
// 存储消息主机
private SocketAddress storeHost;
// 消息id
private String msgId;
// 消息所在commitlog位置
private long commitLogOffset;
// 消息内容CRC
private int bodyCRC;
// 重试消息重试次数
private int reconsumeTimes;
// 事务消息位点
private long preparedTransactionOffset;
……
}
public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
// 转String的properties
private String propertiesString;
// 消息tag的hashcode
private long tagsCode;
// 消息字节缓存,通过这个字节缓存写入文件
private ByteBuffer encodedBuff;
……
}
在上面可以看到CommitLog目录下有多个CommitLog文件。其实CommitLog只有一个文件,为了方便保存和读写被切分为多个子文件,所有子文件通过其保存的第一个和最后一个消息的物理位点进行连接。
Broker按照时间和物理的offset顺序写CommitLog,每次写的时候需要加锁,每个CommitLog子文件大小默认是1GB,可以通过mapedFileSizeCommitLog
进行配置。当一个CommitLog写满后,创建一个新的CommitLog继续上一个CommitLog的offset写操作,直到写满换成下一个文件。所有CommitLog子文件之间的offset是连续的,所以最后一个CommitLog总是被写入的。如果选择写入的CommitLog不足以写下此次消息内容,则创建一个新的CommitLog
为什么写文件这么快
RocketMQ是基于Java编写的消息中间件,支持已忘记的消息扭转和保存,而RocketMQ写文件这么快源于使用了以下的技术:
-
Page Cache:现代操作系统内核被设计为按照Page读取文件,每个Page默认为4KB。因为程序一般符合局部性原理,所以操作系统在读取一段文件内容时,会将该段内容和附近的文件内容都读取到内核Page中(预读),下次读取的内容如果命中Page Cache就可以直接返回内容,不用再次读取磁盘。
Page Cache机制也不是完全不缺点,当遇到操作系统进行脏页会写、内存回收、内存交换等情况时,就会引起较大的消息读写延迟。对于这些情况,RocketMQ采用了多种优化技术,比如
内存预分配
、文件预热
、mlock系统调用
等,以保证在最大限度地发挥Page Cache机制的优点的同时,尽可能减少消息读写延迟。(尽量采用SSD独享磁盘,这样可以最大限度地保证读写性能) -
Virtual Memory:为了保证每个程序有足够的的运行空间和编程空间,可以将一些暂时不用的内存数据保存到交换区(其实就是磁盘)中。虚拟内存不是真的内存。操作系统可分配内存的大小 = 虚拟内存大小 + 物理内存大小。
-
零拷贝和Java文件映射:正常的文件读取流程,操作系统会将磁盘中的文件内容读取到内核态中,然后将内核态的数据拷贝到用户态中,最后从用户态中拷贝到Java进程的某个变量地址。零拷贝则是跳过用户态的拷贝,少了一次数据拷贝,内核态的数据直接拷贝到Java进程的某个变量地址中。java.nio.MappedByteBuffer.java文件实现了零拷贝技术,即Java进行映射到内核态内存。
Broker存储消息流程详解
每次Broker请求进来处理完后,首先将消息数据写入操作系统的Page Cache,然后根据同步、异步刷盘策略进行消息刷盘(即存储)
(1)Broker接收客户端发送消息的请求并做预处理
org.apache.rocketmq.broker.processor.SendMessageProcessor类用于处理Producer发送的消息,分为同步处理processRequest方法和异步处理asyncProcessRequest方法:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
从上面的代码可以看出,其实同步和异步处理调用的方法都是asyncProcessRequest方法异步处理请求,只是processRequest同步处理方法会执行*CompletableFuture.get()*方法,这个方法是个阻塞方法,阻塞到有结果返回未知,以此来实现同步效果。
而asyncProcessRequest异步处理方法则会等待有结果返回后异步通过线程池brokerController.getSendMessageExecutor()
执行传入的回调函数。
实际的处理逻辑在*asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request)*方法中,如下:
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
// 消息消费失败返回Broker
case RequestCode.CONSUMER_SEND_MSG_BACK:
// 重试消费消息处理
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
// 根据请求构建SendMessageContext对象,记录消息的相关信息
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 执行Hook
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
// 如果批量消息,走批量消息处理逻辑
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
(2)Broker存储前消息预处理
此处以处理的是单条消息为例,则是进行asyncSendMessage方法逻辑,这里主要逻辑器对消息进行预处理:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// 1、构建RemotingCommand response 对象
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
// 根据topic查询Broker中记录的Topic配置信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
// 指定的队列id小于0,随机取
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 2、处理重试和死信消息
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
} else {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
// 事务消息处理
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事务消息的处理实际仍是asyncPutMessage方法,只是对原本的信息msgInner做事务消息的处理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 普通单条消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
1、通过preSend方法构建RemotingCommand response 对象,并对消息进行校验:
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
response.setCode(-1);
// 消息校验
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
// 校验是否有权限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
// 校验topic格式
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
// 校验topic是否是在NOT_ALLOWED_SEND_TOPIC_SET集合内,NOT_ALLOWED_SEND_TOPIC_SET集合是MQ内部定义topic
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response;
}
// 获取topic对应的配置信息
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
// 如果topicConfig为空,尝试根据defaultTopic创建topic,defaultTopic一般为“TBW102”
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
// 如果是重试消息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// 不用根据 AUTO_CREATE_TOPIC_KEY_TOPIC配置,而是直接创建新的Topic配置,跟上面的差不多,只是只有1个读写队列
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
方法msgCheck除了对消息权限、topic进行校验外,这里还会在根据消息传入的Topic找不到TopicConfig时,调用createTopicInSendMessageMethod方法创建对应的TopicConfig信息。这里传入的defaultTopic
就是Producer、Broker中的配置AUTO_CREATE_TOPIC_KEY_TOPIC
(默认值为TBW102),这个createTopicKey
在Broker启动时会根据Broker配置autoCreateTopicEnable
(是否允许自动创建Topic,默认为true)创建好createTopicKey
的TopicConfig信息,代码在org.apache.rocketmq.broker.topic.TopicConfigManager的构造方法中:
public TopicConfigManager(BrokerController brokerController) {
……
{
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
……
}
org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageMethod方法的实现就是当Broker的autoCreateTopicEnable
配置为true时(表示允许自动创建topic),根据上面启动Broker创建的AUTO_CREATE_TOPIC_KEY_TOPIC
的TopicConfig信息创建传入的消息的topic信息。如果Broker的autoCreateTopicEnable
为false,则更新AUTO_CREATE_TOPIC_KEY_TOPIC
的TopicConig的权限,去除PERM_INHERIT
权限(如果没有这个权限则无法创建消息的topic的TopicConfig)
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
// 如果defaultTopic是AUTO_CREATE_TOPIC_KEY_TOPIC,即“TBW102”
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
// 如果broker设置的autoCreateTopicEnable为true,即允许自动创建topic,默认为true
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
// 设置defaultTopicConfig的权限PermName.PERM_READ | PermName.PERM_WRITE,这里是为了让下一步判断defaultTopicConfig是否有继承权限为false
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
// 如果defaultTopicConfig拥有继承权限,即PERM_INHERIT,则根据defaultTopicConfig创建传入的topic的TopicConfig信息
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
// 这里是异或,表示继承出来的topic的TopicConfig不具有继承权限
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
defaultTopic, remoteAddress);
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
// 更新topicConfigTable
this.topicConfigTable.put(topic, topicConfig);
// 递增一个dataVersion
this.dataVersion.nextVersion();
createNew = true;
// TopicConfig持久化
this.persist();
}
} finally {
this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
// 如果是自动创建新的Topic,强制注册Broker信息到每个NameServ
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
2、回到asyncSendMessage方法中,接下来主要的就是handleRetryAndDLQ方法,处理重试和死信消息:
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
// 如果消息是重试消息
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
// 根据groupName查找订阅信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
// 订阅信息不存在,返回报错
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// 如果重试次数大于最大重试次数 默认16
if (reconsumeTimes >= maxReconsumeTimes) {
// 将消息放入死信队列 topic以 %DLQ% 开头
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
// 获取或创建死信topic的TopicConfig
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
msg.setDelayTimeLevel(0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
// 如果topic是MULTI_TAG
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
如果接受的消息是重试消费消息,获取订阅信息中的消息重消费次数,如果过消费次数大于等于最大消费次数(默认16次),则将消息放进死信队列,即topic以 %DLQ%
开头,获取或者创建该topic的TopicConfig信息。
(3)消息校验和存储模块校验
接下来以普通单例消息为例,会将预处理好的消息作为传参传入到org.apache.rocketmq.store.DefaultMessageStore.asyncPutMessage方法中:
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
// 1、校验存储服务状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 2、校验消息
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
// 真正执行消息存储逻辑
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
// 更新putMessageEntireTimeMax
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
return putResultFuture;
}
1、checkStoreStatus方法作用:
-
校验存储模块是否关闭,关闭则允许存储
-
校验Broker是否是SLAVE,如果是不允许存储
-
校验Broker磁盘运行标志,如果运行标志不允许写,磁盘满了或者写入队列、写入index错误等会导致运行状态不允许写
-
校验存储服务是否处于繁忙,如果处于繁忙则返回
OS_PAGECACHE_BUSY
:@Override public boolean isOSPageCacheBusy() { // 获取最近一次存储的开始时间(初始化为0),写入成功后会重置为0 long begin = this.getCommitLog().getBeginTimeInLock(); // 通过校验当前时间减去最近一次存储的开始时间是否小于10000000且大于1000判断 long diff = this.systemClock.now() - begin; return diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); }
2、checkMessage方法作用:
- 校验消息的topic长度不能超过
Byte.MAX_VALUE
即127 - 校验消息的properties扩展信息不能为空,且长度不能超过
Short.MAX_VALUE
即32767
(4)将消息写入CommitLog
接下来就是真正执行消息的存储逻辑方法:org.apache.rocketmq.store.CommitLog#asyncPutMessage:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 1、Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 2、如果延迟消息处于TRANSACTION_NOT_TYPE、TRANSACTION_COMMIT_TYPE
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 再次设置延迟消息的topic和queueId
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
// putMessageThreadLocal是线程隔离的PutMessageThreadLocal对象,线程安全
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 将msg消息信息转换为byte数组,存储在PutMessageResult的MessageExtEncoder的encoderBuffer中
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// putMessageLock根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。spin则是CAS锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 3、获取最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// 记录每次存储的开始时间,用于计算CommitLog存储服务是否处于繁忙
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
// 如果最新的MappedFile为空或者满了,尝试获取最新或者创建新的MappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// 创建一个新mappedFile,重试写入信息
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
// 4、存储消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
// 如果允许预加载且unlockMappedFile不为空,则会在创建的时候执行预加载操作,会调用mlock方法锁住MappedFile,这里调用munlock解锁
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
// 5、同步刷盘、异步刷盘操作
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 6、复制操作
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}
1、设置消息的保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
2、延迟消息处理,如果是延迟消息,会设置消息的延迟等级、设置topic为TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
即SCHEDULE_TOPIC_XXXX
、queueId,并且备份原来的topic和queueId,以便延迟消息在投递后被消费者消费
3、使用putMessageLock
锁住接下来的代码区,这里的putMessageLock
有两种锁,通过StoreConfig的useReentrantLockWhenPutMessage决定,默认为true表示使用可重入锁。*mappedFileQueue.getLastMappedFile()*获取最后一个MappedFile,如果获取的MappedFile为空或者已写满(1G),尝试获取最新或者创建最新的MappedFile:
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
// mappedFileLast.getFileFromOffset()获取的就是文件的起始点,通过createOffset设值
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
// 创建MappedFile
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
// 当前创建的文件路径,以及下一个要创建的文件路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
// 通过requestTable、requestQueue列表mmap异步线程同步创建mappedFile
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
doCreateMappedFile方法中,根据MappedFileQueue(MqppedFile的队列类,存储多个MappedFile)的allocateMappedFileService
不为空则执行AllocateMappedFileService#putRequestAndReturnMappedFile逻辑(CommitLog的MappedFileQueue初始化带有allocateMappedFileService,所以这里执行allocateMappedFileService的逻辑)。否则直接以nextFilePath、mappedFileSize(默认1G)创建MappedFile对象(查看MappedFile的构造方法得知上面fileFromOffset
就是获取file.getName即createOffset
)
AllocateRequest自定义了equal/hashCode方法,这里不再列出其源码,如果该filePath和fileSize已经在requestTable中, 则表示此次所需分配的MappedFile已经在上次分配时被预分配了同时要注意的是AllocateRequest也实现了Comparable接口放入优先队列中可自动排序,文件偏移小的会先被分配
AllocateMappedFileService#putRequestAndReturnMappedFile主要是把nextFilePath
、nextNextFilePath
和fileSize(默认1G)
创建AllocateRequest对象,并放到requestQueue
阻塞队列和requestTable
同步Map集合中。然后通过在requestTable
集合获取nextFilePath
的AllocateRequest对象中的CountDownLatch对象进行锁等待(CountDownLatch对象默认等待数为1),如果过等待返回成功则将对象在requestTable
移除,并返回创建的MappedFile对象。
此处关键是在于AllocateMappedFileService继承了ServiceThread类,在DefaultMessageStore构造方法中初始化完后会调用Thread#start启动线程。众所周知,线程启动后便会运行实现Runnable#run方法:
public void run() {
log.info(this.getServiceName() + " service started");
// 执行mmapOperation方法
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 拿到放在最前面的AllocateRequest对象
req = this.requestQueue.take();
// 根据拿到的AllocateRequest对象的filePath获取requestTable集合中存储的对象
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
// 如果不一致,出错
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
// 创建MappedFile实例
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
// 设置MappedFile
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
// 如果出错设置hasException为true,下次进行执行putRequestAndReturnMappedFile会打印警告信息并返回null
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
// io错误重新将AllocateRequest放进requestQueue中,等待下次重新执行mmapOperation
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
// AllocateRequest的countDownLatch的count减少1
req.getCountDownLatch().countDown();
}
return true;
}
mmapOperation方法通过获取顺序先进先出的requestQueue
获取AllocateRequest对象创建MappedFile对象,如果过成功,则让AllocateRequest对象的countDownLatch
的count减少1变成0,这时AllocateMappedFileService#putRequestAndReturnMappedFile中在过期时间内等待的的AllocateRequest返回true,便可以返回上面创建的MappedFile对象并在requestTable中移除:
……
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
……
4、完成获取或者创建一个最新的MappedFile对象,我们回到asyncPutMessage方法,接下来就是真正执行存储消息的逻辑org.apache.rocketmq.store.MappedFile.appendMessagesInner方法:
、 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 当前mappedFile写入的位置
int currentPos = this.wrotePosition.get();
// 如果当前写入位置小于fileSize:1G
if (currentPos < this.fileSize) {
// writeBuffer从transientStorePool中的availableBuffers的队列中获取,默认每隔byteBuffer大小是1G、数量是5个
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// slice后 byteBuffer内容即是writeBuffer/mappedByteBuffer的内容,只是position为0,这里设置position为currentPos
byteBuffer.position(currentPos);
AppendMessageResult result;
// 单条信息
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
// 批量信息
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新wrotePosition加上写入的字节数
this.wrotePosition.addAndGet(result.getWroteBytes());
// 更新MappedFile的存储时间,即这次存储的消息的存储时间戳
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
然后通过调用CommitLog的内部类DefaultAppendMessageCallback#doAppend将之前消息的内容缓存encodedBuff
转移到上面说的MappedFile记录的缓存byteBuffer
并添加其他必要信息。这里需要注意的是当消息的长度加上文件结束符子节数大于maxBlank
(剩余可写入的字节数 fileSize - currentPos),则返回END_OF_FILE
,造成存储消息失败引起创建一个新的MappedFile再次尝试存储。但是这个MappedFile也会写入空余大小(4byte)、BLANK_MAGIC_CODE(文件结束符,4byte),虽然只写了,但是返回写入了多少子节是返回空余大小:
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
然后更新wrotePosition
的值加上本次存储写入的子节数和最近的存储时间戳storTimestamp
,保证下次存储。
执行完org.apache.rocketmq.store.MappedFile#appendMessagesInner方法,消息已经存储到内存中(MappedFile的writeBuffer
或者mappedByteBuffer
)
根据返回的AppendMessageResult对象的status
判断,如果是PUT_OK则继续后续处理,如果是END_OF_FILE则就是上面说的创建一个新的MappedFile再次尝试存储,其他则返回异常。
5、接下来就是同步刷盘、异步刷盘方法org.apache.rocketmq.store.CommitLog#submitFlushRequest:
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
// 如果配置是同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘,flushCommitLogService是GroupCommitService实例
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 消息配置是等待存储完成后返回结果
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 保存request到requestsWrite链表
service.putRequest(request);
return request.future();
} else {
// 异步
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
// 异步刷盘
else {
// 如果不允许暂存池配置
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// flushCommitLogService是FlushRealTimeService实例
flushCommitLogService.wakeup();
} else {
// 异步转存
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
RocketMQ刷盘有两种刷盘方式:
-
同步刷盘:使用org.apache.rocketmq.store.CommitLog.GroupCommitService内部线程类处理(GroupCommitService实现了Runnable接口)–同步刷盘服务。在Broker存储消息到MappFile的
mappedByteBuffer
(其实就是Page Cache)中,同步将mappedByteBuffer
刷到磁盘,在返回客户端并写入结果。GroupCommitService#putRequest方法将创建的GroupCommitRequest对象保存到
requestsWrite
链表中,并叫醒当前线程public synchronized void putRequest(final GroupCommitRequest request) { lock.lock(); try { this.requestsWrite.add(request); } finally { lock.unlock(); } // 尝试将waitPoint CountDownLatch2 count-1 达到0 this.wakeup(); }
GroupCommitService#run会检查线程状态stop,每次等待10s后再执行GroupCommitService#doCommit方法进行刷盘。如果线程被终止shutdown,会等待10s等待所有requset到达在一次性将剩余的request进行刷盘。
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } // shutdown,执行刷盘 synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }
上面的ServiceThread#waitForRunning方法会校验
hasNotified
是否已经置为已提醒(上面wakeup方法会尝试将hasNotified
置为true),是的话将hasNotified
置为false以便下次的等待叫醒,执行GroupCommitService#onWaitEnd重写方法,将GroupCommitService中的requestsWrite
等待刷盘的请求替换到requestsRead
中让下一步刷盘从requestsRead
读取数据;如果hasNotified
为false,则重置waitPoint
的count为1,并等待interval
毫秒(上面的wakeup方法就会将waitPoint
的count置为0,让这里结束等待继续逻辑),然后就是将hasNotified
置为false并执行GroupCommitService#onWaitEnd重写方法。这里,RocketMQ源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。
接下啦就是执行GroupCommitService#doCommit方法:
private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush // 因为存在上一个文件剩余子节不够写入的情况,下一个文件中可能有消息,但是上一个文件的文件结束符等需要flush,所以最多两次flush // 如果mappedFileQueue记录的刷盘位置大于等于下一个消息的起始位置,表示已经刷盘成功 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { // 刷盘 CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 重置requestsRead this.requestsRead = new LinkedList<>(); } else { // Because of individual messages is set to not sync flush, it // will come to this process // 因为个别消息设置为不同步flush,所以会走到这个过程 CommitLog.this.mappedFileQueue.flush(0); } }
这里需要注意的是它刷新的次数是最少2次,因为存在上一个文件剩余子节不够写入的情况,下一个文件中可能有消息,但是上一个文件的文件结束符等需要flush。但是看上面的源码得知,虽然只是写入了空闲空间和文件结束符,但是返回的结果中写入的子节却是剩余的空闲空间,这样可以保证第二次刷盘根据刷盘位置查找MappedFile时可以找到正确的MappedFile。
org.apache.rocketmq.store.MappedFileQueue.flush方法主要是根据
flushedWhere
找到对应需要刷盘的MappedFile实例,然后调用该MappedFile实例的flush方法刷盘,更新flushedPosition
刷盘位置(这是是同步服务逻辑,执行的是*MappedByteBuffer#force()*逻辑)需要注意的是在MappedFile实例的flush方法中调用了hold方法,增加当前MappedFile实例引用数,目的是为了在MappedFile对象(映射文件)被销毁时尽量不要对在读写的数据造成困扰。只有MappedFile当前引用数大于0时才允许刷盘操作。
public boolean flush(final int flushLeastPages) { boolean result = true; // 根据flushedWhere上一次刷盘位置找到接下来需要刷盘的mappedFile MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 刷盘 int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; // 更新flushedWhere this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; } public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 找到mappedFiles中第一个MappedFile MappedFile firstMappedFile = this.getFirstMappedFile(); // 找到mappedFiles中最后一个MappedFile MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 如果offset<第一个MappedFile的起始位置 或者 offset >= 最后一个文件的终结位置 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // offset对应的第几个mappedFile位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 获取offset对应的MappedFile targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 如果offset不在targetFile写入范围内,则从mappedFiles找offset在MappedFile写入范围内的返回 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 当returnFirstOnNotFound为true,标志找不到MappedFile时返回第一个 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; } public int flush(final int flushLeastPages) { // 如果允许刷盘 if (this.isAbleToFlush(flushLeastPages)) { // 增加当前MappedFile引用 if (this.hold()) { // 已写入/已提交位置 int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. // 使用了暂存池 if (writeBuffer != null || this.fileChannel.position() != 0) { // fileChannel强制刷盘 this.fileChannel.force(false); } else { // mappedByteBuffer强制刷盘 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 更新刷盘位置 this.flushedPosition.set(value); // 释放当前MappedFile的引用,并且当MappedFile不可用时清空mappedByteBuffer this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
-
异步刷盘:如果Broker配置了读写分离(即暂存池
transientStorePoolEnable
),则异步刷盘包含异步转存数据和真正的异步刷盘操作,如果没配置读写分离则直接进行异步刷盘操作。-
异步转存数据服务是org.apache.rocketmq.store.CommitLog.CommitRealTimeService,当Broker启动了读写分离,会执行**commitLogService.wakeup();**叫醒CommitRealTimeService继续执行run方法:
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 转存间隔 默认200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 最小转存Page Cache数,默认4 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 两次转存最长时间间隔。默认200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { // 转存到Page Cache中 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. // 叫醒刷盘线程 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }
异步数据转存操作在org.apache.rocketmq.store.MappedFileQueue#commit方法中,跟flush方法类似,MappedFileQueue记录了转存位置
committedWhere
,根据committedWhere
找到对于MappedFile文件,然后调用MappedFile#commit方法根据MappedFile记录的committedPosition
和wrotePosition
将在writeBuffer
(其实就是直接内存 DM)需要提交转存的数据转存到fileChannel(Page Cache)中,更新committedWhere
、committedPosition
。异步数据转存完成后,返回false(第一次比对为false)就会启动真正的异步刷盘服务。
-
真正的异步刷盘服务是org.apache.rocketmq.store.CommitLog.FlushRealTimeService,关键在于run方法,会每隔500ms执行一次刷盘操作。这里的刷盘方法的逻辑跟上面同步刷盘的方法逻辑一致,只是在org.apache.rocketmq.store.MappedFile#flush真正的刷盘逻辑中执行的是**this.fileChannel.force(false);**逻辑。
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 默认值是true boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 默认值是500,刷盘间隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 默认值是4 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 默认值是1000 * 10,两次刷盘操作的最长间隔时间 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); // 当前刷盘操作距离上次刷盘操作大于flushPhysicQueueThoroughInterval,设置flushPhysicQueueLeastPages=0,表示继续将上次未完成的数据进行刷盘 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); // 刷盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }
-
6、复制操作:请看 Broker主从同步详解.md
Broker读写分离机制
在RocketMQ中,有两处地方使用了“读写分离”机制。
- Broker Master-Slave读写分离:写消息到Master Broker,从Slave Broker读取消息。Broker配置为
slaveReadEnable = true(默认为false)
,消息占用内存百分比配置为accessMessageInMemoryMaxRatio = 40(默认)
- Broker Direct Memory-Page Cache读写分离:写消息到Direct Memory(直接内存,简称DM),从操作系统的Page Cache中读取消息。Master Broker配置读写分离开关为
transientStorePoolEnable = true(默认false)
,写入DM存储数量,配置transientStorePoolSize(默认为5)
至少大于0,刷盘类型配置为flushDiskType=FlushDiskType.ASYNC_FLUSH
,即异步刷盘(其实就是上面的暂存池配置)
Master-Slave读写分离
该机制实现分为以下两个步骤:
-
Broker在处理Pull消息时,计算下次是否从Slave拉取消息,是通过org.apache.rocketmq.store.DefaultMessageStore#getMessage方法实现的,代码如下:
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { …… // 表示还有多少消息没有拉取 long diff = maxOffsetPy - maxPhyOffsetPulling; // TOTAL_PHYSICAL_MEMORY_SIZE表示当前Master Broker全部的物理内存大小、 // memory是Broker认为可使用的最大内存,为总物理内存的40%(默认) long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // 设置下次从Master或Slave拉取消息 getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } }
diff > memory 为true表示没有拉取的消息比分配的内存大,说明此时Master Broker内存繁忙,应该选择从Slave拉取消息。
-
通知客户端下次从哪个Broker拉取消息。在消费者Pull消息返回结果时,根据第一步设置的
suggestPullingFromSlave
值返回给消费者,该过程通过org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest方法实现:// 根据上面获取消息返回的suggestPullingFromSlave设置suggestWhichBrokerId if (getMessageResult.isSuggestPullingFromSlave()) { // whichBrokerWhenConsumeSlowly默认为1,表示从Slave拉取 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: // 如果Slave没有Read权限,重新设置suggestWhichBrokerId为Master Broker if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }
从上面源码可以知道,如果需要让Consumer从Slave中拉取信息,需要设置
slaveReadEable = true
。消费会根据返回的suggestWhichBrokerId
决定下次拉取消息从哪个Broker中拉取。
Direct Memory-Page Cache读写分离
从org.apache.rocketmq.store.MappedFile#appendMessagesInner方法中可以得知:
// writeBuffer从transientStorePool中的availableBuffers的队列中获取,默认每隔byteBuffer大小是1G、数量是5个
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
writeBuffer
表示从DM中申请的缓存,mappedByteBuffer
表示从Page Cache中申请的缓存。如果过Broker设置了transientStorePoolEnable
=true,异步刷盘并且Broker是Master,则存储层org.apache.rocketmq.store.DefaultMessageStore在初始化时会调用TransientStorePool#init方法,按照配置的BUffer个数初始化wrieteBuffer
。
初始化writeBuffer
后,当生产者将消息发送到Broker时,Broker将消息写入writeBuffer
,然后异步转存服务不断地从DM中Commit到Page Cache中(将writeBuffer
的数据写入到fileChannel
中),而消费者始终从mappedByteBuffer
(即Page Cache)(mappedByteBuffer
能获取到写入fileChannel
的数据)中读取消息。
读写分离能够最大限度地提供吞吐量,同时会增加数据不一致的风险,建议在生产环境中慎用。