目录
管理Kafka
管理Kafka集群意味着需要使用额外的工具对主题、配置等做出修改,可以通过命令行脚本和编程的方式对Kafka进行管理。
1 命令行操作
Kafka提供了一些命令行工具,以用于对集群做出变更。这些工具使用Java类实现,Kafka提供了一些脚本来调用它们。不过,这些工具只提供了基本的功能,无法完成复杂的操作,难以被用于管理大规模的集群。
虽然Kafka为主题操作提供了身份验证和授权机制,但默认配置并没有对这些工具的使用进行严格的限制。也就是说,不需要经过身份验证也可以使用这些工具,并可以在没有安全检查和审计的情况下执行诸如修改主题之类的操作。为防止发生未经授权的变更,需要确保只有管理员可以使用这些工具。
脚本 | 说明 |
---|---|
kafka-topics.sh | 执行大部分与主题相关的操作 |
通用参数 | 参数值示例 | 说明 |
---|---|---|
--bootstrap-server | 172.26.143.96:9092 | 要连接的集群入口主机和端口 |
--command-config | ||
--config |
1.1 Topic操作
1.1.1 创建主题
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-topics.sh | –create | N/A | 要用脚本执行的命令名:创建主题 |
--topic | peter-test-1 | 要创建的主题的名称 | |
--replication-factor | 3 | 主题的副本数 | |
--partitions | 8 | 主图的分区数 | |
--disable-rack-aware | N/A | 关闭机架感知分配策略 | |
--if-not-exists | N/A | 一般用于自动化执行脚本中,如果主题不存在就创建,如果存在就忽略,不会报错 |
例如,以下命令创建名为peter-test-1的topic, topic拥有8个分区,每个分区有3个副本。
1.1.2 列出集群所有主题
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-topics.sh | –list | N/A | 要用脚本执行的命令名:列出集群所有主题 |
例如:
1.1.3 列出主题详情
此命令还可按照参数过滤主题。
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-topics.sh | –describe | N/A | 要用脚本执行的命令名:列出主题详细信息。 |
--topic | peter-test-1 | 要创建的主题的名称。 | |
--topics-with-overrides | N/A | 这个参数只会列出配置参数与集群默认值不同的主题。 | |
--exclude-internal | N/A | 排除内部主题,它可以将所有名字以双下划线开头的主题排除。 | |
--under-replicated-partitions | N/A | 这个参数可以找出一个或多个副本与首领不同步的分区。这并不一定是坏事,因为集群维护、部署和再均衡都会导致分区副本不同步,但还是要注意一下。 | |
--at-min-isr-partitions | N/A | 这个参数可以找出副本数量(包括首领在内)与配置的最少同步副(ISR)数完全匹配的分区。这些主题对生产者客户端或消费者客户端来说仍然可用,但已无冗余,有会变得不可用的风险。 | |
--under-min-isr-partitions | N/A | 这个参数可以找出ISR数低于配置的最小值的分区。这些分区实际上处于只读模式,不能向其写入数据。 | |
--unavailable-partitions | N/A | 这个参数可以找出所有没有首领的分区。这种情况很严重,说明分区已离线,对生产者客户端或消费者客户端来说已经是不可用的。 |
例如,列出主题peter-test-1的详情:
列出全部主题的详情:
列出所有包含覆盖配置的主题:
1.1.4 增加主题分区数
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-topics.sh | –alter | N/A | 要用脚本执行的命令名: |
--topic | peter-test-1 | 要修改的主题的名称 | |
--partitions | 8 | 主题的分区数 | |
--if-exists | N/A | 一般用于自动化执行脚本中,如果主题存在就执行命令,如果不存在就忽略,不会报错。不建议使用它。这是因为如果指定了这个选项,并且要修改的主题不存在,那么 – alter命令就不会返回任何错误,从而掩盖了本应创建这个不存在的主题的错误。 |
例如,增加主题的分区数(replication-factor无法修改):
1.1.5 减少主题分区数
我们不可能减少主题的分区数量。如果删除了主题的一个分区,那么这个分区里的数据也会被删除,导致客户端看到的数据不一致。此外,将数据重新分布到其余的分区中是非常困难的,即使能够做到,也无法保证消息的顺序。
如果要减少分区数量,则建议册除整个主题并重新创建,或者(如果不能删除的话)创建一个新主题(比如叫作“mytopic-v2”),并将所有流量重定向到新主题。
1.1.6 删除主题
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-topics.sh | –delete | N/A | 要用脚本执行的命令名:删除主题 |
--topic | peter-test-1 | 要修改的主题的名称 |
例如,删除主题peter-test-1:
1.2 生产和消费
在使用Kafka时,为了验证应用程序的逻辑,经常需要手动生成或消费一些示例消息。
Kafka为此提供了两个工具,即kafka-console-consumer.sh和kafka-console-producer.sh在第2章中,我们用它们验证过安装好的Kafka是否可以正常运行。这些工具对Java客端库进行了包装,让我们可以在不编写代码的情况下与Kafka主题发生交互。
1.2.1 控制台生产者
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-console-producer.sh | --topic | peter-test-1 | 要写入的主题的名称 |
--producer.config | $KAFKA_HOME/config/producer.properties | 要传递给控制台生产者的参数的配置文件路径 | |
--producer-property | batch.size=1024 timeout=1000 compression.type=none/gzip/snappy/zstd/lz4 | 直接传递给控制台生产者的参数 | |
--batch-size | 1024 | ||
--timeout | 1000 | ||
--compression-codex | none/gzip/snappy/zstd/lz4 | ||
--sync | true/false | ||
--property | ignore.error=true/false parse.key=true/false key.separator=true/fase | 用来传递参数给消息格式化器 | |
--line-reader | kafka.tools.Console Producer$LineMessageReader。 | 可以使用自定义类来读取命令行输入。自定义类必须继承kafka.common.MessageReader,并负责创建ProducerRecord对象。确保包含这个类的JAR包已经被加入类路径中。 |
例如,向主题写入三条命令:
使用配置文件:
让生产者等待5秒钟再发送消息:
设置键值的分隔符为!:
启动一个异步生产者,批量发送1M数据,10秒钟数据量不够,直接发送
1.2.2 控制台消费者
kafka-console-consumer.sh为我们提供了另外一种从Kafka集群的一个或多个主题读取消息的方式。它读取的消息会被打印在标准输出中,并用换行符分隔。在默认情况下,它将输出消息的原始字节,没有键,也不进行格式化(使用DefaultFormatter)。
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-console-consumer.sh | --topic | peter-test-1 | 要读取的主题的名称 |
--include | ‘peter.*’ | 这个参数是一个正则表达式,其匹配所有要读取的主题(要记得转义正则表达式,以免命令行解析错误)。 | |
--consumer.config | $KAFKA_HOME/config/consumer.properties | 要传递给控制台生产者的参数的配置文件路径 | |
--consumer-property | 直接传递给控制台消费者的参数 | ||
--formatter | kafka.tools.DefaultMessageFormatter默认值 kafka.tools.LoggingMessageFormatter将消息输出到日志而不是标准输出。对应的日志级别为INFO,打印内容包含消息的时间戳、键和值。 kafka.tools.NoOpMessageFormatter读取但不打印消息。 | 指定用于解码消息的消息格式化器的类名,默认是kafka.tools.DefaultMessageFormatter。 | |
--from-beginning | N/A | 指定从最旧的偏移量开始读取数据。如果不指定这个参数,就从最新的偏移量开始读取。 | |
--max-messages | 1 | 指定在退出之前最多读取多少条消息。 | |
--partition | 1 | 只读取指定ID的分区。 | |
--offset | 1 | 如果提供的是整数,就从指定位置开始读取数据。其他有效的值为earliest(将从起始位置开始读取)和latest(将从最近的位置开始读取)。 | |
--skip-message-on-error | N/A | 如果在处理消息时出现错误就跳过消息,而不是一直挂起,这在调试问题时会非常有用。 | |
--property | print.timestamp 如果被设置为true,那么将打印每条消息的时间戳(如果有的话) print.key 如果被设置为true,那么除了打印消息的值,还会打印消息的键 print.offset 如果被设置为true,那么除了打印消息的值,还会打印消息的偏移量 print.partition 如果被设置为true,那么将打印消息来自哪个分区 key. separator 指定打印消息的键和值时所使用的分隔符 line.separator 指定消息之间的分隔符 key.deserializer 指定打印消息的键所使用的反序列化器的类名 value.deserializer 指定打印消息的值所使用的反序列化器的类名 | 用来传递参数给消息格式化器 |
例如:
有时候,需要查有集群的消费者群组提交了哪些偏移量。例如,你可能想知道某个消费者我组是否在提交偏核量,或者在以怎样的频率提交偏移量。这可以通过使用控制消费者__consumer_offsets 这个特殊的内部主题来实现。所有的消费者偏移量都会被写入这个主题。要解码这个主题中的消息,必须使用kafka.coordinator.group.GroupmetadataManager$offsetsMessageFormatter 这个格式化器。
1.3 消费者群组
1.3.1 列出并描述群组
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-consumer-groups.sh | --list | N/A | 要用脚本执行的命令名:列出群组 |
--describe | N/A | 要用脚本执行的命令名:描述群组详情 | |
--all-groups | N/A | 全部消费者群组 | |
--group | consumer-group-1 | 要描述的目标群组名称 | |
--members | N/A |
列出所有的消费者群组:
描述群组详情:
GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|---|
group name | topic name | parttion id | 消费者最近提交的offset,也即在分区读取的当前位置 | 最高水位offset,最近一个被提交的offset | consumer和broker的offset的差距 | consumer id | 主机 | 客户端ID |
consumer-group-1 | string | 0 | 19 | 19 | 0 | concurrency-listener-0-64b7c3e4-ddcc-44e8-8ce7-092580eeea0a | /172.26.128.1 | concurrency-listener-0 |
列出全部的消费者群组详情:
1.3.2 删除消费者群组
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-consumer-groups.sh | --delete | N/A | 要用脚本执行的命令名:删除群组 |
--group | console-consumer-53634 | 要描述的目标群组名称 |
delete命令将删除整个群组,包括所有已保存的偏移量。在删除群组之前,必须将群组里所有的消费者都关闭。如果你试图删除一个非空的群组,那么它将抛出“群组不为空”异常。也可以用这个命令删除单个主题的偏移量,只是需要额外提供 – topic参数,并指定要删除的偏移量。
删除失败:
删除成功:
1.3.3 删除偏移量
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-consumer-groups.sh | --delete-offsets | N/A | 要用脚本执行的命令名:删除偏移量 |
--group | console-consumer-4771 | 目标群组名称 |
删除偏移量失败:
删除偏移量成功:
1.3.4 重置偏移量
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-consumer-groups.sh | --reset-offsets | N/A | 要用脚本执行的命令名:重置偏移量 |
--group | console-consumer-1 | 目标群组名称 | |
--topic | string | 目标主题名称 | |
--dry-run | N/A | 实际执行之前演练一下 | |
--execute | N/A | 实际执行 | |
--export | > /home/peter/offsets.csv | 导出的文件路径 | |
--to-current | N/A | 重置到当前位置 | |
--to-datetime | 2024-03-30T16:32:32.000 | 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。参数格式为YYYY-MM-DDTHH:mm:SS.sss。 | |
--to-earliest | N/A | 重置为最旧的位置 | |
--to-latest | N/A | 重置为最新的位置 | |
--to-offset | 10 | 重置到指定的位置 | |
--shift-by | -9 | 相对当前的offset进行加减操作,可以是正负数, | |
--by-duration | P0DT8H0M0S | 相对当前的offset时间进行加减操作,参数格式为PnDTnHnMnS,例如P0DT8H0M0S表示移动到0天8小时0分钟0秒之后 |
查看当前偏移量:
重置偏移量执行失败,因为群组中还有活动的消费者:
关闭所有活动的消费者,可以先用参数–dry-run演习一下:
确定要执行之后,使用参数–execute实际执行
按照时间重置偏移量:
查询指定时间戳后面最近的一个偏移量:
然后就可以根据主题,分区,偏移量查询数据了:
1.3.5 导出/导入偏移量
除了显示和删除消费者群组的偏移量,还可以批量获取和保存消费者群组的偏移量,这在重置消费者偏移量时非常有用。当消费者因为某些原因需要重新读取消息或因为无法正常处理某些消息(比如格式错误的消息)需要跳过这些消息时就可以进行偏移量重置。
导出偏移量到指定文件:
查看文件内容:
从指定文件导入偏移量:
1.4 动态配置变更
客户端、broker等都有大量的配置参数可以在运行时动态更新,无须关闭或重新部
零集群。可以使用kafka-config.sh来修改这些配置参数。目前,可以进行动态变更的配署参数主要有4种类型:主题、broker、用户和客户端。对于每一种类型,都有一些可以覆盖的配置。随着Kafka不断发布新版本,新的动态配置参数也会不断被添加进来,所以最好确保你使用的工具版本与Kafka版本相匹配。为了便于自动化管理动态配置参数,可以通过 – add-config-file参数来指定包含了你想要配置的参数的文件。
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-configs.sh | --alter | N/A | 要用脚本执行的命令名:修改配置 |
--describe | N/A | 描述被覆盖的配置详情 | |
--entity-type | topics/clients/users/brokers/broker-loggers/ips | 要修改配置的实体类型 | |
--entity-name | peter-test-1 | 实体名称 | |
--add-config | compression.type | 具体要添加的配置项 |
1.4.1 覆盖主题的默认配置
在默认情况下,静态的broker配置文件中已经提供了主题的一些默认设置(比如数据保留策略)。可以通过动态配置覆盖个别主题的默认值,以满足不同应用场景的需求。
配置项 | 描述 |
---|---|
cleanup.policy | 如果被设置为compact,则只有包含了键的最新消息会被保留下来(日志被压实),其他消息会被丢弃 |
compression.type | broker在将消息批次写入磁盘时所使用的压缩类型 |
delete.retention.ms | 墓碑消息能够保留多久,以毫秒为单位。这个参数只对压缩日志类型的主题有效。墓碑消息key不为null,value为null。日志清理线程发现墓碑消息时,先进行常规清理,保留一段时间。 |
file.delete.delay.ms | 从磁盘上删除日志片段和索引之前可以等待多长时间,以毫秒为单位 |
flush.messages | 在冲刷到磁盘之前可以接收多少条消息 |
flush.ms | 在将消息冲刷到磁盘之前可以等待多长时间,以毫秒为单位 |
follower.replication.throttled.replicas | 在复制日志时需要根据跟随者副本进行节流的副本清单 |
index.interval.bytes | 日志片段索引之间能够容纳的消息字节数 |
leader.replication.throttled.replica | 在复制日志时需要根据首领副本进行节流的副本清单 |
max.compaction.lag.ms | 一条消息可以不被压实的最长时间 |
Max.nessage.bytes | 消息的最大字节数 |
message.downconversion.enable | 如果启用了这个参数,则消息格式可以被转换成之前的版本 |
message.format.version | broker将消息写入磁盘时所使用的消息格式版本,必须是有效的API版本号 |
message.timestamp.difference.max.ms | 消息自带的时间戳和broker收到消息时的时间戳之间最大的差值,以毫秒为单位。这个参数只在messsage.timestamp.type 被设为CreateTime时有效 |
message.timestamp.type | 在将消息写入磁盘时使用哪一种时间戳。目前支持两个值:一个是CreateTime,指客户端指定的时间戳;一个是LogAppendTime,指消息被写入分区时的时间戳 |
min.cleanable.dirty.ratio | 压实分区的频率,表示为未压缩日志片段数与总日志分段数之间的比例。这个参数只对压缩日志类型的主题有效 |
min.compaction.lag.ms | 一条消息不被压实的最短时间 |
min.insync.replicas | 可用分区的最少ISR |
preallocate | 如果被设置为true,那么需要为新的日志片段预分配空间 |
retention.bytes | 主题能够保留多少消息,以字节为单位 |
retention.ms | 主题需要保留消息多长时间,以毫秒为单位 |
segment.bytes | 分区的单个日志片段可以保存的消息字节数 |
segment.index.bytes | 单个日志片段的最大索引字节数 |
segment.jitter.ms | 在滚动日志片断时,在segment.ms基础上随机增加的毫秒数 |
segment.ms | 日志片段多长时间滚动一次,以毫秒为单位 |
unclean. leader.election.enable | 如果被设置为false,就不进行不彻底的首领选举 |
将主题的数据保存时间设置为1小时
查看被覆盖的默认配置
删除被覆盖的配置
1.4.2 覆盖客户端和用户的默认配置
对Kafka客户端和用户配置来说,只有少数参数可以覆盖,而且基本上都与配额有关。常见的两个需要修改的参数是生产者的生产速率和消费者的消费速率,以字节/秒为单位。
配置项 | 描述 |
---|---|
consumer_bytes_rate | 单个消费者每秒可以从单个broker 读取的消息字节数 |
producer_bytes_rate | 单个生产者每秒可以向单个broker生成的消息字节数 |
controller_mutations_rate | 可接受的创建主题请求、创建分区请求和删除主题请求的速率。这个速率是根据创建或删除的分区数量累计计算出来的 |
request_percentage | 用户请求或客户端请求的配额窗口百分比((num.io.threads+num.network.threads) × 100%) |
同时修改用户和客户端的控制器突变率:
1.4.3 覆盖broker的默认配置
broker和集群级别的配置主要被放在静态的集群配置文件中,但仍然有大量的参数可以在行时修改,也就是说修改这些参数无须重新部署Kafka。我们可以用kafka-config.sh修以80多个broker配置参数,下面是其中几个比较重要的。
配置项 | 描述 |
---|---|
min.insync.replicas | 当生产者的acks 被设置为all或-1时,用于确认消息写入成功所需的最少ISR数。 |
unclean.leader.election.enable | 允许一个副本被选举为首领,即使可能会导致数据丢失。当允许丢失一些数据,或者在发生不可恢复的数据丢失后需要快速恢复Kafka集群时,可以短暂启用这个功能。 |
max.connections | broker 允许的最大连接数。还可以用 max.connections.per.ipmax.connections.per.ip.override进行更细粒度的节流。 |
允许broker 0在不同步时也能参与不彻底的首领选举:
查看修改的配置:
1.5 分区管理
Kafka 提供了一些用于管理分区的脚本,其中一个用于重新选举首领,另一个用于将分区分配给broker。有了这两个工具,就可以通过手动的方式让消息流量均衡地分布在集群的broker上。
1.5.1 首选首领选举
Kafka集群所有的写入操作和读取操作都发生在分区首领所在的broker上。
为了保证负载均衡地分布在整个Kafka集群中,需要保持分区首领均衡地分布在broker上。
Kafka会将分区副本清单中的第一个ISR定义为首选首领。如果不启用自动首领均衡,那么在进行跨集群部署后可能会出现非常低效的均衡。因此,建议启用这个功能。
如果发现Kafka集群变得不均衡了,则可以考虑进行首选首领选举,这是一个轻量级的首领选举过程,一般来说不会造成负面影响。集群控制器会为分区选择理想的首领。
这个过程可以用kafka-leader-election.sh来手动触发。
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-leader-collection.sh | --election-type | PREFERRED/UNCLEAN | 选举的类型 |
--topic | peter-test-1 | 主题名称 | |
--partion | 0 | 分区号码 | |
--all-topic-partitions | N/A | 应用到所有主题的所有分区 | |
--path-to-json-file | /home/pater/partitions.json | 要配置的分区很多时,但又不是全部分区,可以把特定分区的配置放在文件中,方便维护 |
指定分区进行首选首领选举:
从配置文件中读取分区信息:
{
"partitions": [{
"partition": 0,
"topic": "peter-test-1"
},
{
"partition": 1,
"topic": "peter-test-1"
}
]
}
1.5.2 修改分区的副本
在某些情况下,可能需要手动修改分区的副本,下面是需要这样做的几种场景。
- broker的负载分布不均衡,自动首领选举也无法解决这个问题。
- broker 离线,造成分区不同步。
- 新加了broker,你想快速给它分配分区。
- 你想修改主题的复制系数。
可以用 kafka-reassign-partitions.sh来调整分区的副本。这个过程包含了多个步骤,具体步骤如下:
- 需要基于broker和主题生成一个迁移清单。要生成迁移清单,需要一个JSON文件,其中包含了要调整的主题。
- 执行调整。
- 这个工具可用于跟踪和验证分区调整的进度或完成情况。
脚本 | 参数 | 参数值示例 | 说明 |
---|---|---|---|
kafka-reassignment.sh | --generate | N/A | 命令:生成迁移文件 |
--broker-list | 1,2 | 要迁移去的目标broker列表 | |
--topics-to-move-json-file | /home/peter/topics.json | 要迁移的主题列表文件 | |
--execute | N/A | 命令:执行迁移 | |
--reassignment-json-file | /home/peter/reassignment.json | 迁移清单文件路径 | |
--additional | N/A | 如果已经有进行中的重分配过程,就加入其中,这样就不会出现中断,也不需要等待已有的重分配完毕之后再启动一个新的批次。 | |
--disable-rack-aware | N/A | 有时候,因为启用了机架感知,可能无法实现想要的重分配状态。如果有必要,那么可以用这个参数禁用机架感知。 | |
--throttle | 1024 | 这个参数以字节/秒为单位。分区重分配对集群性能有很大的影响,因为它们会导致内存缓存页的一致性发生变化,并占用额外的网络带宽和磁盘IO。对分区移动流量进行节流可以有效缓解这个问题。这个参数可以与 – additional结合使用,以便对进行中的可能导致上述问题的重分配过程进行节流。 | |
--verify | N/A | 命令:验证迁移的执行结果 | |
--cancel | N/A | 命令:我们可以用它来取消集群中正在进行的重分配过程。如果指定了 --cancel选项,那么副本集将会被恢复到重分配之前的状态。如果正在从已经失效或过载的broker中移除副本,则取消分区重分配有可能会导致集群处于非预期的状态,也不能保证恢复后的副本集的顺序与之前相同。 |
新建一个包含2分区2副本的主题:
查看默认的分区分配情况:
假如我们计划停止broker 0,我们需要先将两个分区迁移到broker 1和2。首先准备要移动的topic清单,保存到文件:topics.json
{
"topics": [{
"topic": "peter-test-2"
}],
"version": 1
}
执行命令生成移动清单:
可以看到在重新分配的结果中,已经将broker 0排除了。将上述命令的输出保存到两个文件中,一个作为备份,一个作为下一步的输入。
备份文件:assignment.json
{
"version": 1,
"partitions": [{
"topic": "peter-test-2",
"partition": 0,
"replicas": [1, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "peter-test-2",
"partition": 1,
"replicas": [0, 2],
"log_dirs": ["any", "any"]
}]
}
重分配的输入文件:reassignment.json
{
"version": 1,
"partitions": [{
"topic": "peter-test-2",
"partition": 0,
"replicas": [2, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "peter-test-2",
"partition": 1,
"replicas": [1, 2],
"log_dirs": ["any", "any"]
}]
}
执行移动清单:
验证移动清单结果:
最后查看主题详情:
1.5.3 修改复制系数
也可以用 kafka-reassign-partitions.sh来增加或减少一个分区的RF。
如果在创建分区时没有选择合适的RF,那么当你在扩展集群时想要增加冗余或为了节约成本想要减少冗余,就需要修改RF。一个很明显的例子是,如果集群的默认RF被修改了,但已有主题的RF个数不会自动随之发生变化,那么这个时候就可以用这个工具增加已有分区的RF。
假设需要为主题peter-test-2增加一个副本,将对分区的调整保存到文件:increase-rf.json
{
"version": 1,
"partitions": [{
"topic": "peter-test-2",
"partition": 0,
"replicas": [2, 1, 0],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "peter-test-2",
"partition": 1,
"replicas": [1, 2, 0],
"log_dirs": ["any", "any", "any"]
}]
}
执行命令:
验证结果:
最后查看主题详情:
1.5.4 取消分区重分配
1.5.5 转储日志片段
有时候,你可能需要查看消息的内容,比如主题中出现了已损坏的“毒药”消息,而用户
无法处理它们。可以用kafka-dump-log.sh来解码分区的日志片段,这样就可以在不读取和解码消息的情况下查看消息的内容。这个工具以日志片段文件列表(以逗号分隔)作为参数,可以输出消息的摘要信息或详细内容。
在下面的例子中,对主题“peter-test-2”主题进行了日志转储。首先,简单地解码日志片段文件00000000000000000000.log,并获取每条消息的基本元数据信息,但不打印消息内容。Kafka的数据目录是/usr/local/bin/kafka_2.13-3.6.0/kafka-logs。因此,可以在/usr/local/bin/kafka_2.13-3.6.0/kafka-logs/-< partition>目录下找到转储文件,对这个例子来说就是/usr/local/bin/kafka_2.13-3.6.0/kafka-logs/peter-test-2-0/。
打印消息内容:
校验索引是否处于可用状态:
检查不匹配的索引:
1.5.6 验证副本
分区复制的原理与普通的消费者客户端类似:跟随者broker会定期将上一个偏移量到当前偏移量之间的数据复制到磁盘上。如果复制过程停止,那么在重启之后将从上一个检查点继续复制。这个时候,如果之前复制的日志片段被删除,那么跟随者将不会填补这个缺口。
可以用kafka-replica-verification.sh来验证集群分区副本的一致性。它会从指定分区的副本读取消息,检查所有副本是否包含了相同的消息,并打印出指定分区的最大延迟。这个过程会一直重复,直到被取消。要使用这个工具,必须提供broker的连接串(如果有多个broker地址,就用逗号隔开)。在默认情况下,它会验证所有的主题。还可以用正则表达式匹配想要验证的主题。
1.6 其他工具
kafka-acls.sh
kafka-mirror-maker.sh
1.7 不安全操作
移动集权控制器
移除待删除的主题
手动删除主题
2 编程方式
Only have the code now. This chapter to be completed…
示例代码