0
点赞
收藏
分享

微信扫一扫

PostgreSQL数据库高可用——patroni RestApiServer


RestApiServer类继承自ThreadingMixIn、HTTPServer和Thread。该类用于构建patroni RestApi服务。我们知道在Patroni类构造函数中会执行如下函数​​self.api = RestApiServer(self, self.config['restapi'])​​​,也即是调用RestApiServer类的构造函数。我们知道在Patroni类run函数中会执行如下函数​​self.api.start() ​​​。我们知道在Patroni类_shutdown函数中会执行如下函数​​self.api.shutdown()​​​。这三处调用就是patroni RestApi服务的初始化,启动和关闭了。以及在Patroni类reload_config函数中会执行如下函数​​self.api.reload_config(self.config['restapi'])​​,加载RestApi服务的配置。

def __init__(self, patroni, config):
self.patroni = patroni
self.__listen = None
self.__ssl_options = None
self.__ssl_serial_number = None
self._received_new_cert = False
self.reload_config(config)
self.daemon = True

初始化工作主要集中于reload_config函数,allowlist参数指定允许调用不安全 REST API 端点的主机集,单个元素可以是主机名、IP 地址或使用 CIDR 表示法的网络地址,默认情况下使用“allow all”。 如果设置了allowlist 或allowlist_include_members,任何不包括在内的访问都会被拒绝。allowlist_include_members如果设置为“true”,则它允许从在 DCS 中注册的其他集群成员访问不安全的 REST API 端点(IP 地址或主机名取自成员“api_url”),请注意,操作系统可能会使用不同的 IP 进行传出连接。
注意,在监听地址更换或ssl选项变动或接受新的cert时,才需要调用​​​self.__initialize(config['listen'], ssl_options)​​以重建RestApi服务。

def reload_config(self, config):
if 'listen' not in config: # changing config in runtime listen参数必须在配置中
raise ValueError('Can not find "restapi.listen" config')
self.__allowlist = tuple(self._build_allowlist(config.get('allowlist')))
self.__allowlist_include_members = config.get('allowlist_include_members')

ssl_options = {n: config[n] for n in ('certfile', 'keyfile', 'keyfile_password', 'cafile', 'ciphers') if n in config}
self.http_extra_headers = config.get('http_extra_headers') or {}
self.http_extra_headers.update((config.get('https_extra_headers') or {}) if ssl_options.get('certfile') else {})
if isinstance(config.get('verify_client'), six.string_types):
ssl_options['verify_client'] = config['verify_client'].lower()
if self.__listen != config['listen'] or self.__ssl_options != ssl_options or self._received_new_cert:
self.__initialize(config['listen'], ssl_options)
self.__auth_key = base64.b64encode(config['auth'].encode('utf-8')) if 'auth' in config else None
self.connection_string = uri(self.__protocol, config.get('connect_address') or self.__listen, 'patroni')

__initialize

__initialize函数首先判定如果是热更新监听地址的话,则需要先调用shutdown关闭RestApi服务,才能重建RestApi服务。然后进行http服务器初始化工作,初始化父类线程,指定其运行的函数为self.serve_forever。针对https进行特殊处理一下。最后如果是热更新监听地址的话,需要在这里将RestApi服务启动,也就是调用线程Thread的start函数执行serve_forever;否则的话是正常更新ssl_options或cert,无需重建RestApi服务。

def __initialize(self, listen, ssl_options):
try:
host, port = split_host_port(listen, None)
except Exception:
raise ValueError('Invalid "restapi" config: expected <HOST>:<PORT> for "listen", but got "{0}"'.format(listen))
reloading_config = self.__listen is not None # changing config in runtime
if reloading_config:
self.shutdown()
self.server_close() # Rely on ThreadingMixIn.server_close() to have all requests terminate before we continue

self.__listen = listen # 重新初始化listen
self.__ssl_options = ssl_options # 重新初始化ssl_options
self._received_new_cert = False # reset to False after reload_config()

self.__httpserver_init(host, port) # http服务器初始化
Thread.__init__(self, target=self.serve_forever)
self._set_fd_cloexec(self.socket)

# wrap socket with ssl if 'certfile' is defined in a config.yaml Sometime it's also needed to pass reference to a 'keyfile'. 如果在 config.yaml 中定义了“certfile”,则使用 ssl 包装套接字有时还需要传递对“keyfile”的引用。
self.__protocol = 'https' if ssl_options.get('certfile') else 'http'
if self.__protocol == 'https':
import ssl
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=ssl_options.get('cafile'))
if ssl_options.get('ciphers'):
ctx.set_ciphers(ssl_options['ciphers'])
ctx.load_cert_chain(certfile=ssl_options['certfile'], keyfile=ssl_options.get('keyfile'),
password=ssl_options.get('keyfile_password'))
verify_client = ssl_options.get('verify_client')
if verify_client:
modes = {'none': ssl.CERT_NONE, 'optional': ssl.CERT_OPTIONAL, 'required': ssl.CERT_REQUIRED}
if verify_client in modes:
ctx.verify_mode = modes[verify_client]
else:
logger.error('Bad value in the "restapi.verify_client": %s', verify_client)
self.__ssl_serial_number = self.get_certificate_serial_number()
self.socket = ctx.wrap_socket(self.socket, server_side=True)
if reloading_config:
self.start()

serve_forever中的while循环只是避免因server端超时而关闭这个socket链接而进行的循环检测,socketserver使用多线程的话,应该是要使用 ThreadingTCPServer这个类,这个类继承了ThreadingMixIn类和TCPServer类,其中ThreadingMixIn类才是处理线程的,而serve_forever是通过TCPServer类继承自baseServer类。目前只需要知道serve_forever是包中提供的就可以了。

__httpserver_init

__httpserver_init函数其实就是调用HTTPServer的构造函数,并注册回调函数RestApiHandler。

def __httpserver_init(self, host, port):
dual_stack = self.__has_dual_stack() # 判定环境是否是网络双栈
if host in ('', '*'):
host = None
info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
# in case dual stack is not supported we want IPv4 to be preferred over IPv6
info.sort(key=lambda x: x[0] == socket.AF_INET, reverse=not dual_stack)
self.address_family = info[0][0]
try:
HTTPServer.__init__(self, info[0][-1][:2], RestApiHandler)
except socket.error:
logger.error("Couldn't start a service on '%s:%s', please check your `restapi.listen` configuration", host, port)
raise

RestApiHandler

RestApiHandler类继承自BaseHTTPRequestHandler,patroni RestApiServer重写 parse_request 方法以丰富 ​​BaseHTTPRequestHandler​​ 类的基本功能。如果只定义了原始类,则BaseHTTPRequestHandler只能调用 do_GET、do_POST、do_PUT 等方法实现。 但我们希望至少有一些简单的路由机制,即:

  • GET /uri1/part2 请求应该调用​​do_GET_uri1()​
  • POST /other 应该调用​​do_POST_other()​

如果 ​​do_<REQUEST_METHOD>_<first_part_url>​​ 方法不存在,我们将退回到BaseHTTPRequestHandler的原始行为。

def parse_request(self):
"""Override parse_request method to enrich basic functionality of `BaseHTTPRequestHandler` class
Original class can only invoke do_GET, do_POST, do_PUT, etc method implementations if they are defined.
But we would like to have at least some simple routing mechanism, i.e.:
GET /uri1/part2 request should invoke `do_GET_uri1()`
POST /other should invoke `do_POST_other()`
If the `do_<REQUEST_METHOD>_<first_part_url>` method does not exists we'll fallback to original behavior."""

ret = BaseHTTPRequestHandler.parse_request(self)
if ret:
urlpath = urlparse(self.path)
self.path = urlpath.path
self.path_query = parse_qs(urlpath.query) or {}
mname = self.path.lstrip('/').split('/')[0]
mname = self.command + ('_' + mname if mname else '')
if hasattr(self, 'do_' + mname): # 判定是否存在do_<REQUEST_METHOD>_<first_part_url>方法
self.command = mname
return

默认方法

do_GET函数为处理所有无法路由到其他方法的 GET 请求的默认方法

def do_GET(self, write_status_code_only=False):
"""Default method for processing all GET requests which can not be routed to other methods"""
path = '/master' if self.path == '/' else self.path
response = self.get_postgresql_status() # 获取postgresql数据库状态
patroni = self.server.patroni
cluster = patroni.dcs.cluster
leader_optime = cluster and cluster.last_lsn or 0
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
max_replica_lag = parse_int(self.path_query.get('lag', [sys.maxsize])[0], 'B')
if max_replica_lag is None:
max_replica_lag = sys.maxsize
is_lagging = leader_optime and leader_optime > replayed_location + max_replica_lag
replica_status_code = 200 if not patroni.noloadbalance and not is_lagging and response.get('role') == 'replica' and response.get('state') == 'running' else 503
if not cluster and patroni.ha.is_paused():
leader_status_code = 200 if response.get('role') in ('master', 'standby_leader') else 503
primary_status_code = 200 if response.get('role') == 'master' else 503
standby_leader_status_code = 200 if response.get('role') == 'standby_leader' else 503
elif patroni.ha.is_leader():
leader_status_code = 200
if patroni.ha.is_standby_cluster():
primary_status_code = replica_status_code = 503
standby_leader_status_code = 200 if response.get('role') in ('replica', 'standby_leader') else 503
else:
primary_status_code = 200
standby_leader_status_code = 503
else:
leader_status_code = primary_status_code = standby_leader_status_code = 503

status_code = 503
ignore_tags = False
if 'standby_leader' in path or 'standby-leader' in path:
status_code = standby_leader_status_code
ignore_tags = True
elif 'leader' in path:
status_code = leader_status_code
ignore_tags = True
elif 'master' in path or 'primary' in path or 'read-write' in path:
status_code = primary_status_code
ignore_tags = True
elif 'replica' in path:
status_code = replica_status_code
elif 'read-only' in path:
status_code = 200 if 200 in (primary_status_code, standby_leader_status_code) else replica_status_code
elif 'health' in path:
status_code = 200 if response.get('state') == 'running' else 503
elif cluster: # dcs is available
is_synchronous = cluster.is_synchronous_mode() and cluster.sync \
and patroni.postgresql.name in cluster.sync.members
if path in ('/sync', '/synchronous') and is_synchronous:
status_code = replica_status_code
elif path in ('/async', '/asynchronous') and not is_synchronous:
status_code = replica_status_code
# check for user defined tags in query params
if not ignore_tags and status_code == 200:
qs_tag_prefix = "tag_"
for qs_key, qs_value in self.path_query.items():
if not qs_key.startswith(qs_tag_prefix):
continue
qs_key = qs_key[len(qs_tag_prefix):]
qs_value = qs_value[0]
instance_tag_value = patroni.tags.get(qs_key)
# tag not registered for instance
if instance_tag_value is None:
status_code = 503
break
if not isinstance(instance_tag_value, six.string_types):
instance_tag_value = str(instance_tag_value).lower()
if instance_tag_value != qs_value:
status_code = 503
break
if write_status_code_only: # when haproxy sends OPTIONS request it reads only status code and nothing more
self._write_status_code_only(status_code) # 只写状态码
else:
self._write_status_response(status_code, response) # 写状态码+响应

do_OPTIONS函数

def do_OPTIONS(self):
self.do_GET(write_status_code_only=True)

重写的处理函数

do_GET_liveness、do_GET_readiness、do_GET_patroni、do_GET_cluster、do_GET_history、do_GET_config、do_GET_metrics

do_PATCH_config、do_PUT_config

do_POST_reload、do_POST_restart、do_POST_reinitialize、do_POST_failover、do_POST_switchover

do_DELETE_restart、do_DELETE_switchover


举报

相关推荐

0 条评论