流程图以及总体概述
拦截器
自定义实现拦截器,帮助自己更好地了解拦截器。
分区器以及分区计算策略
为啥进行分区计算?
producer生产者怎么知道有哪些分区?
分区计算
¹²³⁴ 如果参数中指定了分区编号就直接返回
如何自定义实现分区器?
1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
/**
* 配置分区器
*
* @param configs 配置信息
*/
@Override
public void configure(Map<String, ?> configs) {
}
/**
* 计算分区
*
* @param topic 主题名称
* @param key 消息键,可以为null
* @param keyBytes 消息键的字节数组表示,可以为null
* @param value 消息值
* @param valueBytes 消息值的字节数组表示
* @param cluster Kafka集群信息
* @return 分配的分区ID
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 如果键为null,则使用轮询分区策略
if (keyBytes == null) {
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
// 使用键的hashCode来计算分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
/**
* 关闭分区器
*/
@Override
public void close() {
// 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作
}
}