0
点赞
收藏
分享

微信扫一扫

Flink on Zeppelin使用示例及填坑记录

德州spark 2022-01-20 阅读 118

flink在zeppelin上的使用文档,参见flink interpreter

Flink on Zeppelin 基本概念

Flink解释器组

在Zeppelin中,支持Apache Flink的是由下面列出的五个解释器组成的Flink解释器组。

NameClassDescription
%flinkFlinkInterpreterCreates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment
%flink.pyflinkPyFlinkInterpreterProvides a python environment
%flink.ipyflinkIPyFlinkInterpreterProvides an ipython environment
%flink.ssqlFlinkStreamSqlInterpreterProvides a stream sql environment
%flink.bsqlFlinkBatchSqlInterpreterProvides a batch sql environment

内置的变量

Scala是Flink在Zeppelin上的默认语言,Flink Interpreter (%flink) 自动创建下面 6 个变量作为 Flink Scala 程序的入口。

  • senv (StreamExecutionEnvironment),
  • benv (ExecutionEnvironment)
  • stenv (StreamTableEnvironment for blink planner)
  • btenv (BatchTableEnvironment for blink planner)
  • stenv_2 (StreamTableEnvironment for flink planner)
  • btenv_2 (BatchTableEnvironment for flink planner)
  • z (ZeppelinContext)

Flink Interpreter的运行模式

Flink Interpreter在zeppelin中的三种运行模式,正式环境使用YARN模式

  • Local 模式: 在本地创建一个MiniCluster,适合做POC或者小数据量的试验,必须配置如下:
    • FLINK_HOME /opt/flink/flink-1.13.2
    • flink.execution.mode local
  • Remote 模式: 连接一个已经创建好的Flink集群,一般是Flink standalone集群
    • FLINK_HOME /opt/flink/flink-1.13.2
    • flink.execution.mode remote
    • flink.execution.remote.host 172.25.xx.xx
    • flink.execution.remote.port 8081
  • YARN 模式: 在Yarn集群中创建Flink Cluster
    • FLINK_HOME /opt/flink/flink-1.13.2
    • flink.execution.mode yarn
    • HADOOP_CONF_DIR /etc/hadoop/conf
  • Yarn Application Mode: 是Zeppelin服务器主机上的一个独立的Flink解释器进程,Flink解释器运行在yarn容器中的JobManager中。
    • flink.execution.mode yarn-application
    • HADOOP_CONF_DIR /etc/hadoop/conf
    • 在内部,flink会调用命令hadoop classpath,并在flink解释器进程中加载所有hadoop相关的jar文件: export HADOOP_CLASSPATH=hadoop classpath``

如果连接超时,则需要配置如下参数
zeppelin.interpreter.connect.timeout 600000

代码示例

简单示例

%flink
val data=benv.fromElements("hello world","hello flink","hello hadoop")
val wc=data.flatMap(line=>line.split("\\s"))
.map(w=>(w,1))
.groupBy(0)
.sum(1)
z.show(wc)

在这里插入图片描述

Batch ETL

基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务

下载数据curl -O https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip

注意:手工删除bank.csv的第一行数据

%flink.bsql
DROP TABLE IF EXISTS bank;
CREATE TABLE bank (
    age INT,
    job STRING,
    marital STRING,
    education STRING,
    `default` STRING,
    balance STRING,
    housing STRING,
    loan STRING,
    contact STRING, 
    `day` STRING,
    `month` STRING,
    duration INT,
    campaign INT,
    pdays INT,
    privious INT,
    poutcome STRING,
    y STRING
) WITH (
'connector'='filesystem',
'path'='hdfs://172.25.xx.xx:8020/tmp/bank.csv',
'format'='csv',
'csv.field-delimiter'=';',
'csv.quote-character'='"',
'csv.ignore-parse-errors' = 'true'
);

select * from bank limit 10;

BI 数据分析

基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 BI 数据分析

%flink.bsql
select age,count(1) as total from bank
where age<${maxAge=38}
group by age order by age

在这里插入图片描述

%flink.bsql
select age,count(1) as total from bank
where marital='${marital=married,single|divorced|married}'
group by age order by age

在这里插入图片描述

使用 Flink UDF

%flink-yarn
class ScalaUpper extends ScalarFunction {
  def eval(str: String) = str.toUpperCase
} 
btenv.registerFunction("scala_upper", new ScalaUpper())

%flink-yarn.bsql
select scala_upper(education),count(1) from bank group by education

在这里插入图片描述

访问hive中的数据

为了在Flink中使用Hive,你必须做以下设置。

  • 将zeppelin.flink.enableHive设置为true
  • 将zeppelin.flink.hive.version设置为您正在使用的hive版本,如3.1.0
  • 将HIVE_CONF_DIR设置为hive-site.xml所在的位置。
  • 确保hive metastore已经启动,并且在hive-site.xml中配置了hive.metastore.uris
  • 将以下依赖项复制到flink安装的lib文件夹中。(以HDP 3.1.5.0-152,Flink 1.13.2为例,需要的依赖包如下)
    • flink-sql-connector-hive-3.1.2_2.11。 注意:这是一个bundled hive jar,详见:flink hive
show tables;
describe t1;
select * from t1 limit 1;

在这里插入图片描述

异常处理

checkpoint is not supported for batch jobs

在执行批处理SQL时,报如下异常:checkpoint is not supported for batch jobs
需要将zeppelin.flink.job.check_interval设置为-1,此值默认值为1000.

NoSuchMethodError: com.google.common.base.Preconditions.checkArgument

flink SQL连接hive报错​​java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V​​,查询资料后发现就是guava版本冲突造成的。
hive-exec-3.1.1内置的guava是19.0版本的,而hadoop中的guava是28.0-jre版本的,flink内置的guava也有多个版本。彼此之间版本就冲突了。

解决方案参见:link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj
将flink-sql-connector-hive-3.1.2_2.11 jar中的shade的guava包删除 或 重新编译前将guava依赖删除

ClassNotFoundException:org.apache.hadoop.mapred.JobConf

flink SQL连接hiver后,查询hive 表中的数据,报如下异常:
Caused by: java.lang.ClassNotFoundException: **org.apache.hadoop.mapred.JobConf** at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 52 more

注意:hadoop集群的所有节点已经配置了export HADOOP_CLASSPATH=hadoop classpath``但没有作用。
需要将如下jar包从hadoop集群复制到flink lib目录下:
在这里插入图片描述

参考

connectors table formats csv
最新版本Flink 1.12.0 的sql-cli配置连接yarn-session 问题处理
link SQL报错java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLj

举报

相关推荐

0 条评论