0
点赞
收藏
分享

微信扫一扫

将指定的groupid相关kafka的topic中的offset更新到latest的最新位置

杰森wang 2023-05-11 阅读 26


将指定的groupid相关kafka的topic中的offset更新到latest的最新位置

如果使用java api的方式,将offset更新到特定位置(此处为最新的数据位置),详见下面的代码;

package mykafka.gainoffset;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Properties;

/**将指定的groupid相关kafka的topic中的offset更新到latest的最新位置
 * @author fangct
 * @date 2020-10-10 13:55
 */
public class UpdateKafkaOffset2Latest {

    private static final String topic = "fang-topic-105";

    public static void main(String[] args) {
        Properties props = new Properties();
        //
        props.put("bootstrap.servers", "172.19.32.101:9192,172.19.32.106:9192,172.19.32.111:9192,172.19.32.116:9192,172.19.32.131:9192,172.19.32.136:9192");
        props.put("group.id", "fang-groupid-105"); //定义消费组

        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("max.partition.fetch.bytes", "30000000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");


        KafkaConsumer<Byte[], Byte[]> kafkaConsumer = new KafkaConsumer<>(props);
        //获取该topic下的分区数量
        int topicCnt = kafkaConsumer.partitionsFor(topic).size();


        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        for (int i = 0; i < topicCnt; i++) {
            topicPartitions.add(new TopicPartition(topic, i));
        }
        //需要该步骤,不然报错;分配分区给消费者
        kafkaConsumer.assign(topicPartitions);
        //声明移动offset到latest的最新位置,采用latest的策略,但是并不会立即执行,需要poll()方法或者其他方法触发执行;
        kafkaConsumer.seekToEnd(topicPartitions);


        //由于seekToEnd移动offset是在惰性操作,需要使用poll();触发该操作,移动offset到latest的位置
        kafkaConsumer.poll(1000);
        //此处并不是多余,因为offset的自动提交动作,是在下一次拉取的时候在会同步到zk或者kafka中;
        kafkaConsumer.poll(1000);

        System.out.println("[note]: update kafka offset to latest finished!!! ");
    }

}

如果想要使用shell脚本命令的方式,详情查看我的其他博客文章;

举报

相关推荐

0 条评论