0
点赞
收藏
分享

微信扫一扫

JavaSpark 读取 HBASE

中间件小哥 2022-11-24 阅读 166

1、pom.xml

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

2、SparkHbaseDemo1.java

package com.jun.hbase_prac;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

import java.io.IOException;

public class SparkHbaseDemo1 {
    public static void main(String[] args) throws IOException {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext jsc = new JavaSparkContext(conf);

        String tableName = "t2";

        // 全表扫描
        Scan scan = new Scan();
        byte[] cf1 = "info".getBytes();
        byte[] cf2 = "cf1".getBytes();

        byte[] cn1 = "name".getBytes();
        byte[] cn2 = "gender".getBytes();
        byte[] cn3 = "cn1".getBytes();

        scan.addFamily(cf1);
        scan.addColumn(cf1, cn1);
        scan.addColumn(cf1, cn2);

        // 其他列族
        scan.addFamily(cf2);
        scan.addColumn(cf2, cn3);

        // 将scan编码
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());

        Configuration configuration = HBaseConfiguration.create();
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);
        configuration.set(TableInputFormat.SCAN, scanToString);

        // 配置文件
        // ZooKeeper集群
//        configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
        configuration.addResource(new Path("core-site.xml"));
        configuration.addResource(new Path("hbase-site.xml"));

        // 将HBase数据转成RDD
        JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = jsc.newAPIHadoopRDD(configuration, TableInputFormat.class,
                ImmutableBytesWritable.class, Result.class);
        System.out.println(HBaseRdd.count());

        // 将 RDD 转成 String
        JavaRDD<String> HBaseResult = HBaseRdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, String>() {
            @Override
            public String call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception {
                Result result = tuple2._2;
                String rowKey = Bytes.toString(result.getRow());
                String name = Bytes.toString(result.getValue("info".getBytes(), "name".getBytes()));
                String gender = Bytes.toString(result.getValue("info".getBytes(), "gender".getBytes()));

                String cn1 = Bytes.toString(result.getValue("cf1".getBytes(), "cn1".getBytes()));

                System.out.println(tuple2);

                return "rowKey: " + rowKey + " name: " + name + " gender: " + gender + " cn1: " + cn1;
            }
        });

        System.out.println(HBaseResult.collect());  // [rowKey: 10001 name: rose gender: female cn1: value1]

        jsc.stop();
    }
}

举报

相关推荐

0 条评论