数据导入HBase中常用的有三种方式:sqoop, HBase importTsv, HBase Bulkload,这三种方式,各有优缺点,下面将逐一介绍这三种方案的优缺点.
1. Sqoop直接导入
可以使用 SQOOP 将 MySQL 表的数据导入到 HBase 表中,指定 表的名称、列簇及 RowKey ,范
例如下所示:
参数含义解释:
知识拓展:如何使用SQOOP进行增量导入数据至HBase表,范例命令如下:
例一:
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--hbase-create-table \
--hbase-table tbl_users \
--column-family detail \
--hbase-row-key id \
--num-mappers 2
1 、 -D sqoop.hbase.add.row.key=true
是否将 rowkey 相关字段写入列族中,默认为 false ,默认情况下你将在列族中看不到任何 row
key 中的字段。注意,该参数必须放在 import 之后。
2 、 --hbase-create-table 如果 hbase 中该表不存在则创建
3 、 --hbase-table 对应的 hbase 表名
4 、 --hbase-row-key hbase 表中的 rowkey, 注意格式
5 、 --column-family hbase 表的列族
例二:
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_logs \
--hbase-create-table \
--hbase-table tag_logs \
--column-family detail \
--hbase-row-key id \
--num-mappers 20 \
--incremental lastmodified \
--check-column log_time \
相关增量导入参数说明:
使用 SQOOP 导入数据到 HBase 表中,有一个限制:
需要指定 RDBMs 表中的某个字段作为 HBase 表的 ROWKEY ,如果 HBase 表的 ROWKEY 为多
个字段组合,就无法指定,所以此种方式有时候不能使用。
2. HBase ImportTSV
ImportTSV 功能描述:
将tsv(也可以是csv,每行数据中各个字段使用分隔符分割)格式文本数据,加载到HBase表中。
1)、采用Put方式加载导入
2)、采用BulkLoad方式批量加载导入
使用如下命令,查看 HBase 官方自带工具类使用说明:
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar
执行上述命令提示如下信息:
其中 importtsv 就是将文本文件(比如 CSV 、 TSV 等格式)数据导入 HBase 表工具类,使用
说明如下:
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table.
WALPlayer: Replay WAL files.
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster.
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters.
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
The column names of the TSV data must be specified using the -
Dimporttsv.columns
option. This option takes the form of comma-separated column names, where
each
column name is either a simple column family, or a columnfamily:qualifier.
The special column name HBASE_ROW_KEY is used to designate that this column
should be used as the row key for each imported record.
To instead generate HFiles of data to prepare for a bulk data load, pass
the option:
-Dimporttsv.bulk.output=/path/for/output
'-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
For performance consider the following options:
-Dmapreduce.map.speculative=false
-Dmapreduce.reduce.speculative=false
分别演示采用直接 Put 方式和 HFile 文件方式将数据导入 HBase 表,命令如下:
2.1 直接导入Put方式
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf
${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0-
cdh5.14.0.jar \
importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:s
ite_global_ticket,detail:site_global_session,detail:global_user_id,detai
l:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log
_time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs
上述命令本质上运行一个 MapReduce 应用程序,将文本文件中每行数据转换封装到 Put
对象,然后插入到 HBase 表中。
回顾一下:
采用Put方式向HBase表中插入数据流程:
Put
-> WAL 预写日志
-> MemStore(内存) ,当达到一定大写Spill到磁盘上:
StoreFile(
HFile)
思考:
对海量数据插入,能否将数据直接保存为HFile文件,然后加载到HBase表中
2.2 转换为HFile文件,再加载至表
# 1. 生成HFILES文件
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf
${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0-
cdh5.14.0.jar \
importtsv \
-Dimporttsv.bulk.output=hdfs://bigdata-
cdh01.itcast.cn:8020/datas/output_hfile/tbl_logs \
-
Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:
site_global_ticket,detail:site_global_session,detail:global_user_id,det
ail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:
log_time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs
# 2. 将HFILE文件加载到表中
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \ completebulkload \
hdfs://bigdata-cdh01.itcast.cn:8020/datas/output_hfile/tbl_logs \ tbl_logs
缺点:
1 )、 ROWKEY 不能是组合主键 只能是某一个字段
2 )、当表中列很多时,书写 -Dimporttsv.columns 值时很麻烦,容易出错
3. HBase Bulkload
在大量数据需要写入 HBase 时,通常有 put 方式和 bulkLoad 两种方式。
1 、 put 方式为单条插入,在 put 数据时会先将数据的更新操作信息和数据信息 写入 WAL ,
在写入到 WAL 后, 数据就会被放到 MemStore 中 ,当 MemStore 满后数据就会被 flush 到磁盘
( 即形成 HFile 文件 ) ,在这种写操作过程会涉及到 flush 、 split 、 compaction 等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能 ,避免这些问题最好的方法就是使用 BulkLoad 的方式来加载数据到 HBase 中。
val put = new Put(rowKeyByts)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
table.put(put)
2 、 BulkLoader 利用 HBase 数据 按照 HFile 格式存储在 HDFS 的原理,使用 MapReduce 直接批量
生成 HFile 格式文件后, RegionServers 再将 HFile 文件移动到相应的 Region 目录下 。
1)、Extract,异构数据源数据导入到 HDFS 之上。
2)、Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
3)、Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。
1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。
2、减少接口调用的消耗,是一种快速写入的优化方式。
Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase: https://www.jianshu.com/p/b6c5a5ba30af
Bulkload 过程主要包括三部分:
1 )、 Extract ,异构数据源数据导入到 HDFS 之上。
2 )、 Transform ,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile 。
3 )、 Load , HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的
HDFS 目录上,该过程可能涉及到文件切分。
1 、不会触发 WAL 预写日志,当表还没有数据时进行数据导入不会产生 Flush 和 Split 。
2 、减少接口调用的消耗,是一种快速写入的优化方式。
Spark 读写 HBase 之使用 Spark 自带的 API 以及使用 Bulk Load 将大量数据导入 HBase :
https://www.jianshu.com/p/b6c5a5ba30af
Bulkload过程主要包括三部分:
1、从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS。
抽取数据到HDFS和Hbase并没有关系,所以大家可以选用自己擅长的方式进行。
2、利用MapReduce作业处理事先准备的数据 。
这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce
函数不需要我们考虑,由HBase提供。
该作业需要使用rowkey(行键)作为输出Key;KeyValue、Put或者Delete作为输出Value。
MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。
为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区
域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的
key值将输出分割开来。
HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。
3、告诉RegionServers数据的位置并导入数据。
这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是
completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导
入到相应的区域。