0
点赞
收藏
分享

微信扫一扫

【kafka原理】kafka Log存储解析以及索引机制


本文设置到的配置项有

名称

描述

类型

默认

num.partitions

topic的默认分区数

int

1

log.dirs

保存日志数据的目录。如果未设置,则使用log.dir中的值

string

/tmp/kafka-logs

offsets.topic.replication.factor

offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置)

short

3

log.index.interval.bytes

添加一个条目到offset的间隔

int

4096

首先启动kafka集群,集群中有三台Broker; 设置3个分区,3个副本;

发送topic消息

启动之后​​kafka-client​​​发送一个topic为消息​​szz-test-topic​​的消息

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 5; i++){
producer.send(new ProducerRecord<String, String>("szz-test-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}

发送了之后可以去​​log.dirs​​路径下看看

【kafka原理】kafka Log存储解析以及索引机制_索引

这里的3个文件夹分别代表的是3个分区; 那是因为我们配置了这个topic的分区数​​num.partitions=3​​; 和备份数​​offsets.topic.replication.factor=3​​; 这3个文件夹中的3个分区有​​Leader​​有​​Fllower​​; 那么我们怎么知道谁是谁的Leader呢?

查看topic的分区和副本

bin/kafka-topics.sh  --describe --topic szz-test-topic --zookeeper localhost:2181

【kafka原理】kafka Log存储解析以及索引机制_日志存储_02

可以看到查询出来显示

分区Partition-0​​在broker.id=0​​中,其余的是副本​​Replicas​​ 2,1

分区Partition-1​​在broker.id=1​​中,其余的是副本​​Replicas​​ 0,2

或者也可以通过zk来 查看​​leader​​在哪个broker上

get /brokers/topics/src-test-topic/partitions/0/state
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/szz-test-topic/partitions/0/state
{"controller_epoch":5,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
cZxid = 0x1001995bf

分区文件都有啥

进入文件夹看到如下文件:

【kafka原理】kafka Log存储解析以及索引机制_日志存储_03

【kafka原理】kafka Log存储解析以及索引机制_时间戳_04

名称

描述

类型

默认

log.segment.bytes

单个日志文件的最大大小

int

1073741824

我们试试多发送一些消息,看它会不会生成新的 ​​segment​

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 163840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 1200; i++){
//将一个消息设置大一点
byte[] log = new byte[904800];
String slog = new String(log);
producer.send(new ProducerRecord<String, String>("szz-test-topic",0, Integer.toString(i), slog));
}
producer.close();
}

【kafka原理】kafka Log存储解析以及索引机制_索引_05

从图中可以看到第一个segment文件​​00000000000000000000.log​​快要满​​log.segment.bytes​​的时候就开始创建了​​00000000000000005084.log​​了;

并且​​.log​​和​​.index​​、​​.timeindex​​文件是一起出现的; 并且名称是以文件第一个offset命名的

  • .log存储消息文件
  • .index存储消息的索引
  • .timeIndex,时间索引文件,通过时间戳做索引

消息文件

上面的几个文件我们来使用kafka自带工具​​bin/kafka-run-class.sh​​ 来读取一下都是些啥

​bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log​

【kafka原理】kafka Log存储解析以及索引机制_时间戳_06

最后一行:

baseoffset:5083  position: 1072592768  CreateTime: 1603703296169

.index 消息索引

​bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index​

【kafka原理】kafka Log存储解析以及索引机制_时间戳_07

最后一行:

offset:5083  position:1072592768

.timeindex 时间索引文件

​bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex​

【kafka原理】kafka Log存储解析以及索引机制_索引_08

最后一行:

timestamp: 1603703296169 offset: 5083

Kafka如何查找指定offset的Message的

找了个博主的图 @lizhitao

【kafka原理】kafka Log存储解析以及索引机制_kafka_09

比如:要查找绝对offset为7的Message:

  1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
  2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
  3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

Kafka 中的索引文件,以​​稀疏索引(sparse index)​​​的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 ​​log.index.interval.bytes​​​ 指定,默认值为 4096,即 4KB)的消息时,偏移量索引文件 和 时间戳索引文件 分别增加一个偏移量索引项和时间戳索引项,增大或减小 ​​log.index.interval.bytes​​ 的值,对应地可以缩小或增加索引项的密度。

稀疏索引通过 ​​MappedByteBuffer​​ 将索引文件映射到内存中,以加快索引的查询速度。

leader-epoch-checkpoint


leader-epoch-checkpoint 中保存了每一任leader开始写入消息时的offset; 会定时更新
follower被选为leader时会根据这个确定哪些消息可用
【kafka原理】kafka Log存储解析以及索引机制_时间戳_10


日常运维问题排查=> ​​滴滴开源LogiKM一站式Kafka监控与管控平台​​


【kafka原理】kafka Log存储解析以及索引机制_时间戳_11



举报

相关推荐

0 条评论