kafka生产者——java Api 分区发送
一、简介
在生产者介绍阶段我们回顾了在数据发送时,可以有拦截器、序列化器、分区器三道处理。而这里就是介绍对应分区器的处理步骤。我们知道在kafka的每个topic中会分为多个partition,那为什么分多个partition呢?换个简单的理解如果我们把topic看做一张表,但是这张表由多个块组成,每个块则对应着每个partition。那么由于这个表分成了多个块。只要这些块我可以找到就可以。那么我是不是可以把这些块分布到不同的机器上进行存储,只要我合理的控制每个块的大小,我就可以做到数据均匀分布!那我分了很多个块,他们是相互组成就可以得到表,且其相互之间还互不影响,我是不是又可以以块
为单位,启动对应个数的进程去进行发送或者消费,从而增加吞吐量!
二、生产者分区策略
2.1 逻辑思考
其实这个网络上一搜还挺多讲解的,所以这里我们就直接去看下对应源码的方法有哪些。打开kafka 对应代码,会发现一个 Partitioner接口,ctrl+h
就可以看到对应接口的实现类比如DefaultPartitioner
我们可以看到类上的简介
其实DefaultPartitioner
就已经介绍了我们的默认分区策略,如果有指定对应分区则使用,如果没有则看其是否指定key对key取hash,都没有则直接选择Sticky Partition(黏性分区器)
进行分区。那我们该如何指定呢,回到咱们之前java api发送消息的案例中你会发现我们在发送消息时的代码
kafkaProducer.send(new ProducerRecord("demo", "lsl " + i));
其中new 了一个ProducerRecord类
去封装我们发送的消息。那么我们点到对应ProducerRecord类中,会发现其构造函数就有6个,分别为
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers){}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value){}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers){}
public ProducerRecord(String topic, Integer partition, K key, V value){}
public ProducerRecord(String topic, K key, V value){}
public ProducerRecord(String topic, V value){}
1)会发现其实我们只要指定Integer partition
参数就可以做到将数据写入到我们想要写的分区中!!
2)还有一种是没有Integer partition
参数但是有K key
,在没有指定partition时则会用key算hash值然后与topic的partition数进行求余得到发入哪个分区中。
3)最后一种也就是前一章代码中使用的情况,既没有key也没有partition,在这种情况下其就会采用对应的分区器去进行分区选择,咱们的案例中采用的是Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。 举例:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。
2.2 代码验证
先查看下topic情况:3个分区0,1,2
2.2.1 验证指定分区发送
1)由于需要验证发送情况,所以我们会使用带回调的发送函数去进行验证
package com.lsl.kafkaAll.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
public static void main(String[] args) {
// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();
// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lsl101:9092");
// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
// 指定数据发送到1号分区
kafkaProducer.send(new ProducerRecord("demo", 1,"","lsl " + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
2)执行情况
idea 执行情况:
消费者消费情况:
2.2.2 验证只指定key发送
1)代码,指定key为aa
package com.lsl.kafkaAll.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1. 创建kafka生产者的配置对象
Properties properties = new Properties();
// 2. 给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lsl101:9092");
// key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
// 添加回调
kafkaProducer.send(new ProducerRecord("demo", "aa","lsl " + i), new Callback() {
// 该方法在Producer收到ack时调用,为异步调用
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
}
// 5. 关闭资源
kafkaProducer.close();
}
}
2)执行结果
idea:
消费者:
3)换成key=“bb”
kafkaProducer.send(new ProducerRecord("demo", "bb","lsl " + i), new Callback() {
4)bb执行情况
三、自定义分区器
比如现在有一个需求,当某条数据包含lsl时则发到0号分区,不包含时则发到1号分区中。假如该需求我们想自己写一个分区器去实现该怎么处理呢?其实只要实现接口Partitioner
重写其中的partition()
方法就可以了
1)代码实现
package com.lsl.kafkaAll.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackMyPartitions {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lsl101:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lsl.kafkaAll.producer.MyPartitioner");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord("demo", "lsl " + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println("主题:" + metadata.topic() + "->" + "分区:" + metadata.partition()
);
} else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
2)执行情况
3)换成发送消息:www.baodi.com执行结果