0
点赞
收藏
分享

微信扫一扫

Flink从kafka里面读取数据

maven依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>

代码

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

object SourceTest2 {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val properties = new Properties()
properties.setProperty("bootstrap.servers", "zjj101:9092")
//properties.setProperty("group.id", "ab")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//添加kafka消费者source
// 参数1是topic,参数2是序列化器 参数3是properties对象

val stream1 = env.addSource(new FlinkKafkaConsumer011[String]("hello", new SimpleStringSchema(), properties))


stream1.print("stream1")
env.execute("source test job")
}
}

结果

启动程序之后,然后Linux启动一个生产者,hello这个topic发送消息

[root@zjj101 kafka_2.11-2.1.0]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list zjj101:9092  --topic hello

>111
>222
>333
>444

查看idea窗口输出结果:

stream1> 111
stream1> 222
stream1> 333
stream1> 444


举报

相关推荐

0 条评论