0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(2)

1. 前言

2. 方法

func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    ...
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        enableEquivalenceClassCache:    args.EnableEquivalenceClassCache,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }
...
}

2.1 PodInformer (scheduled pod cache)

// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}
// scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
2.1.1 addPodToCache
func (c *configFactory) addPodToCache(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    if !ok {
        klog.Errorf("cannot convert to *v1.Pod: %v", obj)
        return
    }

    if err := c.schedulerCache.AddPod(pod); err != nil {
        klog.Errorf("scheduler cache AddPod failed: %v", err)
    }

    c.podQueue.AssignedPodAdded(pod)

    // NOTE: Updating equivalence cache of addPodToCache has been
    // handled optimistically in: pkg/scheduler/scheduler.go#assume()
}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
    p.lock.Lock()
    p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
    p.lock.Unlock()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
    for _, pod := range pods {
        if err := p.activeQ.Add(pod); err == nil {
            p.unschedulableQ.delete(pod)
        } else {
            klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
        }
    }
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
    var podsToMove []*v1.Pod
    for _, up := range p.unschedulableQ.pods {
        affinity := up.Spec.Affinity
        if affinity != nil && affinity.PodAffinity != nil {
            terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
            for _, term := range terms {
                namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
                selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
                if err != nil {
                    klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
                }
                if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
                    podsToMove = append(podsToMove, up)
                    break
                }
            }
        }
    }
    return podsToMove
}
2.1.2 UpdatePodInCache
/pkg/scheduler/factory/factory.go
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
    oldPod, ok := oldObj.(*v1.Pod)
    if !ok {
        klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
        return
    }
    newPod, ok := newObj.(*v1.Pod)
    ...
    c.podQueue.AssignedPodUpdated(newPod)
}

/pkg/scheduler/interal/queue/scheduling_queu.go
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
    p.lock.Lock()
    p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
    p.lock.Unlock()
}
2.1.3 deletePodFromCache
// /pkg/scheduler/factory/factory.go
func (c *configFactory) deletePodFromCache(obj interface{}) {
    var pod *v1.Pod
    switch t := obj.(type) {
    case *v1.Pod:
        pod = t
    case cache.DeletedFinalStateUnknown:
        var ok bool
        pod, ok = t.Obj.(*v1.Pod)
        if !ok {
            klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
            return
        }
    default:
        klog.Errorf("cannot convert to *v1.Pod: %v", t)
        return
    }
    ...
    c.podQueue.MoveAllToActiveQueue()
}
// /pkg/scheduler/interal/queue/scheduling_queu.go
func (p *PriorityQueue) MoveAllToActiveQueue() {
    p.lock.Lock()
    defer p.lock.Unlock()
    for _, pod := range p.unschedulableQ.pods {
        if err := p.activeQ.Add(pod); err != nil {
            klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
        }
    }
    p.unschedulableQ.clear()
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
// pod which is unschedulable does not go to the head of the queue frequently. For
// example in a cluster where a lot of pods being deleted, such a high priority
// pod can deprive other pods from getting scheduled.
2.1.4 总结

2.2 PodInformer (unscheduled pod queue)

// pkg/scheduler/factory/factory.go

// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
    return schedulerName == pod.Spec.SchedulerName
}
// unscheduled pod queue
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
            },
        },
    )
2.2.1 addPodToSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
    if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
        runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
    }
}

// pkg/scheduler/internal/queue/scheduling_queue
func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    err := p.activeQ.Add(pod)
    if err != nil {
        klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
    } else {
        if p.unschedulableQ.get(pod) != nil {
            klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
            p.unschedulableQ.delete(pod)
        }
        p.nominatedPods.add(pod, "")
        p.cond.Broadcast()
    }
    return err
}
2.2.2 updatePodInSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
    pod := newObj.(*v1.Pod)
    if c.skipPodUpdate(pod) {
        return
    }
    if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
        runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
    }
}

// pkg/scheduler/internal/queue/scheduling_queue.go

// 1. 如果在actvieQ中存在, 所以直接更新activeQ和nominatedPods
// 2. 如果在unschedulableQ中存在, 更新nominatedPods
//    2.1 如果spec中发生了改变, 有可能成为schedulable, 因此加入到activeQ中并从unscheduableQ中删除
//    2.2 如果没有任何改变 直接更新unscheduableQ
// 3. 在activeQ和unschedulableQ中都不存在, 添加到activeQ和p.nominatedPods.add(newPod, "")

func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    // If the pod is already in the active queue, just update it there.
    if _, exists, _ := p.activeQ.Get(newPod); exists {
        p.nominatedPods.update(oldPod, newPod)
        err := p.activeQ.Update(newPod)
        return err
    }
    // If the pod is in the unschedulable queue, updating it may make it schedulable.
    if usPod := p.unschedulableQ.get(newPod); usPod != nil {
        p.nominatedPods.update(oldPod, newPod)
        // 如果发生了改变, 所以就有可能变成schedulable, 因此加入到activeQ中并从unscheduableQ中删除
        if isPodUpdated(oldPod, newPod) {
            p.unschedulableQ.delete(usPod)
            err := p.activeQ.Add(newPod)
            if err == nil {
                p.cond.Broadcast()
            }
            return err
        }
        // 如果没有改变 直接在unschedulabeQ中更新
        p.unschedulableQ.addOrUpdate(newPod)
        return nil
    }
    // If pod is not in any of the two queue, we put it in the active queue.
    err := p.activeQ.Add(newPod)
    if err == nil {
        p.nominatedPods.add(newPod, "")
        p.cond.Broadcast()
    }
    return err
}
func isPodUpdated(oldPod, newPod *v1.Pod) bool {
    strip := func(pod *v1.Pod) *v1.Pod {
        p := pod.DeepCopy()
        p.ResourceVersion = ""
        p.Generation = 0
        p.Status = v1.PodStatus{}
        return p
    }
    return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
2.2.3 deletePodFromSchedulingQueue
// pkg/scheduler/factory/factory.go
func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
    var pod *v1.Pod
    switch t := obj.(type) {
    case *v1.Pod:
        pod = obj.(*v1.Pod)
    case cache.DeletedFinalStateUnknown:
        var ok bool
        pod, ok = t.Obj.(*v1.Pod)
        if !ok {
            runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
            return
        }
    default:
        runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
        return
    }
    if err := c.podQueue.Delete(pod); err != nil {
        runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
    }
    ...
}

// pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
     // 有就删 没有什么都不做
    p.nominatedPods.delete(pod)
    err := p.activeQ.Delete(pod)
    if err != nil { // The item was probably not found in the activeQ.
        p.unschedulableQ.delete(pod)
    }
    return nil
}

2.3 NodeInformer

args.NodeInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addNodeToCache,
            UpdateFunc: c.updateNodeInCache,
            DeleteFunc: c.deleteNodeFromCache,
        },
    )
func (c *configFactory) addNodeToCache(obj interface{}) {
       ...
    c.podQueue.MoveAllToActiveQueue()
    // NOTE: add a new node does not affect existing predicates in equivalence cache
}
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
    ...
    // Only activate unschedulable pods if the node became more schedulable.
    // We skip the node property comparison when there is no unschedulable pods in the queue
    // to save processing cycles. We still trigger a move to active queue to cover the case
    // that a pod being processed by the scheduler is determined unschedulable. We want this
    // pod to be reevaluated when a change in the cluster happens.
    if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
        c.podQueue.MoveAllToActiveQueue()
    }
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
    if nodeSpecUnschedulableChanged(newNode, oldNode) {
        return true
    }
    if nodeAllocatableChanged(newNode, oldNode) {
        return true
    }
    if nodeLabelsChanged(newNode, oldNode) {
        return true
    }
    if nodeTaintsChanged(newNode, oldNode) {
        return true
    }
    if nodeConditionsChanged(newNode, oldNode) {
        return true
    }
    return false
}

2.4 PvInformer

args.PvInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
            AddFunc:    c.onPvAdd,
            UpdateFunc: c.onPvUpdate,
            DeleteFunc: c.onPvDelete,
        },
    )
func (c *configFactory) onPvAdd(obj interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvUpdate(old, new interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

2.5 PvcInformer

args.PvcInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onPvcAdd,
            UpdateFunc: c.onPvcUpdate,
            DeleteFunc: c.onPvcDelete,
        },
    )
func (c *configFactory) onPvcAdd(obj interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onPvcUpdate(old, new interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

2.6 ServiceInformer

args.ServiceInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onServiceAdd,
            UpdateFunc: c.onServiceUpdate,
            DeleteFunc: c.onServiceDelete,
        },
    )
func (c *configFactory) onServiceAdd(obj interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onServiceDelete(obj interface{}) {
    ...
    c.podQueue.MoveAllToActiveQueue()
}

2.7 run方法

// run starts the goroutine to pump from unschedulableQ to activeQ
func (p *PriorityQueue) run() {
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
func (p *PriorityQueue) flushUnschedulableQLeftover() {
    p.lock.Lock()
    defer p.lock.Unlock()

    var podsToMove []*v1.Pod
    currentTime := p.clock.Now()
    for _, pod := range p.unschedulableQ.pods {
        lastScheduleTime := podTimestamp(pod)
        if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
            podsToMove = append(podsToMove, pod)
        }
    }

    if len(podsToMove) > 0 {
        p.movePodsToActiveQueue(podsToMove)
    }
}
const unschedulableQTimeInterval = 60 * time.Second

2.8 schedulingCycle 和 moveRequestCycle属性

func (p *PriorityQueue) MoveAllToActiveQueue() {
    ...
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}

// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
    ...
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    if p.unschedulableQ.get(pod) != nil {
        return fmt.Errorf("pod is already present in unschedulableQ")
    }
    if _, exists, _ := p.activeQ.Get(pod); exists {
        return fmt.Errorf("pod is already present in the activeQ")
    }
    if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
        p.unschedulableQ.addOrUpdate(pod)
        p.nominatedPods.add(pod, "")
        return nil
    }
    err := p.activeQ.Add(pod)
    if err == nil {
        p.nominatedPods.add(pod, "")
        p.cond.Broadcast()
    }
    return err
}
举报

相关推荐

0 条评论