0
点赞
收藏
分享

微信扫一扫

读书笔记:Kafka源码解析与实战——Broker概述


Broker启动

Kafka安装包目录结构如下:bin、config、libs、LICENSE、logs、NOTICE,bin目录存放的是Kafka提供的管理工具(其中包括Broker的启动脚本),config目录存放的是Broker的配置文件;libs目录存放的是相关的jar包。
进入bin目录,执行以下命令后台启动Broker:​​​nohup ./bin/kafka-server-start.sh config/server.properties &​​,可见Broker是通过脚本kafka-server-start.sh调用起来的。脚本的内容主要逻辑如下:

if [ $# -lt 1];
then
echo "USAGE: $0
exit 1
fi
base_dir = $(dirname $0)
//省略中间步骤
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@

最终执行的是kafka.Kafka这个类,即内部package kafka里面的Kafka类。

object Kafka extends Logging {
def main(args: Array[string]): Unit = {
if (args.length != 1) {
println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
System.exit(1)
}
try {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
KafkaMetricsReporter.startReporters(serverConfig.props)
val kafkaServerStartable = new KafkaServerStartable(serverConfig) // KafkaServerStartable封装了KafkaServer
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {kafkaServerStartable.shutdown}
})
kafkaServerStartable.startup // 启动Kafka Server
kafkaServerStartable.awaitShutdown // 停住,进行服务,如果运行kafkaServerStartable.shutdown,则继续走下去,Broker退出
} catch {
case e: Throwable => fatal(e)
}
System.exit(0)
}
}

Broker内部的模块组成

KafkaServer这个类包含SocketServer(监听Socker请求)、KafkaRequestHandlerPool(请求处理资源池)、LogManage(日志管理)、ReplicaManager(分区副本管理)、OffsetManager(偏移量管理)、KafkaScheduler(后台任务调度资源池)、KafkaApis(业务逻辑实现层)、KafkaHealthcheck(提供Broker健康状态)、TopicConfigManager(Topic配置信息管理)和KafkaController(Kafka集群控制管理)。

KafkaController:由于Zookeeper上保存了Kafka集群的元数据信息,因此KafkaController通过在不同目录注册不同的回调函数来达到监测集群状态的目的,及时响应集群状态的变化。

  1. /controller目录保存了Kafka集群中状态为Leader的KafkaController标识,通过监测这个目录的变化可以及时响应KafkaController状态的切换。
  2. /admin/reassign_partitions目录保存了Topic重分区的信息,通过监测这个目录的变化可以及时响应Topic分区变化的请求。
  3. /admin/preferred_replica_election目录保存了Topic重分区的信息,通过监测这个目录的变化可以及时响应Topic分区变化的请求。
  4. /brokers/topics目录保存了Topic的信息,通过监测这个目录的变化可以及时响应Topic创建和删除的请求。
  5. /brokers/ids目录保存了Broker的状态,通过监测这个目录的变化可以及时响应Broker的上下线情况等。
    KafkaHealthcheck:Broker Server在/brokers/ids上注册自己的ID,当Broker在线的时候,则对应的ID存在;当Broker离线的时候,则对应的ID不存在,以此来达到集群状态监测的目的。
    TopicConfigManage:在/config/changes上注册自己的回调函数来监测Topic配置信息的变化。

SocketServer:首先开启1个Acceptor线程用于监听默认端口号为9092上的Socket链接,然后当有新的Socket链接成功建立时会将对应的SocketChannel以轮询的方式转发给N个Processor线程中的某一个,并由其处理接下来该SocketChannel上的读写请求,其中N=numm.neteork.threads,默认为3。当Processor线程监听来自SocketChannel的请求时,会将请求放置在RequestChannel中的请求队列;当Processor线程监听到SocketChannel请求的响应时,会将响应从RequestChannel中的响应队列中取出来并发送给客户端。
KafkaRequestHandlerPool:真正处理Socket请求的线程池,其个数默认为8个,由参数num.io.threads决定。该线程池里面的线程KafkaRequestHandler从RequestChannel的请求队列中获取Socket的请求,然后调用KafkaApis完成真正的业务逻辑,最后将响应写回至RequestChannel中的响应队列,并交由SocketServer中对应的Processor线程发送给客户端。
KafkaApis:Kafka的业务逻辑实现层,根据不同的Request执行不同的操作,其中利用LogManage、OffsetManger和ReplicaManager来完成内部的处理。KafkaApis处理的请求包括ProducerRequest、TopicMetadataRequest、FetchRequest、OffsetRequest、OffsetCommitRequest、OffsetFetchRequest、LeaderAndIsrRequest、StopReplicaRequest、OffsetCommitRequest、OffsetFetchRequest、LeaderAndIsrRequest、StopRelicaRequest、UpdateMetadataRequest、BrokerControlledShutdownRequest和ConsumerMetadataRequest。

LogManager:Kafka的日志管理模块。主要提供删除任何过期数据和冗余数据,刷新脏数据,对日志文件进行Checkpoint以及日志合并的功能。
ReplicaManager:Kafka的副本管理模块。主要提供针对Topic分区副本数据的管理功能,包括有关副本的Leader和ISR的状态变化、副本的删除、副本的监测等。
OffsetManager:Kafka的偏移量管理模式。主要提供针对偏移量的保存和读取的功能,Kafka管理Topic的偏移量存在两种方式:一种为Zookeeper,就是把偏移量提交至Zookeeper;另一种为Kafka,就是把偏移量提交至Kafka内部Topic为"__consumer_offsets"的日志里面,主要由offset.storage参数决定,默认为zookeeper。

KafkaScheduler:Kafka的后台任务调度资源池。提供后台定期任务的调度,主要为LogManage、OffsetManger和ReplicaManager提供调度服务。


举报

相关推荐

0 条评论