0
点赞
收藏
分享

微信扫一扫

Java实现Flink集成Kafka消费数据


​​精选30+云产品,助力企业轻松上云!>>>

Java实现Flink集成Kafka消费数据_大数据

​​

1.引入相关的依赖

2.代码如下

/**
* 消费Kafka中得数据
* @author 王一宁
* @date 2020/1/2 12:12
*/
public class StreamingFromKafka {
public static void main(String[] args) throws Exception{
//获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//kafka配置
String topic = "wang";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("auto.offset.reset","latest");
prop.setProperty("group.id","consumer1");

FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
//获取数据
DataStream<String> text = env.addSource(myConsumer);

//打印
text.print().setParallelism(1);
//执行
//env.execute("StreamingFormCollection");
env.execute();
}
}

举报

相关推荐

0 条评论