maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
代码
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
object SourceTest2 {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "zjj101:9092")
//properties.setProperty("group.id", "ab")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//添加kafka消费者source
// 参数1是topic,参数2是序列化器 参数3是properties对象
val stream1 = env.addSource(new FlinkKafkaConsumer011[String]("hello", new SimpleStringSchema(), properties))
stream1.print("stream1")
env.execute("source test job")
}
}
结果
启动程序之后,然后Linux启动一个生产者,hello这个topic发送消息
[root@zjj101 kafka_2.11-2.1.0]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list zjj101:9092 --topic hello
>111
>222
>333
>444
查看idea窗口输出结果:
stream1> 111
stream1> 222
stream1> 333
stream1> 444