0
点赞
收藏
分享

微信扫一扫

kafka系列--消费

小禹说财 2022-02-22 阅读 154



     public String title;

    public  ConsumerRecords<byte[], byte[]> records;


    public KafkaConsumerSimple(String title, ConsumerRecords<byte[], byte[]> records) {

        this.title = title;

        this.records = records;

    }

    @Override

    public void run() {

        System.out.println("开始运行 " + title);

        for (ConsumerRecord<byte[], byte[]> record : records) {

            if(record!=null){

                String topic = record.topic();

                int partition = record.partition();


                long offset = record.offset();

                String msg = new String(record.value());

                String key=new String(record.key());

                //System.out.println(String.format(

                        "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s],key:[%s]",

                        title, topic, partition, offset, msg,key));

            }

        }

        //System.out.println(String.format("Consumer: [%s] exiting ...", title));



    }

    public static void main(String[] args) {

        Properties properties = new Properties();


        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupname");

        //默认自动提交


        //properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG , "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                "ip:port,ip:port");

        /**

         * earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。

         * latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

         * none 当该topic下所有分区中存在未提交的offset时,抛出异常。

         */

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        /**

         * consumer向zookeeper提交offset的频率,单位是秒

         */

        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        /**

         * RoundRobin策略有两个前提条件必须满足:

         * 同一个Consumer Group里面的所有消费者的num.streams必须相等;

         * 每个消费者订阅的主题必须相同

         *

         * Range 均分

         */

        //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                "org.apache.kafka.common.serialization.ByteArrayDeserializer");



        String topic = "test";

        TopicPartition partition0 = new TopicPartition(topic, 0);

        TopicPartition partition1 = new TopicPartition(topic, 1);


        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);


        kafkaConsumer.subscribe(Arrays.asList(topic));

        //指定分区消费

        //kafkaConsumer.assign(Arrays.asList(partition0, partition1));


        boolean isRunning = true;

        //创建一个容量3的线程池

        ExecutorService executor = Executors.newFixedThreadPool(3);

        int index=0;

        while(isRunning) {

            ++index;

            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Long.MAX_VALUE);

            executor.execute(new KafkaConsumerSimple("消费者" + (index), records));

        }

        kafkaConsumer.close();


    }


举报

相关推荐

0 条评论