Greenplum Database(下面简称GPDB)主要提供了两种方式:PXF和GPHDFS。虽然二者都利用了GPDB的外部表功能,但是前者需要额外安装部署PXF服务器进程,在复杂的IT环境中流程繁琐、极易出错,终端用户体验不佳。所以,在初期规划和实现HashData数据仓库访问HDFS的功能时,即采用GPHDFS的技术路线:通过增加一种访问HDFS的外部表协议,让各个计算节点直连HDFS集群,不通过任何中间节点或者系统,大幅降低使用门槛的同时,保证两个系统之间数据交换的效率。
技术架构层面,HashData的GPHDFS实现跟GPDB的GPHDFS是一致的,更多的差异是体现在实现细节层面。首先,我们采用C++原生实现的libhdfs3作为访问HDFS的客户端,在规避了安装、部署、配置Java运行环境以及Hadoop客户端等纷繁复杂、极易出错环节的同时,降低系统CPU和内存使用率。
其次,引入类似Oracle数据源配置文件的gphdfs.conf文件,将多个HDFS系统相关的访问信息集中起来,简化访问配置的管理;修改HDFS外部表定义的语法,省略大量的配置选项(放到gphdfs.conf文件),大幅降低用户的使用难度。因为解耦了Hadoop客户端(包括环境变量的配置)和HDFS系统的对应关系,新的GPHDFS能够在同一条SQL语句中同时访问多个HDFS(这些HDFS系统可以由多个不同的Hadoop厂商提供)外部表,极大方便复杂大数据系统中的多源数据融合。
https://mp.weixin.qq.com/s/S-8RcLThuuHVaUtjhafdNg
#include <iostream>
#include <stdio.h>
#include "hdfs.h"
#include <string.h>
#include <stdlib.h>
#define PRINT(args) printf("[%s] - %s,%d,%s", __FILE__, __FUNCTION__, __LINE__, args);
#define INVALID_POINT(p) p == NULL
// desc: 读文件
// param: 参数一/文件系统的句柄 参数二/hdfs上的路径
// return: 读取成功返回true,读取失败返回false
bool hdfs_read(const hdfsFS *pfsdist, const char *hadoop_Path)
{
//获取hdfs上的文件信息(文件大小)
hdfsFileInfo *hdfs_info = (hdfsFileInfo *)malloc(sizeof(hdfsFileInfo));
hdfs_info = hdfsGetPathInfo(*pfsdist, hadoop_Path);
if (INVALID_POINT(hdfs_info))
{
PRINT("file does not exist\n");
return false;
}
long int file_size = (hdfs_info->mSize) + 1;
free(hdfs_info);
hdfs_info = NULL;
//分配内存,准备读取文件
void *buffer = malloc(file_size);
if (INVALID_POINT(buffer))
{
PRINT("malloc rror\n");
return false;
}
else
{
memset(buffer, (int)'\0', file_size);
//打开文件
hdfsFile handle_hdfsFile_w = hdfsOpenFile(*pfsdist, hadoop_Path, O_RDONLY, 0, 0, 0);
if (INVALID_POINT(handle_hdfsFile_w))
{
PRINT("Failed to open file!\n");
free(buffer);
buffer = NULL;
return false;
}
else
{
//读取文件
tSize num_read_bytes = hdfsRead(*pfsdist, handle_hdfsFile_w, buffer, file_size);
if ((file_size - 1) == num_read_bytes){
PRINT("read file successful\n");
}else{
PRINT("read file failure\n");
}
printf("data -> %s\n", (char *)buffer);
free(buffer);
buffer = NULL;
hdfsCloseFile(*pfsdist, handle_hdfsFile_w);
return true;
}
}
}
// desc: 写文件
// param: 参数一/文件系统的句柄 参数二/hdfs上的路径 参数三/要写入文件的内容
// return: 写入成功返回true,写入失败返回false
bool hdfs_write(const hdfsFS *pfsdist, const char *hadoop_Path, const char *buffer)
{
//打开文件
hdfsFile handle_hdfsFile = hdfsOpenFile(*pfsdist, hadoop_Path, O_WRONLY | O_CREAT, 0, 0, 0);
if (INVALID_POINT(handle_hdfsFile))
{
PRINT("Failed to open file!\n");
return false;
}
//读取文件
tSize num_written_bytes = hdfsWrite(*pfsdist, handle_hdfsFile, (void *)buffer, strlen(buffer));
if (num_written_bytes == (strlen(buffer))){
PRINT("write file successful\n");
}else{
PRINT("write file failure\n");
}
hdfsCloseFile(*pfsdist, handle_hdfsFile);
return true;
}
int main()
{
PRINT("start the application\n");
hdfsFS pfsdist = hdfsConnect("hdfs://mycluster", 8020);
if (INVALID_POINT(pfsdist))
{
PRINT("hdfsConnect error\n");
}else{
PRINT("connect successful\n");
}
const char *hadoop_Path = "/data/bigdata/input/xsy_t.txt";
hdfs_write(&pfsdist,hadoop_Path,"xsy_t -> xsy_t");
hdfs_read(&pfsdist, hadoop_Path);
if (!hdfsDisconnect(pfsdist))
PRINT("disconnect successful\n");
return 0;
}