gphdfs_fopen函数通过调用hadoop_env.sh脚本建立Hadoop环境变量(env_cmd环境变量命令为 source $GPHOME/gp_hadoop_connector_jardir/hadoop_env.sh
);确定java执行命令(java_cmd命令为java $GP_JAVA_OPT -classpath $CLASSPATH com.emc.greenplum.gpdb.hdfsconnector.HDFSWriter $GP_SEGMENT_ID $GP_XID\0
或java $GP_JAVA_OPT -classpath $CLASSPATH com.emc.greenplum.gpdb.hdfsconnector.HDFSReader $GP_SEGMENT_ID $GP_SEGMENT_COUNT\0
);通过pg_exttable系统表确定数据源格式(text/csv–>TEXT;avro–>AVRO;parquet–>PARQUET;其他情况GPDBWritable);如果是读取操作,需要序列化表的schema和attr_names。将env_cmd.data, java_cmd, format, gp_hadoop_connector_version, url, table_schema.data, table_attr_names.data序列化为cmd,最后调用url_execute_fopen(cmd.data, forwrite, &extvar, NULL)
函数。
static URL_FILE*gphdfs_fopen(PG_FUNCTION_ARGS, bool forwrite){
StringInfoData cmd;
StringInfoData env_cmd;
StringInfoData table_schema;
StringInfoData table_attr_names;
checkHadoopGUCs(); /* Before we start, make sure that all the GUCs are set properly. This will also set the gp_hadoop_connector_version global var. */
initStringInfo(&env_cmd); /* The env setup script */
appendStringInfo(&env_cmd, "source $GPHOME/%s/hadoop_env.sh;", gp_hadoop_connector_jardir);
char *java_cmd;/* The java program. See the java program for details */
if (forwrite) java_cmd = "java $GP_JAVA_OPT -classpath $CLASSPATH com.emc.greenplum.gpdb.hdfsconnector.HDFSWriter $GP_SEGMENT_ID $GP_XID\0";
else java_cmd = "java $GP_JAVA_OPT -classpath $CLASSPATH com.emc.greenplum.gpdb.hdfsconnector.HDFSReader $GP_SEGMENT_ID $GP_SEGMENT_COUNT\0";
/* NOTE: I've to assume that if it's not TEXT, it's going to be the RIGHT custom format. There's no easy way to find out the name of the formatter here. If the wrong formatter is used, we'll see some error in the protocol. No big deal. */
Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
char *format = (fmttype_is_text(exttbl->fmtcode) || fmttype_is_csv(exttbl->fmtcode)) ? "TEXT":"GPDBWritable";
if (fmttype_is_avro(exttbl->fmtcode)) {
format = "AVRO";
} else if (fmttype_is_parquet(exttbl->fmtcode)) {
format = "PARQUET";
}
/* we transfer table's schema info together with its url */
if (!forwrite){
initStringInfo(&table_schema);
initStringInfo(&table_attr_names);
int colnum = rel->rd_att->natts;
for (int i =0; i < colnum; i++){
int typid = rel->rd_att->attrs[i]->atttypid;
/* add type delimiter, for udt, it can be anychar */
char delim = 0;int16 typlen;bool typbyval;char typalien;
Oid typioparam;Oid func;
get_type_io_data(typid, IOFunc_input, &typlen, &typbyval, &typalien, &delim, &typioparam, &func);
char out[20] = {0};
sprintf(out, "%010d%d%d%03d", typid, rel->rd_att->attrs[i]->attnotnull, rel->rd_att->attrs[i]->attndims, delim);
appendStringInfoString(&table_schema, out);
char name[70] = {0};
sprintf(name, "%s%c", rel->rd_att->attrs[i]->attname.data, ',');
appendStringInfoString(&table_attr_names, name);
}
}
/* Form the actual command
* 1. calls the env setup script
* 2. append the remaining arguements: <format>, <conn ver> and <url> to the java command
* Note: "url" has to be quoted because it's an unverified user input
* Note: gp_hadoop_connector_version does not need to be quoted because we've verified it in checkHadoopGUCs(). */
/* Note: if url is passed with E prefix, quote simply quote has no effect,
* we filter some dangerous chararacters right now. */
char* url_user = EXTPROTOCOL_GET_URL(fcinfo);
if (hasIllegalCharacters(url_user)){ ereport(ERROR, (0, errmsg("illegal char in url"))); }
char *url = quoteArgument(EXTPROTOCOL_GET_URL(fcinfo));
initStringInfo(&cmd);
appendStringInfo(&cmd, EXEC_URL_PREFIX "%s%s %s %s %s", env_cmd.data, java_cmd, format, gp_hadoop_connector_version, url);
if (!forwrite){
appendStringInfo(&cmd, " '%s'", table_schema.data);
pfree(table_schema.data);
appendStringInfo(&cmd, " '%s'", table_attr_names.data);
pfree(table_attr_names.data);
}
/* Setup the env and run the script.. NOTE: the last argument to external_set_env_vars is set to ZERO because we don't have access to the scan counter at all. It's ok because we don't need it. */
extvar_t extvar;
external_set_env_vars(&extvar, url, false, NULL, NULL, false, 0);
URL_FILE *myData = url_execute_fopen(cmd.data, forwrite, &extvar, NULL);
/* Free the command string */
pfree(cmd.data);
return myData;
}