0
点赞
收藏
分享

微信扫一扫

redis6.0源码分析:字典扩容与渐进式rehash

盖码范 2023-10-29 阅读 15
redis

文章目录

字典

数据结构

结构设计

redis的字典的结构定义主要分为三块结构体,dict,dictht,dictEntry,它们之间的关系如下:

在这里插入图片描述

从上图中,其实我们可以看出,Redis 的字典设计,是通过数组 + 链表的方式去实现。

代码实现

/* 字典数据结构 */
typedef struct dict {
    dictType *type;		// 字典类型,会跟 hash 函数等方法的具体实现有关
    void *privdata;		// 私有数据
    dictht ht[2];		// 一个字典,含有两个哈希表
    long rehashidx; 	// 代表 rehashing 到了什么位置,rehashidx = -1 						  // 代表未进行 rehash
    unsigned long iterators; // 当前正在迭代的迭代器数, number of iterators currently running 
} dict;

/* 哈希表, HashTable, 简写 ht */
typedef struct dictht {
    dictEntry **table; 		// 节点数组,可知 ht 的结构是数组 + 链表构成
    unsigned long size;		// table 数组的大小,即 ht 的大小
    // table 大小的掩码,等于 size - 1, 就是用于获取 key 索引运算的
    // index = hash(key) & size - 1 = hash(key) & sizemask
    unsigned long sizemask;
    unsigned long used; 	// ht 表中已有键值对的个数,并非 table 数组占用个数
} dictht;

/* 哈希表节点,单个 Node */
typedef struct dictEntry {
    void *key; 				// key, 存储哈希表的 key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; 					// value, 存储哈希表的 value
    struct dictEntry *next; // 单链表结构,指向下一个节点,用于解决哈希冲突
} dictEntry;

如果代码不够具象,也可以结合下图一起思考下

在这里插入图片描述

dictType字典类型

dictType 属性的知识点属于额外补充知识啦,跟扩容也没有太大关系。字典类型的概念是为了多态字典而存在的。即每种 DictType 都会实现一簇操作于特定键值的函数。说白了就是 Redis 为用途不同的字典设置了不同类型操作键值的特定函数

typedef struct dict {
    dictType *type;
	...
} dict;

typedef struct dictType {
	// 计算键 hash 值的函数
    uint64_t (*hashFunction)(const void *key);
    // 复制键的函数 
    void *(*keyDup)(void *privdata, const void *key);
    // 复制值的函数
    void *(*valDup)(void *privdata, const void *obj);
    // 对比键的函数
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    // 销毁键的函数
    void (*keyDestructor)(void *privdata, void *key);
    // 销毁值的函数
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

为什么字典有两个哈希表?

为什么 redis 的 dict 数据结构有两个哈希表 ht ? 它们的作用和承担的角色分别是什么?

  • 因为 redis 是单进程单线程模型,而且既要支撑一个大容量,还要保持高性能的读写性能,所以不同于 Java HashMap 的扩容是在本体进行。而是由两个哈希表 + 渐进式 rehash 的方式来实现扩容机制的。由此实现平滑扩容,又不阻塞读写
  • 通常时候,字典的数据都是在第一个哈希表 ht[0] 进行的。当字典判断需要扩容的时候,就会停止对 ht[0] 进行写操作,而是对 ht[1] 赋予一个 2 倍大小的新哈希表,并将所有写操作指向 ht[1], 此时表示哈希表扩容完成,随后进入 rehashing 阶段,即开始渐进式数据迁移
  • 在 rehashing 的过程中,ht[0] 会继续保持对原有数据的读操作,而扩容后新写的数据的读操作则在 ht[1] 进行, 直到 ht[0] 的所有数据迁移到 ht[1] 后,则直接 ht[0] = ht[1], 完成整个扩容 & rehash 操作。

所以我们可以简单的总结出两个哈希表分别承担的角色是

  • ht[0] 是日常主要的数据存储表, 对外提供读写能力
  • ht[1] 作为扩容时使用的临时表,保证扩容机制平滑进行

哈希算法

Redis 的字典在 Redis 3.2 以前采用的是 murmurhash2 实现的,在 Redis 4.0 之后则采用 siphash

我们在 src/dict.c 可以看到获取 key 的哈希值是通过 dictHashKey 实现的,所以我们找 dictHashKey 方法

 h = dictHashKey(d, de->key) & d->ht[1].sizemask;

src/dict.h 头文件这么定义了 dictHashKey 方法, 那么 type 是啥玩意?type->hashFunction(key) 又是啥方法?

#define dictHashKey(d, key) (d)->type->hashFunction(key)

这个时候就需要翻到 dict 定义中,有一个 dictType 类型,代表字典的类型

typedef struct dict {
    dictType *type;
	...
} dict;

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key); // 某种 dictType 类型的 hash function
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

好的, dict 的 type 是那种呢?我们看到 src/server.cinitServer 函数的一段代码

void initServer(...) {
	...
   /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].expires_cursor = 0;
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
        server.db[j].defrag_later = listCreate();
        listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
    }
    ...
}

/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictObjectDestructor   /* val destructor */
};

我们得知 dict 是 db 的存放数据的字典,它传入了 dbDictType 类型。在定义中,我们也得知 hash function 具体实现是 dictSdsHash, 所以我们就找 dictSdsHash 即可, 在 src/server.c 中,我们找到了

uint64_t dictSdsHash(const void *key) {
    return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}

所以得知调用入口是 dictGenHashFunction 方法,回到 src/dict.c 代码如下

//https://github.com/redis/redis/blob/unstable/src/dict.c
uint64_t dictGenHashFunction(const void *key, int len) {
    return siphash(key,len,dict_hash_function_seed);
}

好的,真相了,那就是 spihash 算法。

扩容机制

在上面了解了 dict 的数据结构的基础上,我们来了解 dict 是如何进行扩容,以及扩容后数据是如何迁移的?但在了解扩容机制和数据迁移之间,我们先来问几个问题

  • dict 存在几种状态?
  • dict 初始化?
  • dict 什么时候扩容?扩容阀值是多少?扩容倍数是多少?
  • 哪些地方会触发扩容?怎么扩容?
  • 扩容后,数据如何 rehash ?
  • 一次扩容后的rehash 过程中,由于 key 写入过快,很快又超过了新的扩容阀值,此时怎么办?

然后我们基于以上的问题,一个一个问题来回答和解析

扩容前置知识

字典存在几种状态?

在了解扩容机制之前,我们可以先小小剧透一下, dict 总共就存在 4 种状态

  • table.size 不变,无扩缩容
  • 扩容中
  • 缩容中
  • rehashing 中

了解了状态后,就可以更好的方便我们理解了

容量相关的关键字段定义

扩容状态码

#define DICT_OK 0					// 成功
#define DICT_ERR 1					// 失败

哈希表初始值

#define DICT_HT_INITIAL_SIZE     4	//  哈希表 (ht) size 的初始值

扩容安全阈值

static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;

void dictEnableResize(void) {
    dict_can_resize = 1;
}

void dictDisableResize(void) {
    dict_can_resize = 0;
}

字典的容量都是2的幂次方

/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX + 1LU;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}
  • size 是要扩容的大小,进入 _dictNextPower 后,会计算得到一个接近 size 的值,且又是 2 的幂次方

扩容机制

字典什么时候会扩容?

那么我们就看下 sre/dict.c_dictExpandIfNeeded 方法即可,因为字典的扩容时需要这个方法去判断,所以我们可以看到字典有三种扩容的渠道

  • 当字典还没有被初始化,即字典的 hashtable[0] 为空时,那我们就初始化字典的第一个 hashtable

ht[0].size = 0

  • 当 hashtable[0] 的键值对数量 >= hashtable[0] 数组的 size 时,且全局设置 dict_can_resize = true, 我们就扩容

    d->ht[0].used >= d->ht[0].size && dict_can_resize = true

  • 当 hashtable[0] 的键值对数量 >= hashtable[0] 数组的 size 时, 且键值对数量已经超过数组大小的 5 倍的安全阀值时,就强制触发扩容

    d->ht[0].used >= d->ht[0].size && d->ht[0].used/d->ht[0].size > dict_force_resize_ratio

static int _dictExpandIfNeeded(dict *d)
{
    // 如果当前处于 rehash 状态,则直接返回 0 (代表无需扩容,已扩容,新扩容成功)
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 如果 hashtable[0] 的大小为 0, 代表整个 dict 还没有被初始化,所以先初始	  // 化字典的第一个 hashtable,初始大小是 4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

	// 当 hashtable[0] 的键值数 >= hashtable[0] 的 entry 数组大小
	// 且 (dict_can_resize = true 或 hashtable[0] 键值数已超过 hashtable 	 // 节点数组大小的 5 倍的安全阀值) 就会触发扩容
	// 扩容倍数是已有键值数  (ht.used) 的两倍,注意不是 ht 的 size
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

_dictExpandIfNeededdictExpand 的返回值都是 0 (DICT_OK) 或 1 (DICT_ERR),

  • DICT_OK 代表新扩容成功,正在 rehashing ,无需扩容

  • DICT_ERR 代表非法操作,即非法扩容,扩容失败

    • 或是在 rehashing 阶段进入 dictExpand 函数

    • 或是在 dictExpand 阶段传入扩容 size 小于 当前 used

    • 或是在 dictExpand 阶段

扩容的阈值 & 扩容的倍数

扩容阀值是多少?

相较 Java HashMap 的扩容因子为 0.75, 那么 Redis 字典的扩容因子就是 1, 即容量占比百分百才触发扩容。当然从 _dictExpandIfNeeded 函数中,我们可以看到这并不是绝对的,要取决于 dict_can_resize 的设置是否允许。如果不允许扩容时,那么只有等到 键值对数量/数组大小 > 5 时才会触发扩容

扩容倍数是多少?

_dictExpandIfNeeded 方法,我们可以看到,字典的扩容倍数是 2 倍

dictExpand(d, d->ht[0].used*2)

哪些方法会触发扩容?

我们来看下什么地方会调用 _dictExpandIfNeeded 方法,可以看到是 _dictkeyIndex, 可以得知这是一个根据 key 获得其索引位置的函数

/* 方法:获得 key 在 hashtable 的索引
 * 入参:*d 是当前字典,*key 键,hash 是 key 的哈希值,existing 就是 ht 的节点数组
 * 返回值:
 * 	1. -1 代表失败
 * 		- 可能是扩容失败, 有异常,导致不允许后续行为,所以返回 -1
 * 		- 也可能是键值已存在,并且不打算覆盖旧值,所以返回 -1
 *  2. 有值,代表该 key 经过计算,在 ht 的 idx 索引位置
 * 注意:
 * 	1. 如果 existing 指针指向有值,并且该值在 ht 中存在,existing 会隐式将对应 	*		entry 带出去给外层调用方法
 */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;
	
	// 如果需要扩容,则扩容,如果扩容失败,则返回 -1
    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    // 遍历 dict 的两个哈希表, 因为 key 可能在 ht[0], 也可能在 ht[1]    
    for (table = 0; table <= 1; table++) {
    	// mod 运算得到 key 的
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        // 如果 key 存在,则遍历链表,看 key 是否存在 existing 中,如果存在则返回 -1
        // 如果 key 不存在,则直接返回该 key 要插入的位置 idx
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
            	// 如果 existing 有值,则将存在的 entry 赋值给指针,交给外层调用方
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        // 如果 dict 不在 rehashing 状态,就不用遍历 ht[1] 了,因为没有数据
        if (!dictIsRehashing(d)) break;
    }
    // 返回 key 在 ht 节点数组的索引
    return idx;
}

那么谁又在调用 _dictkeyIndex 呢?是 *dictAddRaw方法,这个方法又是干嘛的呢?它就是向字典插入一个数据的基础方法,会有很多操作方法调用它,来看看

/* 方法:向 dict 插入一个键值对, 并返回新增的节点 entry
 * 返回值:
 * 	1. NULL 代表键已存在,不更新
 *  2. 有值,代表键不存在,并新增成功
 */
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;

	// 如果当前处于 rehashing 状态,则主动去迁移一个键值数据
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // 如果该键值已经存在,则 dictKeyIndex 会返回 -1, 则直接返回 null, 代表没有新增
    // 如果该键值不存在,属于新增,则将该 key 在 entry 数组的索引返回,并赋值给 index
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
	// 如果处于 rehashing 状态,则向第二个哈希表 ht[1] 插入数据, 反之 ht[0]	
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 分配一个 entry 新节点, 并对 ht->table[index] 链表进行头插入 ,used + 1
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

	// 暂不关心,不影响理解,有兴趣看 src/dict.h
    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    // 返回新增节点
    return entry;
}

我们知道了 *dictAddRaw 是字典的基本插入方法,那么谁会调用它呢?

  • int dictAdd(dict *d, void *key, void *val)
  • int dictReplace(dict *d, void *key, void *val)
  • dictEntry *dictAddOrFind(dict *d, void *key)
// 如果不存在则插入,存在则插入失败
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}
/* Add or Overwrite:
 * Add an element, discarding the old value if the key already exists.
 * Return 1 if the key was added from scratch, 0 if there was already an
 * element with such key and dictReplace() just performed a value update
 * operation.
 * 
 * 如果存在则更新,不存在则插入
 * 新增返回 1, 更新返回 0  
 */
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, *existing, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }

    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse.
     * 
     * 由 dictAddRaw 隐式返回旧值 entry 的 existing 指向,所以我们可以对 existing 指向的 entry 进行新值更新 
     * 
     * */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
/* Add or Find:
 * dictAddOrFind() is simply a version of dictAddRaw() that always
 * returns the hash entry of the specified key, even if the key already
 * exists and can't be added (in that case the entry of the already
 * existing key is returned.
 * 没啥好说的
 *
 * See dictAddRaw() for more information. */
dictEntry *dictAddOrFind(dict *d, void *key) {
    dictEntry *entry, *existing;
    entry = dictAddRaw(d,key,&existing);
    return entry ? entry : existing;
}
  • 单纯的对应 redis 的命令,dictAdd 和 dictReplace 就可以实现 setIfpresent, setIfabsent, set 等命令了

触发扩容后会怎么扩容?

在我们知道了触发扩容的时机,扩容的阀值,扩容的倍数,以及会导致触发扩容的方法后。我们就要来看看扩容的中重头戏了,那就是怎么扩容? ,主要依赖 dictExpand 方法,所以重点看

/* 方法:Expand or create the hash table, 扩容或新建哈希表
 * 参数:
 * 	1. *d: 要操作的字典
 * 	2. size: 想为 *d 字典扩容到 size 大小
 * 返回值:
 *  1. DICT_ERR 1 扩容或初始化 ht 失败
 * 		- 正处于 rehashing ,数据未完全迁移,无法进行下一次扩容
 * 		- ht[0].used > size, 扩容无意义
 * 		- ht[0].size == realsize, ht[0] 的 size 已经达到 realsize, 没有扩  	*         容的意义
 * 	2. DICT_OK  0 扩容或初始化 ht 成功
 * 
 */
int dictExpand(dict *d, unsigned long size)
{
    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    // 如果正在处于 rehashing,则返回 1,代表刚刚已进行过扩容,并且数据仍未完成全	  	// 部迁移,无法进行下一次扩容,扩容失败
    // 或 ht[0] 已有的键值对数量已经大于 size, 则代表将字典继续扩容到 size 大小  		 // 已经没有意义,返回 1, 表示此次扩容无意义
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;
	
	// 到达这里,代表允许扩容,并且将 size 调整到接近 2 的幂次方的一个数值
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

	// 如果此时的 ht[0] 
    /* Rehashing to the same table size is not useful. */
    if (realsize == d->ht[0].size) return DICT_ERR;

	// 为新哈希表赋值
    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

	// 如果 ht[0] == null, 代表该字典还没有被使用,这是第一次进行初始化,所以将 	// n 赋值给 ht[0]
    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

	// 如果不是第一次初始化,则将扩容后的新哈希表赋值给 ht[1],并更新 rehashidx 	// = 0 ,代表开始 rehashing, 从 0 开始
    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    // 扩容成功
    return DICT_OK;
}

我们知道 dict 就是 redis 的字典数据结构,它有两个 ht, 当 ht[0].used 达到阀值,就会触发字典的扩容,而扩容就是新分配一个 2*ht[0].used 大小的哈希表给 ht[1], 以此循环完成扩容。既然我们知道了 ht[0], ht[1] 是如何搭配工作,完成字典的扩容,那么扩容之后,数据又是如何从旧哈希表迁移到新哈希表的呢?

看后面的 rehash 机制吧

渐进式rehash

前置知识

为什么要rehash?

为什么要 rehash ? 如果你是 Java 技术栈,那么你肯定了解过 HashMap 的数据 rehash ,一种巧妙的二进制操作,就将数据从一个数组迁移到另一个数组里。同理 Redis 字典扩容后也需要一种手段,将数据从一个容器迁移到另一个容器中,只不过 Redis 迁移的方式与 Java 不一致而已

渐进式rehash?

  • 因为 Redis 的字典和 Java 的 HashMap 定位不同, Redis 承载了更大量的数据,并承诺提供高性能的读写,而类 Java 的一次性同步数据迁移会消费大量的时间,而 Redis 又是单进程单线程模型,更不允许因为主线程因为 rehash 而出现长时的阻塞。
  • 所以 Redis 灵机一动,既然无法一次性全量迁移,那么我就一次迁移一部分,直到完成全部数据的迁移,这样单次数据迁移的时间就大大缩小,从而不影响读写,又能保证数据平滑迁移, 所以这也就是渐进式迁移数据的过程

什么时候会rehash?

我们想知道什么时候回开始出发 rehash ? 我们回想下在看扩容的代码时,也就是 dictExpand方法时,最下面有段代码

int dictExpand(dict *d, unsigned long size) {
	...
    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    ...
}

当把字典的 rehashidx 字典置为 0 时,也就代表了字典开始进行 rehash 了

/* 字典数据结构 */
typedef struct dict {
	...
    dictht ht[2];		// 一个字典,含有两个哈希表
    long rehashidx; 	// 代表 rehashing 到了什么位置,rehashidx = -1 代表							// 未进行 rehash
    ...
} dict;

我们再来看到 src/dict.h 的 dictIsRehashing 方法,可以知道,通过判断 rehashidx 是否等于 -1 就能判断当前字典是否处于 rehashing 状态,也能进一步证明 rehashidx = 0 时,代表 rehash 正式开始进行

// src/dict.h
#define dictIsRehashing(d) ((d)->rehashidx != -1)

rehash流程

那么字典是如何进行渐进式 rehash 的呢?它主要分为两种方式进行

  • [被动式触发] :每次外部调用的 CRUD 都会触发一次数据迁移,每次迁移一份数据
  • [主动式触发] :定时任务,每次扫描一点数据进行迁移

被动式迁移

基本上涉及到查询,删除,修改,新增的方法都有判断该字典是否处于 rehashing 状态,如果处于 rehashing 状态,就调用 _dictRehashStep(d) 进行数据迁移; 例子如下,太多了,就不一一列出来了

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {
	...
    if (dictIsRehashing(d)) _dictRehashStep(d);
	...
}    
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
	...
    if (dictIsRehashing(d)) _dictRehashStep(d);
    ...
}    
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
	...
    if (dictIsRehashing(d)) _dictRehashStep(d);
	...

我们看到 _dictRehashStep(d) 是一个入口,那么我们就深入看下去,每次 CRUD 会触发一个怎么样的数据迁移,迁移多少

/* This function performs just a step of rehashing, and only if there are
 * no safe iterators bound to our hash table. When we have iterators in the
 * middle of a rehashing we can't mess with the two hash tables otherwise
 * some element can be missed or duplicated.
 *
 * This function is called by common lookup or update operations in the
 * dictionary so that the hash table automatically migrates from H1 to H2
 * while it is actively used. */
static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}

我们先忽略 iterators 的存在,通常等于 0,总之它调用了 dictRehash 方法, 并且每次只迁移哈希表数组的一个槽位 (因为链表存在,可能迁移多个键值对),继续往下看

/*
 * 方法:rehash, 对数据进行迁移
 * 参数:*d:要操作的字典,n:迁移 n 个数组槽位
 * 返回值:
 * 	1. 返回 1,代表还有数据要迁移
 *  2. 返回 0,代表所有数据已经迁移完了
 *
 */
int dictRehash(dict *d, int n) {
	// 原文注释说有说明, 最多遍历 n*10 个空桶, 避免过于耗时,因为数组中可能有很多	 // 连续为空的数组槽位
	// 避免此次 rehash 过于耗时
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    // 如果 rehashing 已经结束,或没有开始,那么返回 0 ,代表迁移完毕,或无需迁移
    if (!dictIsRehashing(d)) return 0;
	
	// 遍历 n 次,条件是 ht[0] 数据还没有迁移完,中途如果发现迁移完了,则退出循环
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
		// rehashidx 代表数据迁移已经迁移到 ht[0] 的rehashidx 位置了,所以 		 
		// rehashidx 不会大于 ht[0].size 
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
		// 如果遇到空槽位,则去检查下一个槽位,顺便做最大空桶检查
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
		
		// 如果非空桶,则此槽位有数据,遍历该槽位的链表,将该链表的数据 rehash, 			
		// 迁移到 ht[1]
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        // 每迁移一个槽位,就将 ht[0] 原数据回收, rehashidx++
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    // 当发现 ht[0] 已经没有任何数据了,则回收 ht[0] 指向的空间
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        // 并将 ht[0] 重新指向已完成扩容和数据迁移的新哈希表 ht[1]
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        // 并表示 rehashing 状态已结束,完成数据迁移
        d->rehashidx = -1;
        return 0;
    }

	// 如果跳过了上面的判断,则代表还有很多数据有待迁移
    /* More to rehash... */
    return 1;
}
  • 我们可以看到字典的扩容的终止操作其实是在 rehash 方法中完成的,即 ht[0] 指针被重新指向,且字典的 rehashidx = -1
  • 而且被动式 rehash 只会迁移一个数组槽位的数据,(因为链表,所以迁移的键值对可能大于 1 个)

主动式迁移

入口在 src/server.c 文件里,我们看到 databaseCron方法, 我们可以还知道该方法是一个定时任务方法,会执行诸如键过期, resizeing, rehashing 等操作,不过我们不想看这么多,就省略非重点代码

/* This function handles 'background' operations we are required to do
 * incrementally in Redis databases, such as active key expiring, resizing,
 * rehashing. */
void databasesCron(void) {
  		...
        /* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

我们看到了会执行 incrementallyRehash 方法,继续往下看

/* Our hash table implementation performs rehashing incrementally while
 * we write/read from the hash table. Still if the server is idle, the hash
 * table will use two tables for a long time. So we try to use 1 millisecond
 * of CPU time at every call of this function to perform some rehashing.
 *
 * The function returns 1 if some rehashing was performed, otherwise 0
 * is returned. */
int incrementallyRehash(int dbid) {
	// 字典 rehashing
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
	// 过期字典 rehashing
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}

(额外知识点, redis 过期类型键会存在另外一个的字典一起维护数据) 我们看到普通的字典会通过 dictRehashMilliseconds 进行 rehashing , 并传入了 1 的参数。所以让我们从 src/server.h 回到 src/dict.c , 继续往下看

/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger 
 * than 0, and is smaller than 1 in most cases. The exact upper bound 
 * depends on the running time of dictRehash(d,100).
 * 
 * 执行 x ms 的 rehash, 并返回 rehash 槽位的个数
 * */
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
	
	// 每次 rehash 100 个数组槽位,被被动式多 100 倍呢
	// 直到数据完全被迁移完成或 if 打断
    while(dictRehash(d,100)) {
    	// 累计槽位
        rehashes += 100;
        // 如果已经过了 ms 毫秒,则打断
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

从上看可以看到, 主动式每次至少扫描 100 个数组槽位,每次扫描 x ms 时间。反正就是两个退出条件,要么超时,要么迁移完

说明:

  这种主动式迁移是redis处理完网络事件之后才做的,即此时redis处于空闲的时间,开始处理定时事件,然后每次rehash100个数组槽位,移动完100个之后,若超过1ms,则退出定时事件重新等待网络事件;否则继续移动继续判断是否超过1ms。

问题

哈希冲突时,为什么都是头插入?

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {
	...
   	ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ...
}
  • 在头插入的源码中有一段官方注释,是这么说明的,用户最近插入的数据,有更大的概率被频繁访问,有点类似 LRU 的思想;既然新增数据更有概率被访问,那么自然就会将新增数据放在链表的头结点,以减少遍历链表的时间复杂度呀!
  • 当然我个人认为,还有第二个原因就是,当哈希冲突,直接插入头结点可以避免遍历,相比尾插入,少了一个遍历链表的过程,也就提高了写性能啊

rehash阶段遇到读写事件会发生什么?

读事件

  • 当处于 rehashing 阶段时,读线程需要帮忙搬迁数据,同时会遍历两张哈希表
dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    uint64_t h, idx, table;

    if (dictSize(d) == 0) return NULL; /* dict is empty */
    // 如果处理 rehashing, 帮忙搬迁数据,一个槽位即可
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    // 遍历两个 table
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        // 如果没有 reshing, 就直接 Return, 不用迭代遍历 ht[1] 了
        // 如果处理 reshing, 则需要继续遍历 ht[1]
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

写事件

  • 当初 rehashing 时,写线程要帮忙搬迁数据
    • 如果是插入操作则将数据写到新表中,即 ht[1],而不是旧表
    • 如果是删除操作,根据读的情况,不用想都是要遍历两张表,找到元素并删除
	// 如果处于 rehashing 状态,则向第二个哈希表 ht[1] 插入数据, 反之 ht[0]	
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];

扩容 & rehash期间,如果新增过快,又到了扩容的阈值会怎么样?

答案就是 “不会马上扩容,会等待本次扩容结束,再进行下一次扩容”。

什么意思?也就是说当前处于 rehashing 的字典,因为本次扩容的生命周期没有完全结束,所以不会立即触发下一次的扩容,而是继续将数据往 ht[1] 写入,其结果无非就是导致 ht[1] 的哈希冲突概率逐渐加大,直到 ht[0] 的数据全部迁移到 ht[1] 中,并将 ht[0] 重指向 ht[1] 所指向的哈希表, 结束 rehashing 状态,并在本次扩容结束的下一次写入操作,立马触发字典的下一次扩容

rehash每次迁移多少数据?

  • 当由 CRUD 被动式触发的数据迁移,每次只会迁移 1 个数组槽位的数据,而一个数据槽位会含有 n 个键值对数据,具体 n 是多少呢,就看哈希冲突有多强烈了
  • 当由定时任务主动式扫描触发的数据迁移,每次会迁移 1 毫秒的数据,这毫秒内,至少迁移 100 个数组槽位,时间有空余就迁移更多批次,没有空余,执行完第一批 100 个槽位就停下
举报

相关推荐

0 条评论