0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1)

1. 前言

2. SchedulingQueue (以PriorityQueue为例)

2.1 activeQ

// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
    // data stores objects and has a queue that keeps their ordering according
    // to the heap invariant.
    data *heapData
}
2.1.1 heapData
// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool

// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)

type heapItem struct {
    obj   interface{} // The object which is stored in the heap.
    index int         // The index of the object's key in the Heap.queue.
}

type itemKeyValue struct {
    key string
    obj interface{}
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
    // items is a map from key of the objects to the objects and their index.
    // We depend on the property that items in the map are in the queue and vice versa.
    items map[string]*heapItem
    // queue implements a heap data structure and keeps the order of elements
    // according to the heap invariant. The queue keeps the keys of objects stored
    // in "items".
    queue []string

    // keyFunc is used to make the key used for queued item insertion and retrieval, and
    // should be deterministic.
    keyFunc KeyFunc
    // lessFunc is used to compare two objects in the heap.
    lessFunc LessFunc
}
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
    if i > len(h.queue) || j > len(h.queue) {
        return false
    }
    itemi, ok := h.items[h.queue[i]]
    if !ok {
        return false
    }
    itemj, ok := h.items[h.queue[j]]
    if !ok {
        return false
    }
    return h.lessFunc(itemi.obj, itemj.obj)
}

// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }

// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
    h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
    item := h.items[h.queue[i]]
    item.index = i
    item = h.items[h.queue[j]]
    item.index = j
}

// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
    keyValue := kv.(*itemKeyValue)
    n := len(h.queue)
    h.items[keyValue.key] = &heapItem{keyValue.obj, n}
    h.queue = append(h.queue, keyValue.key)
}

// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
    key := h.queue[len(h.queue)-1]
    h.queue = h.queue[0 : len(h.queue)-1]
    item, ok := h.items[key]
    if !ok {
        // This is an error
        return nil
    }
    delete(h.items, key)
    return item.obj
}
2.1.2 Heap
// 1. 计算key
// 2. 根据item的map结构检查该obj是否存在
// 3. 如果存在 则更新该obj 并重新调整堆结构
// 4. 如果不存在 则添加该obj
func (h *Heap) Add(obj interface{}) error {
    key, err := h.data.keyFunc(obj)
    if err != nil {
        return cache.KeyError{Obj: obj, Err: err}
    }
    if _, exists := h.data.items[key]; exists {
        h.data.items[key].obj = obj
        heap.Fix(h.data, h.data.items[key].index)
    } else {
        heap.Push(h.data, &itemKeyValue{key, obj})
    }
    return nil
}
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
    obj := heap.Pop(h.data)
    if obj != nil {
        return obj, nil
    }
    return nil, fmt.Errorf("object was removed from heap data")
}
2.1.3 总结
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
    pq := &PriorityQueue{
        clock:            util.RealClock{},
        stop:             stop,
        activeQ:          newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
        unschedulableQ:   newUnschedulablePodsMap(),
        nominatedPods:    newNominatedPodMap(),
        moveRequestCycle: -1,
    }
    pq.cond.L = &pq.lock

    pq.run()
    return pq
}

// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
    return &Heap{
        data: &heapData{
            items:    map[string]*heapItem{},
            queue:    []string{},
            keyFunc:  keyFn,
            lessFunc: lessFn,
        },
    }
}
func podTimestamp(pod *v1.Pod) *metav1.Time {
    _, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
    if condition == nil {
        return &pod.CreationTimestamp
    }
    if condition.LastProbeTime.IsZero() {
        return &condition.LastTransitionTime
    }
    return &condition.LastProbeTime
}

// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
    p1 := pod1.(*v1.Pod)
    p2 := pod2.(*v1.Pod)
    prio1 := util.GetPodPriority(p1)
    prio2 := util.GetPodPriority(p2)
    return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
}

3. unschedulableQ

// UnschedulablePodsMap 就是一个Map结构
// key为keyFunc计算出来的key 
// value就是对应的pod
type UnschedulablePodsMap struct {
    // pods is a map key by a pod's full-name and the value is a pointer to the pod.
    pods    map[string]*v1.Pod
    keyFunc func(*v1.Pod) string
}

3.1 方法

// 添加或者更新
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
    u.pods[u.keyFunc(pod)] = pod
}

// Delete deletes a pod from the unschedulable pods.
// 删除
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
    delete(u.pods, u.keyFunc(pod))
}

4. nominatedPodMap

// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominatedPodMap struct {
    // 这些pods可能在activeQ或者unschedulableQ中
    // nodeName -> pods
    nominatedPods map[string][]*v1.Pod
    // pod_UID -> nodeName
    nominatedPodToNode map[ktypes.UID]string
}

func newNominatedPodMap() *nominatedPodMap {
    return &nominatedPodMap{
        nominatedPods:      make(map[string][]*v1.Pod),
        nominatedPodToNode: make(map[ktypes.UID]string),
    }
}

4.1 对应的方法

func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
    // 无论是否存在 先删除
    npm.delete(p)

    nnn := nodeName
    if len(nnn) == 0 {
        nnn = NominatedNodeName(p)
        if len(nnn) == 0 {
            return
        }
    }
    // 1. 如果nodeName和pod.Status.NominatedNodeName都为空 直接返回
    // 2. 如果nodeName不为空 nnn = nodeName 否则 nnn = pod.Status.NominatedNodeName
    
    npm.nominatedPodToNode[p.UID] = nnn
    for _, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
            return
        }
    }
    npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}

// 如果该pod存在nominatedPodMap中 就删除
// 不存在 就直接返回
func (npm *nominatedPodMap) delete(p *v1.Pod) {
    nnn, ok := npm.nominatedPodToNode[p.UID]
    if !ok {
        return
    }
    for i, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
            if len(npm.nominatedPods[nnn]) == 0 {
                delete(npm.nominatedPods, nnn)
            }
            break
        }
    }
    delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
    // We update irrespective of the nominatedNodeName changed or not, to ensure
    // that pod pointer is updated.
    npm.delete(oldPod)
    npm.add(newPod, "")
}
// 取该节点下所有nominated Pods
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
    if list, ok := npm.nominatedPods[nodeName]; ok {
        return list
    }
    return nil
}
举报

相关推荐

0 条评论