0
点赞
收藏
分享

微信扫一扫

使用Flink集群环境进行数据处理

前言

上篇文章记录了搭建分布式Flink集群环境的过程​​搭建Flink集群环境​​

这篇文章咱们聊一聊Flink客户端如何对接Flink集群环境的过程

示例:Flink读取Hadoop中的文件 然后通过集群环境进行数据处理的过程

Hadoop

Hadoop集群环境搭建

​​搭建大数据运行环境之一​​

​​搭建大数据运行环境之二​​

Hadoop集群端口说明

使用Flink集群环境进行数据处理_hdfs

Hadoop集群搭建过程异常情况

不能格式化存储目录

​详细异常信息​

org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:
192.168.84.132:8485: Directory /usr/local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty

journalnode的端口是8485

​处理方式​

每一个hadoop journalnode节点上将指定目录删除即可

rm -rf /usr/local/hadoop/jn/data/nameservices001

上传文件到hdfs

cd /usr/local/hadoop/sbin
# 创建文件夹
hdfs dfs -mkdir /hdfsdata
# 文件
sudo vi /home/aaa.txt
# 上传文件到指定文件夹
hdfs dfs -put /home/aaa.txt /hdfsdata

上传文件异常

Hadoop DataNode 节点启不来

​详细异常信息​

File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). 
There are 0 datanode(s) running and no node(s) are excluded in this operation

​查看WebUI DataNode情况​

http://192.168.84.128:50070/dfshealth.html#tab-datanode

使用Flink集群环境进行数据处理_hdfs_02

解决方法一
停止集群

cd /usr/local/hadoop/sbin
./stop-all.sh

删除在hdfs中配置的data目录
  • 查看data目录

在core-site.xml中配置的hadoop.tmp.dir对应文件件

cat /usr/local/hadoop/etc/hadoop/core-site.xml

  • 删除

rm -rf /usr/local/hadoop/tmp/*

重新格式化

./hadoop namenode -format

重新启动集群

./start-all.sh

解决方法二

如果上面的方法还是不能启动DataNode那么使用这个方法

当执行文件系统格式化时
会在namenode数据文件夹
(即配置文件中dfs.name.dir在本地系统的路径)
中保存一个current/VERSION文件
记录namespaceID
标志了所有格式化的namenode版本
如果我们频繁的格式化namenode
那么datanode中保存(即dfs.data.dir在本地系统的路径)的current/VERSION文件只是你地第一次格式化时保存的namenode的ID
因此就会造成namenode和datanode之间的ID不一致

  • 解决方法A:(推荐)

删除DataNode的所有资料及将集群中每个datanode节点的/dfs/data/current中的VERSION删除
然后重新执行hadoop namenode -format进行格式化
重启集群,错误消失

  • 解决方法B:

将name/current下的VERSION中的clusterID复制到data/current下的VERSION中,覆盖掉原来的clusterID

查看DataNode情况

使用Flink集群环境进行数据处理_hdfs_03

DataNode已经起来了

查看上传文件

http://192.168.84.128:50070

使用Flink集群环境进行数据处理_hadoop_04

​该文件路径​

使用Flink集群环境进行数据处理_hadoop_05

hdfs://192.168.84.128:8020/hdfsdata/aaa.txt

Flink读取数据源并处理数据

DEMO源码

https://gitee.com/pingfanrenbiji/flink-examples-streaming

Flink读取hdfs文件并处理数据

使用Flink集群环境进行数据处理_hadoop_06

创建flink执行环境

  • 第一个参数:远程flink集群 jobmanager ip地址
  • 第二个参数:8081是jobmanager webui端口
  • 第三个参数:是当前文件夹所在的jar包

数据源

读取hdfs文件数据

各种算子简介

以单词计数为例

使用Flink集群环境进行数据处理_hdfs_07

先要将字符串数据解析成单词和次数 使用tuple2表示
第一个字段是单词 第二个字段是次数
次数初始值设置成1

​flatmap​

flatmap来做解析的工作
一行数据可能有多个单词

​keyBy​


将数据流按照单词字段即0号索引字段做分组
keyBy(int index) 得到一个以单词为key的tuple2数据流

​timeWindow​


在流上指定想要的窗口
并根据窗口中的数据计算结果
每5秒聚合一次单词数
每个窗口都是从零开始统计的

timeWindow 指定想要5秒的翻滚窗口(Tumble)

​sum​


第三个调用为每个key每个窗口指定了sum聚合函数
按照次数字段(即1号索引字段想家)
得到结果数据流
将每5秒输出一次 这5秒内每个单词出现的次数

将数据打印到控制台

所有算子操作(创建源、聚合、打印)只是构建了内部算子操作的图形

只有在execute被调用时才会在提交到集群或本地计算机上执行

执行报错 找不到代码异常

​具体异常信息​

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2

​解决方法​

  • 将当前目录文件夹打包成jar包

使用Flink集群环境进行数据处理_hadoop_08

使用maven插件maven-jar-plugin

  • 第三个参数指向该jar包

在FLink Web UI查看该任务的执行过程

使用Flink集群环境进行数据处理_hadoop_09

编译异常

无效的标记

--add-exports=java.base/sun.net.util=ALL-UNNAMED

使用Flink集群环境进行数据处理_hadoop_10

不支持hdfs文件系统

​具体异常信息​

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded

​处理方式​

  • 下载 flink hadoop资源jar包

https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar

  • 放入flink 安装包 lib目录下

使用Flink集群环境进行数据处理_flink_11

每个节点都需要放上该jar包 然后重启flink集群环境

当前操作节点hadoop namenode节点为standby状态

​具体详细信息​

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

​解决方法​

重新格式化2个namenode节点即可

具体详见

​​搭建大数据运行环境之二​​

遗留问题

​flink数据源来自于socket数据​

使用Flink集群环境进行数据处理_hdfs_12

​启动socket服务并输入数据​

使用Flink集群环境进行数据处理_hdfs_13

​问题是​

Flink并没有监听到该socket数据
暂时还没有找到原因
了解的朋友们请联系我
指导我一下哦

​如果本地环境是可以监听到的​

使用Flink集群环境进行数据处理_flink_14

后记

为了解决这个问题
我请教了下 “Apache Flink China社区”钉钉群里面的谢波老师
他告诉我:

通过java或scala一般创建本地执行环境 即

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();'

很少有

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles);'

这样用的

若使用flink分布式环境 那么通过web ui界面 上传jar包的方式来完成

使用Flink集群环境进行数据处理_flink_15

这也就解释了为什么我没有找到相关资料
只能靠自己'摸着石头过河'了

结语

在了解一件新事物的时候 
按照自己的想法 一番努力和挣扎之后
也许方向是错误的
但也会对它更进一步的了解了


举报

相关推荐

0 条评论