4.3 消费者
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
// 解析key 得到replcaset的namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 本地缓存
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
// 如果该replicaset不在了 直接删除
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
// 判断是否需要去计算replicas数量与当前实际存在的数量是否相等
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 转化成selector类型
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
return nil
}
// 取出该namespace下所有的pods
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
// 过滤出那些没有active的pods
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
// 过滤出所有属于该replicaset的pods存到了filteredPods中
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 计算从而达到replica的要求
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
claimPods
func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
// 再次确认一下replicaset是否有变化
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
return cm.ClaimPods(filteredPods)
}
manageReplicas
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// 查看当前已经拥有的pod与replicas之间的差值 看看是生成多了还是少了
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
// 表明生成的pod还不够
diff *= -1
if diff > rsc.burstReplicas {
// 一次最多生成这么多
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// 批量生成
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: rsc.GroupVersion().String(),
Kind: rsc.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
return nil
}
return err
})
// 还有skippedPods个生成失败的
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
podsToDelete := getPodsToDelete(filteredPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
if err != nil {
return err
}
default:
}
}
return nil
}
calculateStatus
4.4 关于expectations的理解
以期望增加为例
func (e *ControlleeExpectations) Fulfilled() bool {
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
// 是否满足
if exp.Fulfilled() {
klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
return true
} else if exp.isExpired() {
// 已经过期了 等待了太长时间
klog.V(4).Infof("Controller expectations expired %#v", exp)
return true
} else {
// 此时并不是所有的pod都已经进入到了podInformer的AddPod方法中
klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// 不存在当然去同步 或者出现错误
return true
}
5. 例子的问题
6. 留一个问题
[root@master kubectl]# ./kubectl apply -f replicaset.yaml
replicaset.apps/replicatest created
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 4s
replicatest-rdc8c 1/1 Running 0 4s
[root@master kubectl]# ./kubectl get rs
NAME DESIRED CURRENT READY AGE
replicatest 2 2 2 7s
[root@master kubectl]# ./kubectl delete rs replicatest
replicaset.extensions "replicatest" deleted
[root@master kubectl]# ./kubectl get rs
No resources found.
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 38s
replicatest-rdc8c 1/1 Running 0 38s
[root@master kubectl]#
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 2m48s
replicatest-rdc8c 1/1 Running 0 2m48s
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 0/1 Terminating 0 2m49s
replicatest-rdc8c 0/1 Terminating 0 2m49s
[root@master kubectl]# ./kubectl get pods
No resources found.
[root@master kubectl]#
7. 总结