0
点赞
收藏
分享

微信扫一扫

Kafka-序列化器与反序列化器的使用(自定义消息类型)

Kafka-序列化器与反序列化器的使用(自定义消息类型)

代码如下

Customer

/**
* @Author FengZhen
* @Date 2020-03-30 22:49
* @Description 自定义序列化器的实体类
*/
public class Customer {
private int customerID;
private String customerName;

public Customer(int customerID, String customerName) {
this.customerID = customerID;
this.customerName = customerName;
}

public int getCustomerID() {
return customerID;
}

public void setCustomerID(int customerID) {
this.customerID = customerID;
}

public String getCustomerName() {
return customerName;
}

public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}

序列化器

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
* @Author FengZhen
* @Date 2020-03-30 22:49
* @Description 自定义序列化器:不建议使用,因为如果修改序列化器,就会出现新旧消息不兼容。
* 建议使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf
*/
public class CustomerSerializer implements Serializer<Customer> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
//不做任何配置
}

/**
* Customer对象被序列化成:
* 表示customerID的4字节整数
* 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
* 表示customerName的N个字节
* @param topic
* @param data
* @return
*/
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (null == data){
return null;
}else{
if (data.getCustomerName() != ""){
serializedName = data.getCustomerName().getBytes("UTF-8");
stringSize = serializedName.length;
}else{
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getCustomerID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e){
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}

@Override
public void close() {
//不需要关闭任何东西
}
}

反序列化器

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
* @Author FengZhen
* @Date 2020-04-06 15:08
* @Description 自定义反序列化器
*/
public class CustomerDeserializer implements Deserializer<Customer> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (null == data){
return null;
}
if (data.length < 8){
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameBytes = new byte[nameSize];
buffer.get(nameBytes);
name = new String(nameBytes, "UTF-8");
return new Customer(id, name);
} catch (Exception e){
throw new SerializationException("Error when serializing Customer to byte[]" + e);
}
}

@Override
public void close() {

}
}

生产者发送消息

import com.chinaventure.kafka.serializer.Customer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
* @Author FengZhen
* @Date 2020-03-29 12:21
* @Description kafka生产者使用
*/
public class KafkaProducerTest {

private static Properties kafkaProps = new Properties();
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}

public static void main(String[] args) {
udfSerializer();
}

/**
* 自定义序列化器
*/
public static void udfSerializer(){
kafkaProps.put("value.serializer", "com.chinaventure.kafka.serializer.CustomerSerializer");
KafkaProducer<String, Customer> producer = new KafkaProducer(kafkaProps);
for (int i = 0; i < 10; i++){
ProducerRecord<String, Customer> record = new ProducerRecord<>("test_udf_serializer",i % 3 == 0 ? "Apple": "Banana"+i,new Customer(i, "我是" + i));
producer.send(record, new DemonProducerCallback());
}
while (true){
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

消费者读取数据

import com.chinaventure.kafka.serializer.Customer;
import com.chinaventure.util.ExceptionUtil;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
* @Author FengZhen
* @Date 2020-04-06 11:07
* @Description kafka消费者
*/
public class KafkaConsumerTest {
private static Properties kafkaProps = new Properties();
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("group.id", "test");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}

private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private static KafkaConsumer<String, String> consumer;

public static void main(String[] args) {
udfDeserializer();
}

/**
* 自定义反序列化器
*/
public static void udfDeserializer(){
kafkaProps.put("value.deserializer", "com.chinaventure.kafka.serializer.CustomerDeserializer");
KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(kafkaProps);
//订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
//如:test.*,订阅test相关的所有主题
consumer.subscribe(Collections.singleton("test_udf_serializer"));
System.out.println("==== subscribe success ====");
try {
while (true){
//消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
//传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
//如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
//poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100));
System.out.println("==== data get ====");
for (ConsumerRecord<String, Customer> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch(Exception e){
e.printStackTrace();
} finally {
//退出应用前使用close方法关闭消费者。
//网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
consumer.close();
}
}
}

 

生产者打印内容

topic:test_udf_serializer
partition:0
offset:0
metaData:test_udf_serializer-0@0
topic:test_udf_serializer
partition:0
offset:1
metaData:test_udf_serializer-0@1
topic:test_udf_serializer
partition:0
offset:2
metaData:test_udf_serializer-0@2
topic:test_udf_serializer
partition:0
offset:3
metaData:test_udf_serializer-0@3
topic:test_udf_serializer
partition:0
offset:4
metaData:test_udf_serializer-0@4
topic:test_udf_serializer
partition:0
offset:5
metaData:test_udf_serializer-0@5
topic:test_udf_serializer
partition:0
offset:6
metaData:test_udf_serializer-0@6
topic:test_udf_serializer
partition:0
offset:7
metaData:test_udf_serializer-0@7
topic:test_udf_serializer
partition:0
offset:8
metaData:test_udf_serializer-0@8
topic:test_udf_serializer
partition:0
offset:9
metaData:test_udf_serializer-0@9

消费者打印内容

topic=test_udf_serializer, partition=0, offset=0, key=Apple, value=com.chinaventure.kafka.serializer.Customer@63798ca7
topic=test_udf_serializer, partition=0, offset=1, key=Banana1, value=com.chinaventure.kafka.serializer.Customer@4612b856
topic=test_udf_serializer, partition=0, offset=2, key=Banana2, value=com.chinaventure.kafka.serializer.Customer@22875539
topic=test_udf_serializer, partition=0, offset=3, key=Apple, value=com.chinaventure.kafka.serializer.Customer@5674e1f2
topic=test_udf_serializer, partition=0, offset=4, key=Banana4, value=com.chinaventure.kafka.serializer.Customer@79c7532f
topic=test_udf_serializer, partition=0, offset=5, key=Banana5, value=com.chinaventure.kafka.serializer.Customer@2a448449
topic=test_udf_serializer, partition=0, offset=6, key=Apple, value=com.chinaventure.kafka.serializer.Customer@32f232a5
topic=test_udf_serializer, partition=0, offset=7, key=Banana7, value=com.chinaventure.kafka.serializer.Customer@43f82e78
topic=test_udf_serializer, partition=0, offset=8, key=Banana8, value=com.chinaventure.kafka.serializer.Customer@e54303
topic=test_udf_serializer, partition=0, offset=9, key=Apple, value=com.chinaventure.kafka.serializer.Customer@e8df99a

Done.

举报

相关推荐

0 条评论