0
点赞
收藏
分享

微信扫一扫

Kafka-序列化器

Kafka-序列化器

自定义序列化器

不建议使用自定义序列化器,因为如果序列化器需要新增字段,则会出现新旧消息不兼容问题。推荐使用已知的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf.

 

/**
* @Author FengZhen
* @Date 2020-03-30 22:49
* @Description 自定义序列化器的实体类
*/
public class Customer {
private int customerID;
private String customerName;

public Customer(int customerID, String customerName) {
this.customerID = customerID;
this.customerName = customerName;
}

public int getCustomerID() {
return customerID;
}

public void setCustomerID(int customerID) {
this.customerID = customerID;
}

public String getCustomerName() {
return customerName;
}

public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}

 

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
* @Author FengZhen
* @Date 2020-03-30 22:49
* @Description 自定义序列化器:不建议使用,因为如果修改序列化器,就会出现新旧消息不兼容。
* 建议使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf
*/
public class CustomerSerializer implements Serializer<Customer> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
//不做任何配置
}

/**
* Customer对象被序列化成:
* 表示customerID的4字节整数
* 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
* 表示customerName的N个字节
* @param topic
* @param data
* @return
*/
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (null == data){
return null;
}else{
if (data.getCustomerName() != ""){
serializedName = data.getCustomerName().getBytes("UTF-8");
stringSize = serializedName.length;
}else{
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getCustomerID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e){
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}

@Override
public void close() {
//不需要关闭任何东西
}
}

 

使用Avro序列化

Avro的数据文件里包含了整个schema,不过这样的开销是可接受的。但是如果在每条kafka记录里都嵌入schema,会让记录的大小成倍的增加。在读取记录时仍然需要用到整个schema。使用schema注册表实现。

schema注册表并不属于kafka,现在有一些开源的schema注册表实现,如Confluent Schema Registry。

我们把所有写入数据需要用到的schema保存在注册表里,然后在记录里引用schema的标识符。负责读取数据的应用程序使用标识符从注册表里拉取schema来反序列化记录。序列化器和反序列化器分别负责处理schema的注册和拉取。

Kafka-序列化器_序列化

 

 

 

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
* @Author FengZhen
* @Date 2020-03-30 23:06
* @Description Avro序列化器
*
*/
public class AvroSerializerTest {
public static void main(String[] args) {

}

/**
* 一般的Avro对象
* {
* " namespace": " customerManagement . avro",
* "type": "record",
* "name": "Customer",
* "fields": [{
* "name": "id",
* "type": "int"
* },
* {
* "name": "name",
* "type": "string"
* },
* {
* "name": "email",
* "type": ["null", "string"],
* "default": "null"
* }
* ]
* }
*/
public static void genericValue(){
String schemaUrl = "";

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
properties.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
//schema注册表URI
properties.put("schema.registry.url", schemaUrl);

String schemaString = "{\n" +
"\t\" namespace\": \" customerManagement . avro\",\n" +
"\t\"type\": \"record\",\n" +
"\t\"name\": \"Customer\",\n" +
"\t\"fields\": [{\n" +
"\t\t\t\"name\": \"id\",\n" +
"\t\t\t\"type\": \"int\"\n" +
"\t\t},\n" +
"\t\t{\n" +
"\t\t\t\"name\": \"name\",\n" +
"\t\t\t\"type\": \"string\"\n" +
"\t\t},\n" +
"\t\t{\n" +
"\t\t\t\"name\": \"email\",\n" +
"\t\t\t\"type\": [\"null\", \"string\"],\n" +
"\t\t\t\"default\": \"null\"\n" +
"\t\t}\n" +
"\t]\n" +
"}";

String topic = "customerContacts";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

int i = 0;
while (true){
i++;
String name = "example:" + i;
String email = "email:" + i;
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("id", i);
genericRecord.put("name", name);
genericRecord.put("email", email);

ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, name, genericRecord);
producer.send(record);
}
}

/**
* 用户自定义的Avro对象
*/
public static void udfValue(){
String schemaUrl = "";

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
properties.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
//schema注册表URI
properties.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";

Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);
int i = 0;
while (true){
Customer customer = new Customer(++i, "name:" + i);
ProducerRecord<String, Customer> record = new ProducerRecord<String, Customer>(topic, String.valueOf(customer.getCustomerID()), customer);
producer.send(record);
}
}
}

 

举报

相关推荐

0 条评论