hbase的协处理器功能能够让开发者把自己的代码放到hbase里执行,简而言之就是让hbase在运行期间遵守开发者编写的代码逻辑进行功能增加。
这篇文章主要是将put、delete这两项DML操作进行记录日志,同时对create这项DDL建表语句也进行记录,这两项在协处理器中分属不同的接口控制,https://hbase.apache.org/devapidocs/index.html,hbase的官方文档中有详细的类和接口介绍,协处理器在org.apache.hadoop.hbase.coprocessor类下,put、delete相关操作在regionobserver接口下。create操作在masterobserver下。
可以看到regionobserver下的method很多,这些post开头的就是指在相应操作结束后会调用的方法,同理在操作执行前进行操作的方法也有,是pre开头。
既然是要记录put、delete操作,那我们就重载postput、postdelete方法,增加记录命令信息、写入文件的代码即可实现记录功能。
一、环境测试:
我使用的是林子雨老师的大数据镜像文件大数据Linux实验环境虚拟机镜像文件基本集成了相关组件,唯一缺点就是伪分布式的,如果要进行业务验证还是得另开一个集群测试。
首先eclipse新建maven项目,eclipse+HBASE开发环境搭建基本按这个步骤来,创建好之后,可以先使用HBase协处理器实现两个表数据的同步插入步骤测试下环境和基础的协处理器。这里也可以看到,协处理器的代码非常简单,postput方法由hbase调用时会自动获得这三个参数(ObserverContext e, Put put, WALEdit edit)其中e是当前执行操作的region的环境信息,是获取信息的关键参数,这里暂时不提;put就直接当成一个结构体看,里面包含了put操作的各项信息,包括rowkey、family、column、value等等,edit目前没用上。
在实现了上述代码之后,我们应该能够实现向student里put数据之后,cw:student会自动put相同的数据实现同步操作,可以在后面添加:
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
// 获取表
Table table = e.getEnvironment().getTable(TableName.valueOf("cw:student"));
// 删除数据
table.delete(delete);
// 关闭表
table.close();
}
至此已可以实现同步put、delete
但现在有两个问题:
1、我们要记录信息,并不是同步
2、这是对单表的操作、如何对全库进行操作?
二、记录put操作:
对于记录信息,我们需要提取put、delete的全部信息,而put支持toString()方法,我们尝试输出put.toString(),结果如下:
put ‘student’,‘22222’,‘info:name’,‘1234567’
{“totalColumns”:1,“row”:“22222”,“families”:{“info”: [{“qualifier”:“name”, “vlen”:7,“tag”:[],“timestamp”:1640328174554}]}}
虽然我们可以看到输出中由row、families、column信息,但缺少了value信息,但又存在一个vlen指出了value的长度,这意味着put里包含了value的信息只是我们提取方法不对。
JSONObject jso1=JSON.parseObject(put.toJSON());
String str_row=jso1.getString("row");
JSONObject jso2=JSON.parseObject(put.toJSON());
String str_fam=jso2.getString("families").substring(2, jso2.getString("families").indexOf(":")-1);
String str_col=jso2.getString("families").substring(jso2.getString("families").indexOf("qualifier")+12,jso2.getString("families").indexOf("vlen")-3);
String str_ts=jso2.getString("families").substring(jso2.getString("families").indexOf("timestamp")+11,jso2.getString("families").indexOf("timestamp")+24);
// timestamp转日期
// Long ts=Long.parseLong(str_ts);
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date date = new Date(ts);
// str_ts = simpleDateFormat.format(date);
final List<Cell> cells = put.get(Bytes.toBytes(str_fam), Bytes.toBytes(str_col));
// we assume that there is only one cell in this Put transaction
String valueAsString = new String(value);
String str_val = valueAsString;
至此,我们就从put中提取了str_row,str_fam,str_col,str_val,这四个信息,将这四条信息组合起来就得到一条完整的put命令了,后续你想写入本地还是hdfs还是kafka都随便,但这里似乎少了些什么。。
对,缺了表名,这是因为我们这个jar包是通过单表动态加载的,这就使得只有这段代码无需考虑到底是哪一个表在put,反正只有一个表。但在业务中肯定存在很多表,对每个表的操作读取之后记录会混在一起,要不就每个表加载一个与众不同jar包,不同的表写入不同的地方,但一旦表的数量级太大,或者跨月重新建表,你都得重新加载一次jar包,这当然是不推荐的。
所以首先我们来解决第二个问题:如何对全库进行操作?
三、全局加载+表名读取
全局加载jar包的操作网上很多,就是在配置文件里添加配置,然后将jar包放在lib/下,这样hbase的所有表都是自动加载这个jar包,包括新建的表。当然程序也得改改了,需要删除对cw:student的同步操作,也就是那三行代码。
全局加载之后我们就可以着手表名的读取了,因为java里的put属于一个信息集合,其中是不包含表名信息的,表名可以通过e.getEnvironment().getTable(TableName.valueOf("cw:student")
成为一个表对象,然后table.put(put)调用put函数,所以put属于是表尽可用,所以可以确定其中肯定不包含表名信息。
表名信息在Region中,而Region信息在enviroment中,这就可以通过参数e来提取表名了,String tablename=(e.getEnvironment().getRegion().getRegionInfo().getTable()).toString()
获得表名后就可以搭配put的四个信息组成一个完整的命令了。
四、记录create操作
因为delete操作和put操作基本相同,而且delete操作不需要读取value信息,所以比较简单,就不再赘述,这里主要介绍下create操作的记录。
create属于DDL命令,相关方法在协处理器的MasterRegion下,新建一个类继承BaseMasterObserver,再重载postCreateTable方法来对建表进行读取,其实这个读取比较简单,因为其调用时传入参数的desc中直接可以用getTableName()获得表名,直接记录下来就可以了。
在全局加载的时候也需要额外配置这个类
五、总结
上述操作其实总结下来也不复杂,后续生产到kafka等其他集群抓,或者写到文件系统换种方式传输都可以,不过如果要使用第三方的包都得放到lib/下,或者都打在一个jar包里。
难点其实都在于里面各类方法的调用,在逻辑方面其实比较简单,hbase的这个功能也很有意思,封装好了环境,只提供给你功能实现的地方。不过注意不要直接在业务上调试,虽然代码逻辑简单,但是jar包一旦漏传,hbase加载完协处理器代码之后master就无法启动了。