全栈工程师开发手册 (作者:栾鹏)
架构系列文章
示例:
为了使事情变得简单实用,让我们创建一个简单的operator:当出现一个新的namespace,或ConfigMap与Secret之一更改其状态时,复制ConfigMap。从实用的角度来看,我们的新operator可以用于批量更新应用程序的配置(通过更新ConfigMap)或重置Secret,例如用于Docker注册中心的键(当一个Secret添加到namespace时)。
各模块功能
crd自定义资源类型
cr自定义资源实例(你的配置项)
operator 读取cr中的配置项,控制集群模块
operator代码
from argparse import ArgumentParser
from os import getenv
from functools import partial
from operator import methodcaller
import kubernetes
# CRD Settings
CRD_GROUP = 'luanpeng.com'
CRD_VERSION = 'v1'
CRD_PLURAL = 'copyrators'
# Type methods maps
# 函数源码在client.CoreV1Api()中查看
LIST_TYPES_MAP = {
'configmap': 'list_namespaced_config_map',
'secret': 'list_namespaced_secret',
}
CREATE_TYPES_MAP = {
'configmap': 'create_namespaced_config_map',
'secret': 'create_namespaced_secret',
}
DELETE_TYPES_MAP = {
'configmap': 'delete_namespaced_config_map',
'secret': 'delete_namespaced_secret',
}
# Allowed events
ALLOWED_EVENT_TYPES = {'ADDED', 'UPDATED','DELETED'}
kubernetes.config.load_incluster_config()
print('============load config')
# 在namespace中名称为rule_name的自定义资源
def load_cr(namespace, rule_name):
"""
Method for CRD loading.
It is used to get the object's watching settings.
"""
print('==================get cr',CRD_GROUP,CRD_VERSION,namespace,CRD_PLURAL,rule_name)
client = kubernetes.client.ApiClient()
# client.configuration.verify_ssl = False
custom_api = kubernetes.client.CustomObjectsApi(client)
# 根据namespace和名称,读取自定义资源对象,也就是cr,
cr = custom_api.get_namespaced_custom_object(
CRD_GROUP,
CRD_VERSION,
namespace,
CRD_PLURAL,
rule_name,
)
print(cr)
return {x: cr[x] for x in ('source_type', 'selector', 'namespace')} # 提取自定义资源中的配置属性
def handle_event(v1, specs, event):
"""
The method for processing one Kubernetes event.
"""
print('===================================',event['type'])
if event['type'] not in ALLOWED_EVENT_TYPES:
return
object_ = event['object']
labels = object_['metadata'].get('labels', {})
# Look for the matches using selector
# 先校验事件对象是否包含指定label
for key, value in specs['selector'].items():
if labels.get(key) != value:
return
# 获取存活状态的命名空间
namespaces = map(
lambda x: x.metadata.name,
filter(
lambda x: x.status.phase == 'Active',
v1.list_namespace().items
)
)
# 在每个命名空间下面创建该config/secret对象
for namespace in namespaces:
# Clear the metadata, set the namespace
# 当前命名空间已经存在了,所以可以不用创建
try:
object_['metadata'] = {
'labels': object_['metadata']['labels'],
'namespace': namespace,
'name': object_['metadata']['name'],
}
print(object_)
if event['type']=='ADDED':
methodcaller(
CREATE_TYPES_MAP[specs['source_type']],
namespace,
object_
)(v1)
elif event['type']=='UPDATED':
methodcaller(
DELETE_TYPES_MAP[specs['source_type']],
object_['metadata']['name'],
namespace,
)(v1)
methodcaller(
CREATE_TYPES_MAP[specs['source_type']],
namespace,
object_
)(v1)
elif event['type']=='DELETED':
methodcaller(
DELETE_TYPES_MAP[specs['source_type']],
object_['metadata']['name'],
namespace
)(v1)
except Exception as e:
print(e)
# 参数specs 是使用crd来创建的自定义资源{
# ruleType: configmap
# selector:
# copyrator: "true"
# namespace: default
# }
def handle(specs):
"""
The main method to initiate events processing via operator.
"""
kubernetes.config.load_incluster_config()
v1 = kubernetes.client.CoreV1Api()
# Get the method to watch the objects
method = getattr(v1, LIST_TYPES_MAP[specs['source_type']])
func = partial(method, specs['namespace']) # 监听该命名空间下的事件
w = kubernetes.watch.Watch()
for event in w.stream(func): # , _request_timeout=60
handle_event(v1, specs, event)
if __name__=="__main__":
parser = ArgumentParser(
description='Copyrator - copy operator.',
prog='copyrator'
)
parser.add_argument(
'--namespace',
type=str,
default=getenv('NAMESPACE', 'default'),
help='Operator Namespace (or ${NAMESPACE}), default: default'
)
parser.add_argument(
'--cr_name',
type=str,
default=getenv('CR_NAME', 'main-rule'),
help='CR Name (or ${CR_NAME}), default: main-rule'
)
args = parser.parse_args()
try:
print('=========================:',args)
specs = load_cr(args.namespace, args.cr_name)
print('=========================:',specs)
handle(specs)
except KeyboardInterrupt:
pass
except Exception as err:
raise RuntimeError('Oh no! I am dying...') from err
operator的dockerfile
# docker build -t luanpeng/lp:k8s-operator-copyrator .
FROM python:3.7.3-alpine3.9
RUN pip3 install kubernetes
COPY copyrators.py /copyrators.py
COPY configuration.py /usr/local/lib/python3.7/site-packages/kubernetes/client/configuration.py
CMD ["python","copyrators.py"]
部署的yaml文件
---
# Source: copyrator/templates/rbac.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: copyrator-acc
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: copyrator
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: copyrator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: copyrator
subjects:
- kind: ServiceAccount
name: copyrator-acc
namespace: default
---
# 定义一种资源类型
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: copyrators.luanpeng.com # 名称必须符合下面的格式:<plural>.<group>
spec:
group: luanpeng.com # REST API使用的组名称:/apis/<group>/<version>,group至少包含一个点
versions: # REST API使用的组名称:/apis/<group>/<version>
- name: v1
served: true
storage: true
scope: Namespaced # Namespaced或Cluster,就是只能在命名空间内使用,还是可以在集群上使用
names:
plural: copyrators # URL中使用的复数名称: /apis/<group>/<version>/<plural>
singular: copyrator # CLI中使用的单数名称
kind: CopyratorRule # cr定义要设置成这种类型 # CamelCased格式的单数类型。在清单文件中使用
shortNames: # CLI中使用的资源简称
- copyr
validation:
openAPIV3Schema:
type: object
properties:
source_type:
type: string
namespace:
type: string
selector:
type: object
---
# 创建一个资源
apiVersion: luanpeng.com/v1
kind: CopyratorRule
metadata:
name: main-rule # operator监听的配置项名称,当然也可以通过label监听
labels:
module: copyrator
source_type: configmap # 复制到其他名称空间的资源类型
selector:
copyrator: "true" # 复制的资源需要具有的标签
namespace: default # 复制的资源所在的命名空间
---
# Source: copyrator/templates/operator.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: copyrator
spec:
selector:
matchLabels:
name: copyrator
template:
metadata:
labels:
name: copyrator
spec:
containers:
- name: copyrator
image: luanpeng/lp:k8s-operator-copyrator
# imagePullPolicy: Always
# args: ["--cr_name", "main-rule"]
env:
- name: CR_NAME
value: main-rule
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace # 当前deployment所在的命名空间
serviceAccountName: copyrator-acc