启动过程示意图
启动过程如下图所示,可以结合源码逐步查看,本文也会结合源码进行介绍。
我们先看下DefaultMQProducer和DefaultMQProducerImpl的类图,从下面类图可以看到两个类并没有继承关系。
启动入口
RocketMQ的消息生产者启动入口是DefaultMQProducer的start方法,在这个方法内会再次调用DefaultMQProducerImpl的start的无参方法,进而转调start(final boolean startFactory),我们来看下源码及我添加的中文注释:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//检查配置
this.checkConfig();
//如果producer group不是MixAll.CLIENT_INNER_PRODUCER_GROUP,变更为pid
//注意:MixAll.CLIENT_INNER_PRODUCER_GROUP是client内部用的一个分组
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//创建MQClientInstance实例
//MQClientManager在jvm中只有一个实例,它有个Map维护多个ClientID到MQClientInstance的映射,即ConcurrentMap<String/* clientId */, MQClientInstance>
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//向MQClientInstance注册消息生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
过程介绍
接下来我们分别看下上面的start方法调用的其它三个重要的方法:
this.defaultMQProducer.changeInstanceNameToPID();
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
mQClientFactory.start();
首先我们一起来看一下changeInstanceNameToPID做了什么?
public void changeInstanceNameToPID() {
//如果消息消费模式为集群模式,并且当前的实例名为 DEFAULT,替换为当前客户端进程的PID
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
接下来我们一起来看下getOrCreateMQClientInstance:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//构建ClientID,ClientIP+InstanceName+UnitName,其中UnitName是可选的
//前面已经执行过changeInstanceNameToPID了,不用担心同一台机器部署两个应用的情况下InstanceName重复的问题了
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
这里我们再多看一下buildMQClientId做了什么:
/**
* 一个客户端IP+InstanceName只有一个MQClientInstance实例
*/
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
最后再看下MQClientInstance的start方法(即mQClientFactory.start):
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
//MQ客户端启动,最终是NettyRemotingClient的启动
this.mQClientAPIImpl.start();
// Start various schedule tasks
//启动多个定时任务
this.startScheduledTask();
// Start pull service
//拉取消息服务类启动
this.pullMessageService.start();
// Start rebalance service
//负载均衡服务类启动
this.rebalanceService.start();
// Start push service
//推送消息服务类启动
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
MQClientInstance启动
接下来我们一起看看MQClientInstance的start方法调用的内容,仍然是看下带注释的源码:
1)this.mQClientAPIImpl.start();
关于这个方法,没有太多需要说明的,最终是调用的NettyRemotingClient的start方法,关于远程通信的内容,后面会有相应的介绍。
2)this.startScheduledTask();
这个方没有增加额外的注释,简单说一下都启动了哪些定时任务:获取name server的地址;更新topic路由信息;清理离线booker;发送心跳;持久化consumer的offset;调整线程池。相应的代码如下:
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
MQClientInstance.this.persistAllConsumerOffset();
MQClientInstance.this.adjustThreadPool();
3)this.pullMessageService.start();与this.rebalanceService.start();
启动ServiceThread的子类(线程类)。
4)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
这里又调用到了启动入口对应的方法了,只是参数为false,不会执行下面这段代码:
//启动MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
结语
到这里已经介绍完了消息生产者的启动过程,作为自己阅读源码的随记,不可避免存在问题和遗漏,欢迎指正错误和讨论。
原文发表于我的公众号“平凡的程序员”。