1.HdfsToHBaseMainClass:
从 HDFS 读取数据,MR 计算,结果存储于 HBase
package yqq.study.app02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* @Author yqq
* @Date 2021/11/17 10:46
* @Version 1.0
*/
public class HdfsToHBaseMainClass {
public static void main(String[] args) throws Exception{
String inputPath = "/usr/local";
Configuration conf = new Configuration(true);
//Reducer处理完结果还要插入到hbase中,所以需要指定hbase的zk集群
conf.set("hbase.zookeeper.quorum","node2,node3,node4");
//本地运行
conf.set("mapreduce.framework.name","local");
Job job = Job.getInstance(conf);
job.setJarByClass(HdfsToHBaseMainClass.class);
job.setJobName("HDFS-TO-HBASE-WORDCOUNT");
//设置读取文件路径
FileInputFormat.addInputPath(job,new Path(inputPath));
//设置Mapper类
job.setMapperClass(HdfsHBaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reduce类和输出表wordcount
TableMapReduceUtil.initTableReducerJob("wordcount",HdfsToHBaseReducer.class,
job,null,null,null,null,false);
job.waitForCompletion(true);
}
}
2.HdfsHBaseMapper:
package yqq.study.app02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author yqq
* @Date 2021/11/17 11:03
* @Version 1.0
*/
public class HdfsHBaseMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
private Text keyOut = new Text();
private LongWritable valueOut = new LongWritable(1);
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//hello neusoft 1
String line = value.toString().trim();
String[] words = line.split(" ");
for (String word:words){
keyOut.set(word);
context.write(keyOut,valueOut);
}
}
}
3.HdfsToHBaseReducer:
package yqq.study.app02;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* @Author yqq
* @Date 2021/11/17 11:49
* @Version 1.0
*/
public class HdfsToHBaseReducer extends TableReducer<Text, LongWritable, ImmutableBytesWritable> {
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//hello 1,1,1,1,.......
long sum = 0;
for (LongWritable value:values){
sum+=value.get();
}
//hello 1000000 ->workcount表: rowkey->hello cf:count->100000
Put put = new Put(key.toString().getBytes());
put.add("cf".getBytes(),"count".getBytes(), Bytes.toBytes(sum));
//输出
context.write(null,put);
}
}
4.HDFS部分单词数据
hello neusoft 99970
hello neusoft 99971
hello neusoft 99972
hello neusoft 99973
hello neusoft 99974
hello neusoft 99975
hello neusoft 99976
hello neusoft 99977
hello neusoft 99978
hello neusoft 99979
hello neusoft 99980
hello neusoft 99981
hello neusoft 99982
hello neusoft 99983
hello neusoft 99984
hello neusoft 99985
hello neusoft 99986
hello neusoft 99987
hello neusoft 99988
hello neusoft 99989
hello neusoft 99990
hello neusoft 99991
hello neusoft 99992
hello neusoft 99993
hello neusoft 99994
hello neusoft 99995
hello neusoft 99996
hello neusoft 99997
hello neusoft 99998
hello neusoft 99999
hello neusoft 100000
5.HBase表中部分数据
99975 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99976 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99977 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99978 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99979 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
9998 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99980 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99981 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99982 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99983 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99984 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99985 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99986 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99987 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99988 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99989 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
9999 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99990 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99991 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99992 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99993 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99994 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99995 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99996 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99997 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99998 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
99999 column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x00\x00\x01
hello column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x01\x86\xA0
neusoft column=cf:count, timestamp=1637128222027, value=\x00\x00\x00\x00\x00\x01\x86\xA0
100002 row(s) in 72.9090 seconds