0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][client-go] informer之delta_fifo

1. 前言

2. 整体接口与实现类

DeltaFIFO is like FIFO, but allows you to process changes to items which is delta. 
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, 
and the consumer is whatever calls the Pop() method.
A note on the KeyLister used by the DeltaFIFO:It's main purpose is to list keys that are "known", 
for the purpose of figuring out which items have been deleted when Replace() or Delete() are 
called. The deleted object will be included in the DeleteFinalStateUnknown markers.
type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond
    // items里面存的是key 以及该key对应的pod的变化
    // queue中存的是key 即出队列的顺序
    items map[string]Deltas
    queue []string
    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int
    // 生成key
    keyFunc KeyFunc
    // knownObjects list keys that are "known", for the
    // purpose of figuring out which items have been deleted
    // when Replace() or Delete() is called.
    // 说白了 就是本地缓存
    knownObjects KeyListerGetter
    closed     bool
    closedLock sync.Mutex
}
// It tells you what change happened
type Delta struct {
    Type   DeltaType
    Object interface{}
}
type Deltas []Delta
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    // The other types are obvious. You'll get Sync deltas when:
    //  * A watch expires/errors out and a new list/watch cycle is started.
    //  * You've turned on periodic syncs.
    // (Anything that trigger's DeltaFIFO's Replace() method.)
    Sync DeltaType = "Sync"
)
type DeletedFinalStateUnknown struct {
    Key string
    Obj interface{}
}

// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
    KeyLister
    KeyGetter
}

// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
    ListKeys() []string
}

// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
    GetByKey(key string) (interface{}, bool, error)
}

3. 方法

func testFifoObjectKeyFunc(obj interface{}) (string, error) {
    return obj.(testFifoObject).name, nil
}
type testFifoObject struct {
    name string
    val  interface{}
}
func mkFifoObj(name string, val interface{}) testFifoObject {
    return testFifoObject{name: name, val: val}
}
// helper function to reduce stuttering
func testPop(f *DeltaFIFO) testFifoObject {
    return Pop(f).(Deltas).Newest().Object.(testFifoObject)
}

// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []testFifoObject

// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
    result := []string{}
    for _, fifoObj := range kl() {
        result = append(result, fifoObj.name)
    }
    return result
}

// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
    for _, v := range kl() {
        if v.name == key {
            return v, true, nil
        }
    }
    return nil, false, nil
}

Add 和 Update

func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        // 如果没有设置本地缓存
        if _, exists := f.items[id]; !exists {
            // 如果items中没有该元素, 返回
            return nil
        }
    } else {
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // 如果本地缓存和items中都没有, 返回
            return nil
        }
    }
    return f.queueActionLocked(Deleted, obj)
}
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    // 如果是Deltas, 也就是该obj的变化, 取最后一个操作的obj
    if d, ok := obj.(Deltas); ok {
        if len(d) == 0 {
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        obj = d.Newest().Object
    }
    // 如果该是DeletedFinalStateUnknown类型, 表明在服务器端已经被删除了, 在本地缓存中依然存在
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    // 根据obj生成key
    return f.keyFunc(obj)
}
// 目前这里的操作只是去判断最后两个元素是不是都是delete, 如果是则进行合并, 就选其中一个即可
func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    a := &deltas[n-1]
    b := &deltas[n-2]
    if out := isDup(a, b); out != nil {
        d := append(Deltas{}, deltas[:n-2]...)
        return append(d, *out)
    }
    return deltas
}
func isDup(a, b *Delta) *Delta {
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    // TODO: Detect other duplicate situations? Are there any?
    return nil
}
// a:倒数第一个  b:倒数第二个
// 如果倒数第一个和倒数第二个都是Delete
// 如果倒数第二个是DeletedFinalStateUnknown 返回倒数第一个
// 如果倒数第二个不是DeletedFinalStateUnknown 返回倒数第二个
// 选择一个尽量不是DeletedFinalStateUnknown的元素 
func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // Do more sophisticated checks, or is this sufficient?
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}
// 判断该id的最后一次操作是不是Deleted操作
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
    deltas := f.items[id]
    return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    // 如果是Sync并且该元素中最后一次变化是删除操作 就直接返回了
    // 因为都已经是删除操作了, 在后面加一个Sync就没有必要了 也可以方便用户操作, 用户判断最后一个是不是delete会很方便
    // Resync和Replace方法中有可能会调用Sync操作
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }

    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        delete(f.items, id)
    }
    return nil
}
例子
f := NewDeltaFIFO(
        testFifoObjectKeyFunc,
        keyLookupFunc(func() []testFifoObject {
            return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
        }),
    )
    f.Update(mkFifoObj("baz", 18))
    f.Add(mkFifoObj("foo", 10))
    f.Update(mkFifoObj("bar", 15))
    f.Update(mkFifoObj("foo", 15))
    f.Delete(mkFifoObj("baz", 18))
func newInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
    clientState Store,
) Controller {
    // clientState就是本地缓存 对应的knownObjects
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    lw,
        ObjectType:       objType,
        FullResyncPeriod: resyncPeriod,
        RetryOnError:     false,

        Process: func(obj interface{}) error {
            // from oldest to newest 出队列的数组 一个一个操作
            for _, d := range obj.(Deltas) {
                switch d.Type {
                case Sync, Added, Updated:
                     // 更新本地缓存
                    if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                        if err := clientState.Update(d.Object); err != nil {
                            return err
                        }
                        h.OnUpdate(old, d.Object)
                    } else {
                        // 添加到本地缓存
                        if err := clientState.Add(d.Object); err != nil {
                            return err
                        }
                        h.OnAdd(d.Object)
                    }
                case Deleted:
                    // 删除本地缓存
                    if err := clientState.Delete(d.Object); err != nil {
                        return err
                    }
                    h.OnDelete(d.Object)
                }
            }
            return nil
        },
    }
    return New(cfg)
}

pop方法

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        return item, err
    }
}
例子

Replace

func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

    // 将要加入的list放到keys中
    // 给list中的每一个item发送Sync操作
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        // 如果没有设置本地缓存
        queuedDeletions := 0
        for k, oldItem := range f.items {
            // 如果新加的list中有 因为已经发送Sync操作了 所以就不需要了
            if keys.Has(k) {
                continue
            }
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            queuedDeletions++
            // 不在list中的元素需要被删除
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }

        if !f.populated {
            f.populated = true
            // While there shouldn't be any queued deletions in the initial
            // population of the queue, it's better to be on the safe side.
            f.initialPopulationCount = len(list) + queuedDeletions
        }

        return nil
    }

    // Detect deletions not already in the queue.
    // 这里可能有人会疑惑为什么不删除f.items里面的元素, 因为f.items里面有的元素会出现在本地缓存中的, 所以直接对本地缓存做操作即可
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        if keys.Has(k) {
            continue
        }

        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        queuedDeletions++
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }

    return nil
}
例子
1.pop() --> 本地缓存中就会有foo的记录, queue和item中会删除foo
2.pop() --> 本地缓存中就会有bar的记录, queue和item中会删除bar
3.Add {baz, 15} pop() --> 本地缓存中就会有baz的记录
4.Update {foo, 10} Update {foo, 15} --> foo会加入到queue和item中
5.Update {bar, 15} --> bar会加入到queue和item中
6.Update {baz, 18} Delete {baz, 18} --> baz会加入到queue和item中
7.Replace [{foo, 15}, {baz, 18}]

Resync

func (f *DeltaFIFO) syncKey(key string) error {
    f.lock.Lock()
    defer f.lock.Unlock()

    return f.syncKeyLocked(key)
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
        klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        // 如果该元素在本地缓存中不存在 则返回
        klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil
    }
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if len(f.items[id]) > 0 {
        return nil
    }

    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

HasSynced

func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    return f.populated && f.initialPopulationCount == 0
}

informer整体

举报

相关推荐

0 条评论