0
点赞
收藏
分享

微信扫一扫

4、Flink流处理案例实现-Java

kiliwalk 2022-06-17 阅读 65

在Flink项目里面创建一个包,同时新建一个wordcount类

4、Flink流处理案例实现-Java_flink

 

 

package com.gong.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

public class WordCount {
public static void main(String[] args) throws Exception{
//解析命令行传过来的参数args
ParameterTool params=ParameterTool.fromArgs(args);

//获取一个flink的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = null;
if(params.has("input")) {//判断参数是否带有input
dataStream=env.readTextFile(params.get("input"));
}else {
System.out.println("数据不存在");
}
//数据统计单词词频
DataStream<Tuple2<String,Integer>> counts= dataStream.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);

if(params.has("output")){
counts.writeAsText(params.get("output"));
}else {
counts.print();
}
env.execute("Streaming wordcount ");

}

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens =value.toLowerCase().split("\\W+");
for (String token:tokens){
out.collect(new Tuple2<>(token,1));
}
}
}
}

 

举报

相关推荐

0 条评论