1.将数据从本地存储HBase表中
package yqq.study.app03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
/**
* @Author yqq
* @Date 2021/11/17 14:36
* @Version 1.0
*/
public class InsertSentence {
private HTable table;
private String tableName = "sentence";
public void init() throws IOException {
Configuration conf = HBaseConfiguration.create();
//指定zk集群
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
table = new HTable(conf,tableName);
}
public void close() throws IOException {
if(table!=null)
table.close();
}
public void insertSentence() throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new FileInputStream(System.getProperty("user.dir")+"/hello.txt")));
String line = null;
int rowkey = 1;
while ((line = br.readLine())!=null){
Put put = new Put(Bytes.toBytes(rowkey));
put.add("cf".getBytes(),"line".getBytes(),Bytes.toBytes(line));
table.put(put);
rowkey ++;
}
if(br != null)
br.close();
}
}
部分数据
\x00\x01A\xC1 column=cf:line, timestamp=1637139080362, value=hello neusoft 82369
\x00\x01A\xC2 column=cf:line, timestamp=1637139080365, value=hello neusoft 82370
\x00\x01A\xC3 column=cf:line, timestamp=1637139080376, value=hello neusoft 82371
\x00\x01A\xC4 column=cf:line, timestamp=1637139080385, value=hello neusoft 82372
\x00\x01A\xC5 column=cf:line, timestamp=1637139080397, value=hello neusoft 82373
\x00\x01A\xC6 column=cf:line, timestamp=1637139080401, value=hello neusoft 82374
\x00\x01A\xC7 column=cf:line, timestamp=1637139080405, value=hello neusoft 82375
\x00\x01A\xC8 column=cf:line, timestamp=1637139080407, value=hello neusoft 82376
\x00\x01A\xC9 column=cf:line, timestamp=1637139080409, value=hello neusoft 82377
\x00\x01A\xCA column=cf:line, timestamp=1637139080412, value=hello neusoft 82378
\x00\x01A\xCB column=cf:line, timestamp=1637139080413, value=hello neusoft 82379
\x00\x01A\xCC column=cf:line, timestamp=1637139080415, value=hello neusoft 82380
\x00\x01A\xCD column=cf:line, timestamp=1637139080417, value=hello neusoft 82381
\x00\x01A\xCE column=cf:line, timestamp=1637139080419, value=hello neusoft 82382
\x00\x01A\xCF column=cf:line, timestamp=1637139080424, value=hello neusoft 82383
\x00\x01A\xD0 column=cf:line, timestamp=1637139080434, value=hello neusoft 82384
2.作业提交类
package yqq.study.app03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
/**
* @Author yqq
* @Date 2021/11/17 17:45
* @Version 1.0
*/
public class HBase2MR2HBaseMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf,"HBASE2MR2HBASE");
//初始化Mapper
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("sentence",
scan,
HBase2MR2HBaseMapper.class,
Text.class,
LongWritable.class,
job,
false);
TableMapReduceUtil.initTableReducerJob("wordcount",
HBase2MR2HBaseReducer.class,
job,
null,
null,
null,
null,
false);
job.waitForCompletion(true);
}
}
3.Mapper类
package yqq.study.app03;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* @Author yqq
* @Date 2021/11/17 18:01
* @Version 1.0
*/
public class HBase2MR2HBaseMapper extends TableMapper<Text, LongWritable>{
private Text keyOut = new Text();
private LongWritable valOut = new LongWritable(1);
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//key表示hbase的表sentence中的rowkey
//value:获取对应查询结果的Result对象
//获取sentence表的cf:line单元中的内容
Cell cell = value.getColumnLatestCell("cf".getBytes(),"line".getBytes());
//获取单元格中的数据
String line = Bytes.toString(CellUtil.cloneValue(cell));
String[] words = line.split(" ");
for (String word:words){
keyOut.set(word);
context.write(keyOut,valOut);
}
}
}
4.Reducer类
package yqq.study.app03;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* @Author yqq
* @Date 2021/11/17 18:14
* @Version 1.0
*/
public class HBase2MR2HBaseReducer extends TableReducer<Text, LongWritable,Text> {
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value:values){
sum += value.get();
}
//单词作为rowkey,sum作为value
Put put = new Put(key.toString().getBytes());
put.add("cf".getBytes(),"count".getBytes(), Bytes.toBytes(sum));
context.write(null,put);
}
}
部分数据
11450 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11451 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11452 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11453 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11454 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11455 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11456 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11457 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11458 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11459 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
1146 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11460 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11461 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11462 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11463 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11464 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11465 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11466 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11467 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01
11468 column=cf:count, timestamp=1637159843985, value=\x00\x00\x00\x00\x00\x00\x00\x01