1. 前言
2. 例子
2.1 准备工作
2.1.1 启动带配置文件的kube-scheduler服务
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: policy.yaml
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"extenders" : [{
"urlPrefix": "http://localhost/scheduler",
"filterVerb": "predicates/always_true",
"prioritizeVerb": "priorities/zero_score",
"preemptVerb": "preemption",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false
}],
"hardPodAffinitySymmetricWeight" : 10
}
2.1.2 启动extender服务
[root@worker nicktming]# pwd
/root/go/src/github.com/nicktming
[root@worker nicktming]# git clone https://github.com/nicktming/k8s-scheduler-extender-example.git
[root@worker nicktming]# cd k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# go build .
// 如果没有go环境 可以直接用代码中已经编译好的二进制文件k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
以预选方法为例
type ExtenderArgs struct {
// 正在被调度的pod
Pod *v1.Pod
// 节点信息
Nodes *v1.NodeList
NodeNames *[]string
}
type ExtenderFilterResult struct {
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == false
Nodes *v1.NodeList
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == true
NodeNames *[]string
// Filtered out nodes where the pod can't be scheduled and the failure messages
FailedNodes FailedNodesMap
// 错误信息
Error string
}
// main.go
TruePredicate = Predicate{
Name: "always_true",
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
if node.Name == "172.21.0.16" {
return false, nil
}
return true, nil
},
}
// predicate.go
type Predicate struct {
Name string
Func func(pod v1.Pod, node v1.Node) (bool, error)
}
func (p Predicate) Handler(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
pod := args.Pod
canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
canNotSchedule := make(map[string]string)
for _, node := range args.Nodes.Items {
// 调用自己的处理逻辑方法 判断该pod可不可以在该节点上运行
result, err := p.Func(*pod, node)
fmt.Printf("===>extender node:%v, result:%v\n", node.Name, result)
if err != nil {
canNotSchedule[node.Name] = err.Error()
} else {
if result {
canSchedule = append(canSchedule, node)
}
}
}
// 返回值
result := schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: canSchedule,
},
FailedNodes: canNotSchedule,
Error: "",
}
return &result
}
2.2 验证
2.2.1 创建带有schedulerName: my-scheduler
的pod
.
[root@master kubectl]# cat pod-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
name: test-schduler
spec:
schedulerName: my-scheduler
containers:
- name: podtest-scheduler
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml
pod/test-schduler created
[root@master kubectl]#
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test-schduler 1/1 Running 0 17s
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i nodeName
nodeName: 172.21.0.12
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i schedulerName
{"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"test-schduler","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"podtest-scheduler","ports":[{"containerPort":80}]}],"schedulerName":"my-scheduler"}}
schedulerName: my-scheduler
[root@master kubectl]#
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
[ warn ] 2019/10/16 16:16:50 main.go:87: LOG_LEVEL="" is empty or invalid, fallling back to "INFO".
[ info ] 2019/10/16 16:16:50 main.go:101: Log level was set to INFO
[ info ] 2019/10/16 16:16:50 main.go:119: server starting on the port :80
[ info ] 2019/10/16 16:19:51 routes.go:29: always_true ExtenderArgs =
===>extender node:172.21.0.16, result:false
===>extender node:172.21.0.12, result:true
[ info ] 2019/10/16 16:19:51 routes.go:49: always_true extenderFilterResult = {"Nodes":{"metadata":{},"items":[{"metadata":{"name":"172.21.0.12","selfLink":"/api/v1/nodes/172.21.0.12","uid":"ec8685f1-ef5f-11e9-8482-525400d54f7e","resourceVersion":"23957","creationTimestamp":"2019-10-15T15:24:48Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/hostname":"172.21.0.12"},"annotations":{"node.alpha.kubernetes.io/ttl":"0","volumes.kubernetes.io/controller-managed-attach-detach":"true"}},"spec":{},"status":{"capacity":{"cpu":"2","ephemeral-storage":"51473888Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3880944Ki","pods":"110"},"allocatable":{"cpu":"2","ephemeral-storage":"47438335103","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3778544Ki","pods":"110"},"conditions":[{"type":"MemoryPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientMemory","message":"kubelet has sufficient memory available"},{"type":"DiskPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasNoDiskPressure","message":"kubelet has no disk pressure"},{"type":"PIDPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientPID","message":"kubelet has sufficient PID available"},{"type":"Ready","status":"True","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletReady","message":"kubelet is posting ready status"}],"addresses":[{"type":"InternalIP","address":"172.21.0.12"},{"type":"Hostname","address":"172.21.0.12"}],"daemonEndpoints":{"kubeletEndpoint":{"Port":10250}},"nodeInfo":{"machineID":"c28d40cbc8e3adcb4e32d9779a77b39e","systemUUID":"2C6B0169-85AC-48F3-9377-35EFC668E23C","bootID":"f5081260-8e17-446c-9b2c-8c2766e49e0e","kernelVersion":"3.10.0-862.el7.x86_64","osImage":"CentOS Linux 7 (Core)","containerRuntimeVersion":"docker://17.9.1","kubeletVersion":"v0.0.0-master+$Format:%h$","kubeProxyVersion":"v0.0.0-master+$Format:%h$","operatingSystem":"linux","architecture":"amd64"},"images":[{"names":["nginx@sha256:aeded0f2a861747f43a01cf1018cf9efe2bdd02afd57d2b11fcc7fcadc16ccd1","nginx:latest"],"sizeBytes":125952483},{"names":["mirrorgooglecontainers/pause@sha256:59eec8837a4d942cc19a52b8c09ea75121acc38114a2c68b98983ce9356b8610","mirrorgooglecontainers/pause:3.1"],"sizeBytes":742472},{"names":["hello-world@sha256:c3b4ada4687bbaa170745b3e4dd8ac3f194ca95b2d0518b417fb47e5879d9b5f","hello-world:latest"],"sizeBytes":1840}]}}]},"NodeNames":null,"FailedNodes":{},"Error":""}
2.2.2 创建不带有schedulerName
的pod
.
root@master kubectl]# cat pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: podtest
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod.yaml
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 1/1 Running 0 2m39s
test-schduler 1/1 Running 0 24m
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i nodeName
nodeName: 172.21.0.16
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i schedulerName
schedulerName: default-scheduler
[root@master kubectl]#
3. 相关源码部分分析
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
// 生成扩展
var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
}
3.1 extender接口
type SchedulerExtender interface {
// Name returns a unique name that identifies the extender.
// 该extender的名字 onfig.URLPrefix
Name() string
// 过滤方法 也就是相当于预选方法
Filter(pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
// 打分
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
// 是否支持抢占
SupportsPreemption() bool
// 是否可以容忍错误
// 设置为true时 如果该extender执行过程中发生了错误 可以容忍 就是直接跳过
// 设置为false时 如果该extender执行过程中发生了错误 那scheduler就会返回了
IsIgnorable() bool
}
3.1 extender接口实现类HTTPExtender
// HTTPExtender implements the algorithm.SchedulerExtender interface.
type HTTPExtender struct {
extenderURL string
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
weight int
client *http.Client
nodeCacheCapable bool
managedResources sets.String
ignorable bool
}
func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
if config.HTTPTimeout.Nanoseconds() == 0 {
config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
}
transport, err := makeTransport(config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
managedResources := sets.NewString()
for _, r := range config.ManagedResources {
managedResources.Insert(string(r.Name))
}
return &HTTPExtender{
extenderURL: config.URLPrefix,
preemptVerb: config.PreemptVerb,
filterVerb: config.FilterVerb,
prioritizeVerb: config.PrioritizeVerb,
bindVerb: config.BindVerb,
weight: config.Weight,
client: client,
nodeCacheCapable: config.NodeCacheCapable,
managedResources: managedResources,
ignorable: config.Ignorable,
}, nil
}
3.4 与extender交互
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
...
// 对预选方法过滤出来的所有节点 再重新从extenders中一个个过滤
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
// 如果出现失败 返回
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
// 如果failedPredicateMap中不存在 加入到failedPredicateMap中
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
// 返回最终的filtered 适合的节点
// failedPredicateMap 失败的节点以及原因
return filtered, failedPredicateMap, nil
}
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
var (
result schedulerapi.ExtenderFilterResult
nodeList *v1.NodeList
nodeNames *[]string
nodeResult []*v1.Node
args *schedulerapi.ExtenderArgs
)
if h.filterVerb == "" {
return nodes, schedulerapi.FailedNodesMap{}, nil
}
// 如果nodeCacheCapable等于true 则使用nodeNames
// 否则使用nodeList
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
// 组装发送给extender服务的ExtenderArgs
args = &schedulerapi.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 给其对应的api发POST请求
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
}
// 取结果
if h.nodeCacheCapable && result.NodeNames != nil {
nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
for i := range *result.NodeNames {
nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
}
} else if result.Nodes != nil {
nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
for i := range result.Nodes.Items {
nodeResult = append(nodeResult, &result.Nodes.Items[i])
}
}
return nodeResult, result.FailedNodes, nil
}
4. 总结
containers:
- name: my-scheduler-ctr
image: gcr.io/google_containers/hyperkube:v1.11.1
imagePullPolicy: IfNotPresent
args:
- kube-scheduler
- --config=/my-scheduler/config.yaml
- -v=4
volumeMounts:
- name: my-scheduler-config
mountPath: /my-scheduler
- name: my-scheduler-extender-ctr
image: a/b:c
imagePullPolicy: IfNotPresent