1. 前言
2. equivalence
3. NodeCache
// key为nodeName
// value为NodeCache
type nodeMap map[string]*NodeCache
// predicateMap stores resultMaps with predicate ID as the key.
// predicateID为key
type predicateMap []resultMap
// resultMap stores PredicateResult with pod equivalence hash as the key.
// pod equivalence 的hash值
type resultMap map[uint64]predicateResult
// predicateResult stores the output of a FitPredicate.
// 代表该pod在该predicate方法中是否通过 以及原因
type predicateResult struct {
Fit bool
FailReasons []algorithm.PredicateFailureReason
}
type NodeCache struct {
mu sync.RWMutex
// 每个predicate方法关于pod的情况
cache predicateMap
// generation is current generation of node cache, incremented on node
// invalidation.
// 代表当前的一个flag 如果节点无效会改变generation的值 也就是加1
generation uint64
// snapshotGeneration saves snapshot of generation of node cache.
// 与generation类似 只不过是代表快照的
snapshotGeneration uint64
// predicateGenerations stores generation numbers for predicates, incremented on
// predicate invalidation. Created on first update. Use 0 if does not
// exist.
// 代表每个一个predicate方法的generation
predicateGenerations []uint64
// snapshotPredicateGenerations saves snapshot of generation numbers for predicates.
snapshotPredicateGenerations []uint64
}
3.1 equivalencePod
type equivalencePod struct {
Namespace *string
Labels map[string]string
Affinity *v1.Affinity
Containers []v1.Container // See note about ordering
InitContainers []v1.Container // See note about ordering
NodeName *string
NodeSelector map[string]string
Tolerations []v1.Toleration
Volumes []v1.Volume // See note about ordering
}
func getEquivalencePod(pod *v1.Pod) *equivalencePod {
ep := &equivalencePod{
Namespace: &pod.Namespace,
Labels: pod.Labels,
Affinity: pod.Spec.Affinity,
Containers: pod.Spec.Containers,
InitContainers: pod.Spec.InitContainers,
NodeName: &pod.Spec.NodeName,
NodeSelector: pod.Spec.NodeSelector,
Tolerations: pod.Spec.Tolerations,
Volumes: pod.Spec.Volumes,
}
// DeepHashObject considers nil and empty slices to be different. Normalize them.
if len(ep.Containers) == 0 {
ep.Containers = nil
}
if len(ep.InitContainers) == 0 {
ep.InitContainers = nil
}
if len(ep.Tolerations) == 0 {
ep.Tolerations = nil
}
if len(ep.Volumes) == 0 {
ep.Volumes = nil
}
// Normalize empty maps also.
if len(ep.Labels) == 0 {
ep.Labels = nil
}
if len(ep.NodeSelector) == 0 {
ep.NodeSelector = nil
}
// TODO(misterikkit): Also normalize nested maps and slices.
return ep
}
type Class struct {
// Equivalence hash
hash uint64
}
func NewClass(pod *v1.Pod) *Class {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &Class{
hash: uint64(hash.Sum32()),
}
}
return nil
}
3.2 方法
lookupResult
func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string,
predicateID int,
equivalenceHash uint64,
) (value predicateResult, ok bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, ok = n.cache[predicateID][equivalenceHash]
if ok {
metrics.EquivalenceCacheHits.Inc()
} else {
metrics.EquivalenceCacheMisses.Inc()
}
return value, ok
}
updateResult
func (n *NodeCache) updateResult(
podName, predicateKey string,
predicateID int,
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
nodeInfo *schedulercache.NodeInfo,
) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
return
}
predicateItem := predicateResult{
Fit: fit,
FailReasons: reasons,
}
n.mu.Lock()
defer n.mu.Unlock()
// 表明自上次snapshot之后有接受到invalidation请求 所以直接跳过
if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) {
// Generation of node or predicate has been updated since we last took
// a snapshot, this indicates that we received a invalidation request
// during this time. Cache may be stale, skip update.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
// 更新
// If cached predicate map already exists, just update the predicate by key
if predicates := n.cache[predicateID]; predicates != nil {
// maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem
} else {
n.cache[predicateID] =
resultMap{
equivalenceHash: predicateItem,
}
}
n.predicateGenerations[predicateID]++
klog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
}
RunPredicate
func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
predicateID int,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
equivClass *Class,
) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
}
// 如果存在 就直接返回
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash)
if ok {
return result.Fit, result.FailReasons, nil
}
// 如果不存在 就运行一次 然后就更新其结果
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo)
return fit, reasons, nil
}
invalidate
// invalidatePreds deletes cached predicates by given IDs.
// invalidate 那些predicateIDs
func (n *NodeCache) invalidatePreds(predicateIDs []int) {
n.mu.Lock()
defer n.mu.Unlock()
for _, predicateID := range predicateIDs {
n.cache[predicateID] = nil
n.predicateGenerations[predicateID]++
}
}
// invalidate invalidates node cache.
// 将整个节点的cache信息清空 并改变generation
func (n *NodeCache) invalidate() {
n.mu.Lock()
defer n.mu.Unlock()
n.cache = make(predicateMap, len(n.cache))
n.generation++
}
3. cache
type nodeMap map[string]*NodeCache
type Cache struct {
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
// the reality is lock contention in first level cache is rare.
mu sync.RWMutex
nodeToCache nodeMap
predicateIDMap map[string]int
}
3.1 方法
Snapshot
func (c *Cache) Snapshot() {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodeToCache {
n.mu.Lock()
// snapshot predicate generations
copy(n.snapshotPredicateGenerations, n.predicateGenerations)
// snapshot node generation
n.snapshotGeneration = n.generation
n.mu.Unlock()
}
return
}
invalidate
// 根据predicate名字找到对应的predicate ID
func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int {
predicateIDs := make([]int, 0, len(predicateKeys))
for predicateKey := range predicateKeys {
if id, ok := c.predicateIDMap[predicateKey]; ok {
predicateIDs = append(predicateIDs, id)
} else {
klog.Errorf("predicate key %q not found", predicateKey)
}
}
return predicateIDs
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
predicateIDs := c.predicateKeysToIDs(predicateKeys)
// 每个节点下面的predicateIDs全部invalidate 比如:PodFitsHostPorts等等
for _, n := range c.nodeToCache {
n.invalidatePreds(predicateIDs)
}
klog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
4. 总结