0
点赞
收藏
分享

微信扫一扫

轻松入门进阶Flink第二课 Flink基础二

small_Sun 2022-05-06 阅读 85

第05讲:Flink SQL & Table 编程和案例

我们在第 02 课时中使用 Flink Table & SQL 的 API 实现了最简单的 WordCount 程序。在这一课时中,将分别从 Flink Table & SQL 的背景和编程模型、常见的 API、算子和内置函数等对 Flink Table & SQL 做一个详细的讲解和概括,最后模拟了一个实际业务场景使用 Flink Table & SQL 开发。

Flink Table & SQL 概述

背景

我们在前面的课时中讲过 Flink 的分层模型,Flink 自身提供了不同级别的抽象来支持我们开发流式或者批量处理程序,下图描述了 Flink 支持的 4 种不同级别的抽象。

image.png

Table APISQL 处于最顶端,是 Flink 提供的高级 API 操作。Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

我们在第 04 课时中提到过,Flink 在编程模型上提供了 DataStream 和 DataSet 两套 API,并没有做到事实上的批流统一,因为用户和开发者还是开发了两套代码。正是因为 Flink Table & SQL 的加入,可以说 Flink 在某种程度上做到了事实上的批流一体。

原理

你之前可能都了解过 Hive,在离线计算场景下 Hive 几乎扛起了离线数据处理的半壁江山。它的底层对 SQL 的解析用到了 Apache Calcite,Flink 同样把 SQL 的解析、优化和执行教给了 Calcite。

下图是一张经典的 Flink Table & SQL 实现原理图,可以看到 Calcite 在整个架构中处于绝对核心地位。

image (1).png
从图中可以看到无论是批查询 SQL 还是流式查询 SQL,都会经过对应的转换器 Parser 转换成为节点树 SQLNode tree,然后生成逻辑执行计划 Logical Plan,逻辑执行计划在经过优化后生成真正可以执行的物理执行计划,交给 DataSet 或者 DataStream 的 API 去执行。

在这里我们不对 Calcite 的原理过度展开,有兴趣的可以直接在官网上学习。

一个完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 构成:

image (2).png

  • Source 部分来源于外部数据源,我们经常用的有 Kafka、MySQL 等;

  • Transformation 部分则是 Flink Table & SQL 支持的常用 SQL 算子,比如简单的 Select、Groupby 等,当然在这里也有更为复杂的多流 Join、流与维表的 Join 等;

  • Sink 部分是指的结果存储比如 MySQL、HBase 或 Kakfa 等。

动态表

与传统的表 SQL 查询相比,Flink Table & SQL 在处理流数据时会时时刻刻处于动态的数据变化中,所以便有了一个动态表的概念。

动态表的查询与静态表一样,但是,在查询动态表的时候,SQL 会做连续查询,不会终止。

我们举个简单的例子,Flink 程序接受一个 Kafka 流作为输入,Kafka 中为用户的购买记录:

image (3).png

首先,Kafka 的消息会被源源不断的解析成一张不断增长的动态表,我们在动态表上执行的 SQL 会不断生成新的动态表作为结果表。

Flink Table & SQL 算子和内置函数

我们在讲解 Flink Table & SQL 所支持的常用算子前,需要说明一点,Flink 自从 0.9 版本开始支持 Table & SQL 功能一直处于完善开发中,且在不断进行迭代。

我们在官网中也可以看到这样的提示:

Flink Table & SQL 的开发一直在进行中,并没有支持所有场景下的计算逻辑。从我个人实践角度来讲,在使用原生的 Flink Table & SQL 时,务必查询官网当前版本对 Table & SQL 的支持程度,尽量选择场景明确,逻辑不是极其复杂的场景。

常用算子

目前 Flink SQL 支持的语法主要如下:

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
expression [ ASC | DESC ]

select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }

projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *

tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
ON booleanExpression
| USING ‘(’ column [, column ]* ‘)’

tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ ‘(’ columnAlias [, columnAlias ]* ‘)’ ] ]

tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE ‘(’ functionName ‘(’ expression [, expression ]* ‘)’ ‘)’
| UNNEST ‘(’ expression ‘)’

values:
VALUES expression [, expression ]*

groupItem:
expression
| ‘(’ ‘)’
| ‘(’ expression [, expression ]* ‘)’
| CUBE ‘(’ expression [, expression ]* ‘)’
| ROLLUP ‘(’ expression [, expression ]* ‘)’
| GROUPING SETS ‘(’ groupItem [, groupItem ]* ‘)’

windowRef:
windowName
| windowSpec

windowSpec:
[ windowName ]
‘(’
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
‘)’

可以看到 Flink SQL 和传统的 SQL 一样,支持了包含查询、连接、聚合等场景,另外还支持了包括窗口、排序等场景。下面我就以最常用的算子来做详细的讲解。

SELECT/AS/WHERE

SELECT、WHERE 和传统 SQL 用法一样,用于筛选和过滤数据,同时适用于 DataStream 和 DataSet。

SELECT * FROM Table;
SELECT name,age FROM Table;

当然我们也可以在 WHERE 条件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合:

SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

GROUP BY / DISTINCT/HAVING

GROUP BY 用于进行分组操作,DISTINCT 用于结果去重。
HAVING 和传统 SQL 一样,可以用来在聚合函数之后进行筛选。

SELECT DISTINCT name FROM Table;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;

JOIN

JOIN 可以用于把来自两个表的数据联合起来形成结果表,目前 Flink 的 Join 只支持等值连接。Flink 支持的 JOIN 类型包括:

JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN

例如,用用户表和商品表进行关联:

SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer

LEFT JOIN、RIGHT JOIN 、FULL JOIN 相与我们传统 SQL 中含义一样。

WINDOW

根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种:

  • 滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;

  • 滑动窗口,窗口数据有固定大小,并且有生成间隔;

  • 会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加;

滚动窗口

滚动窗口的特点是:有固定大小、窗口中的数据不会重叠,如下图所示:
image (4).png

滚动窗口的语法:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

举例说明,我们需要计算每个用户每天的订单数量:

SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的开始时间和窗口的结束时间,TUMBLE (timeLine, INTERVAL '1' DAY) 中的 timeLine 代表时间字段所在的列,INTERVAL '1' DAY 表示时间间隔为一天。

滑动窗口

滑动窗口有固定的大小,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的创建频率。需要注意的是,多个滑动窗口可能会发生数据重叠,具体语义如下:

image (5).png

滑动窗口的语法与滚动窗口相比,只多了一个 slide 参数:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

例如,我们要每间隔一小时计算一次过去 24 小时内每个商品的销量:

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL '1' HOUR 代表滑动窗口生成的时间间隔。

会话窗口

会话窗口定义了一个非活动时间,假如在指定的时间间隔内没有出现事件或消息,则会话窗口关闭。

image (6).png
会话窗口的语法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)

举例,我们需要计算每个用户过去 1 小时内的订单量:

SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

内置函数

Flink 中还有大量的内置函数,我们可以直接使用,将内置函数分类如下:

  • 比较函数

  • 逻辑函数

  • 算术函数

  • 字符串处理函数

  • 时间函数

比较函数
比较函数.png

逻辑函数
逻辑函数.png

算术函数
算术函数.png

字符串处理函数
字符串处理函数.png

时间函数
时间函数.png

Flink Table & SQL 案例

上面分别介绍了 Flink Table & SQL 的原理和支持的算子,我们模拟一个实时的数据流,然后讲解 SQL JOIN 的用法。

在上一课时中,我们利用 Flink 提供的自定义 Source 功能来实现一个自定义的实时数据源,具体实现如下:

public class MyStreamingSource implements SourceFunction<Item> {
<span class="hljs-keyword">private</span> <span class="hljs-keyword">boolean</span> isRunning = <span class="hljs-keyword">true</span>;

<span class="hljs-comment">/**
 * 重写run方法产生一个源源不断的数据发送源
 * <span class="hljs-doctag">@param</span> ctx
 * <span class="hljs-doctag">@throws</span> Exception
 */</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span><span class="hljs-params">(SourceContext&lt;Item&gt; ctx)</span> <span class="hljs-keyword">throws</span> Exception </span>{
    <span class="hljs-keyword">while</span>(isRunning){
        Item item = generateItem();
        ctx.collect(item);

        <span class="hljs-comment">//每秒产生一条数据</span>
        Thread.sleep(<span class="hljs-number">1000</span>);
    }
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">cancel</span><span class="hljs-params">()</span> </span>{
    isRunning = <span class="hljs-keyword">false</span>;
}

<span class="hljs-comment">//随机产生一条商品数据</span>
<span class="hljs-function"><span class="hljs-keyword">private</span> Item <span class="hljs-title">generateItem</span><span class="hljs-params">()</span></span>{
    <span class="hljs-keyword">int</span> i = <span class="hljs-keyword">new</span> Random().nextInt(<span class="hljs-number">100</span>);
    ArrayList&lt;String&gt; list = <span class="hljs-keyword">new</span> ArrayList();
    list.add(<span class="hljs-string">"HAT"</span>);
    list.add(<span class="hljs-string">"TIE"</span>);
    list.add(<span class="hljs-string">"SHOE"</span>);
    Item item = <span class="hljs-keyword">new</span> Item();
    item.setName(list.get(<span class="hljs-keyword">new</span> Random().nextInt(<span class="hljs-number">3</span>)));
    item.setId(i);
    <span class="hljs-keyword">return</span> item;
}

}

我们把实时的商品数据流进行分流,分成 even 和 odd 两个流进行 JOIN,条件是名称相同,最后,把两个流的 JOIN 结果输出。

class StreamingDemo {
    public static void main(String[] args) throws Exception {
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    SingleOutputStreamOperator&lt;Item&gt; source = bsEnv.addSource(<span class="hljs-keyword">new</span> MyStreamingSource()).map(<span class="hljs-keyword">new</span> MapFunction&lt;Item, Item&gt;() {
        <span class="hljs-meta">@Override</span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> Item <span class="hljs-title">map</span><span class="hljs-params">(Item item)</span> <span class="hljs-keyword">throws</span> Exception </span>{
            <span class="hljs-keyword">return</span> item;
        }
    });

    DataStream&lt;Item&gt; evenSelect = source.split(<span class="hljs-keyword">new</span> OutputSelector&lt;Item&gt;() {
        <span class="hljs-meta">@Override</span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> Iterable&lt;String&gt; <span class="hljs-title">select</span><span class="hljs-params">(Item value)</span> </span>{
            List&lt;String&gt; output = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;();
            <span class="hljs-keyword">if</span> (value.getId() % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>) {
                output.add(<span class="hljs-string">"even"</span>);
            } <span class="hljs-keyword">else</span> {
                output.add(<span class="hljs-string">"odd"</span>);
            }
            <span class="hljs-keyword">return</span> output;
        }
    }).select(<span class="hljs-string">"even"</span>);

    DataStream&lt;Item&gt; oddSelect = source.split(<span class="hljs-keyword">new</span> OutputSelector&lt;Item&gt;() {
        <span class="hljs-meta">@Override</span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> Iterable&lt;String&gt; <span class="hljs-title">select</span><span class="hljs-params">(Item value)</span> </span>{
            List&lt;String&gt; output = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;();
            <span class="hljs-keyword">if</span> (value.getId() % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>) {
                output.add(<span class="hljs-string">"even"</span>);
            } <span class="hljs-keyword">else</span> {
                output.add(<span class="hljs-string">"odd"</span>);
            }
            <span class="hljs-keyword">return</span> output;
        }
    }).select(<span class="hljs-string">"odd"</span>);


    bsTableEnv.createTemporaryView(<span class="hljs-string">"evenTable"</span>, evenSelect, <span class="hljs-string">"name,id"</span>);
    bsTableEnv.createTemporaryView(<span class="hljs-string">"oddTable"</span>, oddSelect, <span class="hljs-string">"name,id"</span>);

    Table queryTable = bsTableEnv.sqlQuery(<span class="hljs-string">"select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name"</span>);

    queryTable.printSchema();

    bsTableEnv.toRetractStream(queryTable, TypeInformation.of(<span class="hljs-keyword">new</span> TypeHint&lt;Tuple4&lt;Integer,String,Integer,String&gt;&gt;(){})).print();

    bsEnv.execute(<span class="hljs-string">"streaming sql job"</span>);
}

}

直接右键运行,在控制台可以看到输出:

image (7).png

点击这里下载本课程源码

第06讲:Flink 集群安装部署和 HA 配置

我们在这一课时将讲解 Flink 常见的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式,然后分别讲解三种模式的使用场景和部署中常见的问题,最后将讲解在生产环境中 Flink 集群的高可用配置。

Flink 常见的部署模式

环境准备

在绝大多数情况下,我们的 Flink 都是运行在 Unix 环境中的,推荐在 Mac OS 或者 Linux 环境下运行 Flink。如果是集群模式,那么可以在自己电脑上安装虚拟机,保证有一个 master 节点和两个 slave 节点。

同时,要注意在所有的机器上都应该安装 JDK 和 SSH。JDK 是我们运行 JVM 语言程序必须的,而 SSH 是为了在服务器之间进行跳转和执行命令所必须的。关于服务器之间通过 SSH 配置公钥登录,你可以直接搜索安装和配置方法,我们不做过度展开。

Flink 的安装包可以在这里下载。需要注意的是,如果你要和 Hadoop 进行集成,那么我们需要使用到对应的 Hadoop 依赖,下面将会详细讲解。

Local 模式

Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。

我们在这里下载 Apache Flink 1.10.0 for Scala 2.11 版本进行演示,该版本对应 Scala 2.11 版本。

将压缩包下载到本地,并且直接进行解压,使用 Flink 默认的端口配置,直接运行脚本启动:

➜  [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz

image.png

上图则为解压完成后的目录情况。

然后,我们可以直接运行脚本启动 Flink :

➜  [flink-1.10.0]# ./bin/start-cluster.sh

image (1).png

上图显示我们的 Flink 启动成功。

我们直接访问本地的 8081 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。

image (2).png

可以看到 Flink 已经成功启动。当然,我们也可以查看运行日志来确认 Flink 是不是成功启动了,在 log 目录下有程序的启动日志:

image (3).png

我们尝试提交一个测试任务:

./bin/flink run examples/batch/WordCount.jar

image (4).png

我们在控制台直接看到输出。同样,在 Flink 的后台管理界面 Completed Jobs 一栏可以看到刚才提交执行的程序:

image (5).png

Standalone 模式

Standalone 模式是集群模式的一种,但是这种模式一般并不运行在生产环境中,原因和 on yarn 模式相比:

  • Standalone 模式的部署相对简单,可以支持小规模,少量的任务运行;

  • Stabdalone 模式缺少系统层面对集群中 Job 的管理,容易遭成资源分配不均匀;

  • 资源隔离相对简单,任务之间资源竞争严重。

我们在 3 台虚拟机之间搭建 standalone 集群:

图片1.png

在 master 节点,将 Apache Flink 1.10.0 for Scala 2.11 包进行解压:

➜  [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz

重点来啦,我们需要修改 Flink 的配置文件,并且将修改好的解压目录完整的拷贝到两个从节点中去。在这里,我强烈建议主节点和从节点的目录要保持一致。

我们修改 conf 目录下的 flink-conf.yaml:

image (6).png

flink-conf.yaml 文件中有大量的配置参数,我们挑选其中必填的最基本参数进行修改:

jobmanager.rpc.address: master
jobmanager.heap.size: 1024m
jobmanager.rpc.port: 6123
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
io.tmp.dirs: /tmp

它们分别代表:

图片2.png

如果你对其他的参数有兴趣的话,可以直接参考官网。

接下来我们修改 conf 目录下的 master 和 slave 文件。

vim master,将内容修改为:

master

vim slave,将内容修改为:

slave01
slave02

然后,将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:

scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/

在 master、slave01、slave02 上分别配置环境变量,vim /etc/profile,将内容修改为:

export FLINK_HOME=/SoftWare/flink-1.10.0
export PATH=$PATH:$FLINK_HOME/bin

到此为止,我们整个的基础配置已经完成,下面需要启动集群,登录 master 节点执行:

/SoftWare/flink-1.10.0/bin/start-cluster.sh

可以在浏览器访问:http://192.168.2.100:8081/ 检查集群是否启动成功。

集群搭建过程中,可能出现的问题:

  • 端口被占用,我们需要手动杀掉占用端口的程序;

  • 目录找不到或者文件找不到,我们在 flink-conf.yaml 中配置过 io.tmp.dirs ,这个目录需要手动创建。

On Yarn 模式和 HA 配置

image (7).png

上图是 Flink on Yarn 模式下,Flink 和 Yarn 的交互流程。Yarn 是 Hadoop 三驾马车之一,主要用来做资源管理。我们在 Flink on Yarn 模式中也是借助 Yarn 的资源管理优势,需要在三个节点中配置 YARN_CONF_DIR、HADOOP_CONF_DIR、HADOOP_CONF_PATH 中的任意一个环境变量即可。

本课时中集群的高可用 HA 配置是基于独立的 ZooKeeper 集群。当然,Flink 本身提供了内置 ZooKeeper 插件,可以直接修改 conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh 直接启动。

环境准备:

  • ZooKeeper-3.x

  • Flink-1.10.0

  • Hadoop-2.6.5

我们使用 5 台虚拟机搭建 on yarn 的高可用集群:

图片3.png

如果你在使用 Flink 的最新版本 1.10.0 时,那么需要在本地安装 Hadoop 环境并进行下面的操作。

首先,添加环境变量:

vi /etc/profile
# 添加环境变量
export HADOOP_CONF_DIR=/Software/hadoop-2.6.5/etc/hadoop
# 环境变量生效
source /etc/profile

其次,下载对应的的依赖包,并将对应的 Hadoop 依赖复制到 flink 的 lib 目录下,对应的 hadoop 依赖可以在这里下载。

image (8).png

与 standalone 集群不同的是,我们需要修改 flink-conf.yaml 文件中的一些配置:

high-availability: zookeeper
high-availability.storageDir: hdfs://cluster/flinkha/
high-availability.zookeeper.quorum: slave01:2181,slave02:2181,slave03:2181

它们分别代表:

图片4.png

然后分别修改 master、slave、zoo.cfg 三个配置文件。
vim master,将内容修改为:

master01:8081
master02:8081

vim slave,将内容修改为:

slave01
slave02
slave03

vim zoo.cfg,将内容修改为:

server.1=slave01:2888:3888
server.2=slave02:2888:3888
server.3=slave03:2888:3888

然后,我们将整个修改好的 Flink 解压目录使用 scp 远程拷贝命令发送到从节点:

scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave03:/SoftWare/

分别启动 Hadoop 和 ZooKeeper,然后在主节点,使用命令启动集群:

/SoftWare/flink-1.10.0/bin/start-cluster.sh

我们同样直接访问 http://192.168.2.100:8081/ 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。

在 Flink on yarn 模式下,启动集群的方式有两种:

  • 直接在 yarn 上运行任务

  • yarn session 模式

直接在 yarn 上运行任务相当于将 job 直接提交到 yarn 上,每个任务会根据用户的指定进行资源申请,任务之间互不影响。

./bin/flink run -yjm 1024m -ytm 4096m -ys 2  ./examples/batch/WordCount.jar

更多关于参数的含义,可以参考官网。

使用 yarn session 模式,我们需要先启动一个 yarn-session 会话,相当于启动了一个 yarn 任务,这个任务所占用的资源不会变化,并且一直运行。我们在使用 flink run 向这个 session 任务提交作业时,如果 session 的资源不足,那么任务会等待,直到其他资源释放。当这个 yarn-session 被杀死时,所有任务都会停止。

例如我们启动一个 yarn session 任务,该任务拥有 8G 内存、32 个槽位。

./bin/yarn-session.sh -tm 8192 -s 32

我们在 yarn 的界面上可以看到这个任务的 ID,然后向这个 session ID 提交 Flink 任务:

./bin/flink run -m yarn-cluster -yid application_xxxx ./examples/batch/WordCount.jar

其中,application_xxxx 即为上述的 yarn session 任务 ID。

点击这里下载本课程源码

总结

本课时我们讲解了 Flink 的三种部署模式和高可用配置,并且对这三种部署模式的适用场景进行了讲解。在生产上,我们最常用的方式当然是 Flink on Yarn,借助 Yarn 在资源管理上的绝对优势,确保集群和任务的稳定。


第07讲:Flink 常见核心概念分析

在 Flink 这个框架中,有很多独有的概念,比如分布式缓存、重启策略、并行度等,这些概念是我们在进行任务开发和调优时必须了解的,这一课时我将会从原理和应用场景分别介绍这些概念。

分布式缓存

熟悉 Hadoop 的你应该知道,分布式缓存最初的思想诞生于 Hadoop 框架,Hadoop 会将一些数据或者文件缓存在 HDFS 上,在分布式环境中让所有的计算节点调用同一个配置文件。在 Flink 中,Flink 框架开发者们同样将这个特性进行了实现。

Flink 提供的分布式缓存类型 Hadoop,目的是为了在分布式环境中让每一个 TaskManager 节点保存一份相同的数据或者文件,当前计算节点的 task 就像读取本地文件一样拉取这些配置。

分布式缓存在我们实际生产环境中最广泛的一个应用,就是在进行表与表 Join 操作时,如果一个表很大,另一个表很小,那么我们就可以把较小的表进行缓存,在每个 TaskManager 都保存一份,然后进行 Join 操作。

那么我们应该怎样使用 Flink 的分布式缓存呢?举例如下:

public static void main(String[] args) throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(“/Users/wangzhiwu/WorkSpace/quickstart/distributedcache.txt”, “distributedCache”);
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
DataSource<String> data = env.fromElements(“Linea”, “Lineb”, “Linec”, “Lined”);

   DataSet&lt;String&gt; result = data.map(<span class="hljs-keyword">new</span> RichMapFunction&lt;String, String&gt;() {
       <span class="hljs-keyword">private</span> ArrayList&lt;String&gt; dataList = <span class="hljs-keyword">new</span> ArrayList&lt;String&gt;();

       <span class="hljs-meta">@Override</span>
       <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(Configuration parameters)</span> <span class="hljs-keyword">throws</span> Exception </span>{
           <span class="hljs-keyword">super</span>.open(parameters);
           <span class="hljs-comment">//2:使用该缓存文件</span>
           File myFile = getRuntimeContext().getDistributedCache().getFile(<span class="hljs-string">"distributedCache"</span>);
           List&lt;String&gt; lines = FileUtils.readLines(myFile);
           <span class="hljs-keyword">for</span> (String line : lines) {
               <span class="hljs-keyword">this</span>.dataList.add(line);
               System.err.println(<span class="hljs-string">"分布式缓存为:"</span> + line);
           }
       }

       <span class="hljs-meta">@Override</span>
       <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">map</span><span class="hljs-params">(String value)</span> <span class="hljs-keyword">throws</span> Exception </span>{
           <span class="hljs-comment">//在这里就可以使用dataList</span>
           System.err.println(<span class="hljs-string">"使用datalist:"</span> + dataList + <span class="hljs-string">"-------"</span> +value);
           <span class="hljs-comment">//业务逻辑</span>
           <span class="hljs-keyword">return</span> dataList +<span class="hljs-string">":"</span> +  value;
       }
   });

   result.printToErr();

}

从上面的例子中可以看出,使用分布式缓存有两个步骤。

  • 第一步:首先需要在 env 环境中注册一个文件,该文件可以来源于本地,也可以来源于 HDFS ,并且为该文件取一个名字。

  • 第二步:在使用分布式缓存时,可根据注册的名字直接获取。

可以看到,在上述案例中,我们把一个本地的 distributedcache.txt 文件注册为 distributedCache,在下面的 map 算子中直接通过这个名字将缓存文件进行读取并且进行了处理。

我们直接运行该程序,在控制台可以看到如下输出:

image (13).png

image (14).png

在使用分布式缓存时也需要注意一些问题,需要我们缓存的文件在任务运行期间最好是只读状态,否则会造成数据的一致性问题。另外,缓存的文件和数据不宜过大,否则会影响 Task 的执行速度,在极端情况下会造成 OOM。

故障恢复和重启策略

自动故障恢复是 Flink 提供的一个强大的功能,在实际运行环境中,我们会遇到各种各样的问题从而导致应用挂掉,比如我们经常遇到的非法数据、网络抖动等。

Flink 提供了强大的可配置故障恢复和重启策略来进行自动恢复。

故障恢复

我们在上一课时中介绍过 Flink 的配置文件,其中有一个参数 jobmanager.execution.failover-strategy: region。

Flink 支持了不同级别的故障恢复策略,jobmanager.execution.failover-strategy 的可配置项有两种:full 和 region。

当我们配置的故障恢复策略为 full 时,集群中的 Task 发生故障,那么该任务的所有 Task 都会发生重启。而在实际生产环境中,我们的大作业可能有几百个 Task,出现一次异常如果进行整个任务重启,那么经常会导致长时间任务不能正常工作,导致数据延迟。

但是事实上,我们可能只是集群中某一个或几个 Task 发生了故障,只需要重启有问题的一部分即可,这就是 Flink 基于 Region 的局部重启策略。在这个策略下,Flink 会把我们的任务分成不同的 Region,当某一个 Task 发生故障时,Flink 会计算需要故障恢复的最小 Region。

Flink 在判断需要重启的 Region 时,采用了以下的判断逻辑:

  • 发生错误的 Task 所在的 Region 需要重启;

  • 如果当前 Region 的依赖数据出现损坏或者部分丢失,那么生产数据的 Region 也需要重启;

  • 为了保证数据一致性,当前 Region 的下游 Region 也需要重启。

重启策略

Flink 提供了多种类型和级别的重启策略,常用的重启策略包括:

  • 固定延迟重启策略模式

  • 失败率重启策略模式

  • 无重启策略模式

Flink 在判断使用的哪种重启策略时做了默认约定,如果用户配置了 checkpoint,但没有设置重启策略,那么会按照固定延迟重启策略模式进行重启;如果用户没有配置 checkpoint,那么默认不会重启。

下面我们分别对这三种模式进行详细讲解。

无重启策略模式

在这种情况下,如果我们的作业发生错误,任务会直接退出。
我们可以在 flink-conf.yaml 中配置:

restart-strategy: none

也可以在程序中使用代码指定:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

固定延迟重启策略模式

固定延迟重启策略会通过在 flink-conf.yaml 中设置如下配置参数,来启用此策略:

restart-strategy: fixed-delay

固定延迟重启策略模式需要指定两个参数,首先 Flink 会根据用户配置的重试次数进行重试,每次重试之间根据配置的时间间隔进行重试,如下表所示:

图片1.png

举个例子,假如我们需要任务重试 3 次,每次重试间隔 5 秒,那么需要进行一下配置:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 5 s

当前我们也可以在代码中进行设置:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        3, // 重启次数
        Time.of(5, TimeUnit.SECONDS) // 时间间隔
));

失败率重启策略模式

首先我们在 flink-conf.yaml 中指定如下配置:

restart-strategy: failure-rate

这种重启模式需要指定三个参数,如下表所示。失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

图片2.png

这种策略的配置理解较为困难,我们举个例子,假如 5 分钟内若失败了 3 次,则认为该任务失败,每次失败的重试间隔为 5 秒。

那么我们的配置应该是:

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 5 s

当然,也可以在代码中直接指定:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
        3, // 每个时间间隔的最大故障次数
        Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
        Time.of(5, TimeUnit.SECONDS) //  每次任务失败时间间隔
));

最后,需要注意的是,在实际生产环境中由于每个任务的负载和资源消耗不一样,我们推荐在代码中指定每个任务的重试机制和重启策略

并行度

并行度是 Flink 执行任务的核心概念之一,它被定义为在分布式运行环境中我们的一个算子任务被切分成了多少个子任务并行执行。我们提高任务的并行度(Parallelism)在很大程度上可以大大提高任务运行速度。

一般情况下,我们可以通过四种级别来设置任务的并行度。

  • 算子级别

在代码中可以调用 setParallelism 方法来设置每一个算子的并行度。例如:

DataSet<Tuple2<String, Integer>> counts =
      text.flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1).setParallelism(1);

事实上,Flink 的每个算子都可以单独设置并行度。这也是我们最推荐的一种方式,可以针对每个算子进行任务的调优。

  • 执行环境级别

我们在创建 Flink 的上下文时可以显示的调用 env.setParallelism() 方法,来设置当前执行环境的并行度,这个配置会对当前任务的所有算子、Source、Sink 生效。当然你还可以在算子级别设置并行度来覆盖这个设置。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
  • 提交任务级别

用户在提交任务时,可以显示的指定 -p 参数来设置任务的并行度,例如:

./bin/flink run -p 10 WordCount.jar
  • 系统配置级别

我们在上一课时中提到了 flink-conf.yaml 中的一个配置:parallelism.default,该配置即是在系统层面设置所有执行环境的并行度配置。

整体上讲,这四种级别的配置生效优先级如下:算子级别 > 执行环境级别 > 提交任务级别 > 系统配置级别

在这里,要特别提一下 Flink 中的 Slot 概念。我们知道,Flink 中的 TaskManager 是执行任务的节点,那么在每一个 TaskManager 里,还会有“槽位”,也就是 Slot。Slot 个数代表的是每一个 TaskManager 的并发执行能力。

假如我们指定 taskmanager.numberOfTaskSlots:3,即每个 taskManager 有 3 个 Slot ,那么整个集群就有 3 * taskManager 的个数多的槽位。这些槽位就是我们整个集群所拥有的所有执行任务的资源。

点击这里下载本课程源码

第08讲:Flink 窗口、时间和水印

本课时主要介绍 Flink 中的时间和水印。

我们在之前的课时中反复提到过窗口时间的概念,Flink 框架中支持事件时间、摄入时间和处理时间三种。而当我们在流式计算环境中数据从 Source 产生,再到转换和输出,这个过程由于网络和反压的原因会导致消息乱序。因此,需要有一个机制来解决这个问题,这个特别的机制就是“水印”。

Flink 的窗口和时间

我们在第 05 课时中讲解过 Flink 窗口的实现,根据窗口数据划分的不同,目前 Flink 支持如下 3 种:

  • 滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;

  • 滑动窗口,窗口数据有固定的大小,并且有生成间隔;

  • 会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。

Flink 中的时间分为三种:

  • 事件时间(Event Time),即事件实际发生的时间;

  • 摄入时间(Ingestion Time),事件进入流处理框架的时间;

  • 处理时间(Processing Time),事件被处理的时间。

下面的图详细说明了这三种时间的区别和联系:

image (18).png

事件时间(Event Time)

事件时间(Event Time)指的是数据产生的时间,这个时间一般由数据生产方自身携带,比如 Kafka 消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间。Event Time 从消息的产生就诞生了,不会改变,也是我们使用最频繁的时间。

利用 Event Time 需要指定如何生成事件时间的“水印”,并且一般和窗口配合使用,具体会在下面的“水印”内容中详细讲解。

我们可以在代码中指定 Flink 系统使用的时间类型为 EventTime:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置时间属性为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(…);

Flink 注册 EventTime 是通过 InternalTimerServiceImpl.registerEventTimeTimer 来实现的:

image (19).png

可以看到,该方法有两个入参:namespace 和 time,其中 time 是触发定时器的时间,namespace 则被构造成为一个 TimerHeapInternalTimer 对象,然后将其放入 KeyGroupedInternalPriorityQueue 队列中。

那么 Flink 什么时候会使用这些 timer 触发计算呢?答案在这个方法里:

InternalTimeServiceImpl.advanceWatermark。
public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;

InternalTimer<K, N> timer;

while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}

这个方法中的 while 循环部分会从 eventTimeTimersQueue 中依次取出触发时间小于参数 time 的所有定时器,调用 triggerTarget.onEventTime() 方法进行触发。

这就是 EventTime 从注册到触发的流程。

处理时间(Processing Time)

处理时间(Processing Time)指的是数据被 Flink 框架处理时机器的系统时间,Processing Time 是 Flink 的时间系统中最简单的概念,但是这个时间存在一定的不确定性,比如消息到达处理节点延迟等影响。

我们同样可以在代码中指定 Flink 系统使用的时间为 Processing Time:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

同样,也可以在源码中找到 Flink 是如何注册和使用 Processing Time 的。

image (20).png

registerProcessingTimeTimer() 方法为我们展示了如何注册一个 ProcessingTime 定时器:
每当一个新的定时器被加入到 processingTimeTimersQueue 这个优先级队列中时,如果新来的 Timer 时间戳更小,那么更小的这个 Timer 会被重新注册 ScheduledThreadPoolExecutor 定时执行器上。

Processing Time 被触发是在 InternalTimeServiceImpl 的 onProcessingTime() 方法中:

image (21).png

一直循环获取时间小于入参 time 的所有定时器,并运行 triggerTarget 的 onProcessingTime() 方法。

摄入时间(Ingestion Time)

摄入时间(Ingestion Time)是事件进入 Flink 系统的时间,在 Flink 的 Source 中,每个事件会把当前时间作为时间戳,后续做窗口处理都会基于这个时间。理论上 Ingestion Time 处于 Event Time 和 Processing Time之间。

Ingestion Time 的时间类型生成相关的代码在 AutomaticWatermarkContext 中:

image (22).png

image (23).png

我们可以看出,这里会设置一个 watermark 发送定时器,在 watermarkInterval 时间之后触发。

处理数据的代码在 processAndCollect() 方法中:

水印(WaterMark)

水印(WaterMark)是 Flink 框架中最晦涩难懂的概念之一,有很大一部分原因是因为翻译的原因。

WaterMark 在正常的英文翻译中是水位,但是在 Flink 框架中,翻译为“水位线”更为合理,它在本质上是一个时间戳。

在上面的时间类型中我们知道,Flink 中的时间:
EventTime 每条数据都携带时间戳;

  • ProcessingTime 数据不携带任何时间戳的信息;

  • IngestionTime 和 EventTime 类似,不同的是 Flink 会使用系统时间作为时间戳绑定到每条数据,可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。

所以,我们在处理消息乱序的情况时,会用 EventTime 和 WaterMark 进行配合使用。

首先我们要明确几个基本问题。

水印的本质是什么

水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素。如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。

也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记。

在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。

水印是如何生成的

Flink 提供了 assignTimestampsAndWatermarks() 方法来实现水印的提取和指定,该方法接受的入参有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 两种。

整体的类图如下:

image (25).png

水印种类

周期性水印

我们在使用 AssignerWithPeriodicWatermarks 周期生成水印时,周期默认的时间是 200ms,这个时间的指定位置为:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200);
    }
}

是否还记得上面我们在讲时间类型时会通过 env.setStreamTimeCharacteristic() 方法指定 Flink 系统的时间类型,这个 setStreamTimeCharacteristic() 方法中会做判断,如果用户传入的是 TimeCharacteristic.eventTime 类型,那么 AutoWatermarkInterval 的值则为 200ms ,如上述代码所示。当前我们也可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法来指定自动生成的时间间隔。

在上述的类图中可以看出,我们需要通过 TimestampAssigner 的 extractTimestamp() 方法来提取 EventTime。

Flink 在这里提供了 3 种提取 EventTime() 的方法,分别是:

  • AscendingTimestampExtractor

  • BoundedOutOfOrdernessTimestampExtractor

  • IngestionTimeExtractor

这三种方法中 BoundedOutOfOrdernessTimestampExtractor() 用的最多,需特别注意,在这个方法中的 maxOutOfOrderness 参数,该参数指的是允许数据乱序的时间范围。简单说,这种方式允许数据迟到 maxOutOfOrderness 这么长的时间。

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">abstract</span> <span class="hljs-keyword">long</span> <span class="hljs-title">extractTimestamp</span><span class="hljs-params">(T element)</span></span>;

<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> Watermark <span class="hljs-title">getCurrentWatermark</span><span class="hljs-params">()</span> </span>{
    <span class="hljs-keyword">long</span> potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    <span class="hljs-keyword">if</span> (potentialWM &gt;= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Watermark(lastEmittedWatermark);
}

<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">long</span> <span class="hljs-title">extractTimestamp</span><span class="hljs-params">(T element, <span class="hljs-keyword">long</span> previousElementTimestamp)</span> </span>{
    <span class="hljs-keyword">long</span> timestamp = extractTimestamp(element);
    <span class="hljs-keyword">if</span> (timestamp &gt; currentMaxTimestamp) {
        currentMaxTimestamp = timestamp;
    }
    <span class="hljs-keyword">return</span> timestamp;
}

PunctuatedWatermark 水印

这种水印的生成方式 Flink 没有提供内置实现,它适用于根据接收到的消息判断是否需要产生水印的情况,用这种水印生成的方式并不多见。

举个简单的例子,假如我们发现接收到的数据 MyData 中以字符串 watermark 开头则产生一个水印:

data.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
  <span class="hljs-meta">@Override</span>
  <span class="hljs-function"><span class="hljs-keyword">public</span> Watermark <span class="hljs-title">checkAndGetNextWatermark</span><span class="hljs-params">(MyData data, <span class="hljs-keyword">long</span> l)</span> </span>{
    <span class="hljs-keyword">return</span> data.getRecord.startsWith(<span class="hljs-string">"watermark"</span>) ? <span class="hljs-keyword">new</span> Watermark(l) : <span class="hljs-keyword">null</span>;
  }

  <span class="hljs-meta">@Override</span>
  <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">long</span> <span class="hljs-title">extractTimestamp</span><span class="hljs-params">(MyData data, <span class="hljs-keyword">long</span> l)</span> </span>{
    <span class="hljs-keyword">return</span> data.getTimestamp();
  }
});

class MyData{
private String record;
private Long timestamp;
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
public Timestamp getTimestamp() {
return timestamp;
}
public void setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
}
}

案例

我们上面讲解了 Flink 关于水印和时间的生成,以及使用,下面举一个例子来讲解。

模拟一个实时接收 Socket 的 DataStream 程序,代码中使用 AssignerWithPeriodicWatermarks 来设置水印,将接收到的数据进行转换,分组并且在一个 5
秒的窗口内获取该窗口中第二个元素最小的那条数据。

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

<span class="hljs-comment">//设置为eventtime事件类型</span>
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
<span class="hljs-comment">//设置水印生成时间间隔100ms</span>
env.getConfig().setAutoWatermarkInterval(<span class="hljs-number">100</span>);

DataStream&lt;String&gt; dataStream = env
        .socketTextStream(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">9000</span>)
        .assignTimestampsAndWatermarks(<span class="hljs-keyword">new</span> AssignerWithPeriodicWatermarks&lt;String&gt;() {
            <span class="hljs-keyword">private</span> Long currentTimeStamp = <span class="hljs-number">0L</span>;
            <span class="hljs-comment">//设置允许乱序时间</span>
            <span class="hljs-keyword">private</span> Long maxOutOfOrderness = <span class="hljs-number">5000L</span>;

            <span class="hljs-meta">@Override</span>
            <span class="hljs-function"><span class="hljs-keyword">public</span> Watermark <span class="hljs-title">getCurrentWatermark</span><span class="hljs-params">()</span> </span>{

                <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Watermark(currentTimeStamp - maxOutOfOrderness);
            }

            <span class="hljs-meta">@Override</span>
            <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">long</span> <span class="hljs-title">extractTimestamp</span><span class="hljs-params">(String s, <span class="hljs-keyword">long</span> l)</span> </span>{
                String[] arr = s.split(<span class="hljs-string">","</span>);
                <span class="hljs-keyword">long</span> timeStamp = Long.parseLong(arr[<span class="hljs-number">1</span>]);
                currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                System.err.println(s + <span class="hljs-string">",EventTime:"</span> + timeStamp + <span class="hljs-string">",watermark:"</span> + (currentTimeStamp - maxOutOfOrderness));
                <span class="hljs-keyword">return</span> timeStamp;
            }
        });

dataStream.map(<span class="hljs-keyword">new</span> MapFunction&lt;String, Tuple2&lt;String, Long&gt;&gt;() {
    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> Tuple2&lt;String, Long&gt; <span class="hljs-title">map</span><span class="hljs-params">(String s)</span> <span class="hljs-keyword">throws</span> Exception </span>{

        String[] split = s.split(<span class="hljs-string">","</span>);
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Tuple2&lt;String, Long&gt;(split[<span class="hljs-number">0</span>], Long.parseLong(split[<span class="hljs-number">1</span>]));
    }
})
        .keyBy(<span class="hljs-number">0</span>)
        .window(TumblingEventTimeWindows.of(Time.seconds(<span class="hljs-number">5</span>)))
        .minBy(<span class="hljs-number">1</span>)
        .print();

env.execute(<span class="hljs-string">"WaterMark Test Demo"</span>);

}

我们第一次试验的数据如下:

flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000

可以做一个简单的判断,第一条数据的时间戳为 1588659181000,窗口的大小为 5 秒,那么应该会在 flink,1588659185000 这条数据出现时触发窗口的计算。

我们用 nc -lk 9000 命令启动端口,然后输出上述试验数据,看到控制台的输出:

image (26).png

很明显,可以看到当第五条数据出现后,窗口触发了计算。

下面再模拟一下数据乱序的情况,假设我们的数据来源如下:

flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
flink,1588659180000
flink,1588659186000
flink,1588659187000
flink,1588659188000
flink,1588659189000
flink,1588659190000

其中的 flink,1588659180000 为乱序消息,来看看会发生什么?

image (27).png

可以看到,时间戳为 1588659180000 的这条消息并没有被处理,因为此时代码中的允许乱序时间 private Long maxOutOfOrderness = 0L 即不处理乱序消息。

下面修改 private Long maxOutOfOrderness = 5000L,即代表允许消息的乱序时间为 5 秒,然后把同样的数据发往 socket 端口。

可以看到,我们把所有数据发送出去仅触发了一次窗口计算,并且输出的结果中 watermark 的时间往后顺延了 5 秒钟。所以,maxOutOfOrderness 的设置会影响窗口的计算时间和水印的时间,如下图所示:

image (28).png

假如我们继续向 socket 中发送数据:

flink,1588659191000
flink,1588659192000
flink,1588659193000
flink,1588659194000
flink,1588659195000

可以看到下一次窗口的触发时间:

image (29).png

在这里要特别说明,Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:

  • watermark 时间 >= window_end_time;

  • 在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。

此外,因为 WaterMark 的生成是以对象的形式发送到下游,同样会消耗内存,因此水印的生成时间和频率都要进行严格控制,否则会影响我们的正常作业。

点击这里下载本课程源码

总结

这一课时我们学习了 Flink 的时间类型和水印生成,内容偏多并且水印部分理解起来需要时间,建议你结合源码再进一步学习。


举报

相关推荐

0 条评论