Flink流处理API
- Environment
- Source
- 从集合读取数据
- 从文件读取数据
- 从Kafka读取数据
- 自定义Source
- 转换算子Transform
- 简单的转换算子(Map、FlatMAp和Filter)
- 键控流的转换算子(keyBy、滚动聚合和reduce)
- 多流的转换算子(Split、select、connect、CoMap和Union)
- 支持的数据类型
- 自定义UDF函数
- 自定义函数和匿名函数
- 富函数
- Sink
- 存储在文件中
- 使用Kafka作为Sink
- 使用Kafka形成一个管道
- JDBC自定义Sink
Environment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment
会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
创建一个批处理
val env = ExecutionEnvironment.getExecutionEnvironment
创建一个流处理
val env = StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度,会以flink-conf.yaml
中的配置为准,默认是 1。
并行度的优先级是
每个函数 > 整个程序 > 配置文件
跳转顶部
Source
从集合读取数据
package Source
import org.apache.flink.streaming.api.scala._
//定义样例类
case class SensorReading(id: String, timeStamp: Long, temperature: Double)
object SourceFromCollection {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从集合中读取数据
val dataList = (List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
//从集合中读取数据
val stream1 = env.fromCollection(dataList)
stream1.print()
env.execute()
}
}
我们可以发现我们输出的数据和输入数据的顺序是不一样的,这是因为我们没有设置并行度,那么他就会按照CPU
的个数来设置并行度,由于是并行的那么输出的顺序就是不一样的
在读取数据的时候有一个特殊的方法fromElements
,他读取的数据类不限,你给他说明他就输出说明
val stream1 = env.fromElements(dataList)
val stream2 = env.fromElements(1.0,"aa",1)
stream1.print()
stream2.print()
跳转顶部
从文件读取数据
先创建文件数据
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
package Source
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object SourceFromFile {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取文件
val filePath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(filePath)
inputStream.print()
env.execute()
}
}
跳转顶部
从Kafka读取数据
在与Kafka
连接时,需要先导入一个依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
为什么程序中是消费者?
- 因为我们从
Kafka
的生产者中输出数据,在通过Kafka
的消费者里读取数据
package Source
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Properties
object SourceFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//配置项
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//这边是要读取Kafka的数据,所以算是一个消费者
/**
* 第一个泛型:读取过来的数据类型
* 第一个参数:Kafka名
* 第二个参数:序列化
* 第三个参数:配置项
*/
val stream =
env.addSource(new FlinkKafkaConsumer011[String]
("first", new SimpleStringSchema(), properties))
stream.print()
env.execute()
}
}
FlinkKafkaConsumer011
的参数解释
- 泛型是获取到的数据类型
- 第一个参数是
Kafka
得到管道名 - 第二个参数是获取数据类型的序列化
- 第三个参数是配置项
确保zookeeper
和Kafka
开启
-
zookeeper
开启命令:zkServer.sh start
-
kafka
开启命令:bin/kafka-server-start.sh -daemon config/server.properties
创建Kafka
管道
- 命令如下:
bin/kafka-topics.sh --create --zookeeper 192.168.23.69:2181 --replication-factor 3 --partitions 1 --topic first
- 开生产者:
bin/kafka-console-producer.sh --broker-list a:9092 --topic first
需要修改window
的映射文件,否则会报错
结果展示
跳转顶部
自定义Source
我首先需要先自定义个Source
,需要继承SourceFunction
类,泛型是输出的数据类型
/**
* 自定义Source类
*/
class MySensorSource() extends SourceFunction[SensorReading] {
//定义一个标志位flag,依赖表示数据源是否正常运行发出数据
var flag = true
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
//定义一个随机数发生器
val rand = new Random()
//随机生成十个初始温度:(id:temp)
var curTemp = 1.to(10).map(i => ("sensor_" + 1, rand.nextDouble() * 100))
//定义无限循环,不停的产生数据,出发呗cancel
while (flag) {
//在上次温度的基础上更新温度值
curTemp = curTemp.map(
data => (data._1, data._2 + rand.nextGaussian())
)
//获取当前时间戳,将时间戳撞倒sensor,然后使用context发出数据
val curTime = System.currentTimeMillis()
curTemp.foreach(
data => sourceContext.collect(SensorReading(data._1, curTime, data._2))
)
//.间隔一段时间
Thread.sleep(1000)
}
}
override def cancel(): Unit = flag = false
}
使用addSource
方法调用类
package Source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object SourceFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new MySensorSource())
stream.print()
env.execute()
}
}
跳转顶部
转换算子Transform
简单的转换算子(Map、FlatMAp和Filter)
Map
转换操作:
使用语法
val streamMap = stream.map { x => x * 2 }
如图可见,map
算子是将数据从一种形态转换成另一种形态,一对一的转换
flatMap
算子的使用
flatMap
的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
结果是 List(1,1,2,2,3,3)
, 而 List("a b", "c d").flatMap(line ⇒ line.split(" "))
结果是 List(a, b, c, d)。
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
flatMap
算子个一对多的算子
Filter
算子
val streamFilter = stream.filter{
x => x == 1
}
跳转顶部
键控流的转换算子(keyBy、滚动聚合和reduce)
keyBy
算子
DataStream → KeyedStream
:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key
的元素,在内部以hash
的形式实现的。
最后的结果就是同一个key
必定在同一个分区,但是不同的分区里面不一定是同一个key
,因为分区数有限
滚动聚合算子
- 滚动聚合算子是针对
KeyedStream
的每一个支流做聚合
算子 | 作用 |
sum | 求和 |
min | 最小值 |
max | 最大值 |
minBy | 最小值的整条数据 |
maxBy | 最大值的整条数据 |
我们来测试一下minBy
和min
的区别,测试数据如下
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.7
package transform
import Source.SensorReading
import org.apache.flink.streaming.api.scala._
object transformatTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)
//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
//分组聚合输出每个传感器温度最小值
val aggStream = dataStream
.keyBy("id") //根据ID分组
.min(2) //第二个最小值,也可以写属性名
aggStream.print()
env.execute()
}
}
我们可以看到虽然最小值找了出来,但是时间戳数据确实错的,那我们将min
改为minBy
来查看结果是否一致
这个时候数据就匹配了,说明minBy
返回的是整条数据
reduce
算子
KeyedStream
→ DataStream
:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
计算温度的最小值和时间的最新值
val resultStream = dataStream
.keyBy("id")
.reduce(
(curState, newDate) =>
SensorReading(curState.id, newDate.timeStamp, curState.temperature.min(newDate.temperature))
)
resultStream.print()
在例题中:curState
是之前聚合处理留下来的结果
自定义实现一个Reduce
,需要继承ReduceFunction
类
class MyReduceFunction extends ReduceFunction[SensorReading] {
/**
*
* @param t 是之前累计运算的结果
* @param t1 是新传来等待处理的数据
* @return
*/
override def reduce(t: SensorReading, t1: SensorReading): SensorReading = SensorReading(t.id, t1.timeStamp, t.temperature.min(t1.temperature))
}
跳转顶部
多流的转换算子(Split、select、connect、CoMap和Union)
分流操作
Spilt
,其实Spilt
在实际上并没有将流分成俩快。而是类似于分组的操作
DataStream
→ SplitStream
:根据某些特征把一个 DataStream 拆分成两个或者=多个 DataStream
Select
SplitStream
→DataStream
:从一个 SplitStream
中获取一个或者多个DataStream
。
需求练习:以三十度为界,将数据拆分成两个流
package transform
import Source.SensorReading
import org.apache.flink.streaming.api.scala._
object SplitAndSelect {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)
//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
//分流操作
val spiltStream = dataStream
.split(
data => {
if (data.temperature > 30.0) Seq("High") else Seq("low")
}
)
//获取流
val HighTempStream = spiltStream.select("High")
val LowTempStream = spiltStream.select("low")
val allStream = spiltStream.select("High", "low")
HighTempStream.print("high")
LowTempStream.print("low")
allStream.print("all")
env.execute()
}
}
合流操作
-
DataStream
,DataStream
→ ConnectedStreams
:连接两个保持他们类型的数据流,两个数据流被 Connect
之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 -
ConnectedStreams
→ DataStream
:作用于 ConnectedStreams
上,功能与 map
和 flatMap
一样,对 ConnectedStreams
中的每一个 Stream
分别进行map
和 flatMap
处理,也就是说,如有两个stream
在内,在使用CoMap
时必须定义两个Map
的用法
//将数据类型转换一下
val warningStream = HighTempStream.map(data => (data.id, data.temperature))
val connectStream = warningStream.connect(LowTempStream)
//CoMap对数据继续处理
val CoMapStream = connectStream
.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy"
)
)
union
算子
DataStream
→ DataStream
:对两个或者两个以上的 DataStream
进行union
操作,产生一个包含所有 DataStream
元素的新 DataStream
Connect
与 Union
区别:
Union
之前两个流的类型必须是一样,Connect
可以不一样,在之后的 coMap
中再去调整成为一样的。
Connect
只能操作两个流,Union
可以操作多个
跳转顶部
支持的数据类型
Flink
流应用程序处理的是以数据对象表示的事件流。所以在Flink
内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink
使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink
还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda
函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink
支持 Java
和Scala
中所有常见数据类型。使用最广泛的类型有以下几种
基础数据类型
-
Flink
支持所有的 Java
和 Scala
基础数据类型,Int
, Double
, Long
, String
, …
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
Java
和 Scala
元组(Tuples
)
- 样例
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18
Scala
样例类
- 格式
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
** Java
简单对象(POJOs
)**
- 格式如下
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
其它(Arrays, Lists, Maps, Enums
) 等
-
Flink
对 Java
和 Scala
中的一些特殊目的的类型也都是支持的,比如 Java
的ArrayList
,HashMap
,Enum
等等。
跳转顶部
自定义UDF函数
自定义函数和匿名函数
Flink
暴露了所有 udf
函数的接口(实现方式为接口或者抽象类)。例如MapFunction
, FilterFunction
, ProcessFunction
等等
Flink
想要自定义函数,只要使自定义类继承对应的Function
类然后重写方法即可
例如,我们下面自定义Filter
函数,我们只保留以sensor_1
开头的数据
class MyFilterFunction extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean =
t.id.startsWith("sensor_1")
}
函数的使用
object MyFilterTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)
//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
val result = dataStream.filter(new MyFilterFunction)
result.print()
env.execute()
}
}
结果上显示,我们的筛选使成功的
其实有些时候我们不需要非要创建一个类来实现,我们可以使用匿名类的方式
val result = dataStream.filter(new FilterFunction[SensorReading] {
override def filter(t: SensorReading) = t.id.startsWith("sensor_1")
})
我们 filter
的字符串"sensor_!"还可以当作参数传进去
val flinkTweets = tweets.filter(new KeywordFilter("sensor_1"))
class KeywordFilter(keyWord: SensorReading) extends FilterFunction[SensorReading] {
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(keyWord)
}
}
跳转顶部
富函数
“富函数”是 DataStream API
提供的一个函数类的接口,所有 Flink
函数类都有其 Rich
版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function
有一个生命周期的概念。典型的生命周期方法有:
open()
方法是rich function
的初始化方法,当一个算子例如map
或者filter
被调用之前open()
会被调用。
close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()
方法提供了函数的 RuntimeContext
的一些信息,例如函数执行的并行度,任务的名字,以及 state
状态
在自定义富函数的可以重写方法
非富函数可重写
跳转顶部
Sink
存储在文件中
Flink
中可以将数据保存在本地文件中
dataStream.writeAsCsv("src/main/resources/out.csv")
不仅仅只有上面的一个还有如下的方法
我们通过图片可以看到,上面的方法大多都是已经过时了的,这里我们提供一个新的方法
dataStream.addSink(StreamingFileSink.forRowFormat(
new Path("src/main/resources/out.csv"),
new SimpleStringEncoder[SensorReading]()
).build())
其中第一个参数就是指定文件系统与其存储的路径,第二个参数就是存储的字符编码
我们可以发现,最后的输出结果并不是一个文件,而是一个文件夹,文件夹内之所以有多个文件是因为并行度的问题
使用Kafka作为Sink
为什么程序里的Kafka是生产者?
- 程序处理好的数据通过代码里的
Kafka
生产给Kafka
的消费者输出
package Sink
import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object SInkFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)
//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
}
)
dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "first", new SimpleStringSchema()))
env.execute()
}
}
同样的创建Kafka
的主题:
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic first
创建一个输出到控制台的消费者
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
跳转顶部
使用Kafka形成一个管道
我们从一个Kafka中读取数据,将数据处理后在输出到另一个Kafka
package Sink
import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import java.util.Properties
object SInkToKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(3)
// val inputPath = "src/main/resources/SensorReading"
// val inputStream = env.readTextFile(inputPath)
//从Kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//这边是要读取Kafka的数据,所以算是一个消费者
/**
* 第一个泛型:读取过来的数据类型
* 第一个参数:Kafka名
* 第二个参数:序列化
* 第三个参数:配置项
*/
val stream =
env.addSource(new FlinkKafkaConsumer011[String]
("first", new SimpleStringSchema(), properties))
//转换成样例类类型
val dataStream = stream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
}
)
dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "sensor", new SimpleStringSchema()))
env.execute()
}
}
**我们此时需要两个Kafka
的主题,三者的启动顺序是,先启动生产者Kafka
、再启动消费者Kafka
,最后启动程序
**
JDBC自定义Sink
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
在数据库中创建表
CREATE TABLE `temperatures` (
`sensor` varchar(255) DEFAULT NULL,
`temp` double(255,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
package Sink
import Source.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement}
object SinkToMySql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(3)
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)
//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
dataStream.addSink(new MyJdbcSink())
env.execute()
}
}
class MyJdbcSink() extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/Flink",
"root", "200028")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES(?, ?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}
// 调用连接,执行 sql
override def invoke(value: SensorReading, context:
SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
结果
跳转顶部