0
点赞
收藏
分享

微信扫一扫

11.4.3、kafka__kafka集群,flink消费kafka数据

老牛走世界 2022-03-17 阅读 79

1、kafka的集群

在这里插入图片描述

2、使用flink消费kafka的数据

(1)导包

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.11.2</version>
</dependency>

(2)代码实现读取数据

object Demo03KafkaSource {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test1")

    //创建kafka的消费者
    val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)

    /**
      * flinkKafkaCusumor.setStartFromEarliest()      // 读取所有的数据
      * flinkKafkaCusumor.setStartFromLatest()        // 读取最新的数据
      * flinkKafkaCusumor.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
      * flinkKafkaCusumor.setStartFromGroupOffsets()  // 如果消费者组之前存在,接着之前的数据读取
      * 如果消费者组之前不存在,读取最新的数据
      */
    flinkKafkaCusumor.setStartFromGroupOffsets()

    //其实就是一个自定义source
    val kafkaDS = env.addSource(flinkKafkaCusumor)
    kafkaDS.print()

    env.execute()
  }

}
举报

相关推荐

0 条评论