0
点赞
收藏
分享

微信扫一扫

Python面试题:如何在 Python 中读取和写入 JSON 文件?

源码之路 2024-07-24 阅读 28

流程图以及总体概述

拦截器

自定义实现拦截器,帮助自己更好地了解拦截器。

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    /**
     * 配置分区器
     *
     * @param configs 配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
       

    }

    /**
     * 计算分区
     *
     * @param topic       主题名称
     * @param key         消息键,可以为null
     * @param keyBytes    消息键的字节数组表示,可以为null
     * @param value       消息值
     * @param valueBytes  消息值的字节数组表示
     * @param cluster     Kafka集群信息
     * @return 分配的分区ID
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 如果键为null,则使用轮询分区策略
        if (keyBytes == null) {
            return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
        }

        // 使用键的hashCode来计算分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    /**
     * 关闭分区器
     */
    @Override
    public void close() {
        // 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作
    }
}

数据校验

数据收集器

注意

Sender发送线程

举报

相关推荐

0 条评论