在Flink项目里面创建一个包,同时新建一个wordcount类
package com.gong.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception{
//解析命令行传过来的参数args
ParameterTool params=ParameterTool.fromArgs(args);
//获取一个flink的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = null;
if(params.has("input")) {//判断参数是否带有input
dataStream=env.readTextFile(params.get("input"));
}else {
System.out.println("数据不存在");
}
//数据统计单词词频
DataStream<Tuple2<String,Integer>> counts= dataStream.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
if(params.has("output")){
counts.writeAsText(params.get("output"));
}else {
counts.print();
}
env.execute("Streaming wordcount ");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens =value.toLowerCase().split("\\W+");
for (String token:tokens){
out.collect(new Tuple2<>(token,1));
}
}
}
}