0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][kube-scheduler]scheduler之调度之优选(priority)与抢占(preempt)

5.1 优选阶段

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    // If no priority configs are provided, then the EqualPriority function is applied
    // This is required to generate the priority list in the required format
    // 如果没有优选方法并且也没有扩展方法 则所有节点都是得分为1分
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }

    var (
        mu   = sync.Mutex{}
        wg   = sync.WaitGroup{}
        errs []error
    )
    appendError := func(err error) {
        mu.Lock()
        defer mu.Unlock()
        errs = append(errs, err)
    }

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    // DEPRECATED: we can remove this when all priorityConfigs implement the
    // Map-Reduce pattern.
    // 旧版本的算分
    for i := range priorityConfigs {
        if priorityConfigs[i].Function != nil {
            wg.Add(1)
            go func(index int) {
                defer wg.Done()
                var err error
                results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
                if err != nil {
                    appendError(err)
                }
            }(i)
        } else {
            results[i] = make(schedulerapi.HostPriorityList, len(nodes))
        }
    }

    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }

            var err error
            // 进行map方法算分
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                results[i][index].Host = nodes[index].Name
            }
        }
    })

    for i := range priorityConfigs {
        if priorityConfigs[i].Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            // 进行map方法算分
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
            if klog.V(10) {
                for _, hostPriority := range results[index] {
                    klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                }
            }
        }(i)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    // 如果有扩展器, 就将控制器中的方法计算一次, 把分数加起来
    if len(extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int, len(nodeNameToInfo))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                defer wg.Done()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klog.V(10) {
                        klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            result[i].Score += combinedScores[result[i].Host]
        }
    }

    if klog.V(10) {
        for i := range result {
            klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
        }
    }
    return result, nil
}

5.2 选择阶段

func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
    maxScoreIndexes := make([]int, 0, len(priorityList)/2)
    maxScore := priorityList[0].Score
    for i, hp := range priorityList {
        if hp.Score > maxScore {
            maxScore = hp.Score
            maxScoreIndexes = maxScoreIndexes[:0]
            maxScoreIndexes = append(maxScoreIndexes, i)
        } else if hp.Score == maxScore {
            maxScoreIndexes = append(maxScoreIndexes, i)
        }
    }
    return maxScoreIndexes
}


func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
    if len(priorityList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }

    maxScores := findMaxScores(priorityList)
    ix := int(g.lastNodeIndex % uint64(len(maxScores)))
    g.lastNodeIndex++

    return priorityList[maxScores[ix]].Host, nil
}

5.3 抢占

func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
    // Scheduler may return various types of errors. Consider preemption only if
    // the error is of type FitError.
    fitError, ok := scheduleErr.(*FitError)
    if !ok || fitError == nil {
        return nil, nil, nil, nil
    }
    // 判断该pod能不能够进行抢占
    if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
        klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
        return nil, nil, nil, nil
    }
    allNodes, err := nodeLister.List()
    if err != nil {
        return nil, nil, nil, err
    }
    if len(allNodes) == 0 {
        return nil, nil, nil, ErrNoNodesAvailable
    }
    // 选一些潜在的节点
    potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
    if len(potentialNodes) == 0 {
        klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
        // In this case, we should clean-up any existing nominated node name of the pod.
        return nil, nil, []*v1.Pod{pod}, nil
    }
    pdbs, err := g.pdbLister.List(labels.Everything())
    if err != nil {
        return nil, nil, nil, err
    }
    // 发生抢占
    // nodeToVictims返回值为 所有可以发生抢占行为而能够让该pod运行的节点 以及每个节点对应的需要删除的pods
    nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
        g.predicateMetaProducer, g.schedulingQueue, pdbs)
    if err != nil {
        return nil, nil, nil, err
    }

    // We will only check nodeToVictims with extenders that support preemption.
    // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
    // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.

    // 如果有扩展项 则调用扩展项进行计算
    nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
    if err != nil {
        return nil, nil, nil, err
    }

    // 从这些nodeToVictims中选出一个最终发生抢占的节点candidateNode
    candidateNode := pickOneNodeForPreemption(nodeToVictims)
    if candidateNode == nil {
        return nil, nil, nil, err
    }

    // Lower priority pods nominated to run on this node, may no longer fit on
    // this node. So, we should remove their nomination. Removing their
    // nomination updates these pods and moves them to the active queue. It
    // lets scheduler find another place for them.

    // 那些优先级比当前pod优先级低的nomiated pods有可能应该不适合再运行在candidateNode节点上了
    // 所以需要把这些nominatedPods找到
    nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
    if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
        return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
    }

    return nil, nil, nil, fmt.Errorf(
        "preemption failed: the target node %s has been deleted from scheduler cache",
        candidateNode.Name)
}
5.3.1 podEligibleToPreemptOthers
// pkg/scheduler/core/generic_scheduler.go

func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
    nomNodeName := pod.Status.NominatedNodeName
    if len(nomNodeName) > 0 {
        if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
            for _, p := range nodeInfo.Pods() {
                if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
                    // There is a terminating pod on the nominated node.
                    return false
                }
            }
        }
    }
    return true
}
5.3.2 nodesWherePreemptionMightHelp
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
    potentialNodes := []*v1.Node{}
    for _, node := range nodes {
        unresolvableReasonExist := false
        failedPredicates, _ := failedPredicatesMap[node.Name]
        for _, failedPredicate := range failedPredicates {
            switch failedPredicate {
            case
                predicates.ErrNodeSelectorNotMatch,
                predicates.ErrPodAffinityRulesNotMatch,
                predicates.ErrPodNotMatchHostName,
                predicates.ErrTaintsTolerationsNotMatch,
                predicates.ErrNodeLabelPresenceViolated,
                predicates.ErrNodeNotReady,
                predicates.ErrNodeNetworkUnavailable,
                predicates.ErrNodeUnderDiskPressure,
                predicates.ErrNodeUnderPIDPressure,
                predicates.ErrNodeUnderMemoryPressure,
                predicates.ErrNodeOutOfDisk,
                predicates.ErrNodeUnschedulable,
                predicates.ErrNodeUnknownCondition,
                predicates.ErrVolumeZoneConflict,
                predicates.ErrVolumeNodeConflict,
                predicates.ErrVolumeBindConflict:
                unresolvableReasonExist = true
                break
            }
        }
        if !unresolvableReasonExist {
            klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
            potentialNodes = append(potentialNodes, node)
        }
    }
    return potentialNodes
}
5.3.3 selectNodesForPreemption
func selectNodesForPreemption(pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    potentialNodes []*v1.Node,
    predicates map[string]algorithm.FitPredicate,
    metadataProducer algorithm.PredicateMetadataProducer,
    queue internalqueue.SchedulingQueue,
    pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
    nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
    var resultLock sync.Mutex

    // We can use the same metadata producer for all nodes.
    meta := metadataProducer(pod, nodeNameToInfo)
    checkNode := func(i int) {
        nodeName := potentialNodes[i].Name
        var metaCopy algorithm.PredicateMetadata
        if meta != nil {
            metaCopy = meta.ShallowCopy()
        }
        pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
        if fits {
            resultLock.Lock()
            victims := schedulerapi.Victims{
                Pods:             pods,
                NumPDBViolations: numPDBViolations,
            }
            nodeToVictims[potentialNodes[i]] = &victims
            resultLock.Unlock()
        }
    }
    workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
    // nodeToVictims 记录着这些可以运行该pod的节点以及其需要删除的pods
    return nodeToVictims, nil
}
5.3.3.1 selectVictimsOnNode
func selectVictimsOnNode(
    pod *v1.Pod,
    meta algorithm.PredicateMetadata,
    nodeInfo *schedulercache.NodeInfo,
    fitPredicates map[string]algorithm.FitPredicate,
    queue internalqueue.SchedulingQueue,
    pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
    if nodeInfo == nil {
        return nil, 0, false
    }
    potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
    nodeInfoCopy := nodeInfo.Clone()

    removePod := func(rp *v1.Pod) {
        nodeInfoCopy.RemovePod(rp)
        if meta != nil {
            meta.RemovePod(rp)
        }
    }
    addPod := func(ap *v1.Pod) {
        nodeInfoCopy.AddPod(ap)
        if meta != nil {
            meta.AddPod(ap, nodeInfoCopy)
        }
    }
    // As the first step, remove all the lower priority pods from the node and
    // check if the given pod can be scheduled.

    // 准备潜在要删除的pods
    // 潜在要删除的pods: 当然是那些优先级比当前pod优先级低的pods
    podPriority := util.GetPodPriority(pod)
    for _, p := range nodeInfoCopy.Pods() {
        if util.GetPodPriority(p) < podPriority {
            potentialVictims.Items = append(potentialVictims.Items, p)
            removePod(p)
        }
    }
    // 将这些潜在要删除的pods按照优先级要高从低排序
    potentialVictims.Sort()
    // If the new pod does not fit after removing all the lower priority pods,
    // we are almost done and this node is not suitable for preemption. The only condition
    // that we should check is if the "pod" is failing to schedule due to pod affinity
    // failure.
    // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.

    // 将这些潜在的pods全部删除如果还不能让该pod运行到该节点上的话 就可以直接返回了
    if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
        if err != nil {
            klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
        }
        return nil, 0, false
    }
    var victims []*v1.Pod
    numViolatingVictim := 0
    // Try to reprieve as many pods as possible. We first try to reprieve the PDB
    // violating victims and then other non-violating ones. In both cases, we start
    // from the highest priority victims.

    // 按照pdbs把潜在要删除的pods分成两类 violatingVictims和nonViolatingVictims
    violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
    reprievePod := func(p *v1.Pod) bool {
        // 尝试把该p(pod)加回到节点中 其实说不删除p
        addPod(p)
        fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
        if !fits {
            // p需要删除
            removePod(p)
            victims = append(victims, p)
            klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
        }
        return fits
    }
    // 无论violatingVictims还是nonViolatingVictims 但是按优先级从高到低依次尝试reprievePod
    // reprievePod的意思就是说 把p加回去(就是不删除) 如果不满足运行的话 那p就会真正加到victims中 也就是要删除
    // 如果满足运行的话 那p就不用删除了
    
    for _, p := range violatingVictims {
        if !reprievePod(p) {
            numViolatingVictim++
        }
    }
    // Now we try to reprieve non-violating victims.
    for _, p := range nonViolatingVictims {
        reprievePod(p)
    }
    return victims, numViolatingVictim, true
}
5.3.4 processPreemptionWithExtenders
5.3.5 pickOneNodeForPreemption
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
    if len(nodesToVictims) == 0 {
        return nil
    }
    minNumPDBViolatingPods := math.MaxInt32
    var minNodes1 []*v1.Node
    lenNodes1 := 0
    for node, victims := range nodesToVictims {
        if len(victims.Pods) == 0 {
            // We found a node that doesn't need any preemption. Return it!
            // This should happen rarely when one or more pods are terminated between
            // the time that scheduler tries to schedule the pod and the time that
            // preemption logic tries to find nodes for preemption.
            // 如果不需要删除任何pod 就直接返回
            return node
        }
        numPDBViolatingPods := victims.NumPDBViolations
        if numPDBViolatingPods < minNumPDBViolatingPods {
            minNumPDBViolatingPods = numPDBViolatingPods
            minNodes1 = nil
            lenNodes1 = 0
        }
        if numPDBViolatingPods == minNumPDBViolatingPods {
            minNodes1 = append(minNodes1, node)
            lenNodes1++
        }
    }
    if lenNodes1 == 1 {
        return minNodes1[0]
    }

    // There are more than one node with minimum number PDB violating pods. Find
    // the one with minimum highest priority victim.
    minHighestPriority := int32(math.MaxInt32)
    var minNodes2 = make([]*v1.Node, lenNodes1)
    lenNodes2 := 0
    for i := 0; i < lenNodes1; i++ {
        node := minNodes1[i]
        victims := nodesToVictims[node]
        // highestPodPriority is the highest priority among the victims on this node.
        highestPodPriority := util.GetPodPriority(victims.Pods[0])
        if highestPodPriority < minHighestPriority {
            minHighestPriority = highestPodPriority
            lenNodes2 = 0
        }
        if highestPodPriority == minHighestPriority {
            minNodes2[lenNodes2] = node
            lenNodes2++
        }
    }
    if lenNodes2 == 1 {
        return minNodes2[0]
    }

    // There are a few nodes with minimum highest priority victim. Find the
    // smallest sum of priorities.
    minSumPriorities := int64(math.MaxInt64)
    lenNodes1 = 0
    for i := 0; i < lenNodes2; i++ {
        var sumPriorities int64
        node := minNodes2[i]
        for _, pod := range nodesToVictims[node].Pods {
            // We add MaxInt32+1 to all priorities to make all of them >= 0. This is
            // needed so that a node with a few pods with negative priority is not
            // picked over a node with a smaller number of pods with the same negative
            // priority (and similar scenarios).
            sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
        }
        if sumPriorities < minSumPriorities {
            minSumPriorities = sumPriorities
            lenNodes1 = 0
        }
        if sumPriorities == minSumPriorities {
            minNodes1[lenNodes1] = node
            lenNodes1++
        }
    }
    if lenNodes1 == 1 {
        return minNodes1[0]
    }

    // There are a few nodes with minimum highest priority victim and sum of priorities.
    // Find one with the minimum number of pods.
    minNumPods := math.MaxInt32
    lenNodes2 = 0
    for i := 0; i < lenNodes1; i++ {
        node := minNodes1[i]
        numPods := len(nodesToVictims[node].Pods)
        if numPods < minNumPods {
            minNumPods = numPods
            lenNodes2 = 0
        }
        if numPods == minNumPods {
            minNodes2[lenNodes2] = node
            lenNodes2++
        }
    }
    // At this point, even if there are more than one node with the same score,
    // return the first one.
    if lenNodes2 > 0 {
        return minNodes2[0]
    }
    klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
    return nil
}
5.3.6 getLowerPriorityNominatedPods
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
    pods := g.schedulingQueue.NominatedPodsForNode(nodeName)

    if len(pods) == 0 {
        return nil
    }

    var lowerPriorityPods []*v1.Pod
    podPriority := util.GetPodPriority(pod)
    for _, p := range pods {
        if util.GetPodPriority(p) < podPriority {
            lowerPriorityPods = append(lowerPriorityPods, p)
        }
    }
    return lowerPriorityPods
}
举报

相关推荐

0 条评论