0
点赞
收藏
分享

微信扫一扫

Flink 1.12 基本概念和集群部署

Gascognya 2021-09-21 阅读 71

一、简介

Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。实现处理的低延迟、高吞吐、准确性和容错性。

需要解决的问题:
1.如何保证处理结果准确性
2.如何保证数据的时序性
3.如何保证容灾

那些行业需要处理流数据:
1.电商和市场营销:数据报表、广告投放、业务流程需要
2.物联网:传感器实时数据采集和显示、实时报警,交通运输业
3.电信业:基站流量调配
4.实时结算和通知推送,实时检测异常行为

二、处理架构对比

2.1 离线处理架构


存储与处理是分离的。


2.2 流式处理框架

2.2.1 第一代流式处理框架(Storm)


  • 为了解决内存中数据的可靠性,引入了远程存储的概念,即有状态的流式处理。

与第三代流式处理框架

对比点 Storm Spark Streaming
实时计算模型 纯实时,来一条数据,处理一条数据 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理
实时计算延迟度 毫秒级 秒级
吞吐量
事务机制 支持完善 支持,但不够完善
健壮性 / 容错性 ZooKeeper,Acker,非常强 Checkpoint,WAL,一般
动态调整并行度 支持 不支持

storm保证了低延迟,但是没有保证数据的吞吐量、时序性和准确性


2.2.2 第二代流式处理架构(lambda架构)


  • 使用两套系统,流处理系统保证低延迟,批处理系统来校准结果准确性。


2.2.3 第三代流式处理架构 (Flink/Spark Streaming)

Flink的主要特点:

  • 事件驱动(Event-driven):一个事件记录进行一次处理,区别于顺序和流程驱动。


  • 基于流的世界观:离线数据是有界的流;实时数据是一个没有界限的流,这就是所谓的有界流和无界流。


  • 分层API:越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活。如果顶层API不够用,可以通过底层API进行实现。


    SQL/Table API:如果业务逻辑比较简单,可以使用顶层的API进行实现;
    DataStream API:可以对流进行自定义转换和操作,如开窗;也可以使用DataSet进行批处理操作。是最为常用的API层级。
    ProcessFunction(events,state,time):对于非常复杂的业务场景,DataStream API都不能实现,那么可以使用这个最底层的API,可以自定义任何功能。可以获取当前所有的时间(state)和状态(time),可以定义定时器

  • 其他特点:支持事件时间和处理时间;精确一次的状态一致性保证;低延迟,每秒处理百万个事件,毫秒级延迟;与众多常用的存储系统的连接;高可用动态扩展,实现7*24小时全天候运行。

Spark和Flink的主要差别就在于计算模型不同。Spark采用了微批处理模型,而Flink采用了基于操作符的连续流模型。因此,对Apache Spark和Apache Flink的选择实际上变成了计算模型的选择,而这种选择需要在延迟、吞吐量和可靠性等多个方面进行权衡。
如果对流数据进行处理,推荐使用Flink,主(显而)要(易见)的原因为:

  • Flink灵活的窗口
  • Exactly Once语义保证


三、Flink的简单使用

依赖

    <properties>
        <flink.version>1.12.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将scala代码编译为class字节码文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.4.0</version>
                <executions>
                    <execution>
                        <!-- 声明编译到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.1 Source输入

word_count功能代码
①批处理
需要导入隐式转换:import org.apache.flink.api.scala._

import org.apache.flink.api.scala.DataSet
import org.apache.flink.api.scala._

object WordCount {
  //  **_  的用法:①包中所有类②系统默认初始化③将函数不执行返回④参数占位符⑤隐藏导入的类⑥标识符⑦绝对路径**⑧case _ 不管什么值都匹配⑨case _:BigInt =>...  当后面不用该变量,不关心变量时,可以用 _ 代替。
  def main(args: Array[String]):Unit = {
    // 创建一个批处理的执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 从文件中读取数据
    val inputPath: String = "C:\\Users\\Administrator\\Desktop\\不常用的项目\\flink_demo\\src\\main\\resources\\test.txt"
    val inputDataSet: DataSet[String] = env.readTextFile(inputPath)

    // 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
    val resultDataSet: DataSet[(String,Int)] = inputDataSet
      .flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)  // 以第一个元素作为key进行分组
      .sum(1)

    resultDataSet.print()
  }
}

②流处理
如在192.168.32.242节点上启动命令nc -lk 7777向端口7777发送数据包
配置程序的参数--host 192.168.32.242 --port 7777来监听该端口获取流数据。

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理的执行环境  DataStreamApi
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(8) // 设置最大并行度,默认是本机核心数

    // 接收一个socket文本流
    val paramTool: ParameterTool = ParameterTool.fromArgs(args)
    val host: String = paramTool.get("host")
    val port: Int = paramTool.getInt("port")
    val inputDataStream: DataStream[String] = env.socketTextStream(host, port)

    // 进行转化处理统计
    val resultDataStream: DataStream[(String, Int)] = inputDataStream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0) // 流处理没有groupBy,使用keyBy进行聚合
      .sum(1)

    resultDataStream.print() //分布式处理会导致输出是乱序的
    resultDataStream.print().setParallelism(1) //打印的并行度是1,不会输出进程号。

    // 定义任务后,开始执行
    env.execute("stream word count")
  }
}

结果如下,最开始的数字表示运行在哪个线程下,线程号是根据key的hash值来决定的

6> (word,1)
3> (hello,1)
3> (hello,2)
5> (world,1)

原理:会将key计算得到哈希后发送到对应的线程中,这样相同key的数据发送到同一个线程中进行计算,保证了数据的准确性和吞吐量。
问题:分布式会导致时序错乱,如print()算子在不同节点上执行导致输出结果时间错乱。

③从kafka中读取流数据
添加kafka连接依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._

object StreamKafka {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 1.从集合中读取数据
    val readProperties = new Properties()
    readProperties.setProperty("bootstrap.servers", "192.168.32.242:9092")
    readProperties.setProperty("group.id", "consumer-group")
    readProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    readProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val myConsumer = new FlinkKafkaConsumer[String]("kafka_test", new SimpleStringSchema(), readProperties)
    //    myConsumer.setStartFromEarliest()
    myConsumer.setStartFromLatest()

    val inputStream = env.addSource(myConsumer)

    val resultDataStream: DataStream[(String, Int)] = inputStream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0) // 流处理没有groupBy,使用keyBy进行聚合
      .sum(1)

    resultDataStream.print() //分布式处理会导致输出是乱序的
    resultDataStream.print().setParallelism(1) //打印的并行度是1,不会输出进程号。

    env.execute("kafka_source_test")
  }
}

在kafka中创建topic:bin/kafka-topics.sh --zookeeper bigdata1:2181 --create --topic kafka_test --partitions 3 --replication-factor 1
启动程序,然后向创建的topic中添加数据bin/kafka-console-producer.sh --broker-list bigdata1:9092 --topic kafka_test进行测试。

④读取自定义数据源的流式数据

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

object SourceTest {
  // 自定义数据源读取流数据
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val stream2 = env.addSource(new MySensorSource())

    stream2.print()

    env.execute("diy_source_test")

  }

}

// 自定义函数实现run和cancel方法
class MySensorSource() extends SourceFunction[SensorReading] {
  // 定义一个标志位表示数据源是否正常发出数据
  var running: Boolean = true

  override def cancel(): Unit = running = false

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 定义一个循环,不停生产数据,除非被cancel
    val curTemp = 1.to(10).map(i => ("sensor_" + i, Random.nextDouble() * 100))

    while (running) {

      curTemp.map(
        data => (data._1, data._2 + Random.nextGaussian())
      )
      val curTime = System.currentTimeMillis()
      curTemp.foreach(
        data => sourceContext.collect(SensorReading(data._1, data._2, curTime))
      )

      Thread.sleep(1000)
    }
  }

}

case class SensorReading(name: String, temp: Double, timestamp: Long)


3.2 计算API


3.3 Sink输出

输出包括文件、Kafka、ES、MySQL等


3.4 窗口API

窗口包括滚动窗口(TumblingWindow)、滑动窗口(SlidingWindow)、会话窗口(SessionWindow)和全局窗口(GlobalWindow)。

KeyedStream有window方法调用窗口函数,
DataStream有windowAll方法调用窗口函数(因为并行度是1,性能不高,所以先进行聚合),
调用窗口函数之后,得到的是WindowedStream,无界流变成了有界流。
调用聚合函数后得到的还是DataStream。

窗口函数分为增量聚合函数(数据来一条计算一次)和全窗口函数(收集后统一计算)。ReduceFunction、AggregateFunction等就是增量聚合函数,ProcessWindowFunction就是全窗口函数。


3.5 其他API

trigger() 触发器:定义window什么时候关闭,触发计算并输出结果
evictor() 移除器:定义删除某些数据逻辑
allowedLateness():允许处理迟到的数据
sideOutputLateData():将迟到的数据放入到侧输出流
getSideOuntput():获取侧输出流


四、服务器中运行程序

4.1单机模式

4.1.1 启动单机模式
下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
启动命令

/bin/start-cluster.sh

启动单节点模式。
可以访问UI界面http://192.168.32.242:8081

4.1.2 提交任务

设置并行度的优先级(由高到低)

  • 算子设置
  • 全局设置
  • 提交时UI界面输入设置
  • 配置文件设置

4.1.3 上传文件设置参数

4.1.4 查看执行计划

4.1.5 启动任务
可以通过UI界面提交任务,也可以通过命令行提交任务:

bin/flink run -c com.iotmars.wecook.StreamWordCount -p 2 /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 6666

4.1.6 查看任务

bin/flink list [-a]

4.1.7 取消任务
可以通过UI界面取消任务,也可以通过命令行取消任务:

bin/flink cancel 4198899a1a47f496309fe2da2e31c1f5

4.1.8 停止flink集群

/bin/stop-cluster.sh


4.2 Yarn模式

4.2.1 Session-cluster

概念:Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业无法提交,只能等到yarn中的一个作业执行完成之后释放了资源,才能进行下一个作业任务。
所有作业共享Dispatcher和ResourceManager,共享资源,适合规模小执行时间短的作业。
操作:
①启动yarn-session:

// 添加环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
// 启动yarn-session
yarn-session.sh -n 2 -s 2 -jm 1024 -nm test -d
// 或调度器中创建了多个队列,需要指定队列
nohup ./yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-on-yarn -qu flink -d 1>/opt/module/flink-1.12.0/yarn-session.log 2>/opt/module/flink-1.12.0/yarn-session.err &

关闭yarn-session:

# 找到flink集群任务的id,然后kill
yarn application -kill application_1616059084025_0002

②提交任务(和standalone模式一样):

bin/flink run -c com.iotmars.wecook.StreamWordCount -p 2 /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 6666

③查看任务状态:去yarn控制台查看任务状态
④取消yarn-session:

yarn application --kill job_id

可以通过http://192.168.32.243:37807访问Web页面(会在某台服务器上部署一个Web页面)

4.2.2 Per-Job-Cluster

概念:一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,知道作业执行完成,一个作业失败与否不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请,适合大规模长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成后创建的集群也会消失。
操作:
①不启动yarn-session,直接执行job

flink run `-m yarn-cluster` -c com.iotmars.wecook.StreamWordCount  /opt/jar/flink-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 7777


4.3 Kubernetes部署

Flink在最近的版本中也支持了k8s部署模式。
①搭建k8s集群
②配置各组件的yaml文件
在k8s上构建Flink Session Cluster,需要将Flink集群的组件对应的docker镜像分别在k8s上启动,包括JobManager、JobManager、JobManagerService三个镜像服务。每个镜像服务都可以从中央镜像仓库中获取。
③启动Flink Session Cluster

// 启动jobmanager-service服务
kubectl create -f jobmanager-service.yaml
// 启动jobmanager-deployment服务
kubectl create -f jobmanager-deployment.yaml
// 启动taskmanager-deployment服务
kubectl create -f taskmanager-deployment.yaml

④访问Flink UI页面
集群启动后,就可以通过JobManagerServices中配置的WebUI端口进行访问
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy


五、 运行时流程

5.1 Flink的单机模式

  1. Dispatcher提供了restful风格的接口用于提交任务;
  2. JobManager分析任务生成执行图,然后向ResourceManager申请slot资源;
  3. ResourceManager启动TaskManager,TaskManager在启动会向ResourceManager进行注册slot;
  4. JobManager提交要在slot中执行的任务。

5.2 Yarn的Per-Job-Cluster模式下的任务提交流程

流程概述:
提交Job请求后由Yarn启动所需资源数量的容器,容器中启动的TaskManager节点向ResourceManager进行slots的注册,然后JobManager向slots分发任务执行。
相当于每次提交Job后都启动一个Flink的TaskManager集群进行处理。

流程详情:

  1. 提交JOB后,任务到YARN的ResourceManager;
  2. RM启动AM,在AM中启动Flink的JobManager和ResourceManager;
  3. 在JobManager中进行任务切分,然后向Flink的ResourceManager进行任务请求,再向Yarn的ResourceManager进行资源申请;
  4. Yarn的ResourceManager根据请求的资源数启动NodeManager,然后在容器中启动TaskManager;
  5. TaskManager向Flink的ResourceManager进行slots资源注册,然后由Flink的JobManager进行资源分配。

5.3 TaskManager和Slot

两者关系

  • 一个TaskManager至少有一个Slot,有多少个slot可以接收多少个任务。
  • Slot拥有独立的内存,而不是独立的cpu。可以理解为Flink中每一个TaskManager都是一个JVM进程,它会在独立的线程上执行一个或多个子任务。

任务分配

  • 并不是每个算子的任务都会放到不同的Slot中,而是同一个任务会根据并发分配到不同的Slot中,而不同任务会分配到同一个Slot中,所以可能会有slot会保存一个pipeline(即包含了所有的算子任务)。好处是如果算子工作量不同,不会导致某些Slot闲置,可以动态分配任务量,可以充分利用多核CPU并发)。
  • 因此所需Slot的最大数量就是任务最大并行度的值。

数据传输形式

  • one-to-one:map、filter、flatMap等算子符合one-to-one对应关系。这些算子的子任务得到的元素的个数和顺序跟source算子的子任务生产的元素的个数、顺序相同。
  • redistributing:如果前后任务并行度不同,会采用round-robin重新分区;如果是keyBy等类似shuffle的宽依赖操作算子,会进行hashCode重分区;而broadcast和rebalance会随机重分区;
  • local forward(本地转发):flink采用了任务链优化技术,采用one-to-one操作且设置算子的并行度相同,可以实现本地转发进行任务连接。这样数据在本地进行流转,减少了序列化和网络传输的花销。

任务链拆分和Slot独占
如上所述,flink会自动优化任务链合并多个任务为一个任务。

  • 如果某个算子任务特别复杂需要单独拆出来,可以在算子后面设置.disableChaining(),这样这个任务就不会被合并;
  • 如果想把合并的任务拆分为两个任务,可以在需要拆分的算子后添加.startNewChain()

共享组
可以将不同的任务分配到指定的共享组内。只有在同一组的算子任务会共享Slot,不同组的任务不会存在同一个Slot中。这样需要重新计算需要的Slot的数量。

  • 如拆分出来的算子任务如果想独占一个Slot,可以在算子后面添加.slotSharingGroup("a")


六、时间语义和水位线

6.1 时间概念

Event Time:事件创建时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间,与机器相关

6.2 设置时间语义

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果不设置,默认是Processing Time语义。

6.3 水位线

6.3.1 概念

水位线用于标记当前任务的时间,被水位线没过的时间窗口才会触发执行。当前水位线是到达的最大事件时间戳减去设置的延迟时间。

6.3.2 水位线上下游传递

任务会记录所有上游任务传递的水位线,并取最低的水位线传递给下游的任务。

当前事件时间-水位线延迟时间=水位线
水位线-窗口允许延迟时间=窗口关闭时间
注意点:

  • 1.如果水位线没过窗口,那么会对窗口进行计算但不关闭,延迟数据进入后会重复触发窗口的计算,直到到达窗口关闭时间。
  • 2.如果迟到数据属于多个窗口,每个窗口都会进行计算。
  • 3.只有当迟到数据不属于任何一个窗口时,才会进入侧输出流。

水位线生成逻辑:

new BoundedOutOfOrdernessTimestampExtractorModelLogQ6 {
override def extractTimestamp(element: ModelLogQ6): Long = element.timestamp
}
如上对象为周期性生成watermark逻辑,默认为每隔200ms生成一个。

// 手动设置时间间隔为1s
env.getConfig().setAutoWatermarkInterval(1000);


new AssignerWithPunctuatedWatermarks[ModelLogQ6] {
override def checkAndGetNextWatermark(lastElement: ModelLogQ6, extractedTimestamp: Long): Watermark = ???

    override def extractTimestamp(element: ModelLogQ6, recordTimestamp: Long): Long = ???

}
每条数据到达都可以生成一个watermark。


七、Flink中的状态

算子状态(Operator state)

键控状态(keyed state)

7.2 状态后端

MemoryStateBackend:将键控状态存储在taskManager的JVM堆上,checkpoint存储在JobManager的内存中。一般测试时使用。
FsStateBackend:键控状态存储在TaskManager的JVM堆上,而将checkPoint存储在dfs上。Flink默认的配置。
RocksDBStateBackend:所有状态序列化后存储到本地RocksDB中。

7.3 一致性检查点checkpoint

checkpoint策略配置
env.enableCheckpointing(1000L) // 设置checkpoint的间隔时间
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) //设置checkpoint的模式
env.getCheckpointConfig.setCheckpointTimeout(60000L) //设置checkpoint的超时时间
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //允许最多两个checkpoint同时执行
env.getCheckpointconfig.setMinPauseBetweenCheckpoints(500L) //保证两次checkpoint之间至少有500ms的间隔时间
env.getCheckpointconfig.setPreferCheckpointForRecovery(true) //是否直接使用checkpoint进行故障恢复。默认是false,表示如果savepoint更近,则会使用savepoint进行恢复
env.getCheckpointconfig.setTolerableCheckpointFailureNumber(0) // 能容忍多少次的checkpoint失败。如果是0,则checkpoint失败会导致任务失败,进行重启。

重启策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L)) // 固定时间间隔尝试的重启次数。
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS))) // 在固定时间内间隔多少时间重启多少次

7.4 保存点savepoints

原理同checkpoint,但是需要手动触发savepoint操作。
可以用于手动备份、更新应用程序、版本迁移、暂停和重启应用。
最好在程序中定义这个任务的uid(.uid("1")),这样可以清楚找到任务对应的savepoint。

7.5 状态一致性

AT-MOST-ONCE 最多一次
AT-LEASE-ONCE 至少一次
EXACTLY-ONCE 精确一次

  • 内部保证 —— checkpoint
  • source端 —— 可重设数据的读取位置
  • sink端 —— 从故障恢复时,数据不会重复写入外部系统

7.6 端到端一致性

7.6.1幂等写入**

7.6.2事务写入

概念:

  • 预写日志WAL:将结果先进行保存,收到checkpoint完成通知后一次性写入到sink中。优点是简单;缺点是延迟变大而且可能会丢数据。

  • 两阶段提交2PC:每个checkpoint,sink任务会启动一个事务,将接下来所有接收的数据添加到事务中。然后将数据写入到外部sink系统,但不提交(预提交),当收到checkpoint完成通知后才正式提交事务。Flink提供了TwoPhaseCommitSinkFunction接口。优点:实现exactly-once;缺点:需要提供事务支持的外部sink系统,而且可以在进程失败后恢复事务。如果sink超时关闭也会导致预提交的数据丢失。

Kafka中的两阶段提交
source消费Kafka数据,在checkpoint点插入barrier并记录offset到状态后端,sink会开启事务对到达的数据进行预提交。当barrier到达sink时,会开启一个新的事务对后续的数据进行预提交,直到barrier到达所有的sink,完成checkpoint后JobManager会通知Kafka进行正式提交关闭事务。
注意:1.Kafka隔离级别设置为read-commit;2.Kafka超时时间默认为15m,flink默认的checkpoint超时时间为1h。设置checkpoint超时间小于Kafka。

八、Table API 和 FLink SQL

依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- 引入格式描述器FormatDescriptor,适配Kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.10.1</version>
        </dependency>

简单使用

object TableApiTest {
  def main(args: Array[String]): Unit = {
    // 1. 创建环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    // 1.1 基于老版本planner的流处理
    val oldStreamSettings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings)

    // 1.2 基于老版本planner的批处理
    val batchEnv = ExecutionEnvironment.getExecutionEnvironment
    val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)

    // 1.3 基于blink planner的流处理
    val blinkStreamSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val blinkStreamEnv = StreamTableEnvironment.create(env, blinkStreamSettings)

    // 1.4 基于blink planner的批处理
    val blinkBatchSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()
    val blinkBatchEnv = TableEnvironment.create(blinkBatchSettings)


    // 2. 连接外部系统,读取数据,注册表
    // 2.1 读取文件,进行表注册并读取数据
    val path = "C:\\Users\\Administrator\\Desktop\\不常用的项目\\flink_demo\\src\\main\\resources\\test.txt";

    tableEnv.connect(new FileSystem().path(path))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("name", DataTypes.STRING())
        .field("age", DataTypes.INT())
      )
      .createTemporaryTable("fileInputTable")

    val fileInputTable: Table = tableEnv.from("fileInputTable")
    fileInputTable.toAppendStream[(String,Int)].print()


    // 2.2 从Kafka读取数据,进行表注册并读取数据
    tableEnv
      .connect(new Kafka()
        .version("0.11")
        .topic("sensor")
        .property("zookeeper.connect", "192.168.32.242:2181")
        .property("bootstrap.servers", "192.168.32.242:9092")
      )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("name", DataTypes.STRING())
        .field("age", DataTypes.INT())
      )
      .createTemporaryTable("kafkaInputTable")

    val kafkaInputTable = tableEnv.from("kafkaInputTable")
    kafkaInputTable.toAppendStream[(String,Int)].print()

    env.execute("table api test")
  }
}

表是由一个“标识符”来指定,由3部分组成:Catalog名、数据库(database)名和表名。
表包括常规表和视图View。

两种连接外部数据库的方式

举报

相关推荐

0 条评论