1. 在linux机器上安装nc
yum install nc y
按提示完成安装。测试的时候比较方便。
2. 在idea创建maven项目,并添加依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.10.1</flink.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.7</slf4j.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Flink 的 Java api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- Flink Streaming 的 Java api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<!-- Flink 的 Web UI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${project.build.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
3. 编写java代码
package com.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* author:HUAWEI
*/
public class SocketWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 此处数据源是的 Linux 主机,通过 socket 的方式传输数据
DataStreamSource<String> socket = env.socketTextStream("192.168.0.181", 9000, "\n");
lambda(socket);
// function(socket);
// lambdaAndFunction(socket);
// richFunction(socket);
env.execute();
}
// 富函数方式
private static void richFunction(DataStreamSource<String> socket) {
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] strings = value.split(" ");
for (String s : strings) {
out.collect(Tuple2.of(s, 1));
}
}
});
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
tuple2StringKeyedStream.sum(1).print();
}
// Lambda 和函数混合完成
private static void lambdaAndFunction(DataStreamSource<String> socket) {
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] strings = value.split(" ");
for (String s : strings) {
out.collect(Tuple2.of(s, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f -> f.f0).sum(1);
sum.print();
}
// 纯函数完成
private static void function(DataStreamSource<String> socket) {
SingleOutputStreamOperator<String> flatMap = socket.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] strings = value.split(" ");
for (String s : strings) {
out.collect(s);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy("f0").sum(1);
sum.print();
}
// Lambda 方式完成
private static void lambda(DataStreamSource<String> socket) {
socket.flatMap((String value, Collector<String> out) -> {
Arrays.stream(value.split(" ")).forEach(out::collect);
}).returns(Types.STRING)
.map(f -> Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1)
.print();
}
}
注意这里的socketTextStream设置,其中ip为运行nc的服务器ip,端口为nc的监听端口。
4. 在对应服务器上启动nc
nc -l 9000
5. 运行flink程序
6. 执行测试
在nc服务器上输入字符串并回车。
观察程序控制台输出。
浏览器查看任务运行