0
点赞
收藏
分享

微信扫一扫

如何使用Avro文件


本文介绍了如何使用ApacheSqoop实用程序将数据从Oracle数据库传输到S3。数据将以Avro数据格式存储。

数据传输采用了以下技术:

  • Apache Sqoop 1.4.7
  • Oracle 12c
  • Amazon EMR 5.16.0(Hadoop发行版2.8.4)

Sqoop命令以Avro格式存储数据

ApacheSqoop1.4.7支持Avro数据文件。要以Avro格式存储数据,应将以下参数添加到Sqoop命令中:


--as-avrodatafile # imports data to Avro data files
 
--compression-codec snappy # use Hadoop codec (in this case - snappy)


Sqoop命令的模板如下:


sqoop import \
 
--bindir ./ \
 
--connect 'dbc:oracle:thin:<username>/password@<host>:<port>/<instance_name>' \
 
# 'jdbc:sqlserver://<host>:<port>;databasename=<database_name>' \ # SQL Server 2008 and higher
 
# 'jdbc:jtds:sqlserver://<host>:<port>/<database_name>' \ - #SQL Server 2000 \
 
--username <username> \
 
--driver <driver_class> # manually specify JDBC driver class to use
 
# example: --driver net.sourceforge.jtds.jdbc.Driver
 
--connection-manager # Specify connection manager class to use
 
# example: --connection-manager org.apache.sqoop.manager.SQLServerManager
 
--password <password> \
 
--num-mappers <n> \
 
--fields-terminated-by '\t' \ # sets the field separator character
 
--lines-terminated-by '\n' \  # sets the end-of-line character
 
--as-avrodatafile \           # imports data to Avro data files
 
--compression-codec snappy \  # use Hadoop codec (in this case - snappy)
 
--options-file <path_to_options_file> \
 
--split-by <field_name> \ # only used if number of mappers > 1
 
--target-dir s3://<path> \
 
# example for HDFS: --target-dir hdfs:///<path>
 
--null-string '' \
 
--null-non-string ''
 
--boundary-query # if used then --split-by should also be present


Oracle将数据转储到S3的Sqoop命令示例:

sqoop import \
 
-Dmapreduce.job.user.classpath.first=true \
 
--connect "jdbc:oracle:thin:user/password@host_address.com:1521/orcl" \
 
--num-mappers 1 \
 
--query 'select * from employee where $CONDITIONS' \
 
--target-dir s3://my-bucket/staging/employee \
 
--as-avrodatafile \
 
--compression-codec snappy \
 
--null-string '' \
 
--null-non-string ''


请注意,当您运行该命令时,目标目录不存在,否则Sqoop命令将失败。

您可以使用一个简单的AWSCLI命令来删除目标目录:


aws s3 rm s3://my-bucket/staging/employee --recursive
 
Oracle将数据转储到Hadoop的Sqoop命令示例:
 
sqoop import \
 
-Dmapreduce.job.user.classpath.first=true \
 
--connect "jdbc:oracle:thin:user/password@host_address.com:1521/orcl" \
 
--num-mappers 1 \
 
--query 'select * from employee where $CONDITIONS' \
 
--delete-target-dir
 
--target-dir /user/hive/warehouse/employee \
 
--as-avrodatafile \
 
--compression-codec snappy \
 
--null-string '' \
 
--null-non-string ''


注意,有一个参数, -删除-目标-dir,在删除目标目录的命令中,只有当目标目录位于HDFS中时才能使用。

Sqoop可以将数据传输到Hadoop(HDFS)或AWS(S3)。要查询传输的数据,需要在物理文件之上创建表。如果数据被传输到Hadoop,您可以创建Hive表。如果数据被传输到S3,则可以创建Hive表或AmazonAthena表。在这两种情况下,您都需要一个可以从物理文件中检索的表模式。从1.4.7版(EMR5.14.0)开始,Hadoop发行版:Amazon2.8.3 Sqoop自动检索表模式并将其存储在AutoGeneratedSchema.avsc文件在同一个文件夹中。如果使用Sqoop版本1.4.6(EMR 5.13.0的一部分)或更低版本,则可以手动检索表模式。

如果数据的目的地是HDFS,则可以使用以下命令检索表架构:


hadoop jar avro-tools-1.8.1.jar getschema /user/hive/warehouse/employee/part-m-00000.avro > employee.avsc


如果数据的目标是S3,则需要将Avro数据文件复制到本地文件系统,然后检索架构:


java -jar avro-tools-1.8.1.jar getschema part-m-00000.avro > employee.avsc


Avro-tools-1.8.1.jar是Avro工具的一部分,它提供用于处理Avro文件的CLI接口。

在检索表模式之后,可以使用它来进一步创建表。

在蜂巢中创建Avro表

要在Hive中创建Avro表(在Hadoop集群上或在EMR上),您必须提供从Avro数据文件中检索到的表模式位置:


CREATE TABLE employee


STORED AS AVRO


LOCATION '/user/hive/warehouse/employee'


TBLPROPERTIES('avro.schema.url'='hdfs:///user/hive/warehouse/avsc/employee.avsc');


还可以在S3:中指定表位置:


CREATE TABLE employee


STORED AS AVRO


location 's3://my-bucket/staging/employee'


TBLPROPERTIES ('avro.schema.url'='hdfs:///user/hive/warehouse/avsc/employee.avsc');


您甚至可以在S3中保留一个表模式:

CREATE EXTERNAL TABLE employee
 
STORED AS AVRO
 
location 's3:/my-bucket/staging/employee'
 
TBLPROPERTIES ('avro.schema.url'='s3://my-bucket/staging/avsc/employee.avsc');
 
Employee表的Avro架构如下所示:
 
{

 
"type" : "record",
 
"name" : "AutoGeneratedSchema",
 
"doc" : "Sqoop import of QueryResult",
 
"fields" : [ {

 
"name" : "ID",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "ID",
 
"sqlType" : "2"
 
}, {

 
"name" : "NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "AGE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "AGE",
 
"sqlType" : "2"
 
}, {

 
"name" : "GEN",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "GEN",
 
"sqlType" : "12"
 
}, {

 
"name" : "CREATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "CREATE_DATE",
 
"sqlType" : "93"
 
}, {

 
"name" : "PROCESS_NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "PROCESS_NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "UPDATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "UPDATE_DATE",
 
"sqlType" : "93"
 
} ],
 
"tableName" : "QueryResult"
 
}

注意,所有时间戳列都定义为短的.

重要*在蜂巢中创建的所有表都使用创建表语句是托管表。这意味着,如果删除表,HDFS或S3中的相应目录也将被删除。要保留数据为HDFS或S3,应将表创建为外部表:


CREATE EXTERNAL TABLE employee


在这种情况下,即使外部表被删除,HDFS或S3中的物理文件也将保持不变。

在AmazonAthena中创建一个Avro表

AmazonAthena不支持table属性。avro.schema.url-模式需要显式地添加在avro.schema.literal:


CREATE EXTERNAL TABLE employee
 
(
 
ID string,
 
NAME string,
 
AGE string,
 
GEN string,
 
CREATE_DATE bigint,
 
PROCESS_NAME string,
 
UPDATE_DATE bigint
 
)
 
STORED AS AVRO
 
LOCATION 's3://my-bucket/staging/employees'
 
TBLPROPERTIES (
 
'avro.schema.literal'='
 
{

 
"type" : "record",
 
"name" : "AutoGeneratedSchema",
 
"doc" : "Sqoop import of QueryResult",
 
"fields" : [ {

 
"name" : "ID",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "ID",
 
"sqlType" : "2"
 
}, {

 
"name" : "NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "AGE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "AGE",
 
"sqlType" : "2"
 
}, {

 
"name" : "GEN",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "GEN",
 
"sqlType" : "12"
 
}, {

 
"name" : "CREATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "CREATE_DATE",
 
"sqlType" : "93"
 
}, {

 
"name" : "PROCESS_NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "PROCESS_NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "UPDATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "UPDATE_DATE",
 
"sqlType" : "93"
 
} ],
 
"tableName" : "QueryResult"
 
}
 
');


注意,表定义中的所有时间戳列都定义为比金.对此的解释如下。

使用Avro中的时间戳

当Sqoop将数据从Oracle导入到Avro时(使用-as-avrodatafile)它以unix时间格式存储所有“时间戳”值。(时代),即短的.

在蜂巢

在Hive中创建Avro表时不会发生任何更改:


CREATE TABLE employee
 
STORED AS AVRO
 
LOCATION '/user/hive/warehouse/employee'
 
TBLPROPERTIES ('avro.schema.url'='hdfs:///user/hive/warehouse/avsc/employee.avsc');
 
查询数据时,只需将毫秒转换为弦:
 
from_unixtime(<Unix time column> div 1000)
 
不使用时间戳转换的结果数据集如下所示:
 
hive> select id, name, age, gen, create_date, process_name, update_date
 
> from employee limit 2;
 
OK
 
id  name    age  gen  create_date    process_name  update_date
 
--  ----    ---  ---  -----------    ------------  -----------
 
2   John    30   M    1538265652000  BACKFILL      1538269659000
 
3   Jennie  25   F    1538265652000  BACKFILL      1538269659000
 
使用时间戳转换生成的数据集如下所示:
 
hive> select
 
>     id, name, age, gen,
 
>     from_unixtime(create_date div 1000) as create_date,
 
>     process_name,
 
>     from_unixtime(update_date div 1000) as update_date
 
> from employee limit 2;
 
OK
 
id  name    age  gen  create_date          process_name  update_date
 
--  ----    ---  ---  -----------          ------------  -----------
 
2   John    30   M    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39
 
3   Jennie  25   F    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39
 
重要*在蜂巢中,如果保留字用作列名(如时间戳)您需要使用反引号来转义它们:
 
select from_unixtime(`timestamp` div 1000) as time_stamp
 
from employee;
 
在亚马逊雅典娜
创建雅典娜表时,所有短的 字段应创建为比金 在创建表中 语句(在Avro模式中不存在!):
 
CREATE EXTERNAL TABLE employee
 
(
 
ID string,
 
NAME string,
 
AGE string,
 
GEN string,
 
CREATE_DATE bigint,
 
PROCESS_NAME string,
 
UPDATE_DATE bigint
 
)
 
STORED AS AVRO
 
LOCATION 's3://my-bucket/staging/employee'
 
TBLPROPERTIES (
 
'avro.schema.literal'='
 
{

 
"type" : "record",
 
"name" : "AutoGeneratedSchema",
 
"doc" : "Sqoop import of QueryResult",
 
"fields" : [ {

 
"name" : "ID",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "ID",
 
"sqlType" : "2"
 
}, {

 
"name" : "NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "AGE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "AGE",
 
"sqlType" : "2"
 
}, {

 
"name" : "GEN",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "GEN",
 
"sqlType" : "12"
 
}, {

 
"name" : "CREATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "CREATE_DATE",
 
"sqlType" : "93"
 
}, {

 
"name" : "PROCESS_NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "PROCESS_NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "UPDATE_DATE",
 
"type" : [ "null", "long" ],
 
"default" : null,
 
"columnName" : "UPDATE_DATE",
 
"sqlType" : "93"
 
} ],
 
"tableName" : "QueryResult"
 
}
 
');
 
查询数据时,只需将毫秒转换为弦:
 
from_unixtime(<Unix time column> / 1000)
 
不使用时间戳转换的结果数据集如下所示:
 
select id, name, age, gen, create_date, process_name, update_date
 
from employee limit 2;
 
id  name    age  gen  create_date    process_name  update_date
 
--  ----    ---  ---  -----------    ------------  -----------
 
2   John    30 M    1538265652000  BACKFILL      1538269659000
 
3   Jennie  25 F    1538265652000  BACKFILL      1538269659000
 
使用时间戳转换生成的数据集如下所示:
 
select id, name, age, gen,
 
from_unixtime(create_date / 1000) as create_date,
 
process_name,
 
from_unixtime(update_date / 1000) as update_date
 
from employee limit 2;
 
id  name    age  gen  create_date              process_name  update_date
 
--  ----    ---  ---  -----------              ------------  -----------
 
2   John    30   M    2018-09-30 00:00:52.000  BACKFILL      2018-09-30 01:07:39.000
 
3   Jennie  25   F    2018-09-30 00:00:52.000  BACKFILL      2018-09-30 01:07:39.000
 
将时间戳存储为文本
如果您不希望每次运行查询时都从Unix时间转换时间戳,则可以通过向Sqoop添加以下参数将时间戳值存储为文本:
 
--map-column-java CREATE_DATE=String,UPDATE_DATE=String
 
在应用此参数并运行Sqoop之后,表模式如下所示:
 
{

 
"type" : "record",
 
"name" : "AutoGeneratedSchema",
 
"doc" : "Sqoop import of QueryResult",
 
"fields" : [ {

 
"name" : "ID",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "ID",
 
"sqlType" : "2"
 
}, {

 
"name" : "NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "AGE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "AGE",
 
"sqlType" : "2"
 
}, {

 
"name" : "GEN",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "GEN",
 
"sqlType" : "12"
 
}, {

 
"name" : "CREATE_DATE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "CREATE_DATE",
 
"sqlType" : "93"
 
}, {

 
"name" : "PROCESS_NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "PROCESS_NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "UPDATE_DATE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "UPDATE_DATE",
 
"sqlType" : "93"
 
} ],
 
"tableName" : "QueryResult"
 
}
 
注意,表架构中的时间戳列定义为弦.
用于以字符串格式存储时间戳字段的Sqoop命令:
 
sqoop import \
 
-Dmapreduce.job.user.classpath.first=true \
 
--connect "jdbc:oracle:thin:user/password@host_address.com:1521/orcl" \
 
--num-mappers 1 \
 
--query 'select * from employee where $CONDITIONS' \
 
--target-dir s3://my-bucket/staging/employee_ts_str \
 
--as-avrodatafile \
 
--compression-codec snappy \
 
--null-string '' \
 
--null-non-string '' \
 
--map-column-java CREATE_DATE=String,UPDATE_DATE=String
 
对于将数据转储到HDFS,Sqoop命令将与-目标-DIR参数:
 
--target-dir hdfs:.///user/hive/warehouse/employee_ts_str
 
 
在蜂巢
使用新表模式在Hive中创建一个新表:
 
CREATE TABLE employee_ts_str
 
STORED AS AVRO
 
LOCATION '/user/hive/warehouse/employee_ts_str'
 
TBLPROPERTIES('avro.schema.url'='hdfs:///user/hive/warehouse/avsc/employee_ts_str.avsc');
 
选择不使用时间戳转换的数据:
 
hive> select id, name, age, gen, create_date, process_name, update_date
 
> from employee_ts_str limit 2;
 
OK
 
id  name   age  gen  create_date          process_name  update_date
 
--  ----   ---  ---  -----------          ------------  -----------
 
2  John    30   M    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39
 
3  Jennie  25   F    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39
 
在亚马逊雅典娜
使用新的表模式在AmazonAthena中创建一个新表:
 
CREATE EXTERNAL TABLE employee_ts_str
 
(
 
ID string,
 
NAME string,
 
AGE string,
 
GEN string,
 
CREATE_DATE string,
 
PROCESS_NAME string,
 
UPDATE_DATE string
 
)
 
STORED AS AVRO
 
LOCATION 's3://my-bucket/staging/employee_ts_str'
 
TBLPROPERTIES (
 
'avro.schema.literal'='
 
{

 
"type" : "record",
 
"name" : "AutoGeneratedSchema",
 
"doc" : "Sqoop import of QueryResult",
 
"fields" : [ {

 
"name" : "ID",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "ID",
 
"sqlType" : "2"
 
}, {

 
"name" : "NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "AGE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "AGE",
 
"sqlType" : "2"
 
}, {

 
"name" : "GEN",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "GEN",
 
"sqlType" : "12"
 
}, {

 
"name" : "CREATE_DATE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "CREATE_DATE",
 
"sqlType" : "93"
 
}, {

 
"name" : "PROCESS_NAME",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "PROCESS_NAME",
 
"sqlType" : "12"
 
}, {

 
"name" : "UPDATE_DATE",
 
"type" : [ "null", "string" ],
 
"default" : null,
 
"columnName" : "UPDATE_DATE",
 
"sqlType" : "93"
 
} ],
 
"tableName" : "QueryResult"
 
}
 
');


注意,表定义中的时间戳列定义为.

选择不使用时间戳转换的数据:


select id, name, age, gen, create_date, process_name, update_date
 
from employee_ts_str limit 2;
 
id  name    age gen  create_date          process_name  update_date
 
--  ----   ---  ---  -----------          ------------  -----------
 
2   John    30  M    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39
 
3   Jennie  25  F    2018-09-30 00:00:52  BACKFILL      2018-09-30 01:07:39


Avro文件连接

如果有多个输出文件(有多个映射器),并且希望将它们组合成一个文件,则可以使用级联:


hadoop jar avro-tools-1.8.1.jar part-m-00000.avro part-m-00001.avro cons_file.avro

文件可以是本地文件,也可以是S3文件:

hadoop jar avro-tools-1.8.1.jar concat s3://my-bucket/staging/employee/part-m-00000.avro s3://my-bucket/staging/employee/part-m-00001.avro s3://my-bucket/staging/employee/employee_final.avro

摘要

本文解释了如何将数据从关系数据库(Oracle)传输到S3或HDFS,并使用ApacheSqoop将其存储在Avro数据文件中。本文还演示了如何使用Avro表模式,以及如何处理Avro中的时间戳字段(使它们保持在Unix Time(Epoch Time)或转换为数据类型)。

举报

相关推荐

0 条评论