0
点赞
收藏
分享

微信扫一扫

【Flink Scala】Flink流处理API

zhongjh 2022-03-30 阅读 41



Flink流处理API

  • ​​Environment​​
  • ​​Source​​
  • ​​从集合读取数据​​
  • ​​从文件读取数据​​
  • ​​从Kafka读取数据​​
  • ​​自定义Source​​
  • ​​换算子Transform​​
  • ​​简单的换算子(Map、FlatMAp和Filter)​​
  • ​​键控流的换算子(keyBy、滚动聚合和reduce)​​
  • ​​多流的换算子(Split、select、connect、CoMap和Union)​​
  • ​​支持的数据类型​​
  • ​​自定义UDF函数​​
  • ​​自定义函数和匿名函数​​
  • ​​富函数​​
  • ​​Sink​​
  • ​​存储在文件中​​
  • ​​使用Kafka作为Sink​​
  • ​​使用Kafka形成一个管道​​
  • ​​JDBC自定义Sink​​



Environment

【Flink Scala】Flink流处理API_apache

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,​​getExecutionEnvironment​​会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

创建一个批处理

val env = ExecutionEnvironment.getExecutionEnvironment

创建一个流处理

val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以​​flink-conf.yaml​​中的配置为准,默认是 1。

【Flink Scala】Flink流处理API_apache_02

并行度的优先级是

每个函数 > 整个程序 > 配置文件

​​跳顶部​​

Source

从集合读取数据

package Source

import org.apache.flink.streaming.api.scala._

//定义样例类
case class SensorReading(id: String, timeStamp: Long, temperature: Double)

object SourceFromCollection {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//从集合中读取数据
val dataList = (List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
//从集合中读取数据
val stream1 = env.fromCollection(dataList)
stream1.print()
env.execute()
}
}

【Flink Scala】Flink流处理API_apache_03

我们可以发现我们输出的数据和输入数据的顺序是不一样的,这是因为我们没有设置并行度,那么他就会按照​​CPU​​的个数来设置并行度,由于是并行的那么输出的顺序就是不一样的

在读取数据的时候有一个特殊的方法​​fromElements​​,他读取的数据类不限,你给他说明他就输出说明

val stream1 = env.fromElements(dataList)
val stream2 = env.fromElements(1.0,"aa",1)
stream1.print()
stream2.print()

【Flink Scala】Flink流处理API_apache_04

​​跳顶部​​

从文件读取数据

先创建文件数据


sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1


package Source

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object SourceFromFile {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//读取文件
val filePath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(filePath)

inputStream.print()

env.execute()
}
}

【Flink Scala】Flink流处理API_apache_05

​​跳顶部​​

从Kafka读取数据

在与​​Kafka​​连接时,需要先导入一个依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>

为什么程序中是消费者?

  • 因为我们从​​Kafka​​的生产者中输出数据,在通过​​Kafka​​的消费者里读取数据
package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Properties

object SourceFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//配置项
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

//这边是要读取Kafka的数据,所以算是一个消费者
/**
* 第一个泛型:读取过来的数据类型
* 第一个参数:Kafka名
* 第二个参数:序列化
* 第三个参数:配置项
*/
val stream =
env.addSource(new FlinkKafkaConsumer011[String]
("first", new SimpleStringSchema(), properties))

stream.print()
env.execute()
}
}

​FlinkKafkaConsumer011​​的参数解释

  • 泛型是获取到的数据类型
  • 第一个参数是​​Kafka​​得到管道名
  • 第二个参数是获取数据类型的序列化
  • 第三个参数是配置项

确保​​zookeeper​​和​​Kafka​​开启

  • ​zookeeper​​开启命令:​​zkServer.sh start​
  • ​kafka​​开启命令:​​bin/kafka-server-start.sh -daemon config/server.properties​

创建​​Kafka​​管道

  • 命令如下:​​bin/kafka-topics.sh --create --zookeeper 192.168.23.69:2181 --replication-factor 3 --partitions 1 --topic first​
  • 开生产者:​​bin/kafka-console-producer.sh --broker-list a:9092 --topic first​需要修改​​window​​的映射文件,否则会报错

【Flink Scala】Flink流处理API_apache_06

结果展示

【Flink Scala】Flink流处理API_kafka_07

​​跳顶部​​

自定义Source

我首先需要先自定义个​​Source​​,需要继承​​SourceFunction​​类,泛型是输出的数据类型

/**
* 自定义Source类
*/
class MySensorSource() extends SourceFunction[SensorReading] {
//定义一个标志位flag,依赖表示数据源是否正常运行发出数据
var flag = true

override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
//定义一个随机数发生器
val rand = new Random()

//随机生成十个初始温度:(id:temp)
var curTemp = 1.to(10).map(i => ("sensor_" + 1, rand.nextDouble() * 100))

//定义无限循环,不停的产生数据,出发呗cancel
while (flag) {
//在上次温度的基础上更新温度值
curTemp = curTemp.map(
data => (data._1, data._2 + rand.nextGaussian())
)
//获取当前时间戳,将时间戳撞倒sensor,然后使用context发出数据
val curTime = System.currentTimeMillis()
curTemp.foreach(
data => sourceContext.collect(SensorReading(data._1, curTime, data._2))
)
//.间隔一段时间
Thread.sleep(1000)
}
}

override def cancel(): Unit = flag = false
}

使用​​addSource​​方法调用类

package Source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object SourceFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.addSource(new MySensorSource())

stream.print()
env.execute()
}
}

【Flink Scala】Flink流处理API_kafka_08

​​跳顶部​​

转换算子Transform

简单的转换算子(Map、FlatMAp和Filter)

​Map​换操作:

【Flink Scala】Flink流处理API_flink_09

使用语法

val streamMap = stream.map { x => x * 2 }

如图可见,​​map​​算子是将数据从一种形态换成另一种形态,一对一的转换

​flatMap​​算子的使用

【Flink Scala】Flink流处理API_apache_10

​flatMap​​ 的函数签名:​​def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]​

例如: ​​flatMap(List(1,2,3))(i ⇒ List(i,i))​

结果是 ​​List(1,1,2,2,3,3)​​, 而 ​​List("a b", "c d").flatMap(line ⇒ line.split(" "))​

结果是 ​​List(a, b, c, d)。​

val streamFlatMap = stream.flatMap{
x => x.split(" ")
}

​flatMap​​算子个一对多的算子

​Filter​​算子

【Flink Scala】Flink流处理API_apache_11

val streamFilter = stream.filter{
x => x == 1
}

​​跳顶部​​

键控流的转换算子(keyBy、滚动聚合和reduce)

​keyBy​​算子

【Flink Scala】Flink流处理API_scala_12

​DataStream → KeyedStream​​:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 ​​key​​的元素,在内部以​​hash​​的形式实现的。

最后的结果就是同一个​​key​​必定在同一个分区,但是不同的分区里面不一定是同一个​​key​​,因为分区数有限

滚动聚合算子

  • 滚动聚合算子是针对​​KeyedStream​​的每一个支流做聚合

算子

作用

sum

求和

min

最小值

max

最大值

minBy

最小值的整条数据

maxBy

最大值的整条数据

我们来测试一下​​minBy​​和​​min​​的区别,测试数据如下


sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.7


package transform

import Source.SensorReading
import org.apache.flink.streaming.api.scala._

object transformatTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)

//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
//分组聚合输出每个传感器温度最小值
val aggStream = dataStream
.keyBy("id") //根据ID分组
.min(2) //第二个最小值,也可以写属性名

aggStream.print()
env.execute()
}
}

【Flink Scala】Flink流处理API_kafka_13

我们可以看到虽然最小值找了出来,但是时间戳数据确实错的,那我们将​​min​​改为​​minBy​​来查看结果是否一致

【Flink Scala】Flink流处理API_scala_14

这个时候数据就匹配了,说明​​minBy​​返回的是整条数据

​reduce​​算子

​KeyedStream​​ → ​​DataStream​​:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

计算温度的最小值和时间的最新值

val resultStream = dataStream
.keyBy("id")
.reduce(
(curState, newDate) =>
SensorReading(curState.id, newDate.timeStamp, curState.temperature.min(newDate.temperature))
)
resultStream.print()

【Flink Scala】Flink流处理API_scala_15

在例题中:​​curState​​是之前聚合处理留下来的结果

自定义实现一个​​Reduce​​,需要继承​​ReduceFunction​​类

class MyReduceFunction extends ReduceFunction[SensorReading] {
/**
*
* @param t 是之前累计运算的结果
* @param t1 是新传来等待处理的数据
* @return
*/
override def reduce(t: SensorReading, t1: SensorReading): SensorReading = SensorReading(t.id, t1.timeStamp, t.temperature.min(t1.temperature))
}

​​跳顶部​​

多流的转换算子(Split、select、connect、CoMap和Union)

分流操作

  • ​Spilt​​,其实​​Spilt​​在实际上并没有将流分成俩快。而是类似于分组的操作

    【Flink Scala】Flink流处理API_apache_16

  • ​DataStream​​ → ​​SplitStream​​:根据某些特征把一个 DataStream 拆分成两个或者=多个 ​​DataStream​

  • ​Select​【Flink Scala】Flink流处理API_apache_17

  • ​SplitStream​​→​​DataStream​​:从一个 ​​SplitStream​​ 中获取一个或者多个​​DataStream​​。

需求练习:以三十度为界,将数据拆分成两个流

package transform

import Source.SensorReading
import org.apache.flink.streaming.api.scala._

object SplitAndSelect {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)

//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
//分流操作
val spiltStream = dataStream
.split(
data => {
if (data.temperature > 30.0) Seq("High") else Seq("low")
}
)
//获取流
val HighTempStream = spiltStream.select("High")
val LowTempStream = spiltStream.select("low")
val allStream = spiltStream.select("High", "low")

HighTempStream.print("high")
LowTempStream.print("low")
allStream.print("all")

env.execute()
}
}

【Flink Scala】Flink流处理API_scala_18

合流操作

【Flink Scala】Flink流处理API_kafka_19

  • ​DataStream​​,​​DataStream​​ → ​​ConnectedStreams​​:连接两个保持他们类型的数据流,两个数据流被 ​​Connect​​之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。【Flink Scala】Flink流处理API_scala_20
  • ​ConnectedStreams​​ → ​​DataStream​​:作用于 ​​ConnectedStreams​​上,功能与 ​​map​​和 ​​flatMap​​ 一样,对 ​​ConnectedStreams​​ 中的每一个 ​​Stream​​分别进行​​map​​和 ​​flatMap​​处理,也就是说,如有两个​​stream​​在内,在使用​​CoMap​​时必须定义两个​​Map​​的用法
//将数据类型转换一下
val warningStream = HighTempStream.map(data => (data.id, data.temperature))

val connectStream = warningStream.connect(LowTempStream)

//CoMap对数据继续处理
val CoMapStream = connectStream
.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy"
)
)

​union​​算子

【Flink Scala】Flink流处理API_flink_21

​DataStream​​ → ​​DataStream​​:对两个或者两个以上的 ​​DataStream​​ 进行​​union​​ 操作,产生一个包含所有 ​​DataStream​​元素的新 ​​DataStream​

​Connect​​与 ​​Union​​ 区别:

  • ​Union​​ 之前两个流的类型必须是一样,​​Connect​​可以不一样,在之后的 ​​coMap​​中再去调整成为一样的。

  • ​Connect​​只能操作两个流,​​Union​​可以操作多个

​​跳顶部​​

支持的数据类型

​Flink​​流应用程序处理的是以数据对象表示的事件流。所以在​​Flink​​内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。​​Flink​​使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

​Flink​​还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 ​​lambda​​函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

​Flink​​支持 ​​Java​​和​​Scala​​ 中所有常见数据类型。使用最广泛的类型有以下几种

基础数据类型

  • ​Flink​​ 支持所有的 ​​Java​​和 ​​Scala​​基础数据类型,​​Int​​, ​​Double​​, ​​Long​​, ​​String​​, …
  • val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
    numbers.map( n => n + 1 )

​Java​​和 ​​Scala​​元组(​​Tuples​​)

  • 样例
  • val persons: DataStream[(String, Integer)] = env.fromElements(
    ("Adam", 17),
    ("Sarah", 23) )
    persons.filter(p => p._2 > 18

​Scala​​样例类

  • 格式
  • case class Person(name: String, age: Int)
    val persons: DataStream[Person] = env.fromElements(
    Person("Adam", 17),
    Person("Sarah", 23) )
    persons.filter(p => p.age > 18)

** ​​Java​​​ 简单对象(​​POJOs​​)**

  • 格式如下
  • public class Person {
    public String name;
    public int age;
    public Person() {}
    public Person(String name, int age) {
    this.name = name;
    this.age = age;
    }
    }
    DataStream<Person> persons = env.fromElements(
    new Person("Alex", 42),
    new Person("Wendy", 23));

其它(​​Arrays, Lists, Maps, Enums​​) 等

  • ​Flink​​ 对 ​​Java​​和 ​​Scala​​ 中的一些特殊目的的类型也都是支持的,比如 ​​Java​​的​​ArrayList​​,​​HashMap​​,​​Enum​​ 等等。

​​跳顶部​​

自定义UDF函数

自定义函数和匿名函数

​Flink​​ 暴露了所有 ​​udf​​函数的接口(实现方式为接口或者抽象类)。例如​​MapFunction​​, ​​FilterFunction​​, ​​ProcessFunction​​ 等等

​Flink​​想要自定义函数,只要使自定义类继承对应的​​Function​​类然后重写方法即可

例如,我们下面自定义​​Filter​​函数,我们只保留以​​sensor_1​​开头的数据

class MyFilterFunction extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean =
t.id.startsWith("sensor_1")
}

函数的使用

object MyFilterTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)

//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)
val result = dataStream.filter(new MyFilterFunction)
result.print()
env.execute()
}
}

【Flink Scala】Flink流处理API_kafka_22

结果上显示,我们的筛选使成功的

其实有些时候我们不需要非要创建一个类来实现,我们可以使用匿名类的方式

val result = dataStream.filter(new FilterFunction[SensorReading] {
override def filter(t: SensorReading) = t.id.startsWith("sensor_1")
})

我们 ​​filter​​ 的字符串"sensor_!"还可以当作参数传进去

val flinkTweets = tweets.filter(new KeywordFilter("sensor_1"))

class KeywordFilter(keyWord: SensorReading) extends FilterFunction[SensorReading] {
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(keyWord)
}
}

​​跳顶部​​

富函数

“富函数”是 ​​DataStream API​​提供的一个函数类的接口,所有 ​​Flink​​函数类都有其 ​​Rich​​ 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

​Rich Function​​ 有一个生命周期的概念。典型的生命周期方法有:

  • ​open()​​方法是​​rich function​​ 的初始化方法,当一个算子例如​​map​​或者​​filter​​被调用之前​​open()​​会被调用。

  • ​close()​​方法是生命周期中的最后一个调用的方法,做一些清理工作。

  • ​getRuntimeContext()​​方法提供了函数的 ​​RuntimeContext​​的一些信息,例如函数执行的并行度,任务的名字,以及 ​​state​​状态

在自定义富函数的可以重写方法

【Flink Scala】Flink流处理API_apache_23

非富函数可重写

【Flink Scala】Flink流处理API_flink_24

​​跳顶部​​

Sink

存储在文件中

​Flink​​中可以将数据保存在本地文件中

dataStream.writeAsCsv("src/main/resources/out.csv")

不仅仅只有上面的一个还有如下的方法

【Flink Scala】Flink流处理API_kafka_25

我们通过图片可以看到,上面的方法大多都是已经过时了的,这里我们提供一个新的方法

dataStream.addSink(StreamingFileSink.forRowFormat(
new Path("src/main/resources/out.csv"),
new SimpleStringEncoder[SensorReading]()
).build())

其中第一个参数就是指定文件系统与其存储的路径,第二个参数就是存储的字符编码

【Flink Scala】Flink流处理API_apache_26

我们可以发现,最后的输出结果并不是一个文件,而是一个文件夹,文件夹内之所以有多个文件是因为并行度的问题

使用Kafka作为Sink

为什么程序里的Kafka是生产者?

  • 程序处理好的数据通过代码里的​​Kafka​​生产给​​Kafka​​的消费者输出
package Sink

import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object SInkFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)

val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)

//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
}
)

dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "first", new SimpleStringSchema()))

env.execute()
}
}

同样的创建​​Kafka​​的主题:

bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic first

创建一个输出到控制台的消费者

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first

【Flink Scala】Flink流处理API_kafka_27

​​跳顶部​​

使用Kafka形成一个管道

我们从一个Kafka中读取数据,将数据处理后在输出到另一个Kafka

package Sink

import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}

import java.util.Properties

object SInkToKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(3)

// val inputPath = "src/main/resources/SensorReading"
// val inputStream = env.readTextFile(inputPath)

//从Kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

//这边是要读取Kafka的数据,所以算是一个消费者
/**
* 第一个泛型:读取过来的数据类型
* 第一个参数:Kafka名
* 第二个参数:序列化
* 第三个参数:配置项
*/
val stream =
env.addSource(new FlinkKafkaConsumer011[String]
("first", new SimpleStringSchema(), properties))

//转换成样例类类型
val dataStream = stream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
}
)

dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "sensor", new SimpleStringSchema()))

env.execute()
}
}

**我们此时需要两个​​Kafka​​的主题,三者的启动顺序是,先启动生产者​​Kafka​​、再启动消费者​​Kafka​​,最后启动程序【Flink Scala】Flink流处理API_apache_28

**

JDBC自定义Sink

导入依赖

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>

在数据库中创建表

CREATE TABLE `temperatures` (
`sensor` varchar(255) DEFAULT NULL,
`temp` double(255,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
package Sink

import Source.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement}

object SinkToMySql {
def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(3)

val inputPath = "src/main/resources/SensorReading"
val inputStream = env.readTextFile(inputPath)

//转换成样例类类型
val dataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)

dataStream.addSink(new MyJdbcSink())

env.execute()
}
}

class MyJdbcSink() extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/Flink",
"root", "200028")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES(?, ?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}

// 调用连接,执行 sql
override def invoke(value: SensorReading, context:
SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}

结果

【Flink Scala】Flink流处理API_scala_29

​​跳顶部​​


举报

相关推荐

0 条评论