调用方法
# http方法
data = {
"url": "url地址",
"token": "token",
"verify_ssl": False,
"ssl_ca_cert": '',
"method": 'http'
}
# 实例化
t = K3SExtManage(**data)
# 配置文件方法
conf = {
"method": 'config',
"type": "",
"conf": "配置文件或配置文件内容"
}
t = K3SExtManage(**conf)
# 获取所有pod
t.AllPodList()
脚本内容
import urllib3
import kubernetes
from io import StringIO
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# base client for k3s http\https
class K3STokenManage:
def __init__(self, *args, **kwargs) -> None:
self.url = kwargs.get("url")
self.token = kwargs.get('token')
self.verify_ssl = kwargs.get('verify_ssl')
if self.verify_ssl:
self.ssl_ca_cert = kwargs.get('ssl_ca_cert')
def K3sV1Api(self):
k3s_conf = kubernetes.client.Configuration()
k3s_conf.host = self.url
k3s_conf.verify_ssl = self.verify_ssl
if self.verify_ssl:
k3s_conf.ssl_ca_cert = self.ssl_ca_cert
k3s_conf.api_key = {"authorization": "Bearer " + self.token}
k3s_client = kubernetes.client.ApiClient(k3s_conf)
api = kubernetes.client.CoreV1Api(k3s_client)
# return api
return api
# base client for k3s config.yaml
class K3SManage:
def __init__(self, conf) -> None:
self.config = conf
def K3sV1Api(self):
kubernetes.config.load_kube_config(self.config)
api = kubernetes.client.CoreV1Api()
return api
# k3s 扩展接口
class K3SExtManage:
"""
http 方式:
:param method 请求方式, 值为http
:param url k3s的api地址
:param token k3s的用户token值
:param verify_ssl 是否开启htps值为: False或者True
:param ssl_ca_cert_type 当verify_ssl为True可用,值为file或者空
:param ssl_ca_cert 当verify_ssl为True可用, 值为ca证书的内容或者ca证书的路径
配置文件方式:
:param method 请求方式, 值为config
:param type 配置文件的类型,值为file或空
:param conf 配置文件, 值为配置文件的内容或者路径
"""
def __init__(self, *args, **kwargs) -> None:
method = kwargs.get("method")
if method == "http":
if kwargs.get("ssl_ca_cert_type") == "file":
ssl_ca_cert = kwargs.get("ssl_ca_cert")
else:
ssl_ca_cert = StringIO(kwargs.get("ssl_ca_cert"))
data = {
"url": kwargs.get("url"),
"token": kwargs.get("token"),
"verify_ssl": kwargs.get("verify_ssl"),
"ssl_ca_cert": ssl_ca_cert
}
self.K3SApi = K3STokenManage(**data).K3sV1Api()
elif method == "config":
if kwargs.get("type") == "file":
conf = kwargs.get("conf")
else:
conf = StringIO(kwargs.get("conf"))
self.K3SApi = K3SManage(conf=conf).K3sV1Api()
# 获取所有pod信息, 返回list
def AllPodList(self):
all_pod_list = []
all_pod_list_meta = self.K3SApi.list_pod_for_all_namespaces()
for pod in all_pod_list_meta.items:
all_pod_list.append({
"pod_uid": pod.metadata.uid,
"pod_name": pod.metadata.name,
"pod_namespace": pod.metadata.namespace,
"pod_labels": pod.metadata.labels,
"pod_creation_timestamp": pod.metadata.creation_timestamp
})
return all_pod_list