0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][client-go] informer之reflector

1. 前言

2. 类

type Reflector struct {
    // 名字
    name string
    metrics *reflectorMetrics
    // 该reflector接收的类型
    expectedType reflect.Type
    // 要存的地方 会是DeltaFIFO
    store Store
    // 与api-server打交道的listh和watcher
    listerWatcher ListerWatcher
    period       time.Duration
    resyncPeriod time.Duration
    ShouldResync func() bool
    clock clock.Clock
    // 最后一次sync的resourceversion
    lastSyncResourceVersion string
    // 用于resourceversion的锁
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    WatchListPageSize int64
}
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        expectedType:  reflect.TypeOf(expectedType),
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    return r
}
var internalPackages = []string{"client-go/tools/cache/"}

3. 方法

Run

func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

ListAndWatch

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    var resourceVersion string
    // ResourceVersion从0开始 可以获得该对象在api-server全部操作的情况
    options := metav1.ListOptions{ResourceVersion: "0"}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {
                pager.PageSize = r.WatchListPageSize
            }
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            list, err = pager.List(context.Background(), options)
            close(listCh)
        }()
        // 等待获得上面的list
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
        }
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        // 调用replace函数替换deltaFIFO中的元素
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    // 这里主要是启动一个异步goroutine
    // 每隔r.resyncPeriod时间调用DeltaFIFO的Resync
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    // 根据当前的ResourceVersion生成一个watch
    // 监控该ResourceVersion后面的一系列变化 然后对应加入到DeltaFIFO中
    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            ...
        }

        // 从当前resourceVersion后面开始监控
        w, err := r.listerWatcher.Watch(options)
        ...
        // 调用watchHandler
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case apierrs.IsResourceExpired(err):
                    klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                }
            }
            return nil
        }
    }
}

watchHandler

func (r *Reflector) LastSyncResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()
    return r.lastSyncResourceVersion
}

func (r *Reflector) setLastSyncResourceVersion(v string) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
}
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            // stopCh被close了 退出watchHandler方法
            return errorStopRequested
        case err := <-errc:
            // DeltaFIFO的Resync出现错误,退出watchHandler方法
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                // watch这个channel已经被关闭 跳出loop
                break loop
            }
            if event.Type == watch.Error {
                // 退出watchHandler方法
                return apierrs.FromObject(event.Object)
            }
            // 得到的对象类型与该reflector监控的类型不一致
            // 比如该reflector负责的是pod对象 来了一个Service对象
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                continue
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            // 获得新的ResourceVersion
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                // 往Delta添加一个对象
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                // 往Delta更新一个对象
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                // 往Delta删除一个对象
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            // 更新ResourceVersion
            // 处理的event总数加1
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    // 当前的watch channel被关闭了
    watchDuration := r.clock.Since(start)
    // 如果该watch一个event都没有处理 并且1秒钟都不到
    // 那有可能问题 所以会返回一个错误 此时退出watchHandler后, 会整个退出ListAndWatch, Run中的util会再次调用ListAndWatch方法.
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    // 退出watchHandler后 不会整个退出ListAndWatch 在for循环里面再生成一个watch再次调用watchHandler
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
    return nil
}

4. 总结

informer整体

举报

相关推荐

0 条评论