1. 前言
2. 不带扩展方法

2.1 例子
2.2.1 准备配置文件
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: policy.yaml
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
}
2.1.2 运行以及测试
[root@master kubectl]# cat pod-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
name: test-schduler
spec:
schedulerName: my-scheduler
containers:
- name: podtest-scheduler
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# cat pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: podtest
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml
[root@master kubectl]# ./kubectl apply -f pod.yaml
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 0/1 Pending 0 83s
test-schduler 1/1 Running 0 13m
[root@master kubectl]# ./kubectl describe pod test-scheduler
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 12m my-scheduler Successfully assigned default/test-schduler to 172.21.0.12
Normal Pulling 12m kubelet, 172.21.0.12 pulling image "nginx"
Normal Pulled 11m kubelet, 172.21.0.12 Successfully pulled image "nginx"
Normal Created 11m kubelet, 172.21.0.12 Created container
Normal Started 11m kubelet, 172.21.0.12 Started container
Warning MissingClusterDNS 62s (x12 over 12m) kubelet, 172.21.0.12 pod: "test-schduler_default(213933b8-efda-11e9-9434-525400d54f7e)". kubelet does not have ClusterDNS IP configured and cannot create Pod using "ClusterFirst" policy. Falling back to "Default" policy.
2.2 源码分析
2.2.1 解析文件
NewSchedulerCommand -> runCommand -> opts.Config() -> o.ApplyTo(c)
// cmd/kube-scheduler/app/options/options.go
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
if len(o.ConfigFile) == 0 {
...
} else {
// 如果kube-scheduler 指定了--config 那就会从配置文件中取
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
...
}
// cmd/kube-scheduler/app/options/configfile.go
func loadConfigFromFile(file string) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
return loadConfig(data)
}
func loadConfig(data []byte) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
configObj := &kubeschedulerconfig.KubeSchedulerConfiguration{}
if err := runtime.DecodeInto(kubeschedulerscheme.Codecs.UniversalDecoder(), data, configObj); err != nil {
return nil, err
}
return configObj, nil
}
// pkg/scheduler/apis/config/types.go
type KubeSchedulerConfiguration struct {
metav1.TypeMeta
// SchedulerName is name of the scheduler, used to select which pods
// will be processed by this scheduler, based on pod's "spec.SchedulerName".
SchedulerName string
// AlgorithmSource specifies the scheduler algorithm source.
AlgorithmSource SchedulerAlgorithmSource
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
HardPodAffinitySymmetricWeight int32
// 高可用的时候会分析
LeaderElection KubeSchedulerLeaderElectionConfiguration
ClientConnection apimachineryconfig.ClientConnectionConfiguration
// defaulting to 0.0.0.0:10251
HealthzBindAddress string
// serve on, defaulting to 0.0.0.0:10251.
MetricsBindAddress string
apiserverconfig.DebuggingConfiguration
// 是否禁止抢占
DisablePreemption bool
PercentageOfNodesToScore int32
FailureDomains string
BindTimeoutSeconds *int64
}
type SchedulerAlgorithmSource struct {
// Policy 策略
Policy *SchedulerPolicySource
// Provider is the name of a scheduling algorithm provider to use.
Provider *string
}
type SchedulerPolicySource struct {
// 从文件中读
File *SchedulerPolicyFileSource
// 从configmap中读
ConfigMap *SchedulerPolicyConfigMapSource
}
// 高可用
type KubeSchedulerLeaderElectionConfiguration struct {
apiserverconfig.LeaderElectionConfiguration
// LockObjectNamespace defines the namespace of the lock object
LockObjectNamespace string
// LockObjectName defines the lock object name
LockObjectName string
}
type LeaderElectionConfiguration struct {
LeaderElect bool
LeaseDuration metav1.Duration
RenewDeadline metav1.Duration
RetryPeriod metav1.Duration
ResourceLock string
}
// k8s.io/apimachinery/pkg/apis/meta/v1/types.go
type TypeMeta struct {
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
}
2.2.2 解析algorithmSource
runCommand -> Run(cc, stopCh) -> scheduler.New```
// New returns a Scheduler
func New(client clientset.Interface,
...
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
...
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 默认调度器会进入到这里 *source.Provider = DefaultProvider
...
case source.Policy != nil:
// 自定义调度器会进入到这里
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
...
}
type Policy struct {
metav1.TypeMeta
Predicates []PredicatePolicy
Priorities []PriorityPolicy
ExtenderConfigs []ExtenderConfig
HardPodAffinitySymmetricWeight int32
AlwaysCheckAllPredicates bool
}
type PredicatePolicy struct {
Name string
Argument *PredicateArgument
}
type PriorityPolicy struct {
Name string
Weight int
Argument *PriorityArgument
}
type PredicateArgument struct {
ServiceAffinity *ServiceAffinity
LabelsPresence *LabelsPresence
}
type PriorityArgument struct {
ServiceAntiAffinity *ServiceAntiAffinity
LabelPreference *LabelPreference
RequestedToCapacityRatioArguments *RequestedToCapacityRatioArguments
}
type ExtenderConfig struct {
URLPrefix string
FilterVerb string
PreemptVerb string
PrioritizeVerb string
Weight int
BindVerb string
EnableHTTPS bool
TLSConfig *restclient.TLSClientConfig
HTTPTimeout time.Duration
NodeCacheCapable bool
ManagedResources []ExtenderManagedResource
Ignorable bool
}
2.2.3 根据policy生成factory.config
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
// 生成预选方法的key
// 如果没有 就那默认的那些预选方法的key
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
predicateKeys = provider.FitPredicateKeys
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
}
}
// 生成优选方法的key
// 如果没有 就那默认的那些优选方法的key
priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
priorityKeys = provider.PriorityFunctionKeys
} else {
for _, priority := range policy.Priorities {
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
}
}
// 生成扩展
var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
// 根据预选, 优选, 扩展方法进行生成config
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}
// 根据当前的预选key得到所有的预选方法
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
// 根据当前的优选key得到所有的优选方法
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// priorityMetaProducer 在算分的时候会用到
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
// predicateMetaProducer 在真正预选的时候会用到
predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
// 是否打开了加速predicate的equivalence class cache
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
// 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
...
}
3. 总结