1. 前言
2. SchedulingQueue (以PriorityQueue为例)
2.1 activeQ
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *heapData
}
2.1.1 heapData
// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool
// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)
type heapItem struct {
obj interface{} // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue struct {
key string
obj interface{}
}
// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc
}
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue)
n := len(h.queue)
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
if !ok {
// This is an error
return nil
}
delete(h.items, key)
return item.obj
}
2.1.2 Heap
// 1. 计算key
// 2. 根据item的map结构检查该obj是否存在
// 3. 如果存在 则更新该obj 并重新调整堆结构
// 4. 如果不存在 则添加该obj
func (h *Heap) Add(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
}
return nil
}
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
obj := heap.Pop(h.data)
if obj != nil {
return obj, nil
}
return nil, fmt.Errorf("object was removed from heap data")
}
2.1.3 总结
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
pq := &PriorityQueue{
clock: util.RealClock{},
stop: stop,
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.run()
return pq
}
// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
return &Heap{
data: &heapData{
items: map[string]*heapItem{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
},
}
}
func podTimestamp(pod *v1.Pod) *metav1.Time {
_, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
if condition == nil {
return &pod.CreationTimestamp
}
if condition.LastProbeTime.IsZero() {
return &condition.LastTransitionTime
}
return &condition.LastProbeTime
}
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
p1 := pod1.(*v1.Pod)
p2 := pod2.(*v1.Pod)
prio1 := util.GetPodPriority(p1)
prio2 := util.GetPodPriority(p2)
return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
}
3. unschedulableQ
// UnschedulablePodsMap 就是一个Map结构
// key为keyFunc计算出来的key
// value就是对应的pod
type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod
keyFunc func(*v1.Pod) string
}
3.1 方法
// 添加或者更新
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
u.pods[u.keyFunc(pod)] = pod
}
// Delete deletes a pod from the unschedulable pods.
// 删除
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.pods, u.keyFunc(pod))
}
4. nominatedPodMap
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominatedPodMap struct {
// 这些pods可能在activeQ或者unschedulableQ中
// nodeName -> pods
nominatedPods map[string][]*v1.Pod
// pod_UID -> nodeName
nominatedPodToNode map[ktypes.UID]string
}
func newNominatedPodMap() *nominatedPodMap {
return &nominatedPodMap{
nominatedPods: make(map[string][]*v1.Pod),
nominatedPodToNode: make(map[ktypes.UID]string),
}
}
4.1 对应的方法
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
// 无论是否存在 先删除
npm.delete(p)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
if len(nnn) == 0 {
return
}
}
// 1. 如果nodeName和pod.Status.NominatedNodeName都为空 直接返回
// 2. 如果nodeName不为空 nnn = nodeName 否则 nnn = pod.Status.NominatedNodeName
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
// 如果该pod存在nominatedPodMap中 就删除
// 不存在 就直接返回
func (npm *nominatedPodMap) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPod, "")
}
// 取该节点下所有nominated Pods
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
if list, ok := npm.nominatedPods[nodeName]; ok {
return list
}
return nil
}