1. 前言
2. node_tree
2.1 nodeArray
type nodeArray struct {
// 所有的节点
nodes []string
// 遍历nodes时的下标位置
lastIndex int
}
//取nodeArray中节点的下一个
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
klog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
2.2 NodeTree
type NodeTree struct {
// key 是一个zone value是一个nodeArray(一个存有该zones下所有的array)
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
// 所有的zones
zones []string // a list of all the zones in the tree (keys)
// 遍历zones的时候的下标所在位置
zoneIndex int
// 节点的个数
NumNodes int
mu sync.RWMutex
}
2.2.1 AddNode
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
/**
1. 获得该节点所在的zone
2. 如果该zone不存在 添加到zones 和 tree中
3. 如果该zone存在
3.1 检查该node是不是已经在tree中nodearray中
3.2 如存在则直接返回 不存在则添加
*/
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
klog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
klog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
2.2.3 Next
// 从头开始 (因为已经整个Map遍历完了)
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
// Next() 返回下一个节点
// 遍历整个zones中的每个node
// 说白了就是把整个Map结构想像成一个List 然后遍历它
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}
2.2.4 总结
3. cache
type Cache interface {
// 将该pod设置为assumed 状态
AssumePod(pod *v1.Pod) error
// 设置该Pod bindingFinished=true
FinishBinding(pod *v1.Pod) error
// 从cache中删除该Pod(该pod必须为assumed状态)
ForgetPod(pod *v1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error
// 只能从Added 状态调用
UpdatePod(oldPod, newPod *v1.Pod) error
// 只能从Added 状态调用
RemovePod(pod *v1.Pod) error
// 从podState中获得一个pod
GetPod(pod *v1.Pod) (*v1.Pod, error)
// 判断该pod是否为assumed 状态
IsAssumedPod(pod *v1.Pod) (bool, error)
// 添加一个节点 该节点所有信息会保存起来
AddNode(node *v1.Node) error
// 更新节点
UpdateNode(oldNode, newNode *v1.Node) error
// 删除节点
RemoveNode(node *v1.Node) error
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error
// 从nodes中返回所有pod
List(labels.Selector) ([]*v1.Pod, error)
// 从nodes中返回所有符合条件的pod
FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// 备份assumed pods 和 节点
Snapshot() *Snapshot
// 返回nodetree
NodeTree() *NodeTree
}
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*schedulercache.NodeInfo
}
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
3.1 schedulerCache
3.1.1 结构
type schedulerCache struct {
stop <-chan struct{}
// ttl是assume pod 过期的时间
ttl time.Duration
// period是每隔period调用清理过期的assumed pod
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
// 已经assumed pod
assumedPods map[string]bool
// a map from pod key to podState.
// 存着一些pod的状态
podStates map[string]*podState
// 每个节点的信息
nodes map[string]*schedulercache.NodeInfo
nodeTree *NodeTree
// A map from image name to its imageState.
// 每个image的信息
imageStates map[string]*imageState
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
// assumedPod过期时间
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
// bindingFinished为true的时候 过期才会起作用
bindingFinished bool
}
type imageState struct {
// Size of the image
// iamge 大小
size int64
// A set of node names for nodes having this image present
// 拥有该image的所有节点
nodes sets.String
}
3.1.2 AssumePod
// 1. 获得key
// 2. 根据podStates来检查该pod是否已经存在 如果存在则返回错误, 因为一个pod不能assume两次
// 3. 调用addPod添加该pod
// 4. 存到podState中 此时(deadline和bindingFinished没有被赋值)
// 5. 存到assumedPods中 表明该pod处于assume状态
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
// Assumes that lock is already acquired.
// 1. 从nodes中得到NodeInfo
// 2. 然后将该Pod加入到NodeInfo中
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.AddPod(pod)
}
3.1.3 New 和 run方法
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// 从assumed状态的pods中遍历
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
// 如果没有完成binding 跳过
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
// 如果过期时间已经到了 则调用expirePod方法
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
// 1. 调用removePod删除该节点 (从nodes中删除)
// 2. 从assumedPods中删除
// 3. 从podStates中删除
// 整个已经从schedulerCache中完全删除
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
3.1.4 AddPod
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// assumed pod -> 过来
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
// 更换nodes的信息
cache.removePod(currState.pod)
cache.addPod(pod)
}
// 删除assumed 状态 变为added状态
delete(cache.assumedPods, key)
// deadline为nil bindingFinished=false
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// 可以从expired状态/也可以是initial状态 -> 过来
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
3.1.5 AddNode
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
}
// 添加到nodetree中
cache.nodeTree.AddNode(node)
// 设置imagestates 和 nodeinfo
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) {
newSum := make(map[string]*schedulercache.ImageStateSummary)
// 遍历该节点下所有的image
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
// 把该节点添加到此image的imageStates中
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
// 把该node下的ImageStateSummary放到该node下
nodeInfo.SetImageStates(newSum)
}
3.1.6 List 和 FilteredList
3.2 总结