0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler

1. 前言

2. 注册默认scheduler

{
    "kind" : "Policy",
    "apiVersion" : "v1",
    "predicates" : [
      {"name" : "PodFitsHostPorts"},
      {"name" : "PodFitsResources"},
      {"name" : "NoDiskConflict"},
      {"name" : "MatchNodeSelector"},
      {"name" : "HostName"}
    ],
    "priorities" : [
      {"name" : "LeastRequestedPriority", "weight" : 1},
      {"name" : "BalancedResourceAllocation", "weight" : 1},
      {"name" : "ServiceSpreadingPriority", "weight" : 1},
      {"name" : "EqualPriority", "weight" : 1}
    ],
}

3. pkg/scheduler/factory/plugins.go

type PluginFactoryArgs struct {
    PodLister                      algorithm.PodLister
    ServiceLister                  algorithm.ServiceLister
    ControllerLister               algorithm.ControllerLister
    ReplicaSetLister               algorithm.ReplicaSetLister
    StatefulSetLister              algorithm.StatefulSetLister
    NodeLister                     algorithm.NodeLister
    PDBLister                      algorithm.PDBLister
    NodeInfo                       predicates.NodeInfo
    PVInfo                         predicates.PersistentVolumeInfo
    PVCInfo                        predicates.PersistentVolumeClaimInfo
    StorageClassInfo               predicates.StorageClassInfo
    VolumeBinder                   *volumebinder.VolumeBinder
    HardPodAffinitySymmetricWeight int32
}
type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate
type PriorityFunctionFactory func(PluginFactoryArgs) algorithm.PriorityFunction
type PriorityFunctionFactory2 func(PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction)

3.1 基本结构

type PriorityConfigFactory struct {
    Function          PriorityFunctionFactory
    MapReduceFunction PriorityFunctionFactory2
    Weight            int
}

var (
    schedulerFactoryMutex sync.Mutex

    // maps that hold registered algorithm types
    fitPredicateMap        = make(map[string]FitPredicateFactory)
    mandatoryFitPredicates = sets.NewString()
    priorityFunctionMap    = make(map[string]PriorityConfigFactory)
    algorithmProviderMap   = make(map[string]AlgorithmProviderConfig)

    // Registered metadata producers
    priorityMetadataProducer  PriorityMetadataProducerFactory
    predicateMetadataProducer PredicateMetadataProducerFactory
)

const (
    // DefaultProvider defines the default algorithm provider name.
    DefaultProvider = "DefaultProvider"
)
type AlgorithmProviderConfig struct {
    FitPredicateKeys     sets.String
    PriorityFunctionKeys sets.String
}

3.2 注册预选方法

// pkg/scheduler/factory/plugins.go

func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
    return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}
// 通过正则表达式检查一下预选的名字是否合法
var validName = regexp.MustCompile("^[a-zA-Z0-9]([-a-zA-Z0-9]*[a-zA-Z0-9])$")

func validateAlgorithmNameOrDie(name string) {
    if !validName.MatchString(name) {
        klog.Fatalf("Algorithm name %v does not match the name validation regexp \"%v\".", name, validName)
    }
}
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()
    validateAlgorithmNameOrDie(name)
    fitPredicateMap[name] = predicateFactory
    return name
}
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()
    validateAlgorithmNameOrDie(name)
    fitPredicateMap[name] = predicateFactory
    return name
}
func RegisterMandatoryFitPredicate(name string, predicate algorithm.FitPredicate) string {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()
    validateAlgorithmNameOrDie(name)
    fitPredicateMap[name] = func(PluginFactoryArgs) algorithm.FitPredicate { return predicate }
    mandatoryFitPredicates.Insert(name)
    return name
}
// pkg/scheduler/algorithmprovider/defaults/defaults.go

func defaultPredicates() sets.String {
    return sets.NewString(
        factory.RegisterFitPredicateFactory(
            predicates.NoVolumeZoneConflictPred,
            func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
                return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
            },
        ),
        ...
        factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate),
        factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints),
        ...
    )
}

3.3 注册优选方法

// pkg/scheduler/factory/plugins.go
func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()
    validateAlgorithmNameOrDie(name)
    priorityFunctionMap[name] = pcf
    return name
}
func RegisterPriorityFunction2(
    name string,
    mapFunction algorithm.PriorityMapFunction,
    reduceFunction algorithm.PriorityReduceFunction,
    weight int) string {
    return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
        MapReduceFunction: func(PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
            return mapFunction, reduceFunction
        },
        Weight: weight,
    })
}
// pkg/scheduler/algorithmprovider/defaults/defaults.go

func defaultPriorities() sets.String {
    return sets.NewString(
        // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
        factory.RegisterPriorityConfigFactory(
            "SelectorSpreadPriority",
            factory.PriorityConfigFactory{
                MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
                    return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
                },
                Weight: 1,
            },
        ),
        ...
        factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
    )
}

3.4 注册调度器

// pkg/scheduler/factory/plugins.go

func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()
    validateAlgorithmNameOrDie(name)
    algorithmProviderMap[name] = AlgorithmProviderConfig{
        FitPredicateKeys:     predicateKeys,
        PriorityFunctionKeys: priorityKeys,
    }
    return name
}
// pkg/scheduler/algorithmprovider/defaults/defaults.go
func registerAlgorithmProvider(predSet, priSet sets.String) {
    factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
    ...
}
//  pkg/scheduler/factory/plugins.go

func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
    schedulerFactoryMutex.Lock()
    defer schedulerFactoryMutex.Unlock()

    provider, ok := algorithmProviderMap[name]
    if !ok {
        return nil, fmt.Errorf("plugin %q has not been registered", name)
    }

    return &provider, nil
}

3.5 注册默认调度器

// pkg/scheduler/algorithmprovider/defaults/defaults.go

func init() {
    ...
    registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
    ...
}

3.6 使用默认调度器

// pkg/scheduler/scheduler.go

// New returns a Scheduler
func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {
...
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
...
}
// pkg/scheduler/factory/factory.go

func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    provider, err := GetAlgorithmProvider(providerName)
    if err != nil {
        return nil, err
    }
    return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

4. 总结

举报

相关推荐

0 条评论