背景介绍
由于平时工作使用 Spark 较多,在 WordCount 这一个小 Demo 中,我也好奇,同为大数据计算组件,且同为 scala,会有多少不同?我认为能在小的 Demo 中获得思考,管中窥豹的了解不同。
带着疑问去学习 Flink 的 WordCount 程序:
Spark, Flink 批处理,Flink 流处理 3者,在有何不同?
- 声明会话方式?
- 处理中的 map 算子有何不同?
- 如何执行?(如 .show())
- 变量格式
Flink 流处理
// Flink 流处理
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 从外部命令中提取参数,作为socket主机名和端口号
val paramTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = paramTool.get("host")
val port: Int = paramTool.getInt("port")
// 接收一个socket文本流
val inputDataStream: DataStream[String] = env.socketTextStream(host, port)
// 进行转化处理统计
val resultDataStream: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
resultDataStream.print().setParallelism(1)
// 启动任务执行
env.execute("stream word count")
}
}
Flink 批处理
// Flink 批处理
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
// 创建批处理的执行环境
val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val inputDataSet:DataSet[String] = env.readTextFile("D:/work/demo.txt")
val resultDataSet: AggregateDataSet[(String, Int)] = inputDataSet
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
// 打印输出
resultDataSet.print()
}
}
Spark 批处理
object ScalaWordCount {
// https://www.jianshu.com/p/9451ffb052bf 时间原因,引用该作者 wordcount
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local");
//创建一个SparkContext对象
val sc = new SparkContext(conf)
//执行WordCount
val result = sc
.textFile("wordcount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
//打印在屏幕上
result.foreach(println)
//释放资源
sc.stop()
}
}
回到上述问题,我们来看看
-
声明会话方式?
ExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment.getExecutionEnvironment()
SparkContext -
处理中的 map 算子有何不同?
Flink 流处理:KeyBy(0),Flink 批处理:groupBy(0);
Flink 批处理:.groupBy(0).sum(1); Spark 批处理:.reduceByKey(_ + _)
Spark 批处理:sc.stop() -
如何执行?(如 .show())
两者通过 .print() 触发流程,或者叫通过 .print() 输出。而不一样的是,流处理需要额外使用 env.execute()方法,因为流处理需要一个进程监听(Socket),等待数据输入,如果没有该进程则不会实际运行。。 -
变量类型
DataStream;DataSet;RDD