一、简介
Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。实现处理的低延迟、高吞吐、准确性和容错性。
需要解决的问题:
1.如何保证处理结果准确性
2.如何保证数据的时序性
3.如何保证容灾
那些行业需要处理流数据:
1.电商和市场营销:数据报表、广告投放、业务流程需要
2.物联网:传感器实时数据采集和显示、实时报警,交通运输业
3.电信业:基站流量调配
4.实时结算和通知推送,实时检测异常行为
二、处理架构对比
2.1 离线处理架构
存储与处理是分离的。
2.2 流式处理框架
2.2.1 第一代流式处理框架(Storm)
-
为了解决内存中数据的可靠性,引入了远程存储的概念,即有状态的流式处理。
与第三代流式处理框架
对比点 | Storm | Spark Streaming |
---|---|---|
实时计算模型 | 纯实时,来一条数据,处理一条数据 | 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理 |
实时计算延迟度 | 毫秒级 | 秒级 |
吞吐量 | 低 | 高 |
事务机制 | 支持完善 | 支持,但不够完善 |
健壮性 / 容错性 | ZooKeeper,Acker,非常强 | Checkpoint,WAL,一般 |
动态调整并行度 | 支持 | 不支持 |
storm保证了低延迟,但是没有保证数据的吞吐量、时序性和准确性
2.2.2 第二代流式处理架构(lambda架构)
-
使用两套系统,流处理系统保证低延迟,批处理系统来校准结果准确性。
2.2.3 第三代流式处理架构 (Flink/Spark Streaming)
Flink的主要特点:
-
事件驱动(Event-driven):一个事件记录进行一次处理,区别于顺序和流程驱动。
-
基于流的世界观:离线数据是有界的流;实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
-
分层API:越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活。如果顶层API不够用,可以通过底层API进行实现。
SQL/Table API:如果业务逻辑比较简单,可以使用顶层的API进行实现;
DataStream API:可以对流进行自定义转换和操作,如开窗;也可以使用DataSet进行批处理操作。是最为常用的API层级。
ProcessFunction(events,state,time):对于非常复杂的业务场景,DataStream API都不能实现,那么可以使用这个最底层的API,可以自定义任何功能。可以获取当前所有的时间
(state)和状态
(time),可以定义定时器
。 其他特点:支持事件时间和处理时间;
精确一次
的状态一致性保证;低延迟,每秒处理百万个事件,毫秒级延迟
;与众多常用的存储系统的连接;高可用
,动态扩展
,实现7*24小时全天候运行。
Spark和Flink的主要差别就在于计算模型不同。Spark采用了微批处理模型,而Flink采用了基于操作符的连续流模型。因此,对Apache Spark和Apache Flink的选择实际上变成了计算模型的选择,而这种选择需要在延迟、吞吐量和可靠性等多个方面进行权衡。
如果对流数据进行处理,推荐使用Flink,主(显而)要(易见)的原因为:
- Flink灵活的窗口
- Exactly Once语义保证
三、Flink的简单使用
依赖
<properties>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将scala代码编译为class字节码文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<!-- 声明编译到maven的compile阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.1 Source输入
word_count功能代码
①批处理
需要导入隐式转换:import org.apache.flink.api.scala._
import org.apache.flink.api.scala.DataSet
import org.apache.flink.api.scala._
object WordCount {
// **_ 的用法:①包中所有类②系统默认初始化③将函数不执行返回④参数占位符⑤隐藏导入的类⑥标识符⑦绝对路径**⑧case _ 不管什么值都匹配⑨case _:BigInt =>... 当后面不用该变量,不关心变量时,可以用 _ 代替。
def main(args: Array[String]):Unit = {
// 创建一个批处理的执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath: String = "C:\\Users\\Administrator\\Desktop\\不常用的项目\\flink_demo\\src\\main\\resources\\test.txt"
val inputDataSet: DataSet[String] = env.readTextFile(inputPath)
// 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
val resultDataSet: DataSet[(String,Int)] = inputDataSet
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0) // 以第一个元素作为key进行分组
.sum(1)
resultDataSet.print()
}
}
②流处理
如在192.168.32.242节点上启动命令nc -lk 7777
向端口7777发送数据包
配置程序的参数--host 192.168.32.242 --port 7777
来监听该端口获取流数据。
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建一个流处理的执行环境 DataStreamApi
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(8) // 设置最大并行度,默认是本机核心数
// 接收一个socket文本流
val paramTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = paramTool.get("host")
val port: Int = paramTool.getInt("port")
val inputDataStream: DataStream[String] = env.socketTextStream(host, port)
// 进行转化处理统计
val resultDataStream: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0) // 流处理没有groupBy,使用keyBy进行聚合
.sum(1)
resultDataStream.print() //分布式处理会导致输出是乱序的
resultDataStream.print().setParallelism(1) //打印的并行度是1,不会输出进程号。
// 定义任务后,开始执行
env.execute("stream word count")
}
}
结果如下,最开始的数字表示运行在哪个线程下,线程号是根据key的hash值来决定的
6> (word,1)
3> (hello,1)
3> (hello,2)
5> (world,1)
原理:会将key计算得到哈希后发送到对应的线程中,这样相同key的数据发送到同一个线程中进行计算,保证了数据的准确性和吞吐量。
问题:分布式会导致时序错乱,如print()算子在不同节点上执行导致输出结果时间错乱。
③从kafka中读取流数据
添加kafka连接依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
object StreamKafka {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 1.从集合中读取数据
val readProperties = new Properties()
readProperties.setProperty("bootstrap.servers", "192.168.32.242:9092")
readProperties.setProperty("group.id", "consumer-group")
readProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
readProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val myConsumer = new FlinkKafkaConsumer[String]("kafka_test", new SimpleStringSchema(), readProperties)
// myConsumer.setStartFromEarliest()
myConsumer.setStartFromLatest()
val inputStream = env.addSource(myConsumer)
val resultDataStream: DataStream[(String, Int)] = inputStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0) // 流处理没有groupBy,使用keyBy进行聚合
.sum(1)
resultDataStream.print() //分布式处理会导致输出是乱序的
resultDataStream.print().setParallelism(1) //打印的并行度是1,不会输出进程号。
env.execute("kafka_source_test")
}
}
在kafka中创建topic:bin/kafka-topics.sh --zookeeper bigdata1:2181 --create --topic kafka_test --partitions 3 --replication-factor 1
启动程序,然后向创建的topic中添加数据bin/kafka-console-producer.sh --broker-list bigdata1:9092 --topic kafka_test
进行测试。
④读取自定义数据源的流式数据
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
object SourceTest {
// 自定义数据源读取流数据
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream2 = env.addSource(new MySensorSource())
stream2.print()
env.execute("diy_source_test")
}
}
// 自定义函数实现run和cancel方法
class MySensorSource() extends SourceFunction[SensorReading] {
// 定义一个标志位表示数据源是否正常发出数据
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
// 定义一个循环,不停生产数据,除非被cancel
val curTemp = 1.to(10).map(i => ("sensor_" + i, Random.nextDouble() * 100))
while (running) {
curTemp.map(
data => (data._1, data._2 + Random.nextGaussian())
)
val curTime = System.currentTimeMillis()
curTemp.foreach(
data => sourceContext.collect(SensorReading(data._1, data._2, curTime))
)
Thread.sleep(1000)
}
}
}
case class SensorReading(name: String, temp: Double, timestamp: Long)
3.2 计算API
3.3 Sink输出
输出包括文件、Kafka、ES、MySQL等
3.4 窗口API
窗口包括滚动窗口(TumblingWindow)、滑动窗口(SlidingWindow)、会话窗口(SessionWindow)和全局窗口(GlobalWindow)。
KeyedStream有window方法调用窗口函数,
DataStream有windowAll方法调用窗口函数(因为并行度是1,性能不高,所以先进行聚合),
调用窗口函数之后,得到的是WindowedStream,无界流变成了有界流。
调用聚合函数后得到的还是DataStream。
窗口函数分为增量聚合函数(数据来一条计算一次)和全窗口函数(收集后统一计算)。ReduceFunction、AggregateFunction等就是增量聚合函数,ProcessWindowFunction就是全窗口函数。
3.5 其他API
trigger() 触发器:定义window什么时候关闭,触发计算并输出结果
evictor() 移除器:定义删除某些数据逻辑
allowedLateness():允许处理迟到的数据
sideOutputLateData():将迟到的数据放入到侧输出流
getSideOuntput():获取侧输出流
四、服务器中运行程序
4.1单机模式
4.1.1 启动单机模式
下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
启动命令
/bin/start-cluster.sh
启动单节点模式。
可以访问UI界面http://192.168.32.242:8081
4.1.2 提交任务
设置并行度的优先级(由高到低)
- 算子设置
- 全局设置
- 提交时UI界面输入设置
- 配置文件设置
4.1.3 上传文件设置参数
4.1.4 查看执行计划
4.1.5 启动任务
可以通过UI界面提交任务,也可以通过命令行提交任务:
bin/flink run -c com.iotmars.wecook.StreamWordCount -p 2 /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 6666
4.1.6 查看任务
bin/flink list [-a]
4.1.7 取消任务
可以通过UI界面取消任务,也可以通过命令行取消任务:
bin/flink cancel 4198899a1a47f496309fe2da2e31c1f5
4.1.8 停止flink集群
/bin/stop-cluster.sh
4.2 Yarn模式
4.2.1 Session-cluster
概念:Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业无法提交,只能等到yarn中的一个作业执行完成之后释放了资源,才能进行下一个作业任务。
所有作业共享Dispatcher和ResourceManager,共享资源,适合规模小执行时间短的作业。
操作:
①启动yarn-session:
// 添加环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
// 启动yarn-session
yarn-session.sh -n 2 -s 2 -jm 1024 -nm test -d
// 或调度器中创建了多个队列,需要指定队列
nohup ./yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-on-yarn -qu flink -d 1>/opt/module/flink-1.12.0/yarn-session.log 2>/opt/module/flink-1.12.0/yarn-session.err &
关闭yarn-session:
# 找到flink集群任务的id,然后kill
yarn application -kill application_1616059084025_0002
②提交任务(和standalone模式一样):
bin/flink run -c com.iotmars.wecook.StreamWordCount -p 2 /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 6666
③查看任务状态:去yarn控制台查看任务状态
④取消yarn-session:
yarn application --kill job_id
可以通过http://192.168.32.243:37807访问Web页面(会在某台服务器上部署一个Web页面)
4.2.2 Per-Job-Cluster
概念:一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,知道作业执行完成,一个作业失败与否不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请,适合大规模长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成后创建的集群也会消失。
操作:
①不启动yarn-session,直接执行job
flink run `-m yarn-cluster` -c com.iotmars.wecook.StreamWordCount /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 7777
4.3 Kubernetes部署
Flink在最近的版本中也支持了k8s部署模式。
①搭建k8s集群
②配置各组件的yaml文件
在k8s上构建Flink Session Cluster,需要将Flink集群的组件对应的docker镜像分别在k8s上启动,包括JobManager、JobManager、JobManagerService三个镜像服务。每个镜像服务都可以从中央镜像仓库中获取。
③启动Flink Session Cluster
// 启动jobmanager-service服务
kubectl create -f jobmanager-service.yaml
// 启动jobmanager-deployment服务
kubectl create -f jobmanager-deployment.yaml
// 启动taskmanager-deployment服务
kubectl create -f taskmanager-deployment.yaml
④访问Flink UI页面
集群启动后,就可以通过JobManagerServices中配置的WebUI端口进行访问
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
五、 运行时流程
5.1 Flink的单机模式
- Dispatcher提供了restful风格的接口用于提交任务;
- JobManager分析任务生成执行图,然后向ResourceManager申请slot资源;
- ResourceManager启动TaskManager,TaskManager在启动会向ResourceManager进行注册slot;
- JobManager提交要在slot中执行的任务。
5.2 Yarn的Per-Job-Cluster模式下的任务提交流程
流程概述:
提交Job请求后由Yarn启动所需资源数量的容器,容器中启动的TaskManager节点向ResourceManager进行slots的注册,然后JobManager向slots分发任务执行。
相当于每次提交Job后都启动一个Flink的TaskManager集群进行处理。
流程详情:
- 提交JOB后,任务到YARN的ResourceManager;
- RM启动AM,在AM中启动Flink的JobManager和ResourceManager;
- 在JobManager中进行任务切分,然后向Flink的ResourceManager进行任务请求,再向Yarn的ResourceManager进行资源申请;
- Yarn的ResourceManager根据请求的资源数启动NodeManager,然后在容器中启动TaskManager;
- TaskManager向Flink的ResourceManager进行slots资源注册,然后由Flink的JobManager进行资源分配。
5.3 TaskManager和Slot
两者关系
- 一个TaskManager至少有一个Slot,有多少个slot可以接收多少个任务。
- Slot拥有独立的内存,而不是独立的cpu。可以理解为Flink中每一个TaskManager都是一个JVM进程,它会在独立的线程上执行一个或多个子任务。
任务分配
- 并不是每个算子的任务都会放到不同的Slot中,而是同一个任务会根据并发分配到不同的Slot中,而不同任务会分配到同一个Slot中,所以可能会有slot会保存一个pipeline(即包含了所有的算子任务)。好处是如果算子工作量不同,不会导致某些Slot闲置,可以动态分配任务量,可以充分利用多核CPU并发)。
- 因此所需Slot的最大数量就是任务最大并行度的值。
数据传输形式
- one-to-one:map、filter、flatMap等算子符合one-to-one对应关系。这些算子的子任务得到的元素的个数和顺序跟source算子的子任务生产的元素的个数、顺序相同。
- redistributing:如果前后任务并行度不同,会采用round-robin重新分区;如果是keyBy等类似shuffle的宽依赖操作算子,会进行hashCode重分区;而broadcast和rebalance会随机重分区;
- local forward(本地转发):flink采用了任务链优化技术,采用one-to-one操作且设置算子的并行度相同,可以实现本地转发进行任务连接。这样数据在本地进行流转,减少了序列化和网络传输的花销。
任务链拆分和Slot独占
如上所述,flink会自动优化任务链合并多个任务为一个任务。
- 如果某个算子任务特别复杂需要单独拆出来,可以在算子后面设置
.disableChaining()
,这样这个任务就不会被合并; - 如果想把合并的任务拆分为两个任务,可以在需要拆分的算子后添加
.startNewChain()
。
共享组
可以将不同的任务分配到指定的共享组内。只有在同一组的算子任务会共享Slot,不同组的任务不会存在同一个Slot中。这样需要重新计算需要的Slot的数量。
- 如拆分出来的算子任务如果想独占一个Slot,可以在算子后面添加
.slotSharingGroup("a")
。
六、时间语义和水位线
6.1 时间概念
Event Time:事件创建时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间,与机器相关
6.2 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果不设置,默认是Processing Time语义。
6.3 水位线
6.3.1 概念
水位线用于标记当前任务的时间,被水位线没过的时间窗口才会触发执行。当前水位线是到达的最大事件时间戳减去设置的延迟时间。
6.3.2 水位线上下游传递
任务会记录所有上游任务传递的水位线,并取最低的水位线传递给下游的任务。
当前事件时间-水位线延迟时间=水位线
水位线-窗口允许延迟时间=窗口关闭时间
注意点:
- 1.如果水位线没过窗口,那么会对窗口进行计算但不关闭,延迟数据进入后会重复触发窗口的计算,直到到达窗口关闭时间。
- 2.如果迟到数据属于多个窗口,每个窗口都会进行计算。
- 3.只有当迟到数据不属于任何一个窗口时,才会进入侧输出流。
水位线生成逻辑:
①
new BoundedOutOfOrdernessTimestampExtractorModelLogQ6 {
override def extractTimestamp(element: ModelLogQ6): Long = element.timestamp
}
如上对象为周期性生成watermark逻辑,默认为每隔200ms生成一个。
// 手动设置时间间隔为1s
env.getConfig().setAutoWatermarkInterval(1000);
②
new AssignerWithPunctuatedWatermarks[ModelLogQ6] {
override def checkAndGetNextWatermark(lastElement: ModelLogQ6, extractedTimestamp: Long): Watermark = ???
override def extractTimestamp(element: ModelLogQ6, recordTimestamp: Long): Long = ???
}
每条数据到达都可以生成一个watermark。
七、Flink中的状态
算子状态(Operator state)
键控状态(keyed state)
7.2 状态后端
MemoryStateBackend:将键控状态存储在taskManager的JVM堆上,checkpoint存储在JobManager的内存中。一般测试时使用。
FsStateBackend:键控状态存储在TaskManager的JVM堆上,而将checkPoint存储在dfs上。Flink默认的配置。
RocksDBStateBackend:所有状态序列化后存储到本地RocksDB中。
7.3 一致性检查点checkpoint
checkpoint策略配置
env.enableCheckpointing(1000L) // 设置checkpoint的间隔时间
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) //设置checkpoint的模式
env.getCheckpointConfig.setCheckpointTimeout(60000L) //设置checkpoint的超时时间
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //允许最多两个checkpoint同时执行
env.getCheckpointconfig.setMinPauseBetweenCheckpoints(500L) //保证两次checkpoint之间至少有500ms的间隔时间
env.getCheckpointconfig.setPreferCheckpointForRecovery(true) //是否直接使用checkpoint进行故障恢复。默认是false,表示如果savepoint更近,则会使用savepoint进行恢复
env.getCheckpointconfig.setTolerableCheckpointFailureNumber(0) // 能容忍多少次的checkpoint失败。如果是0,则checkpoint失败会导致任务失败,进行重启。
重启策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L)) // 固定时间间隔尝试的重启次数。
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS))) // 在固定时间内间隔多少时间重启多少次
7.4 保存点savepoints
原理同checkpoint,但是需要手动触发savepoint操作。
可以用于手动备份、更新应用程序、版本迁移、暂停和重启应用。
最好在程序中定义这个任务的uid(.uid("1")),这样可以清楚找到任务对应的savepoint。
7.5 状态一致性
AT-MOST-ONCE 最多一次
AT-LEASE-ONCE 至少一次
EXACTLY-ONCE 精确一次
- 内部保证 —— checkpoint
- source端 —— 可重设数据的读取位置
- sink端 —— 从故障恢复时,数据不会重复写入外部系统
7.6 端到端一致性
7.6.1幂等写入**
7.6.2事务写入
概念:
预写日志WAL:将结果先进行保存,收到checkpoint完成通知后一次性写入到sink中。优点是简单;缺点是延迟变大而且可能会丢数据。
两阶段提交2PC:每个checkpoint,sink任务会启动一个事务,将接下来所有接收的数据添加到事务中。然后将数据写入到外部sink系统,但不提交(预提交),当收到checkpoint完成通知后才正式提交事务。Flink提供了TwoPhaseCommitSinkFunction接口。优点:实现exactly-once;缺点:需要提供事务支持的外部sink系统,而且可以在进程失败后恢复事务。如果sink超时关闭也会导致预提交的数据丢失。
Kafka中的两阶段提交
source消费Kafka数据,在checkpoint点插入barrier并记录offset到状态后端,sink会开启事务对到达的数据进行预提交。当barrier到达sink时,会开启一个新的事务对后续的数据进行预提交,直到barrier到达所有的sink,完成checkpoint后JobManager会通知Kafka进行正式提交关闭事务。
注意:1.Kafka隔离级别设置为read-commit;2.Kafka超时时间默认为15m,flink默认的checkpoint超时时间为1h。设置checkpoint超时间小于Kafka。
八、Table API 和 FLink SQL
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- 引入格式描述器FormatDescriptor,适配Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
简单使用
object TableApiTest {
def main(args: Array[String]): Unit = {
// 1. 创建环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
// 1.1 基于老版本planner的流处理
val oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings)
// 1.2 基于老版本planner的批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)
// 1.3 基于blink planner的流处理
val blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamEnv = StreamTableEnvironment.create(env, blinkStreamSettings)
// 1.4 基于blink planner的批处理
val blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchEnv = TableEnvironment.create(blinkBatchSettings)
// 2. 连接外部系统,读取数据,注册表
// 2.1 读取文件,进行表注册并读取数据
val path = "C:\\Users\\Administrator\\Desktop\\不常用的项目\\flink_demo\\src\\main\\resources\\test.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new Csv())
.withSchema(new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
)
.createTemporaryTable("fileInputTable")
val fileInputTable: Table = tableEnv.from("fileInputTable")
fileInputTable.toAppendStream[(String,Int)].print()
// 2.2 从Kafka读取数据,进行表注册并读取数据
tableEnv
.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "192.168.32.242:2181")
.property("bootstrap.servers", "192.168.32.242:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
)
.createTemporaryTable("kafkaInputTable")
val kafkaInputTable = tableEnv.from("kafkaInputTable")
kafkaInputTable.toAppendStream[(String,Int)].print()
env.execute("table api test")
}
}
表是由一个“标识符”来指定,由3部分组成:Catalog名、数据库(database)名和表名。
表包括常规表和视图View。
两种连接外部数据库的方式