1、kafka的集群
2、使用flink消费kafka的数据
(1)导包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
(2)代码实现读取数据
object Demo03KafkaSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "test1")
//创建kafka的消费者
val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)
/**
* flinkKafkaCusumor.setStartFromEarliest() // 读取所有的数据
* flinkKafkaCusumor.setStartFromLatest() // 读取最新的数据
* flinkKafkaCusumor.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
* flinkKafkaCusumor.setStartFromGroupOffsets() // 如果消费者组之前存在,接着之前的数据读取
* 如果消费者组之前不存在,读取最新的数据
*/
flinkKafkaCusumor.setStartFromGroupOffsets()
//其实就是一个自定义source
val kafkaDS = env.addSource(flinkKafkaCusumor)
kafkaDS.print()
env.execute()
}
}