flink在zeppelin上的使用文档,参见flink interpreter
Flink on Zeppelin 基本概念
Flink解释器组
在Zeppelin中,支持Apache Flink的是由下面列出的五个解释器组成的Flink解释器组。
Name | Class | Description |
---|---|---|
%flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment |
%flink.pyflink | PyFlinkInterpreter | Provides a python environment |
%flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment |
%flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment |
%flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment |
内置的变量
Scala是Flink在Zeppelin上的默认语言,Flink Interpreter (%flink
) 自动创建下面 6 个变量作为 Flink Scala 程序的入口。
- senv (StreamExecutionEnvironment),
- benv (ExecutionEnvironment)
- stenv (StreamTableEnvironment for blink planner)
- btenv (BatchTableEnvironment for blink planner)
- stenv_2 (StreamTableEnvironment for flink planner)
- btenv_2 (BatchTableEnvironment for flink planner)
- z (ZeppelinContext)
Flink Interpreter的运行模式
Flink Interpreter在zeppelin中的三种运行模式,正式环境使用YARN模式
- Local 模式: 在本地创建一个MiniCluster,适合做POC或者小数据量的试验,必须配置如下:
- FLINK_HOME /opt/flink/flink-1.13.2
- flink.execution.mode local
- Remote 模式: 连接一个已经创建好的Flink集群,一般是Flink standalone集群
- FLINK_HOME /opt/flink/flink-1.13.2
- flink.execution.mode remote
- flink.execution.remote.host 172.25.xx.xx
- flink.execution.remote.port 8081
- YARN 模式: 在Yarn集群中创建Flink Cluster
- FLINK_HOME /opt/flink/flink-1.13.2
- flink.execution.mode yarn
- HADOOP_CONF_DIR /etc/hadoop/conf
- Yarn Application Mode: 是Zeppelin服务器主机上的一个独立的Flink解释器进程,Flink解释器运行在yarn容器中的JobManager中。
- flink.execution.mode yarn-application
- HADOOP_CONF_DIR /etc/hadoop/conf
- 在内部,flink会调用命令hadoop classpath,并在flink解释器进程中加载所有hadoop相关的jar文件:
export HADOOP_CLASSPATH=
hadoop classpath``
如果连接超时,则需要配置如下参数
zeppelin.interpreter.connect.timeout 600000
代码示例
简单示例
%flink
val data=benv.fromElements("hello world","hello flink","hello hadoop")
val wc=data.flatMap(line=>line.split("\\s"))
.map(w=>(w,1))
.groupBy(0)
.sum(1)
z.show(wc)
Batch ETL
基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务
下载数据curl -O https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip
注意:手工删除bank.csv的第一行数据
%flink.bsql
DROP TABLE IF EXISTS bank;
CREATE TABLE bank (
age INT,
job STRING,
marital STRING,
education STRING,
`default` STRING,
balance STRING,
housing STRING,
loan STRING,
contact STRING,
`day` STRING,
`month` STRING,
duration INT,
campaign INT,
pdays INT,
privious INT,
poutcome STRING,
y STRING
) WITH (
'connector'='filesystem',
'path'='hdfs://172.25.xx.xx:8020/tmp/bank.csv',
'format'='csv',
'csv.field-delimiter'=';',
'csv.quote-character'='"',
'csv.ignore-parse-errors' = 'true'
);
select * from bank limit 10;
BI 数据分析
基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 BI 数据分析
%flink.bsql
select age,count(1) as total from bank
where age<${maxAge=38}
group by age order by age
%flink.bsql
select age,count(1) as total from bank
where marital='${marital=married,single|divorced|married}'
group by age order by age
使用 Flink UDF
%flink-yarn
class ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}
btenv.registerFunction("scala_upper", new ScalaUpper())
%flink-yarn.bsql
select scala_upper(education),count(1) from bank group by education
访问hive中的数据
为了在Flink中使用Hive,你必须做以下设置。
- 将zeppelin.flink.enableHive设置为true
- 将zeppelin.flink.hive.version设置为您正在使用的hive版本,如3.1.0
- 将HIVE_CONF_DIR设置为hive-site.xml所在的位置。
- 确保hive metastore已经启动,并且在hive-site.xml中配置了hive.metastore.uris
- 将以下依赖项复制到flink安装的lib文件夹中。(以HDP 3.1.5.0-152,Flink 1.13.2为例,需要的依赖包如下)
- flink-sql-connector-hive-3.1.2_2.11。 注意:这是一个bundled hive jar,详见:flink hive
show tables;
describe t1;
select * from t1 limit 1;
异常处理
checkpoint is not supported for batch jobs
在执行批处理SQL时,报如下异常:checkpoint is not supported for batch jobs
需要将zeppelin.flink.job.check_interval
设置为-1,此值默认值为1000.
NoSuchMethodError: com.google.common.base.Preconditions.checkArgument
flink SQL连接hive报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V,查询资料后发现就是guava版本冲突造成的。
hive-exec-3.1.1内置的guava是19.0版本的,而hadoop中的guava是28.0-jre版本的,flink内置的guava也有多个版本。彼此之间版本就冲突了。
解决方案参见:link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj
将flink-sql-connector-hive-3.1.2_2.11 jar中的shade的guava包删除 或 重新编译前将guava依赖删除
ClassNotFoundException:org.apache.hadoop.mapred.JobConf
flink SQL连接hiver后,查询hive 表中的数据,报如下异常:
Caused by: java.lang.ClassNotFoundException: **org.apache.hadoop.mapred.JobConf** at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 52 more
注意:hadoop集群的所有节点已经配置了export HADOOP_CLASSPATH=
hadoop classpath``但没有作用。
需要将如下jar包从hadoop集群复制到flink lib目录下:
参考
connectors table formats csv
最新版本Flink 1.12.0 的sql-cli配置连接yarn-session 问题处理
link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj