0
点赞
收藏
分享

微信扫一扫

Broker注册到NameServer源码分析


写作目的

RocketMQ一个用Java写的开源项目,而且也是阿里开源的,所以想看一看设计思路以及一些细节,所以就写了这篇博客,记录一下Broker注册到Nameserver的过程以及心跳逻辑。

前提

要会Netty吧,如果不会的话,感觉应该看不懂吧。

跟源码思路

其实很多源码的讲解都是把一个类都标上注释,其实我感觉这样的人很厉害,因为他确实对这个代码很精通。我的风格比较偷懒,我们想看哪一部分就跟哪一部分和哪个分支,其他的没必要看,这样你就能偷懒了,所以这篇文章想跟的是Broker注册到NameServer源码以及Broker与NameServer的心跳。你调试的时候不也是这样吗,哪报错了你就进哪个分支,不关心的分支我们不去分析

注意:本文只关心Broker注册到NameServer和心跳逻辑,其他都不关心。

启动源码分析

源码版本

​​RocketMQ4.9.1​​

NameServer启动流程

Broker注册到NameServer源码分析_ide


启动入口我们从NamesrvStartup#main0开始

public static NamesrvController main0(String[] args) {

try {
//创建NameServer,其实就初始化一些变量,跳过
NamesrvController controller = createNamesrvController(args);
//启动controller
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

接下来就是NamesrvStartup#start方法

public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化,就是给一些属性初始化,其中remotingServer得到初始化
boolean initResult = controller.initialize();
//省略
//启动controller
controller.start();
return controller;
}

下面跟NamesrvController#start方法

public void start() throws Exception {
//启动NettyServer服务器
this.remotingServer.start();

if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

此时在NettyRemotingServer#start就启动了一个ServerBootstrap服务端

public void start() {
//省略
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});

//省略
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

//省略
}

综上:NameServer的启动流程的核心就是在NettyRemotingServer#start就启动了一个ServerBootstrap并监听9876端口

Broker启动流程

Broker注册到NameServer源码分析_java_02


还是直接看BrokerController#start方法吧,反正前面也是debug

public void start() throws Exception {

//省略

//向NameServer注册自己的信息
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
向NameServer注册自己的信息
this.registerBrokerAll(true, false, true);
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//心跳,重复注册自己
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}


}

Broker给NameServer发心跳

Broker启动一个定时任务,每次都会向NameServer注册自己,不断覆盖到NameServer存的Broker的信息,从而达到心跳的效果,我只能说一个字,秀。

//BrokerController#start
public void start() throws Exception {

//省略
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//心跳,重复注册自己
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

//省略
}

broker和NameServer之间维护连接

NameServer维护和Broker之间的连接

//NamesrvController#initialize
public boolean initialize() {
//省略

//定时任务,根据broker注册到nameServer的时间与此时此刻时间的阈值去判断该broker是否还存活
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

//省略

return true;
}

总结

1 broker和NameServer之间的心跳我以为是发送心跳包去实现的,结果是通过不断的向nameserver注册自己实现的
2 NameServer通过定时任务不断的扫描brokerLiveTable去根据时间阈值(broker注册的时间和此时此刻的时间差距)实现维护连接

参考

​​https://www.bilibili.com/video/BV1fE411V7Ho?p=6​​


举报

相关推荐

0 条评论