0
点赞
收藏
分享

微信扫一扫

HBase扫描器与过滤器


扫描器

HBase在扫描数据的时候,使用scanner表扫描器。

HTable通过一个Scan实例,调用getScanner(scan)来获取扫描器。可以配置扫描起止位以及其他的过滤条件。

通过迭代器返回查询结果,使用起来虽然不是很方便,不过并不复杂。

但是这里有一点可能被忽略的地方,就是返回的scanner迭代器,每次调用next的获取下一条记录的时候,默认配置下会访问一次RegionServer。这在网络不是很好的情况下,对性能的影响是很大的,建议配置扫描器缓存

扫描器缓存

​hbase.client.scanner.caching​​配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。

有三个地方可以对其进行配置:

1、在HBase的conf配置文件中进行配置。

2、通过调用HTable.setScannerCaching(int scannerCaching)进行配置。

3、通过调用Scan.setCaching(int caching)进行配置。

三者的优先级越来越高。

扫描器Demo

package Scanner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class Scanner {

private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;

private Scanner(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;

conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);

hConn = HConnectionManager.createConnection(conf);
}

public void scanTable(String tablename){
Scan scan = new Scan();
//设置扫描缓存
scan.setCaching(1000);
try {
HTableInterface table = hConn.getTable(tablename);

ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}

} catch (IOException e) {
e.printStackTrace();
}
}

public void format(Result result){
//行键
String rowkey = Bytes.toString(result.getRow());

//Return an cells of a Result as an array of KeyValues
KeyValue[] kvs = result.raw();

for (KeyValue kv : kvs) {
//列族名
String family = Bytes.toString(kv.getFamily());
//列名
String qualifier = Bytes.toString(kv.getQualifier());

String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));

System.out.println("rowkey->"+rowkey+", family->"
+family+", qualifier->"+qualifier);
System.out.println("value->"+value);

}
}

//命令行 scan 'students'
public static void main(String[] args) throws IOException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
//初始化
Scanner conn = new Scanner(rootDir,zkServer,port);

conn.scanTable("students");
}
}

过滤器

1、使用过滤器可以提高操作表的效率, HBase中两种数据读取函数get()和scan()都支持过滤器,支持直接访问和通过指定起止行键来访问,但是缺少细粒度的筛选功能(如基于正则表达式对行键或值进行筛选的功能)。

2、可以使用预定义好的过滤器或者是实现自定义过滤器。

3、 过滤器在客户端创建,通过RPC传送到服务器端,在服务器端执行过滤操作,把数据返回给客户端。

过滤器分类

1、Comparision Filters(比较过滤器)

RowFilter
FamilyFilter
QualifierFilter
ValueFilter
DependentColumnFilter

2、Dedicated Filters(专用过滤器)

SingleColumnValueFilter
SingleColumnValueExcludeFilter
PrefixFilter
PageFilter
KeyOnlyFilter
FirstKeyOnlyFilter
TimestampsFilter
RandomRowFilter

3、 Decorating Filters(附加过滤器)

SkipFilter
WhileMatchFilters

过滤器Demo

package Filter;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;


public class FilterDemo {

private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;

private FilterDemo(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;

conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);

hConn = HConnectionManager.createConnection(conf);
}


//比较过滤器
public void filterTable(String tablename){
Scan scan = new Scan();
scan.setCaching(1000);

RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("Tom")));
scan.setFilter(filter);
try {
HTableInterface table = hConn.getTable(tablename);

ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}

} catch (IOException e) {
e.printStackTrace();
}
}

public void filterTableRegex(String tablename){
Scan scan = new Scan();
scan.setCaching(1000);

RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("T\\w+"));
scan.setFilter(filter);
try {
HTableInterface table = hConn.getTable(tablename);

ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
format(result);
}

} catch (IOException e) {
e.printStackTrace();
}
}

//专用过滤器
public void filterTablePage(String tablename){
PageFilter pageFilter = new PageFilter(4);//每一行3行记录,预留一行。
byte[] lastRow = null;//记录最后一次读到的rowkey作为下一次查询的rowkey
int pageCount = 0;//表示第几页
try {
HTableInterface table = hConn.getTable(tablename);

while(++pageCount>0){
System.out.println("pageCount = " + pageCount);
Scan scan = new Scan();
scan.setFilter(pageFilter);
if (lastRow != null) {
scan.setStartRow(lastRow);
}

ResultScanner resultScanner = table.getScanner(scan);
int count=0;
for (Result result : resultScanner) {

lastRow = result.getRow();
if (++count>3) {
break;
}
format(result);

}
if(count<3){//当某次读取的数据小于3表示要结束了,终止循环
break;
}

}

} catch (IOException e) {
e.printStackTrace();
}

}



public void format(Result result){
//行键
String rowkey = Bytes.toString(result.getRow());

//Return an cells of a Result as an array of KeyValues
KeyValue[] kvs = result.raw();

for (KeyValue kv : kvs) {
//列族名
String family = Bytes.toString(kv.getFamily());
//列名
String qualifier = Bytes.toString(kv.getQualifier());

String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)));

System.out.println("rowkey->"+rowkey+", family->"
+family+", qualifier->"+qualifier);
System.out.println("value->"+value);

}
}

public static void main(String[] args) throws IOException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
//初始化
FilterDemo filterDemo = new FilterDemo(rootDir, zkServer, port);
//filterDemo.filterTable("students");

//filterDemo.filterTableRegex("students");
filterDemo.filterTablePage("students");
}

}



举报

相关推荐

HBase专用过滤器

拦截器与过滤器

Servlet与过滤器

过滤器中/与/*

过滤器 & 监听器

0 条评论