0
点赞
收藏
分享

微信扫一扫

如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表



1.文档编写目的



在前面的文章Fayson介绍了关于StreamSets的一些文章​​如何在CDH中安装和使用StreamSets​​》、《​​如何使用StreamSets从MySQL增量更新数据到Hive​​》、《​​如何使用StreamSets实现MySQL中变化数据实时写入Kudu​​》、《​​如何使用StreamSets实现MySQL中变化数据实时写入HBase​​》、《​​如何使用StreamSets实时采集Kafka并入库Kudu​​》和《​​如何使用StreamSets实时采集Kafka数据并写入Hive表​​,本篇文章Fayson主要介绍如何使用StreamSets实时采集Kafka中嵌套的JSON数据并将采集的数据写入Hive,StreamSets的流程处理如下:


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据


  • 内容概述

1.测试环境准备

2.配置StreamSets

3.创建Pipline及测试

4.总结


  • 测试环境

1.RedHat7.3

2.CM和CDH版本为cdh5.13.3

3.Kafka2.2.0(0.10.0)

4.StreamSets3.3.0


  • 前置条件

1.集群已启用Sentry


2.测试环境准备



1.准备测试的JSON数据


{
"school": 1,
"address": 2,
"no": "page",
"class": 3,
"students": [{
"name": "page1",
"teacher": "larry",
"age": 40
}, {
"name": "page2",
"teacher": "larry",
"age": 50
}, {
"name": "page3",
"teacher": "larry",
"age": 51
}]
}

(可左右滑动)


2.为sdc用户授权

由于集群已启用Sentry,所以这里需要为sdc用户授权,否则sdc用户无法向Hive库中创建表及写入数据


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_02


3.创建StreamSets的Pipline



1.登录StreamSets,创建一个kafka2hive_json的Pipline



如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_03


2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_04


配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_05


配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_06


3.添加JavaScript Evaluator模块,主要用于处理嵌套的JSON数据


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_07


编写JSON数据解析代码,将嵌套JSON解析为多个Record,传输给HiveMetadata


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_08


解析脚本如下:


for(var i = 0; i < records.length; i++) {
try {
var students = records[i].value['students'];
log.error("---------++++++++------" + students.length);
for(var j = 0; j< students.length; j++) {
log.info("============" + students[0].name)
var newRecord = sdcFunctions.createRecord(true);
var studentMap = sdcFunctions.createMap(true);
studentMap.no = records[i].value['no'];
studentMap.school = records[i].value['school'];
studentMap.class = records[i].value['class'];
studentMap.address = records[i].value['address'];
studentMap.name = students[j].name;
studentMap.teacher = students[j].teacher;
studentMap.age = students[j].age;
newRecord.value = studentMap;
log.info("-------------" + newRecord.value['school'])
output.write(newRecord);
}
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}

(可左右滑动)


4.添加Hive Metadata中间处理模块,选择对应的CDH版本


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_09


配置Hive的JDBC信息


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_10


配置Hive的表信息,指定表名和库名


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_11


指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_12


5.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_13


 

配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_14


配置Hadoop FS的Out Files


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_15


注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,Idle Timeout主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。



配置Late Records参数,使用默认参数即可


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_16


指定写入到HDFS的数据格式


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_17


6.添加Hive Metastore模块,该模块主要用于向Hive库中创建表


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_18


配置Hive信息,JDBC访问URL


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_19


Hive Metastore的高级配置


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_20


7.点击校验流程,如下图所示则说明流程正常


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_21


到此为止完成了Kafka数据到Hive的流程配置。


4.流程测试验证



1.启动kafka2hive_json的Pipline,启动成功如下图显示


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_kafka_22


2.使用Kafka的Producer脚本向kafka_hive_topic生产消息


kafka-console-producer \
--topic kafka_hive_topic \
--broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092

(可左右滑动)


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_23


3.在StreamSets中查看kafka2hive_json的pipline运行情况


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_hive_24


4.使用sdc用户登录Hue查看ods_user表数据


如何使用StreamSets实时采集Kafka中嵌套JSON数据并写入Hive表_数据_25


将嵌套的JSON数据解析为3条数据插入到ods_user表中。


5.总结



1.在使用StreamSets的Kafka Consumer模块接入Kafka嵌套的JSON数据后,无法直接将数据入库到Hive,需要将嵌套的JSON数据解析,这里可以使用Evaluator模块,StreamSets支持多种语言的Evaluator(如:JavaScprit、Jython、Groovy、Expression及Spark)。


2.由于集群启用了Sentry,StreamSets默认使用sdc用户访问Hive,在想Hive库中创建表时需要为sdc用户授权,否则会报权限异常。


3.在配置Hive的JDBC是,我们需要在JDBC URL后指定user和password否则会报匿名用户无权限访问的问题,注意必须带上password。


4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。


举报

相关推荐

0 条评论