0
点赞
收藏
分享

微信扫一扫

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager) Node 管理器

小月亮06 2022-07-28 阅读 98


Node Manager

节点数据结构主要成员:节点类型(Proxy、Coordinator、Datanode),节点序号、Proxy序号(如果节点不经过proxy或节点类型为proxy,该成员设置为0),Postgresql端口、GTM可见的UP,节点的数据目录。

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager) Node 管理器_初始化

void GTM_InitNodeManager(void);
size_t pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen, bool locked);
size_t pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t maxlen);

int Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, char *nodename, GTM_PGXCNodePort port, char *proxyname, GTM_PGXCNodeStatus status, char *ipaddress, char *datafolder, bool in_recovery, int socket, bool is_session);
int Recovery_PGXCNodeUnregister(GTM_PGXCNodeType type, char *node_name, bool in_recovery, int socket);
void Recovery_SaveRegisterInfo(void);
void Recovery_PGXCNodeDisconnect(Port *myport);
void Recovery_SaveRegisterFileName(char *dir);
int Recovery_PGXCNodeRegisterCoordProcess(char *coord_node, int coord_procid, int coord_backendid);

void ProcessPGXCRegisterSession(Port *myport, StringInfo message);
void ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup);
void ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup);
void ProcessPGXCNodeBackendDisconnect(Port *myport, StringInfo message);
void ProcessPGXCNodeList(Port *myport, StringInfo message);

void ProcessGTMBeginBackup(Port *myport, StringInfo message);
void ProcessGTMEndBackup(Port *myport, StringInfo message);


GlobalTransactionId GTM_HandleGlobalXmin(GTM_PGXCNodeType type, char *node_name, GlobalTransactionId reported_xmin, int *errcode);

节点管理器

GTM_InitNodeManager函数初始化GTM_PGXCNodes桶中的gtm_list指针为null,并初始化RegisterFileLock和PGXCNodesLock锁。

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager) Node 管理器_初始化_02

void GTM_InitNodeManager(void) {
int ii;
for (ii = 0; ii < NODE_HASH_TABLE_SIZE; ii++) {
GTM_PGXCNodes[ii].nhb_list = gtm_NIL;
}
GTM_RWLockInit(&RegisterFileLock);
GTM_RWLockInit(&PGXCNodesLock);
}

pgxcnode_get_all函数将GTM_NodeInfoHashBucket桶中gtm_List中存储的所有GTMPGXCNodeInfo都存放到GTM_PGXCNodeInfo **data所指向的内存空间中。pgxcnode_find_by_type函数查找相同GTM_PGXCNodeType的所有节点信息,存放到GTM_PGXCNodeInfo **data所指向的内存空间中。

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager) Node 管理器_初始化_03

size_t pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen, bool locked) {
GTM_PGXCNodeInfoHashBucket *bucket;
gtm_ListCell *elem;
int node = 0;
int i;
if (!locked)
GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
for (i = 0; i < NODE_HASH_TABLE_SIZE; i++) {
bucket = &GTM_PGXCNodes[i];
gtm_foreach(elem, bucket->nhb_list){
GTM_PGXCNodeInfo *curr_nodeinfo = NULL;
curr_nodeinfo = (GTM_PGXCNodeInfo *) gtm_lfirst(elem);
if (curr_nodeinfo != NULL) {
data[node] = curr_nodeinfo;
node++;
}
if (node == maxlen)
break;
}
}
if (!locked)
GTM_RWLockRelease(&PGXCNodesLock);
return node;
}

size_t pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t maxlen) {
GTM_PGXCNodeInfoHashBucket *bucket;
gtm_ListCell *elem;
int node = 0;
int i;
GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
for (i = 0; i < NODE_HASH_TABLE_SIZE; i++){
bucket = &GTM_PGXCNodes[i];
gtm_foreach(elem, bucket->nhb_list){
GTM_PGXCNodeInfo *cur = NULL;
cur = (GTM_PGXCNodeInfo *) gtm_lfirst(elem);
if (cur != NULL && cur->type == type){
data[node] = cur;
elog(DEBUG1, "pgxcnode_find_by_type: cur=%p, ipaddress=%s", cur, cur->ipaddress);
node++;
}
if (node == maxlen)
break;
}
}
GTM_RWLockRelease(&PGXCNodesLock);
return node;
}

节点注册

节点Recovery注册

register_common.c
向GTM_PGXCNODES全局哈希桶中注册GTM_PGXCNodeInfo: int Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, char *nodename, GTM_PGXCNodePort port, char *proxyname, GTM_PGXCNodeStatus status, char *ipaddress, char *datafolder, bool in_recovery, int socket, bool is_session);
向GTM_PGXCNODES全局哈希桶中取消注册GTM_PGXCNodeInfo:int Recovery_PGXCNodeUnregister(GTM_PGXCNodeType type, char *node_name, bool in_recovery, int socket);
当GTM关闭时调用,向磁盘中写入注册信息(GTM_PGXCNODES全局哈希桶中注册的节点信息):void Recovery_SaveRegisterInfo(void);
关闭已经和GTM master断开连接的节点的连接:void Recovery_PGXCNodeDisconnect(Port *myport);
void Recovery_SaveRegisterFileName(char *dir);
注册active分布session:int Recovery_PGXCNodeRegisterCoordProcess(char *coord_node, int coord_procid, int coord_backendid);

Recovery_PGXCNodeRegister函数创建GTM_PGXCNodeInfo结构体,并利用形参进行成员的填充,将该PGXC Node Info结构体加入到全局hasn table(GTM_PGXCNODES)中。最后向磁盘上的PGXC节点文件添加注册记录。

int Recovery_PGXCNodeRegister(GTM_PGXCNodeType  type, char *nodename, GTM_PGXCNodePort port, char *proxyname, GTM_PGXCNodeStatus  status, char *ipaddress, char *datafolder, bool  in_recovery, int socket, bool is_session) {
GTM_PGXCNodeInfo *nodeinfo = NULL;
int errcode = 0;
nodeinfo = (GTM_PGXCNodeInfo *) palloc0(sizeof(GTM_PGXCNodeInfo));
if (nodeinfo == NULL)
ereport(ERROR, (ENOMEM, errmsg("Out of memory")));
GTM_RWLockInit(&nodeinfo->node_lock);
/* Fill in structure */
nodeinfo->type = type;
if (nodename)
nodeinfo->nodename = pgxcnode_copy_char(nodename);
nodeinfo->port = port;
if (proxyname)
nodeinfo->proxyname = pgxcnode_copy_char(proxyname);
if (datafolder)
nodeinfo->datafolder = pgxcnode_copy_char(datafolder);
if (ipaddress)
nodeinfo->ipaddress = pgxcnode_copy_char(ipaddress);
nodeinfo->status = status;
nodeinfo->socket = socket;
nodeinfo->reported_xmin = InvalidGlobalTransactionId;
nodeinfo->reported_xmin_time = 0;
nodeinfo->is_session = is_session;
elog(DEBUG1, "Recovery_PGXCNodeRegister Request info: type=%d, nodename=%s, port=%d, datafolder=%s, ipaddress=%s, status=%d", type, nodename, port, datafolder, ipaddress, status);
elog(DEBUG1, "Recovery_PGXCNodeRegister Node info: type=%d, nodename=%s, port=%d, datafolder=%s, ipaddress=%s, status=%d", nodeinfo->type, nodeinfo->nodename, nodeinfo->port, nodeinfo->datafolder, nodeinfo->ipaddress, nodeinfo->status);
/* Add PGXC Node Info to the global hash table */
errcode = pgxcnode_add_info(nodeinfo);
/* Add a Record to file disk saying that this node with given data has been correctly registered */
if (!in_recovery && errcode == 0)
Recovery_RecordRegisterInfo(nodeinfo, true);
return errcode;
}

节点注册处理

register_gtm.c
处理MSG_REGISTER_SESSION消息:void ProcessPGXCRegisterSession(Port *myport, StringInfo message);
void ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup);
处理MSG_NODE_UNREGISTER/MSG_BKUP_NODE_UNREGISTER消息:void ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup);
处理MSG_BACKEND_DISCONNECT消息:void ProcessPGXCNodeBackendDisconnect(Port *myport, StringInfo message);
处理MSG_NODE_LIST消息:void ProcessPGXCNodeList(Port *myport, StringInfo message);void ProcessGTMBeginBackup(Port *myport, StringInfo message);
void ProcessGTMEndBackup(Port *myport, StringInfo message);


举报

相关推荐

0 条评论