0
点赞
收藏
分享

微信扫一扫

Flink流式处理百万数据量CSV文件

前言

最近公司让做一个'没有必要'的需求

​需求针对的对象​

这是同一个csv文件的不同展示形式

  • Excel展示形式

Flink流式处理百万数据量CSV文件_字符串

  • 文本展示形式

Flink流式处理百万数据量CSV文件_flink_02

这个csv文件中可能有数百万条数据

​需求​

将所有的异常数据检测出来

  • 什么样的数据是异常数据

Flink流式处理百万数据量CSV文件_数据_03

圈红的数据我手动添加了一个a

原本是数字类型

现在变成了一个字符串类型

那么程序中将字符串类型转换为数字类型的话

就会报错

那这个值就是异常数据

  • 为什么我说是 '没有必要的需求'
  • 百万级别的数据量的csv一般都是由数据库导出来的
  • 数据库的列字段都是定义好的 比如是decimal类型的数据类型 导出来的话 那么也肯定是数字而不会是字符串
  • 而出现数字成字符串 是由于人工手动录入的情况下 才会出现 而这种情况又比较少
  • 该csv数据集用于跑python算法 比如通过pandas读取csv 统计某一个列数据的和 若全是数字则可以统计 若某一行数据是字符串 则会出现异常 那么可以通过pandas的方式做异常值数据处理 比如剔除这一行即可
  • 综上 花费人力物力去处理这一个没有必要的需求真的有些'没必要'

但领导发话了呀 这是客户的需求

所以do it

​大致实现思路​

读取该csv文件

解析csv每一行数据

检验每一行数据是否是异常数据

​实现方式​

  • 普通方式

通过java读取csv 然后一行一行处理

这种方式 若单机内存太小 很容易造成内存溢出

而且方式很low 没有多大挑战性

对个人技术能力没有提升

所以这种方式pass

  • Flink 流式处理

刚好头段时间 自己学习到了Flink

之前一直是纸上谈兵

现在终于有了用武之地

选好了技术方案 let's do it!

业务逻辑图

Flink流式处理百万数据量CSV文件_字符串_04

接下来简要说说此流程上的核心技术的实现原理

rabbitmq

DEMO源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/rabbitmq

发送消息

Flink流式处理百万数据量CSV文件_字符串_05Flink流式处理百万数据量CSV文件_数据_06

​重点配置说明​

  • durable
    是否持久化,是否将队列持久化到mnesia数据库中,有专门的表保存队列声明
  • exclusive
    ①当前定义的队列是connection的channel是共享的,其他的connection是访问不到的
    ②当connection关闭的时候,队列将被删除
  • autoDelete
    当最后一个consumer(消费者)断开之后,队列将自动删除

监听消息

方式一

Flink流式处理百万数据量CSV文件_数据_07Flink流式处理百万数据量CSV文件_数据_08

​重要参数说明​

  • autoack

Flink流式处理百万数据量CSV文件_flink_09

autoAck(同no-ack)为true的时候
消息发送到操作系统的套接字缓冲区时即任务消息已经被消费
但如果此时套接字缓冲区崩溃
消息在未被消费者应用程序消费的情况下就被队列删除

所以,如果想要保证消息可靠的达到消费者端
建议将autoAck字段设置为false
这样当上面套接字缓冲区崩溃的情况同样出现
仍然能保证消息被重新消费

方式二 注解方式

Flink流式处理百万数据量CSV文件_数据_10

  • 对类添加@RabbitListener(queues = "${java.flink.queue}")注解
  • 指定队列名称 可从配置文件中读取
  • 对方法添加 @RabbitHandler 注解

​三个参数​

  • Object message

任意类型的消息

# 解析mq消息
String messageString=JsonUtils.toJson(message);
Message message1=JsonUtils.fromJsonObject(messageString,Message.class);
String message2 = new String(message1.getBody(), "UTF-8");

  • Message msg

手动确认

//如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = msg.getMessageProperties().getDeliveryTag();

//通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);

  • Channel channel

// 处理失败,重新压入MQ
channel.basicRecover();

线程池

源码

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread

Flink流式处理百万数据量CSV文件_数据_11

​spring线程相关注解​

  • @EnableAsync
    使用多线程
  • @Async

加在线程任务的方法上(需要异步执行的任务)
定义一个线程任务
通过spring提供的ThreadPoolTaskExecutor就可以使用线程池

​重要参数​

  • corePoolSize
    核心线程数
  • maxPoolSize
    最大线程数
  • queueCapacity
    队列容量
  • keepAliveSeconds
    活跃时间
  • waitForTasksToCompleteOnShutdown
    设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
  • rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
  • setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
  • CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

​使用线程池中的线程去执行异步任务​

Flink流式处理百万数据量CSV文件_数据_12

分布式内存文件系统Alluxio

​环境搭建​

  • 自定义dokcer网络

docker network create alluxio_nw

  • 安装alluxio master

docker run -d  --rm \
-p 19999:19999 \
-p 19998:19998 \
--net=alluxio_nw \
--name=alluxio-master \
-e ALLUXIO_JAVA_OPTS=" \
-Dalluxio.master.hostname=alluxio-master" \
alluxio/alluxio master

  • 安装alluxio worker

docker run -d --rm \
-p 29999:29999 \
-p 30000:30000 \
--net=alluxio_nw \
--name=alluxio-worker \
--shm-size=3971.64MB \
-e ALLUXIO_JAVA_OPTS=" \
-Dalluxio.worker.memory.size=3971.64MB \
-Dalluxio.master.hostname=alluxio-master \
-Dalluxio.worker.hostname=alluxio-worker" \
alluxio/alluxio worker

​域名转发配置​

sudo vim /etc/hosts

127.0.0.1 alluxio-worker

​上传alluxio文件​

Flink流式处理百万数据量CSV文件_字符串_13

​下载alluxio文件​

Flink流式处理百万数据量CSV文件_字符串_14

​将文件流写入本地​

Flink流式处理百万数据量CSV文件_flink_15

​源码​

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/alluxio

Flink流式处理数据

Flink流式处理百万数据量CSV文件_数据_16

结合当前业务梳理流程

来源数据源:数百万数据量的CSV文件
结果保存数据:CSV或Mysql

Flink流式处理百万数据量CSV文件_数据_17

​读取目标数据​

Flink流式处理百万数据量CSV文件_字符串_18

  • 略过表头
  • 在已知几列的情况下 执行上图代码

比如有6列
那么读取csv的时候
flink均认为是String类型去读取(人为指定的类型)

​筛选异常数据​

Flink流式处理百万数据量CSV文件_flink_19

​异常数据的判断标准​

比如输入数据源CSV中一行数据为

Flink流式处理百万数据量CSV文件_字符串_20

若认定圈红的那一列是数字类型

那么此时因为是字符串 无法转换为数字类型

那么这一行是异常数据

​将异常数据保存​

Flink流式处理百万数据量CSV文件_flink_21

根据业务灵活处理

  • 第一个全红的 2: 表示第二行
  • 第二个圈红的部分 表示 当前列数据应为Double类型但实际上是非数字类型 所以该行是异常数据

在方法内部对于全局变量的使用仅限于在方法内部使用 不会对方法之后的作用域有效

​比如​

Flink流式处理百万数据量CSV文件_数据_22

​过滤函数​

filter 是过滤函数 
对每一行数据进行过滤
返回false则会被过滤掉

​全局变量​

List<Integer> rowList=new ArrayList<>();
在filter函数作用域之外
可以在filter函数中使用
仅限于filter函数之间才有效
在filter函数之后 则无法使用filter对该变量的修改

  • 保存到CSV

Flink流式处理百万数据量CSV文件_数据_23

  • 缺陷

需要指定Tuple类

比如生成的csv文件一行有25列 那么就使用Tuple25

还需要定义25个泛型 比如Tuple25<String,String,....>

最多可支持25列

如果是超过25列那么就不支持

所以使用起来非常不方便 而且使用范围有限

我当时在这块费了时间,因为csv列数超过了25列 比如26列,我想着在增加一个Tuple26或TupleN 尝试了之后 不可以 后来找到了国内Flink钉钉群 请教了下里面的大佬 说是建议保存到Mysql中

  • 保存到Mysql

Flink流式处理百万数据量CSV文件_flink_24

配置mysql信息和要执行的sql语句

​局限性​

假如我有1000个列 那么需要建立一个表有1000个列吗

如果有5000个列呢 所以这种方式 也不太好

此时已经到了项目的最后期限了 很多同事在等着我的结果 我的压力也倍增 差点准备放弃flink 用low的方式实现 最后灵机一动看到了保存到txt文本文件的方法

  • 保存到Text

Flink流式处理百万数据量CSV文件_数据_25

这种方式简单有效

​DEMO源码​

https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/flink

Flink国内钉钉群号

群号 : 23138101

后记

上面这点东西 忙活了我3-4天时间 

自我感觉 真是太笨了

国内相关的资料目前还比较少

写下这点心得和经验给需要的朋友们

避免走弯路


举报

相关推荐

0 条评论