案例需求:
需求:使用netcat工具向9999端口不断的发送数据,通过flink读取端口数据并统计不同单词出现的次数。
代码实现:
import org.apache.flink.streaming.api.scala._
//导入隐式函数依赖
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountFromSocket {
case class WordWithCount(word: String, count: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置分区的数量为1
env.setParallelism(1)
//建立数据源
//需要先启动 'nc -lk 9999' 用来发送数据
val stream = env.socketTextStream("192.168.91.128", 9999, '\n')
//写对流的转换处理逻辑
val transformed = stream
//使用空格切分输入的字符串
.flatMap(line => line.split("\\s"))
//类似MR中的map
.map(w => WordWithCount(w, 1))
//使用word字段进行分组,类似于shuffle
.keyBy(0)
//开了个5秒钟的滚动窗口
.timeWindow(Time.seconds(5))
//针对count字段进行累加操作,类似MR中的reduce
.sum(1)
//将计算结果输出到标准输出
transformed.print()
//执行计算逻辑
env.execute()
}
}
使用netcat发送数据:
命令nc -lk 9999
运行结果: