0
点赞
收藏
分享

微信扫一扫

k8s集群中 spark访问hbase中数据


​​

全栈工程师开发手册 (作者:栾鹏)
​​ 架构系列文章​​


hbase数据分区是按照region进行的,分区的location就是各个region的location。那么后续分配executor时可以按照region所在机器分配对应executor,直接在本机读取数据计算。

我们先来往hbase里面写两个数据

hbase shell
whoami
list # 如果list出错,说正在初始化中,要等待,可以在dashboard中看
exists 't1'
create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
put 't1','rowkey001','f1:col1','value01'
put 't1','rowkey002','f1:col1','value01'
put 't1','rowkey003','f1:col1','value01'
put 't1','rowkey004','f1:col1','value01'

我们使用python来实现连接

python代码如下

# 这些函数只在driver驱动中执行.  只有DRR的mapreduce才会在exec程序中执行
# 需要设置hbase的hostname和subdomain 并在dns增加一条重定向

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


spark = SparkSession.builder.appName("hbase_test").getOrCreate()
sc = spark.sparkContext


zookeeper = 'zookeeper-1.cloudai-2.svc.cluster.local,zookeeper-2.cloudai-2.svc.cluster.local,zookeeper-3.cloudai-2.svc.cluster.local'
table = 't1'


# # 读取
conf = {
"hbase.zookeeper.quorum": zookeeper,
"hbase.zookeeper.property.clientPort":"2181",
"hbase.regionserver.port":"60010",
# "hbase.master":"10.233.9.21:60000",
# "hbase.master.port":"60000",
"zookeeper.znode.parent":"/hbase",
"hbase.defaults.for.version.skip":"true",
"hbase.mapreduce.inputtable": table
}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print((k, v))

在代码中我们配置了zookeeper的地址, spark会先访问spark来获取hbase的所有信息,包含master,regionserver的信息.

为了保证正确,我们可以先自己查询一遍看看是否正确.


进入zookeeper的pod,

./bin/zkCli.sh          # 计入zookeeper命令行

ls / # 查看zookeeper根目录存储:
包含zookeeper hbase两个文件夹(数据节点)

ls2 /zookeeper 查看zookeeper节点目录

ls2 /hbase 查看hbase节点信息:
[meta-region-server, backup-masters, table, draining, region-in-transition, table-lock, running, master, namespace, hbaseid, online-snapshot, replication, splitWAL, recovering-regions, rs, flush-table-proc]

查看hbase集群在zookeeper记录的信息,比如:regionserver1-slave-1,regionserver2-slave-2
ls2 /hbase/rs
[hbase-master-deployment-1-ddb859944-ctbrm,16201,1541399059310]
这里可以看出, 记录在zookeeper中的数据是pod的主机名hostname 而不是ip

表锁节点会有所有表。
[zk: localhost:2181(CONNECTED) 10] ls2 /hbase/table-lock
[TestTable, product, device, t1, rec_history, hbase:namespace, face_detect]
cZxid = 0x100000009
ctime = Wed Sep 19 07:25:49 UTC 2018
mZxid = 0x100000009
mtime = Wed Sep 19 07:25:49 UTC 2018
pZxid = 0x300000749
cversion = 21
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 7


查看所有表
[zk: localhost:2181(CONNECTED) 11] ls2 /hbase/table
[TestTable, product, t1, rec_history, device, hbase:meta, hbase:namespace, face_detect]
cZxid = 0x100000006
ctime = Wed Sep 19 07:25:49 UTC 2018
mZxid = 0x100000006
mtime = Wed Sep 19 07:25:49 UTC 2018
pZxid = 0x30000074b
cversion = 22
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 8


查看hbase的meta表信息,包含server信息。
[zk: localhost:2181(CONNECTED) 14] get /hbase/table/hbase:meta

?master:60000o,?OG?e?PBUF
cZxid = 0x100000029
ctime = Wed Sep 19 07:25:57 UTC 2018
mZxid = 0x10000002a
mtime = Wed Sep 19 07:25:57 UTC 2018
pZxid = 0x100000029
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0

由于hbase的通信是通过hostname连接的. 所以我们在python代码中设置了将hostname写入到hosts文件,但是这个代码只在driver驱动中执行,只有DRR数据才会分解到exec驱动中执行. 所以只有driver的pod中成功修改了hosts 而exec仍然无法解析hostname

有几种方案:

1 将hbase的hostname和pod的ip想办法能配置到镜像中 因为pod的ip可能是变化的,所以必须以配置的方式添加进去.

2 添加代理,或者自定义DNS 将hbase的hostname代理到pod的ip

3 重定向 将hostname 重定向到pod的访问地址

{pod-name}.{subdomain}.{namespace}.svc.cluster.local

4 创建service-headless服务的名称和hbase的hostname设置成一样,并且让spark和hbase在同一个命名空间下 ( 这种最简单 )

我们先按照第4种方案在hbase所在的命名空间创建和hbase的pod的hostname相同的service-headless,并且让spark也在这个命名空间下运行

apiVersion: v1
kind: Service
metadata:
name: hbase-master-1
namespace: cloudai-2
spec:
selector:
app: hbase-master-1
clusterIP: None
ports:
- name: rpc
port: 60000
targetPort: 60000 # 配置的hbase的链接端口
- name: info
port: 60001 # hbase运行状态的网页查询端口
targetPort: 60001
- name: region
port: 60010
targetPort: 60010
- name: regioninfo
port: 60011
targetPort: 60011
- name: thrift
port: 9090 # thrift服务器ip,让外界通过thrift来访问hbase
targetPort: 9090

如果我们想让spark在一个独立的命名空间运行,就要另寻它法了.

创建hbase的deployment时指定pod的hostname和subdomain

hostname: hbase-master-1  # 设定主机名
subdomain: hbase-headless # 设定子域名

为subdomain建一个service-headless

apiVersion: v1
kind: Service
metadata:
name: hbase-headless
namespace: cloudai-2
spec:
selector:
app: hbase-master-1
clusterIP: None
ports:
- name: rpc
port: 60000
targetPort: 60000 # 配置的hbase的链接端口
- name: info
port: 60001 # hbase运行状态的网页查询端口
targetPort: 60001
- name: region
port: 60010
targetPort: 60010
- name: regioninfo
port: 60011
targetPort: 60011
- name: thrift
port: 9090 # thrift服务器ip,让外界通过thrift来访问hbase
targetPort: 9090

这样就能访问hbase的pod了,只不过域名必须是

hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local

下面我们来设置代理, 在使用hbase的hostname访问pod时也能正常访问到hbase的pod.

我这里在dns中增加了一个重定向解析记录

我这里的dns使用的coredns,在创建Corefile的configmap时使用下面的方法

apiVersion: v1
data:
Corefile: |
.:53 {
errors
health
rewrite name hbase-master-1 hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
upstream
fallthrough in-addr.arpa ip6.arpa
}
prometheus :9153
proxy . /etc/resolv.conf
cache 30
reload
}
kind: ConfigMap
metadata:
name: coredns
namespace: kube-system

这样dns在解析hbase-master-1时返回的是hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local的ip




举报

相关推荐

0 条评论