0
点赞
收藏
分享

微信扫一扫

自定义kafka高效的protoStuff序列化

微笑沉默 2022-03-11 阅读 75

目前序列化领域中,谷歌的 protobuf 应该是性能好,效率高的了,并且 protobuf 支持多种语言,可跨平台,跨语言

但使用起来并不像其他序列化那么简单(首先要写.proto文件,然后编译.proto文件,生成对应的.java文件)

protostuff 基于Google protobuf,但是提供了更多的功能和更简易的用法。其中,protostuff-runtime 实现了无需预编译对java bean进行protobuf序列化/反序列化的能力。protostuff-runtime的局限是序列化前需预先传入schema,反序列化不负责对象的创建只负责复制,因而必须提供默认构造函数。此外,protostuff 还可以按照protobuf的配置序列化成json/yaml/xml等格式。

在性能上,protostuff不输原生的protobuf,甚至有反超之势。但 protostuff应该只支持 java端,目前没看到可支持其他端

---------------------------------------------------------------------------------------------------------------------------------

自定义kafka高效的protoStuff序列化

最核心的点是Schema,思想都是共通的,像mysql、plusar、prestod等,都会有类似的Schema的概念,数据的结构化,protostuff 使用起来比 protobuf简单了很多,核心代码如下:

1,自定义序列化器

public class ProtostuffSerializer <T> implements Serializer<T> {

    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.MIN_BUFFER_SIZE);

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    @SuppressWarnings("unchecked")
    public byte[] serialize(String topic, T data) {
        Class<T> dataClass = (Class<T>) data.getClass();
        Schema<T> schema = RuntimeSchema.getSchema(dataClass);
        byte[] byteData;
        try {
            byteData = ProtostuffIOUtil.toByteArray(data, schema, BUFFER);
        } finally {
            BUFFER.clear();
        }
        return byteData;
    }

    @Override
    public void close() {

    }
}

从protoStuff的的源码可以看到,内部会将我们传递进去的dataClass,以一个

ConcurrentHashMap<String, HasSchema<?>> pojoMapping,保存在内存,所有直接调用RuntimeSchema.getSchema 就可以了,不需要外部在自己缓存这些schema

 

2,反序列化

@KafkaListener(topics = {"protostuff_topic"}, groupId = "group1", containerFactory="kafkaListenerContainerFactory")
    public void test(List<ConsumerRecord<String, byte[]>> records) {
        System.out.println("拉取消息量: " + records.size());
        for (ConsumerRecord<String, byte[]> consumerRecord : records){
            System.out.println("消费消息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() );

            Schema<T> schema = RuntimeSchema.getSchema(User.class);
            User user = schema.newMessage();
            ProtostuffIOUtil.mergeFrom(data, user, schema);
            System.out.println(user);
        }
    }
举报

相关推荐

0 条评论