0
点赞
收藏
分享

微信扫一扫

[k8s源码分析][client-go] informer之store和index

1. 前言

2. 整体接口与实现类

2.1 Store

// tools/cache/store.go
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list, you should not reference
    // it after calling this function.
    // 1. 会删除store里面的内容
    // 2. 用传进来的list代替以前的内容
    Replace([]interface{}, string) error
    // 同步
    Resync() error
}

2.2 Indexer

// tools/cache/thread_safe_store.go
type Indexer interface {
    Store
    // Index returns the stored objects whose set of indexed values
    // intersects the set of indexed values of the given object, for
    // the named index
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the storage keys of the stored objects whose
    // set of indexed values for the named index includes the given
    // indexed value
    IndexKeys(indexName, indexedValue string) ([]string, error)
    // ListIndexFuncValues returns all the indexed values of the given index
    ListIndexFuncValues(indexName string) []string
    // ByIndex returns the stored objects whose set of indexed values
    // for the named index includes the given indexed value
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    // GetIndexer return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
}
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

2.3 ThreadSafeStore

// tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

type threadSafeMap struct {
    lock  sync.RWMutex
    // 存储着key与obj的对应关系
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    // 存着indexer的名字与它对应的生成index的方法
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}

2.4 cache

// tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
type cache struct {
    cacheStorage ThreadSafeStore
    keyFunc KeyFunc
}
// 一个Store实例 没有index相关方法
func NewStore(keyFunc KeyFunc) Store {
    return &cache{
        cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
        keyFunc:      keyFunc,
    }
}
// 一个带有indexer实例 有index相关方法
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}
type ExplicitKey string
// 一般情况下都是<namespace>/<name> unless <namespace> is empty, then it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }
    meta, err := meta.Accessor(obj)
    if err != nil {
        return "", fmt.Errorf("object has no meta: %v", err)
    }
    if len(meta.GetNamespace()) > 0 {
        return meta.GetNamespace() + "/" + meta.GetName(), nil
    }
    return meta.GetName(), nil
}

3. 方法

3.1 生成一个Indexer实例

func testUsersIndexFunc(obj interface{}) ([]string, error) {
    pod := obj.(*v1.Pod)
    usersString := pod.Annotations["users"]

    return strings.Split(usersString, ","), nil
}

func TestMultiIndexKeys(t *testing.T) {
    index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
}

3.2 Add

    pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
    pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
    pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}

    index.Add(pod1)
    index.Add(pod2)
    index.Add(pod3)
// tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
// 根据它的keyFunc生成该obj的key
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
// 会调用threadSafeMap的Add方法
    c.cacheStorage.Add(key, obj)
    return nil
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // if we got an old object, we need to remove it before we add it again
    // 如果以前存在 则删除
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
    // 遍历所有的indexers 
    for name, indexFunc := range c.indexers {
        // 根据indexFunc生成该对象newObj的键
        indexValues, err := indexFunc(newObj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        // 取出当前indexer的结构 是一个map对象
        index := c.indices[name]
        // 如果不存在 则创建一个新的
        if index == nil {
            index = Index{}
            c.indices[name] = index
        }

        // 遍历刚刚生成的键
        for _, indexValue := range indexValues {
            set := index[indexValue]
            if set == nil {
                set = sets.String{}
                index[indexValue] = set
            }
            set.Insert(key)
        }
    }
}
c.indexers = {"byUser": testUsersIndexFunc}
// 所以只有一次循环
name = "byUser"
indexFunc = testUsersIndexFunc
===> indexValues = ["ernie", "bert"]
===> indices["byUser"] = {}
======> indices["byUser"]["ernie"] = [{"one": Empty}]
======> indices["byUser"]["bert"] = [{"one": Empty}]

3.3 查询方法

3.3.1 ByIndex
// 上流程序调用
index.ByIndex("byUser", "ernie")

// tools/cache/store.go
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    return c.cacheStorage.ByIndex(indexName, indexKey)
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    indexFunc := c.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }
    index := c.indices[indexName]
    set := index[indexKey]
    list := make([]interface{}, 0, set.Len())
    for key := range set {
        list = append(list, c.items[key])
    }
    return list, nil
}
ListIndexFuncValues
// 上流程序调用
index.ListIndexFuncValues("byUser")

// tools/cache/store.go
func (c *cache) ListIndexFuncValues(indexName string) []string {
    return c.cacheStorage.ListIndexFuncValues(indexName)
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
    c.lock.RLock()
    defer c.lock.RUnlock()
    index := c.indices[indexName]
    names := make([]string, 0, len(index))
    for key := range index {
        names = append(names, key)
    }
    return names
}
List() 和 Get(obj interface{})
// 上流程序调用
index.List()
index.Get("pod1")

// tools/cache/store.go
func (c *cache) List() []interface{} {
    return c.cacheStorage.List()
}
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
    key, err := c.keyFunc(obj)
    if err != nil {
        return nil, false, KeyError{obj, err}
    }
    return c.GetByKey(key)
}
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
    item, exists = c.cacheStorage.Get(key)
    return item, exists, nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) List() []interface{} {
    c.lock.RLock()
    defer c.lock.RUnlock()
    list := make([]interface{}, 0, len(c.items))
    for _, item := range c.items {
        list = append(list, item)
    }
    return list
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    item, exists = c.items[key]
    return item, exists
}
Delete
// 上流程序调用
index.Delete(pod3)

// tools/cache/store.go
func (c *cache) Delete(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Delete(key)
    return nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.deleteFromIndices(obj, key)
        delete(c.items, key)
    }
}
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
    for name, indexFunc := range c.indexers {
        indexValues, err := indexFunc(obj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }

        index := c.indices[name]
        if index == nil {
            continue
        }
        for _, indexValue := range indexValues {
            set := index[indexValue]
            if set != nil {
                set.Delete(key)
            }
        }
    }
}
update
// tools/cache/store.go
func (c *cache) Update(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Update(key, obj)
    return nil
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Update(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}
resync
// tools/cache/store.go
func (c *cache) Resync() error {
    return c.cacheStorage.Resync()
}
// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Resync() error {
    // Nothing to do
    return nil
}
Replace
// tools/cache/store.go
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
    items := make(map[string]interface{}, len(list))
    for _, item := range list {
        key, err := c.keyFunc(item)
        if err != nil {
            return KeyError{item, err}
        }
        items[key] = item
    }
    c.cacheStorage.Replace(items, resourceVersion)
    return nil
}

// tools/cache/thread_safe_store.go
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    // 更新items
    c.items = items

    // rebuild any index
    // 重新构建indices
    c.indices = Indices{}
    for key, item := range c.items {
        c.updateIndices(nil, item, key)
    }
}

informer整体

举报

相关推荐

0 条评论