0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][controller-manager] ReplicaSetController(ReplicaSet)分析(2)

4.3 消费者

syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()
    // 解析key 得到replcaset的namespace和name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    // 本地缓存
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
        // 如果该replicaset不在了 直接删除
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return err
    }
    // 判断是否需要去计算replicas数量与当前实际存在的数量是否相等
    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    // 转化成selector类型
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
        return nil
    }
    // 取出该namespace下所有的pods
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }
    // Ignore inactive pods.
    // 过滤出那些没有active的pods
    var filteredPods []*v1.Pod
    for _, pod := range allPods {
        if controller.IsPodActive(pod) {
            filteredPods = append(filteredPods, pod)
        }
    }
    // 过滤出所有属于该replicaset的pods存到了filteredPods中
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    if err != nil {
        return err
    }
    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        // 计算从而达到replica的要求
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    // Always updates status as pods come up or die.
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        // Multiple things could lead to this update failing. Requeuing the replica set ensures
        // Returning an error causes a requeue without forcing a hotloop
        return err
    }
    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}
claimPods
func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
    canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
        fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
        if err != nil {
            return nil, err
        }
        // 再次确认一下replicaset是否有变化
        if fresh.UID != rs.UID {
            return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
        }
        return fresh, nil
    })
    cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
    return cm.ClaimPods(filteredPods)
}
manageReplicas
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    // 查看当前已经拥有的pod与replicas之间的差值  看看是生成多了还是少了
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }
    if diff < 0 {
        // 表明生成的pod还不够 
        diff *= -1
        if diff > rsc.burstReplicas {
            // 一次最多生成这么多
            diff = rsc.burstReplicas
        }
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        // 批量生成
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            boolPtr := func(b bool) *bool { return &b }
            controllerRef := &metav1.OwnerReference{
                APIVersion:         rsc.GroupVersion().String(),
                Kind:               rsc.Kind,
                Name:               rs.Name,
                UID:                rs.UID,
                BlockOwnerDeletion: boolPtr(true),
                Controller:         boolPtr(true),
            }
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
            if err != nil && errors.IsTimeout(err) {
                return nil
            }
            return err
        })
        // 还有skippedPods个生成失败的
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        podsToDelete := getPodsToDelete(filteredPods, diff)
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()
        select {
        case err := <-errCh:
            if err != nil {
                return err
            }
        default:
        }
    }
    return nil
}
calculateStatus

4.4 关于expectations的理解

以期望增加为例
func (e *ControlleeExpectations) Fulfilled() bool {
    return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
    if exp, exists, err := r.GetExpectations(controllerKey); exists {
        // 是否满足
        if exp.Fulfilled() {
            klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
            return true
        } else if exp.isExpired() {
            // 已经过期了 等待了太长时间
            klog.V(4).Infof("Controller expectations expired %#v", exp)
            return true
        } else {
            // 此时并不是所有的pod都已经进入到了podInformer的AddPod方法中
            klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
            return false
        }
    } else if err != nil {
        klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
    } else {
        klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
    }
    // 不存在当然去同步 或者出现错误
    return true
}

5. 例子的问题

6. 留一个问题

[root@master kubectl]# ./kubectl apply -f replicaset.yaml 
replicaset.apps/replicatest created
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
replicatest-9q9sm   1/1     Running   0          4s
replicatest-rdc8c   1/1     Running   0          4s
[root@master kubectl]# ./kubectl get rs
NAME          DESIRED   CURRENT   READY   AGE
replicatest   2         2         2       7s
[root@master kubectl]# ./kubectl delete rs replicatest
replicaset.extensions "replicatest" deleted
[root@master kubectl]# ./kubectl get rs
No resources found.
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
replicatest-9q9sm   1/1     Running   0          38s
replicatest-rdc8c   1/1     Running   0          38s
[root@master kubectl]# 
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
replicatest-9q9sm   1/1     Running   0          2m48s
replicatest-rdc8c   1/1     Running   0          2m48s
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS        RESTARTS   AGE
replicatest-9q9sm   0/1     Terminating   0          2m49s
replicatest-rdc8c   0/1     Terminating   0          2m49s
[root@master kubectl]# ./kubectl get pods
No resources found.
[root@master kubectl]# 

7. 总结

举报

相关推荐

0 条评论