Standalone
开始(Getting Started)
该部分指导你在一台计算机的单独进程中搭建一个本地Flink集群,同时,它很容易扩展成一个分布式的独立集群,这部分的内容在参考中做了介绍。
介绍(Introduction)
Standalone模式是部署Flink最基础的方式,在部署总览页面介绍的Flink服务以进程的形式运行在操作系统中。不同于结合K8S、Yarn部署,你必须负责重启失败的任务或在操作期间分配资源。
在Standalone的其他子页面,我们介绍了其他的部署方法,即基于standalone模式的docker容器部署或K8S容器部署。
准备(Preparation)
Flink可以运行在所有的类UNIX系统上,如Linux、MacOS、Windows等。在部署之前,请确认你的系统满足如下要求:
- 安装Java8或以上
- 下载Flink安装包并完成解压
启动Standalone集群(Session模式)(Starting a Standalone Cluster (Session Mode) )
下列步骤展示了如何启动一个Standalone集群,并提交一个样例任务 :
# we assume to be in the root directory of the unzipped Flink distribution
# (1) Start Cluster
$ ./bin/start-cluster.sh
# (2) You can now access the Flink Web Interface on http://localhost:8081
# (3) Submit example job
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# (4) Stop the cluster again
$ ./bin/stop-cluster.sh
在(1)中,我们将会进行两个操作:启动一个JobManager的JVM,一个TaskManager的JVM。JobManager会启动一个web服务,可以通过localhost:8081访问。
在(3)中,我们启动一个Flink客户端(一个短周期的JVM进程),并将一个应用提交到JobManager。
部署模式(Deployment Modes)
Application Mode
更多关于application mode的高级内容,请跳转到部署模式总览。
为了在嵌入式应用程序中启动一个Flink Manager,我们使用bin/standalone-job.sh脚本。
我们通过本地运行TopSpeedWindowing.jar案例来演示这个模式,运行在一个单节点的TaskManager。
应用Jar包所在路径需要可以被访问,最简单的方法是将jar包放入lib目录下。
cp ./examples/streaming/TopSpeedWindowing.jar lib/
然后,我们可以启动JobManager:
./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
现在,可以使用localhost:8081访问web接口。然而,该应用好不能运行,因为没有TaskManager在运行。
./bin/taskmanager.sh start
小贴士:如果你的应用需要更多资源,你可以启动多个TaskManager。
通过脚本也可以停止服务,调用多次脚本或使用stop-all脚本,可以停止多个实例。
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
Per-Job Mode
Standalone集群不支持Per-Job模式。
Session Mode
本地Session模式部署已经在上文中介绍过。
Standalone集群相关介绍(Standalone Cluster Reference)
配置(Configuration)
所有开放的配置项都在配置页面列出来了,特别是基础设置部分,包含了关于端口、内存、并行等的好的建议。
调试(Debugging)
如果Flink没有按我们期望的运行,我们建议查看Flink的日志文件,作为更进一步排查的起点。
日志文件位于logs目录下。运行在当前机器上的每个Flink服务都有一个.log文件。默认配置下,每次启动Flink服务都会轮转日志文件,旧服务的日志文件将会在末尾添加一个数字编号。
此外,日志文件可以通过FlinkWeb访问(JobManager和TaskManager都可以)。
默认情况下,Flink日志级别为“INFO”级别,它为所有明显的问题提供了基础信息。例如,当Flink似乎有错误行为时,建议将日志级别设置为“DEBUG”,日志级别由conf/log4.properties控制。设置rootLogger.level = DEBUG将日志级别更新为DEBUG。
这有一个专门的页面,用于介绍Flink日志的相关内容。
组件管理脚本(Component Management Scripts)
启动停止集群(Starting and Stopping a cluster)
bin/start-cluster.sh和bin/stop-cluster.sh依赖conf/masters和conf/workers来决定集群组件实例的数量。
如果集群机器配置了SSH免密登录,并且它们共享同样的目录结构,那么脚本还可以远程启动、关闭实例。
例一:本地启动有两个TaskManager的集群
conf/masters配置内容:
localhost
conf/workers配置内容:
localhost
localhost
例二:启动一个分布式集群的JobManagers
假设一个集群有4台机器(master1、worker1、worker2、worker3),每一台都可通过网络互相访问。
conf/masters 配置内容:
master1
conf/workers配置内容:
worker1
worker2
worker3
注意jobmanager.rpc.address需要设置为master1。
例三:通过配置待命状态的JobManager实现高可用。
启动或停止Flink组件(Starting and Stopping Flink Components)
bin/jobmanager.sh和bin/taskmanager.sh脚本支持在后台启动相关守护进程(通过启动参数),或在前台(使用start-foreground)。前台启动时,日志会打印在标准输出。这个模式对于另一个进程控制Flink守护进程的场景非常有用。
脚本可以被调用多次,例如需要多个TaskManager的场景。这些实例由脚本跟踪,并且可以一对一停止(使用stop命令)或一起停止所有(使用stop-all命令)。
Windows用户(Windows Cygwin Users)
If you are installing Flink from the git repository and you are using the Windows git shell, Cygwin can produce a failure similar to this one:
如果你是通过git仓库安装Flink,并且你使用的是Windows git shell,你可能遇到如下错误:
这个错误发生的原因是,Windows运行环境下,git自动将UNIX行结束符转换为Windows风格的行结束符,然而Windows只能处理Unix风格的行结束符。解决方法是设置通过如下三个步骤设置明确的行结束符:
- 启动Windows Shell终端。
- 跳转到你的家目录下
cd; pwd
这将返回Windows root路径下的一个路径。
- 使用NotePad、WordPad或其他文本编辑器打开家目录下的文件.bash_profile,并且追加如下内容:(如果文件不存在,你需要创建它)
export SHELLOPTS
set -o igncr
- 保存文件并且打开一个新的终端。
设置高可用(Setting up High-Availability)
为了激活standalone集群的HA,你必须使用ZookeeperHA服务。
此外,你必须配置你的集群启动多个JobManager。
为了启动高可用集群,需要配置conf/masters的masters文件:
masters文件:masters文件包含所有ip(JobManager将要运行的机器),以及web服务需要绑定的端口
master1:webUIPort1
[...]
masterX:webUIPortX
默认情况下,JobManager将会随机选择一个端口用于内部进程通信。你可以通过high-availablity.jobmanager.port进行配置,这个参数可以设置单个端口(如50010),随机端口(50000-50025),或绑定所有端口(50010,50011,50020-50025,50050-50075)
样例:2个JobManager的Standalone高可用集群(Example: Standalone HA Cluster with 2 JobManagers)
- 在conf/flink-conf.yaml配置高可用以及zookeeper:
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery
- 在conf/masters配置masters
localhost:8081
localhost:8082
- 在conf/zoo.cfg中配置zookeeper服务(目前它只能在每台机器上运行一个单机的Zookeeper服务)
server.0=localhost:2888:3888
- 启动zookeeper服务
./bin/start-zookeeper-quorum.sh
# Starting zookeeper daemon on host localhost.
- 启动HA集群
./bin/start-cluster.sh
#Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
#Starting standalonesession daemon on host localhost.
#Starting standalonesession daemon on host localhost.
#Starting taskexecutor daemon on host localhost.
- 停止zookeeper服务和集群
./bin/stop-cluster.sh
#Stopping taskexecutor daemon (pid: 7647) on localhost.
#Stopping standalonesession daemon (pid: 7495) on host localhost.
#Stopping standalonesession daemon (pid: 7349) on host localhost.
./bin/stop-zookeeper-quorum.sh
#Stopping zookeeper daemon (pid: 7101) on host localhost.
Yarn
开始(Getting Started)
该部分介绍如何集成YARN配置一个功能齐全的Flink集群。
介绍(Introduction)
Apache Hadoop YARN是一个资源调度框架,广受许多数据处理框架欢迎。Flink的服务将提交到YARN的ResourceManager,这些服务会在集群中生成容器并由YARN的NodeManager管理。Flink在这些容器中部署它的JobManager和TaskManager实例。
Flink可以根据运行在JobManager上的作业所需的slot数量动态分配或取消分配TaskManager资源。
准备(Preparation)
该部分假设了一个可用的YARN环境,版本为2.4.1及以上。YARN环境可以通过Amazon EMR、Google Cloud DataProc或Cloudera等产品快速提供。不建议在本地或集群上手动设置YARN环境。
- 通过yarn top命令,确认你的YARN集群已经准备好接收Flink任务,它应该没有任何错误信息。
- 从下载页面下载Flink并解压
- 确保HADOOP_CLASSPATH环境变量完成设置(可以通过echo $HADOOP_CLASSPATH检查),如果没有设置,使用如下命令完成设置
export HADOOP_CLASSPATH=`hadoop classpath`
在Yarn上启动Flink会话(Starting a Flink Session on YARN)
一旦确认配置了HADOOP_CLASSPATH环境变量,你可以启动一个Flink on Yarn会话,并提交一个样例任务:
# we assume to be in the root directory of
# the unzipped Flink distribution
# (0) export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`
# (1) Start YARN Session
./bin/yarn-session.sh --detached
# (2) You can now access the Flink Web Interface through the
# URL printed in the last lines of the command output, or through
# the YARN ResourceManager web UI.
# (3) Submit example job
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# (4) Stop YARN session (replace the application id based
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
祝贺你,你成功的在Flink on YARN上运行了一个Flink应用。
在YARN上部署Flink的可选模式(Deployment Modes Supported by Flink on YARN)
我们建议使用Per-Job或Application模式部署Flink,这些模式提供了更好的任务隔离机制。
Application Mode
Application模式将会在yarn上启动一个Flink集群,应用Jar包的main方法会运行在JobManager上。应用运行结束后集群将会关闭。你可以使用yarn application -kill 命令手动停止集群。
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
一旦Application模式集群部署成功,你就可以与它进行交互操作,如取消或设置保存点。
# List running job on the cluster
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
注意,取消你的任务将会停止集群。
为了使用application模式的全部功能,可以考虑配置yarn.provided.lib.dirs,并提前上传你的应用jar包到集群所有节点都可以访问的位置,可以使用如下命令完成操作:
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
hdfs://myhdfs/jars/my-application.jar
上面的操作将允许作业的提交是非常轻量的,因为所需的Flink jar和应用程序jar将由指定的远程位置获取,而不是由客户端发送到集群。
Per-Job Mode
Per-Job集群模式将会在yarn启动一个Flink集群,然后本地运行提供的应用Jar包并最后提交JobGraph到Yarn上的JobManager。如果你传入了–detached参数,client将会在提交被接收后停止。
yarn集群将在任务结束后停止。
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
Per-Job集群部署成功后,你可以与它进行交互操作,如取消或设置保存点。
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
注意,取消你的任务会导致Per-Job集群关闭。
Session Mode
我们在当前页面的Getting Started中介绍了Session模式部署。
Session模式有两种操作模式:
连接模式(默认):yarn-session.sh提交Flink集群到YARN,但是client任然保持运行,跟踪集群的运行状态。如果集群失败,client会显示错误信息。如果client被终止,它会通知集群停止运行。
分离模式(-d 或 --detached):yarn-session.sh 客户端提交Flink集群到yarn,然后客户端返回。停止Flink集群需要调用客户端或yarn工具箱。
session模式将在/tmp/.yarn-properties-创建一个隐藏的yarn配置文件,当提交任务时,这个文件将被集群通过命令行接口选中。
在提交Flink任务时,你可以通过命令行接口手动指定yarn集群目标。例如:
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
./examples/streaming/TopSpeedWindowing.jar
你可以通过如下命令再次访问yarn session:
./bin/yarn-session.sh -id application_XXXX_YY
除了通过conf/flink-conf.yaml文件配置,你也可以在提交时使用-Dkey=value参数传递任何参数到./bin/yarn-session.sh客户端。
yarn session client也有一些快捷参数用于公共配置。可以通过./bin/yarn-session.sh -h查看。
Flink on YARN相关内容(Flink on YARN Reference)
配置Flink on yarn(Configuring Flink on YARN)
yarn相关的配置在配置页面中已经列出。
下列的配置参数由Flink on yarn管理,因为在运行时它们可能被覆盖。
jobmanager.rpc.address(Flink on yarn可以动态设置JobManager容器的地址)
io.tmp.dirs(如果未设置,yarn将会为flink设置临时目录)
high-availability.cluster-id(自动生成ID用于区分HA中的多个集群)
如果你需要给flink传递额外的Hadoop配置文件,你可以配置HADOOP_CONF_DIR环境变量,该变量接收一个目录名,这个目录包含了Hadoop配置文件。默认情况下,所有必须的Hadoop配置文件通过HADOOP_CLASSPATH环境变量进行加载。
资源分配行为(Resource Allocation Behavior)
现存的资源无法运行所有提交的任务时,yarn上的JobManager需要额外的TaskManager。运行在session模式时,当其他任务被提交,如果需要,JobManager将分配额外的TaskManager。未使用的TaskManager在超时后将会被释放。
JobManager和TaskManager进程的内存配置将受到yarn实现的重视。默认情况下,上报的VCores数量等于每个TaskManager配置的slot数。yarn.containers.vcores允许用一个自定义的数值覆盖VCores数量。为了让这个参数起作用,你需要激活yarn集群的CPU调度。
失败的容器会被yarn替换(包括JobManager)。JobManager容器重启的最大次数通过yarn.application-attempts配置(默认为1)。当所有重试都失败时,yarn应用将会失败。
yarn高可用(High-Availability on YARN)
通过为YARN绑定一个高可用服务可以配置yarn的高可用。
一旦高可用服务配置好,它将持久化JobManager的元数据并展示leader票数。
yarn关注的是JobManager的失败重启。JobManager重启的最大次数由两个配置参数决定。第一个是yarn.application-attempts,默认值是2,这个值受到参数yarn.resourcemanager.am.max-attemps参数的限制,它的默认值也是2。
注意flink部署在yarn上时,管理了high-availability.cluster-id配置参数。Flink将其默认值设置为yarn应用id。在yarn上部署HA集群时,不应该覆盖此参数。集群ID用于在HA后端(如zookeeper)区分多个HA集群。重写次配置参数会导致多个yarn集群互相影响。
容器关闭行为(Container Shutdown Behaviour)
yarn 2.3.0<version<2.4.0 当application master故障时,所有的容器都将重启。
yarn 2.4.0<version<2.6.0 在application master故障时,TaskManager容器将保持活跃。这样做的优点是启动快速并且用户无需再次等待资源的获取。
yarn 2.6.0<=version 设置失败尝试有效间隔时间为flink akka超时时间。失败尝试有效间隔表示一个应用只有当系统在一个间隔时间内尝试了最大尝试次数后才会被终止。这避免了一个长期的任务耗尽它的应用尝试次数。
Hadoop yarn2.4.0有一个重大Bug(在2.5.0中已解决),它阻止了容器从一个重启应用的Master/Job Manager容器中重启。FLINK-4142记载了细节。我们推荐使用最低Hadoop2.5.0来配置高可用。
支持的Hadoop版本(Supported Hadoop versions)
Flink on yarn被Hadoop2.4.1再次编译了,并且所有大于2.4.1的版本都支持,包括Hadoop3.x。
为了给Flink提供必要的Hadoop依赖,我们推荐设置HADOOP_CLASSPATH环境变量,在Getting Started/Preparation部分介绍过。
如果无法配置,相关依赖可以放到flink的lib目录下。
Flink也在lib目录下提供了预绑定的hadoop jar包,在Downloads / Additional组件网页有相关介绍。这些预绑定的jar包是隐藏的,从而避免了公共jar包的依赖冲突。Flink社区没有对这些预绑定的jar包进行测试。
在防火墙后运行Flink on yarn(Running Flink on YARN behind Firewalls)
一些yarn集群使用防火墙来控制集群和互联网间的网络攻击。这种情况下,Flink任务只能被提交到集群内部网络(防火墙后)的yarn session。如果对于生产这是不可用的,flink允许为它的REST端配置一个端口范围,用于client-cluster的通信。端口范围配置好后,用户可以越过防火墙提交任务到Flink。
指定REST端端口的配置参数是rest.bind-port。这个参数可以选择单个端口、端口范围或多个端口。
用户jar包和类路径(User jars & Classpath)
默认情况下,在运行一个单一任务时,flink会将用户jar包加载到系统的classpath。这个行为可以使用yarn.per-job-cluster,include-user-jar参数控制。
当设置设个参数为DISABLED时,flink将会把jar包加载到用户classpath。
用户jar包在classpath中的位置可以使用下面的参数来配置:
ORDER:默认,添加jar包到系统classpath,以字典顺序排序
FIRST:添加jar包到系统classpath的首位
LAST:添加jar包到系统classpath的末尾