Define
定义外部协议的执行流程定义在src/backend/commands/extprotocolcmds.c文件中。主要提供两个函数:DefineExtProtocol(创建外部协议)和RemoveExtProtocolById(删除外部协议)。
ProcessUtilitySlow
| -- case T_DefineStmt:
| -- DefineStmt *stmt = (DefineStmt *) parsetree
| -- switch (stmt->kind)
| -- case OBJECT_EXTPROTOCOL:
| -- DefineExtProtocol(stmt->defnames, stmt->definition, stmt->trusted)
performDeletion/performMultipleDeletions
| -- deleteObjectsInList
| -- deleteOneObject
| -- doDeletion
| -- switch (getObjectClass(object))
| -- case OCLASS_EXTPROTOCOL:
| -- RemoveExtProtocolById(object->objectId)
DefineExtProtocol函数的定义void DefineExtProtocol(List *name, List *parameters, bool trusted)
,其执行流程为:首先从parameters列表中提取出readfunc、writefunc和validatorfunc的名字;调用ExtProtocolCreate函数创建外部协议;调用CdbDispatchUtilityStatement向segment发送define节点信息,以在segment创建外部协议。
ListCell *pl;
foreach(pl, parameters)
{
DefElem *defel = (DefElem *) lfirst(pl);
if (pg_strcasecmp(defel->defname, "readfunc") == 0) List *readfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "writefunc") == 0) List *writefuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "validatorfunc") == 0) List *validatorfuncName = defGetQualifiedName(defel);
else ereport(ERROR,(errcode(ERRCODE_SYNTAX_ERROR), errmsg("protocol attribute \"%s\" not recognized", defel->defname)));
}
if (readfuncName == NULL && writefuncName == NULL) /* make sure we have our required definitions */
ereport(ERROR,(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("protocol must be specify at least a readfunc or a writefunc")));
char * protName = strVal(linitial(name));
/* Most of the argument-checking is done inside of ExtProtocolCreate */
Oid protOid = ExtProtocolCreate(protName, /* protocol name */ readfuncName, /* read function name */ writefuncName, /* write function name */ validatorfuncName, /* validator function name */ trusted);
if (Gp_role == GP_ROLE_DISPATCH) {
DefineStmt * stmt = makeNode(DefineStmt);
stmt->kind = OBJECT_EXTPROTOCOL;
stmt->oldstyle = false;
stmt->defnames = name;
stmt->args = NIL;
stmt->definition = parameters;
stmt->trusted = trusted;
CdbDispatchUtilityStatement((Node *) stmt, DF_CANCEL_ON_ERROR| DF_WITH_SNAPSHOT| DF_NEED_TWO_PHASE, GetAssignedOidsForDispatch(), NULL);
}
RemoveExtProtocolById函数定义为void RemoveExtProtocolById(Oid protOid)
,通过protOid删除相应的外部协议。首先以RowExclusiveLock打开pg_extprotocol系统表,并利用ExtprotocolOidIndexId进行扫描,获取protOid对应的条目,并调用CatalogTupleDelete函数进行删除。
ScanKeyData skey;
HeapTuple tup;
bool found = false;
/* Search pg_extprotocol. */
Relation rel = table_open(ExtprotocolRelationId, RowExclusiveLock);
ScanKeyInit(&skey,Anum_pg_extprotocol_oid,BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(protOid));
SysScanDesc scan = systable_beginscan(rel, ExtprotocolOidIndexId, true, NULL, 1, &skey);
while (HeapTupleIsValid(tup = systable_getnext(scan))){
CatalogTupleDelete(rel, &tup->t_self);
found = true;
}
systable_endscan(scan);
if (!found) elog(ERROR, "protocol %u could not be found", protOid);
table_close(rel, NoLock);
关联函数
关联函数【ExtProtocolCreate、LookupExtProtocolFunction、ExtProtocolGetNameByOid】定义在src/backend/catalog/pg_extprotocol.c文件中。以Oid ExtProtocolCreate(const char *protocolName, List *readfuncName, List *writefuncName, List *validatorfuncName, bool trusted)
函数为例,其执行流程如下所示:参数检查;首先以RowExclusiveLock打开pg_extprotocol系统表,并利用ExtprotocolOidIndexId进行扫描,检查外部协议是否已经存在;从系统表中获取readfunc、writefunc和validatorfunc的OID;利用上述结果创建pg_extprotocol条目并插入系统表(simple_heap_insert和CatalogUpdateIndexes);在pg_depend系统表中创建protOid到readfunc、writefunc和validatorfunc OID等的依赖关系。
bool nulls[Natts_pg_extprotocol];
Datum values[Natts_pg_extprotocol];
NameData prtname;
ObjectAddress myself, referenced;
Oid ownerId = GetUserId();
ScanKeyData skey;
/* sanity checks (caller should have caught these) */
if (!protocolName) elog(ERROR, "no protocol name supplied");
if (!readfuncName && !writefuncName) elog(ERROR, "protocol must have at least one of readfunc or writefunc");
/* Until we add system protocols to pg_extprotocol, make sure no protocols with the same name are created. */
if (strcasecmp(protocolName, "file") == 0 || strcasecmp(protocolName, "http") == 0 || strcasecmp(protocolName, "gpfdist") == 0 || strcasecmp(protocolName, "gpfdists") == 0){
ereport(ERROR,(errcode(ERRCODE_RESERVED_NAME), errmsg("protocol \"%s\" already exists",protocolName),errhint("pick a different protocol name")));
}
Relation rel = heap_open(ExtprotocolRelationId, RowExclusiveLock);
/* make sure there is no existing protocol of same name */
ScanKeyInit(&skey,Anum_pg_extprotocol_ptcname,BTEqualStrategyNumber, F_NAMEEQ,CStringGetDatum(protocolName));
SysScanDesc scan = systable_beginscan(rel, ExtprotocolPtcnameIndexId, true,SnapshotNow, 1, &skey);
HeapTuple tup = systable_getnext(scan);
if (HeapTupleIsValid(tup)) ereport(ERROR,(errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("protocol \"%s\" already exists", protocolName)));
systable_endscan(scan);
/* function checks: if supplied, check existence and correct signature in the catalog */
if (readfuncName) Oid readfn = ValidateProtocolFunction(readfuncName, EXTPTC_FUNC_READER);
if (writefuncName) Oid writefn = ValidateProtocolFunction(writefuncName, EXTPTC_FUNC_WRITER);
if (validatorfuncName) Oid validatorfn = ValidateProtocolFunction(validatorfuncName, EXTPTC_FUNC_VALIDATOR);
/* Everything looks okay. Try to create the pg_extprotocol entry for the protocol. (This could fail if there's already a conflicting entry.) */
/* initialize nulls and values */
for (int i = 0; i < Natts_pg_extprotocol; i++){
nulls[i] = false;
values[i] = (Datum) 0;
}
namestrcpy(&prtname, protocolName);
values[Anum_pg_extprotocol_ptcname - 1] = NameGetDatum(&prtname);
values[Anum_pg_extprotocol_ptcreadfn - 1] = ObjectIdGetDatum(readfn);
values[Anum_pg_extprotocol_ptcwritefn - 1] = ObjectIdGetDatum(writefn);
values[Anum_pg_extprotocol_ptcvalidatorfn - 1] = ObjectIdGetDatum(validatorfn);
values[Anum_pg_extprotocol_ptcowner - 1] = ObjectIdGetDatum(ownerId);
values[Anum_pg_extprotocol_ptctrusted - 1] = BoolGetDatum(trusted);
nulls[Anum_pg_extprotocol_ptcacl - 1] = true;
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* insert a new tuple */
Oid protOid = simple_heap_insert(rel, tup);
CatalogUpdateIndexes(rel, tup);
heap_close(rel, RowExclusiveLock);
/* Create dependencies for the protocol */
myself.classId = ExtprotocolRelationId;
myself.objectId = protOid;
myself.objectSubId = 0;
if (OidIsValid(readfn)){ /* Depends on read function, if any */
referenced.classId = ProcedureRelationId;
referenced.objectId = readfn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
if (OidIsValid(writefn)){ /* Depends on write function, if any */
referenced.classId = ProcedureRelationId;
referenced.objectId = writefn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* dependency on owner */
recordDependencyOnOwner(ExtprotocolRelationId, protOid, GetUserId());
/* dependency on extension */
recordDependencyOnCurrentExtension(&myself, false);
return protOid;
LookupExtProtocolFunction函数主要是在src/backend/access/external/url_custom.c文件的url_custom_fopen --> LookupExtProtocolFunction中调用。