文章目录
字典
数据结构
结构设计
redis的字典的结构定义主要分为三块结构体,dict,dictht,dictEntry,它们之间的关系如下:
从上图中,其实我们可以看出,Redis 的字典设计,是通过数组 + 链表的方式去实现。
代码实现
/* 字典数据结构 */
typedef struct dict {
dictType *type; // 字典类型,会跟 hash 函数等方法的具体实现有关
void *privdata; // 私有数据
dictht ht[2]; // 一个字典,含有两个哈希表
long rehashidx; // 代表 rehashing 到了什么位置,rehashidx = -1 // 代表未进行 rehash
unsigned long iterators; // 当前正在迭代的迭代器数, number of iterators currently running
} dict;
/* 哈希表, HashTable, 简写 ht */
typedef struct dictht {
dictEntry **table; // 节点数组,可知 ht 的结构是数组 + 链表构成
unsigned long size; // table 数组的大小,即 ht 的大小
// table 大小的掩码,等于 size - 1, 就是用于获取 key 索引运算的
// index = hash(key) & size - 1 = hash(key) & sizemask
unsigned long sizemask;
unsigned long used; // ht 表中已有键值对的个数,并非 table 数组占用个数
} dictht;
/* 哈希表节点,单个 Node */
typedef struct dictEntry {
void *key; // key, 存储哈希表的 key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; // value, 存储哈希表的 value
struct dictEntry *next; // 单链表结构,指向下一个节点,用于解决哈希冲突
} dictEntry;
如果代码不够具象,也可以结合下图一起思考下
dictType字典类型
dictType 属性的知识点属于额外补充知识啦,跟扩容也没有太大关系。字典类型的概念是为了多态字典而存在的。即每种 DictType 都会实现一簇操作于特定键值的函数。说白了就是 Redis 为用途不同的字典设置了不同类型操作键值的特定函数
typedef struct dict {
dictType *type;
...
} dict;
typedef struct dictType {
// 计算键 hash 值的函数
uint64_t (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比键的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);
} dictType;
为什么字典有两个哈希表?
为什么 redis 的 dict 数据结构有两个哈希表 ht ? 它们的作用和承担的角色分别是什么?
- 因为 redis 是单进程单线程模型,而且既要支撑一个大容量,还要保持高性能的读写性能,所以不同于 Java HashMap 的扩容是在本体进行。而是由两个哈希表 + 渐进式 rehash 的方式来实现扩容机制的。由此实现平滑扩容,又不阻塞读写
- 通常时候,字典的数据都是在第一个哈希表 ht[0] 进行的。当字典判断需要扩容的时候,就会停止对 ht[0] 进行写操作,而是对 ht[1] 赋予一个 2 倍大小的新哈希表,并将所有写操作指向 ht[1], 此时表示哈希表扩容完成,随后进入 rehashing 阶段,即开始渐进式数据迁移
- 在 rehashing 的过程中,ht[0] 会继续保持对原有数据的读操作,而扩容后新写的数据的读操作则在 ht[1] 进行, 直到 ht[0] 的所有数据迁移到 ht[1] 后,则直接 ht[0] = ht[1], 完成整个扩容 & rehash 操作。
所以我们可以简单的总结出两个哈希表分别承担的角色是
- ht[0] 是日常主要的数据存储表, 对外提供读写能力
- ht[1] 作为扩容时使用的临时表,保证扩容机制平滑进行
哈希算法
Redis 的字典在 Redis 3.2 以前采用的是 murmurhash2 实现的,在 Redis 4.0 之后则采用 siphash
我们在 src/dict.c 可以看到获取 key 的哈希值是通过 dictHashKey 实现的,所以我们找 dictHashKey 方法
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
在 src/dict.h
头文件这么定义了 dictHashKey
方法, 那么 type
是啥玩意?type->hashFunction(key)
又是啥方法?
#define dictHashKey(d, key) (d)->type->hashFunction(key)
这个时候就需要翻到 dict 定义中,有一个 dictType 类型,代表字典的类型
typedef struct dict {
dictType *type;
...
} dict;
typedef struct dictType {
uint64_t (*hashFunction)(const void *key); // 某种 dictType 类型的 hash function
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
好的, dict 的 type 是那种呢?我们看到 src/server.c
的 initServer
函数的一段代码
void initServer(...) {
...
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
...
}
/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};
我们得知 dict 是 db 的存放数据的字典,它传入了 dbDictType
类型。在定义中,我们也得知 hash function 具体实现是 dictSdsHash
, 所以我们就找 dictSdsHash
即可, 在 src/server.c
中,我们找到了
uint64_t dictSdsHash(const void *key) {
return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}
所以得知调用入口是 dictGenHashFunction
方法,回到 src/dict.c
代码如下
//https://github.com/redis/redis/blob/unstable/src/dict.c
uint64_t dictGenHashFunction(const void *key, int len) {
return siphash(key,len,dict_hash_function_seed);
}
好的,真相了,那就是 spihash 算法。
扩容机制
在上面了解了 dict 的数据结构的基础上,我们来了解 dict 是如何进行扩容,以及扩容后数据是如何迁移的?但在了解扩容机制和数据迁移之间,我们先来问几个问题
- dict 存在几种状态?
- dict 初始化?
- dict 什么时候扩容?扩容阀值是多少?扩容倍数是多少?
- 哪些地方会触发扩容?怎么扩容?
- 扩容后,数据如何 rehash ?
- 一次扩容后的rehash 过程中,由于 key 写入过快,很快又超过了新的扩容阀值,此时怎么办?
然后我们基于以上的问题,一个一个问题来回答和解析
扩容前置知识
字典存在几种状态?
在了解扩容机制之前,我们可以先小小剧透一下, dict 总共就存在 4 种状态
- table.size 不变,无扩缩容
- 扩容中
- 缩容中
- rehashing 中
了解了状态后,就可以更好的方便我们理解了
容量相关的关键字段定义
扩容状态码
#define DICT_OK 0 // 成功
#define DICT_ERR 1 // 失败
哈希表初始值
#define DICT_HT_INITIAL_SIZE 4 // 哈希表 (ht) size 的初始值
扩容安全阈值
static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;
void dictEnableResize(void) {
dict_can_resize = 1;
}
void dictDisableResize(void) {
dict_can_resize = 0;
}
字典的容量都是2的幂次方
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
- size 是要扩容的大小,进入
_dictNextPower
后,会计算得到一个接近 size 的值,且又是 2 的幂次方
扩容机制
字典什么时候会扩容?
那么我们就看下 sre/dict.c
的 _dictExpandIfNeeded
方法即可,因为字典的扩容时需要这个方法去判断,所以我们可以看到字典有三种扩容的渠道
- 当字典还没有被初始化,即字典的 hashtable[0] 为空时,那我们就初始化字典的第一个 hashtable
ht[0].size = 0
-
当 hashtable[0] 的键值对数量 >= hashtable[0] 数组的 size 时,且全局设置 dict_can_resize = true, 我们就扩容
d->ht[0].used >= d->ht[0].size && dict_can_resize = true
-
当 hashtable[0] 的键值对数量 >= hashtable[0] 数组的 size 时, 且键值对数量已经超过数组大小的 5 倍的安全阀值时,就强制触发扩容
d->ht[0].used >= d->ht[0].size && d->ht[0].used/d->ht[0].size > dict_force_resize_ratio
static int _dictExpandIfNeeded(dict *d)
{
// 如果当前处于 rehash 状态,则直接返回 0 (代表无需扩容,已扩容,新扩容成功)
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
// 如果 hashtable[0] 的大小为 0, 代表整个 dict 还没有被初始化,所以先初始 // 化字典的第一个 hashtable,初始大小是 4
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
// 当 hashtable[0] 的键值数 >= hashtable[0] 的 entry 数组大小
// 且 (dict_can_resize = true 或 hashtable[0] 键值数已超过 hashtable // 节点数组大小的 5 倍的安全阀值) 就会触发扩容
// 扩容倍数是已有键值数 (ht.used) 的两倍,注意不是 ht 的 size
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
_dictExpandIfNeeded
和 dictExpand
的返回值都是 0 (DICT_OK) 或 1 (DICT_ERR),
-
DICT_OK 代表新扩容成功,正在 rehashing ,无需扩容
-
DICT_ERR 代表非法操作,即非法扩容,扩容失败
-
或是在 rehashing 阶段进入 dictExpand 函数
-
或是在 dictExpand 阶段传入扩容 size 小于 当前 used
-
或是在 dictExpand 阶段
-
扩容的阈值 & 扩容的倍数
扩容阀值是多少?
相较 Java HashMap 的扩容因子为 0.75
, 那么 Redis 字典的扩容因子就是 1
, 即容量占比百分百才触发扩容。当然从 _dictExpandIfNeeded
函数中,我们可以看到这并不是绝对的,要取决于 dict_can_resize 的设置是否允许。如果不允许扩容时,那么只有等到 键值对数量/数组大小 > 5 时才会触发扩容
扩容倍数是多少?
从 _dictExpandIfNeeded
方法,我们可以看到,字典的扩容倍数是 2 倍
dictExpand(d, d->ht[0].used*2)
哪些方法会触发扩容?
我们来看下什么地方会调用 _dictExpandIfNeeded
方法,可以看到是 _dictkeyIndex
, 可以得知这是一个根据 key 获得其索引位置的函数
/* 方法:获得 key 在 hashtable 的索引
* 入参:*d 是当前字典,*key 键,hash 是 key 的哈希值,existing 就是 ht 的节点数组
* 返回值:
* 1. -1 代表失败
* - 可能是扩容失败, 有异常,导致不允许后续行为,所以返回 -1
* - 也可能是键值已存在,并且不打算覆盖旧值,所以返回 -1
* 2. 有值,代表该 key 经过计算,在 ht 的 idx 索引位置
* 注意:
* 1. 如果 existing 指针指向有值,并且该值在 ht 中存在,existing 会隐式将对应 * entry 带出去给外层调用方法
*/
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
// 如果需要扩容,则扩容,如果扩容失败,则返回 -1
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
// 遍历 dict 的两个哈希表, 因为 key 可能在 ht[0], 也可能在 ht[1]
for (table = 0; table <= 1; table++) {
// mod 运算得到 key 的
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
// 如果 key 存在,则遍历链表,看 key 是否存在 existing 中,如果存在则返回 -1
// 如果 key 不存在,则直接返回该 key 要插入的位置 idx
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
// 如果 existing 有值,则将存在的 entry 赋值给指针,交给外层调用方
if (existing) *existing = he;
return -1;
}
he = he->next;
}
// 如果 dict 不在 rehashing 状态,就不用遍历 ht[1] 了,因为没有数据
if (!dictIsRehashing(d)) break;
}
// 返回 key 在 ht 节点数组的索引
return idx;
}
那么谁又在调用 _dictkeyIndex
呢?是 *dictAddRaw
方法,这个方法又是干嘛的呢?它就是向字典插入一个数据的基础方法,会有很多操作方法调用它,来看看
/* 方法:向 dict 插入一个键值对, 并返回新增的节点 entry
* 返回值:
* 1. NULL 代表键已存在,不更新
* 2. 有值,代表键不存在,并新增成功
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
// 如果当前处于 rehashing 状态,则主动去迁移一个键值数据
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
// 如果该键值已经存在,则 dictKeyIndex 会返回 -1, 则直接返回 null, 代表没有新增
// 如果该键值不存在,属于新增,则将该 key 在 entry 数组的索引返回,并赋值给 index
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
// 如果处于 rehashing 状态,则向第二个哈希表 ht[1] 插入数据, 反之 ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
// 分配一个 entry 新节点, 并对 ht->table[index] 链表进行头插入 ,used + 1
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
// 暂不关心,不影响理解,有兴趣看 src/dict.h
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
// 返回新增节点
return entry;
}
我们知道了 *dictAddRaw
是字典的基本插入方法,那么谁会调用它呢?
int dictAdd(dict *d, void *key, void *val)
int dictReplace(dict *d, void *key, void *val)
dictEntry *dictAddOrFind(dict *d, void *key)
// 如果不存在则插入,存在则插入失败
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}
/* Add or Overwrite:
* Add an element, discarding the old value if the key already exists.
* Return 1 if the key was added from scratch, 0 if there was already an
* element with such key and dictReplace() just performed a value update
* operation.
*
* 如果存在则更新,不存在则插入
* 新增返回 1, 更新返回 0
*/
int dictReplace(dict *d, void *key, void *val)
{
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that it is important
* to do that in this order, as the value may just be exactly the same
* as the previous one. In this context, think to reference counting,
* you want to increment (set), and then decrement (free), and not the
* reverse.
*
* 由 dictAddRaw 隐式返回旧值 entry 的 existing 指向,所以我们可以对 existing 指向的 entry 进行新值更新
*
* */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
/* Add or Find:
* dictAddOrFind() is simply a version of dictAddRaw() that always
* returns the hash entry of the specified key, even if the key already
* exists and can't be added (in that case the entry of the already
* existing key is returned.
* 没啥好说的
*
* See dictAddRaw() for more information. */
dictEntry *dictAddOrFind(dict *d, void *key) {
dictEntry *entry, *existing;
entry = dictAddRaw(d,key,&existing);
return entry ? entry : existing;
}
- 单纯的对应 redis 的命令,dictAdd 和 dictReplace 就可以实现 setIfpresent, setIfabsent, set 等命令了
触发扩容后会怎么扩容?
在我们知道了触发扩容的时机,扩容的阀值,扩容的倍数,以及会导致触发扩容的方法后。我们就要来看看扩容的中重头戏了,那就是怎么扩容? ,主要依赖 dictExpand
方法,所以重点看
/* 方法:Expand or create the hash table, 扩容或新建哈希表
* 参数:
* 1. *d: 要操作的字典
* 2. size: 想为 *d 字典扩容到 size 大小
* 返回值:
* 1. DICT_ERR 1 扩容或初始化 ht 失败
* - 正处于 rehashing ,数据未完全迁移,无法进行下一次扩容
* - ht[0].used > size, 扩容无意义
* - ht[0].size == realsize, ht[0] 的 size 已经达到 realsize, 没有扩 * 容的意义
* 2. DICT_OK 0 扩容或初始化 ht 成功
*
*/
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
// 如果正在处于 rehashing,则返回 1,代表刚刚已进行过扩容,并且数据仍未完成全 // 部迁移,无法进行下一次扩容,扩容失败
// 或 ht[0] 已有的键值对数量已经大于 size, 则代表将字典继续扩容到 size 大小 // 已经没有意义,返回 1, 表示此次扩容无意义
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
// 到达这里,代表允许扩容,并且将 size 调整到接近 2 的幂次方的一个数值
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);
// 如果此时的 ht[0]
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;
// 为新哈希表赋值
/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
// 如果 ht[0] == null, 代表该字典还没有被使用,这是第一次进行初始化,所以将 // n 赋值给 ht[0]
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
// 如果不是第一次初始化,则将扩容后的新哈希表赋值给 ht[1],并更新 rehashidx // = 0 ,代表开始 rehashing, 从 0 开始
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
// 扩容成功
return DICT_OK;
}
我们知道 dict 就是 redis 的字典数据结构,它有两个 ht, 当 ht[0].used 达到阀值,就会触发字典的扩容,而扩容就是新分配一个 2*ht[0].used
大小的哈希表给 ht[1], 以此循环完成扩容。既然我们知道了 ht[0], ht[1] 是如何搭配工作,完成字典的扩容,那么扩容之后,数据又是如何从旧哈希表迁移到新哈希表的呢?
看后面的 rehash 机制吧
渐进式rehash
前置知识
为什么要rehash?
为什么要 rehash ? 如果你是 Java 技术栈,那么你肯定了解过 HashMap 的数据 rehash ,一种巧妙的二进制操作,就将数据从一个数组迁移到另一个数组里。同理 Redis 字典扩容后也需要一种手段,将数据从一个容器迁移到另一个容器中,只不过 Redis 迁移的方式与 Java 不一致而已
渐进式rehash?
- 因为 Redis 的字典和 Java 的 HashMap 定位不同, Redis 承载了更大量的数据,并承诺提供高性能的读写,而类 Java 的一次性同步数据迁移会消费大量的时间,而 Redis 又是单进程单线程模型,更不允许因为主线程因为 rehash 而出现长时的阻塞。
- 所以 Redis 灵机一动,既然无法一次性全量迁移,那么我就一次迁移一部分,直到完成全部数据的迁移,这样单次数据迁移的时间就大大缩小,从而不影响读写,又能保证数据平滑迁移, 所以这也就是渐进式迁移数据的过程
什么时候会rehash?
我们想知道什么时候回开始出发 rehash ? 我们回想下在看扩容的代码时,也就是 dictExpand
方法时,最下面有段代码
int dictExpand(dict *d, unsigned long size) {
...
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0;
...
}
当把字典的 rehashidx 字典置为 0
时,也就代表了字典开始进行 rehash 了
/* 字典数据结构 */
typedef struct dict {
...
dictht ht[2]; // 一个字典,含有两个哈希表
long rehashidx; // 代表 rehashing 到了什么位置,rehashidx = -1 代表 // 未进行 rehash
...
} dict;
我们再来看到 src/dict.h 的 dictIsRehashing
方法,可以知道,通过判断 rehashidx 是否等于 -1
就能判断当前字典是否处于 rehashing 状态,也能进一步证明 rehashidx = 0 时,代表 rehash 正式开始进行
// src/dict.h
#define dictIsRehashing(d) ((d)->rehashidx != -1)
rehash流程
那么字典是如何进行渐进式 rehash 的呢?它主要分为两种方式进行
- [被动式触发] :每次外部调用的 CRUD 都会触发一次数据迁移,每次迁移一份数据
- [主动式触发] :定时任务,每次扫描一点数据进行迁移
被动式迁移
基本上涉及到查询,删除,修改,新增的方法都有判断该字典是否处于 rehashing 状态,如果处于 rehashing 状态,就调用 _dictRehashStep(d)
进行数据迁移; 例子如下,太多了,就不一一列出来了
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {
...
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
...
if (dictIsRehashing(d)) _dictRehashStep(d);
...
}
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
...
if (dictIsRehashing(d)) _dictRehashStep(d);
...
我们看到 _dictRehashStep(d)
是一个入口,那么我们就深入看下去,每次 CRUD 会触发一个怎么样的数据迁移,迁移多少
/* This function performs just a step of rehashing, and only if there are
* no safe iterators bound to our hash table. When we have iterators in the
* middle of a rehashing we can't mess with the two hash tables otherwise
* some element can be missed or duplicated.
*
* This function is called by common lookup or update operations in the
* dictionary so that the hash table automatically migrates from H1 to H2
* while it is actively used. */
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
我们先忽略 iterators 的存在,通常等于 0,总之它调用了 dictRehash
方法, 并且每次只迁移哈希表数组的一个槽位 (因为链表存在,可能迁移多个键值对),继续往下看
/*
* 方法:rehash, 对数据进行迁移
* 参数:*d:要操作的字典,n:迁移 n 个数组槽位
* 返回值:
* 1. 返回 1,代表还有数据要迁移
* 2. 返回 0,代表所有数据已经迁移完了
*
*/
int dictRehash(dict *d, int n) {
// 原文注释说有说明, 最多遍历 n*10 个空桶, 避免过于耗时,因为数组中可能有很多 // 连续为空的数组槽位
// 避免此次 rehash 过于耗时
int empty_visits = n*10; /* Max number of empty buckets to visit. */
// 如果 rehashing 已经结束,或没有开始,那么返回 0 ,代表迁移完毕,或无需迁移
if (!dictIsRehashing(d)) return 0;
// 遍历 n 次,条件是 ht[0] 数据还没有迁移完,中途如果发现迁移完了,则退出循环
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// rehashidx 代表数据迁移已经迁移到 ht[0] 的rehashidx 位置了,所以
// rehashidx 不会大于 ht[0].size
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// 如果遇到空槽位,则去检查下一个槽位,顺便做最大空桶检查
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 如果非空桶,则此槽位有数据,遍历该槽位的链表,将该链表的数据 rehash,
// 迁移到 ht[1]
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// 每迁移一个槽位,就将 ht[0] 原数据回收, rehashidx++
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
// 当发现 ht[0] 已经没有任何数据了,则回收 ht[0] 指向的空间
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
// 并将 ht[0] 重新指向已完成扩容和数据迁移的新哈希表 ht[1]
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
// 并表示 rehashing 状态已结束,完成数据迁移
d->rehashidx = -1;
return 0;
}
// 如果跳过了上面的判断,则代表还有很多数据有待迁移
/* More to rehash... */
return 1;
}
- 我们可以看到字典的扩容的终止操作其实是在 rehash 方法中完成的,即 ht[0] 指针被重新指向,且字典的
rehashidx = -1
- 而且被动式 rehash 只会迁移一个数组槽位的数据,(因为链表,所以迁移的键值对可能大于 1 个)
主动式迁移
入口在 src/server.c
文件里,我们看到 databaseCron
方法, 我们可以还知道该方法是一个定时任务方法,会执行诸如键过期, resizeing, rehashing 等操作,不过我们不想看这么多,就省略非重点代码
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
...
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
我们看到了会执行 incrementallyRehash
方法,继续往下看
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
* of CPU time at every call of this function to perform some rehashing.
*
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned. */
int incrementallyRehash(int dbid) {
// 字典 rehashing
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
// 过期字典 rehashing
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
(额外知识点, redis 过期类型键会存在另外一个的字典一起维护数据) 我们看到普通的字典会通过 dictRehashMilliseconds
进行 rehashing , 并传入了 1
的参数。所以让我们从 src/server.h
回到 src/dict.c
, 继续往下看
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
* than 0, and is smaller than 1 in most cases. The exact upper bound
* depends on the running time of dictRehash(d,100).
*
* 执行 x ms 的 rehash, 并返回 rehash 槽位的个数
* */
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
// 每次 rehash 100 个数组槽位,被被动式多 100 倍呢
// 直到数据完全被迁移完成或 if 打断
while(dictRehash(d,100)) {
// 累计槽位
rehashes += 100;
// 如果已经过了 ms 毫秒,则打断
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
从上看可以看到, 主动式每次至少扫描 100 个数组槽位,每次扫描 x ms 时间。反正就是两个退出条件,要么超时,要么迁移完
说明:
这种主动式迁移是redis处理完网络事件之后才做的,即此时redis处于空闲的时间,开始处理定时事件,然后每次rehash100个数组槽位,移动完100个之后,若超过1ms,则退出定时事件重新等待网络事件;否则继续移动继续判断是否超过1ms。
问题
哈希冲突时,为什么都是头插入?
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {
...
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
...
}
- 在头插入的源码中有一段官方注释,是这么说明的,用户最近插入的数据,有更大的概率被频繁访问,有点类似 LRU 的思想;既然新增数据更有概率被访问,那么自然就会将新增数据放在链表的头结点,以减少遍历链表的时间复杂度呀!
- 当然我个人认为,还有第二个原因就是,当哈希冲突,直接插入头结点可以避免遍历,相比尾插入,少了一个遍历链表的过程,也就提高了写性能啊
rehash阶段遇到读写事件会发生什么?
读事件
- 当处于 rehashing 阶段时,读线程需要帮忙搬迁数据,同时会遍历两张哈希表
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
// 如果处理 rehashing, 帮忙搬迁数据,一个槽位即可
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// 遍历两个 table
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
// 如果没有 reshing, 就直接 Return, 不用迭代遍历 ht[1] 了
// 如果处理 reshing, 则需要继续遍历 ht[1]
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
写事件
- 当初 rehashing 时,写线程要帮忙搬迁数据
- 如果是插入操作则将数据写到新表中,即 ht[1],而不是旧表
- 如果是删除操作,根据读的情况,不用想都是要遍历两张表,找到元素并删除
// 如果处于 rehashing 状态,则向第二个哈希表 ht[1] 插入数据, 反之 ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
扩容 & rehash期间,如果新增过快,又到了扩容的阈值会怎么样?
答案就是 “不会马上扩容,会等待本次扩容结束,再进行下一次扩容”。
什么意思?也就是说当前处于 rehashing 的字典,因为本次扩容的生命周期没有完全结束,所以不会立即触发下一次的扩容,而是继续将数据往 ht[1] 写入,其结果无非就是导致 ht[1] 的哈希冲突概率逐渐加大,直到 ht[0] 的数据全部迁移到 ht[1] 中,并将 ht[0] 重指向 ht[1] 所指向的哈希表, 结束 rehashing 状态,并在本次扩容结束的下一次写入操作,立马触发字典的下一次扩容
rehash每次迁移多少数据?
- 当由 CRUD 被动式触发的数据迁移,每次只会迁移 1 个数组槽位的数据,而一个数据槽位会含有 n 个键值对数据,具体 n 是多少呢,就看哈希冲突有多强烈了
- 当由定时任务主动式扫描触发的数据迁移,每次会迁移 1 毫秒的数据,这毫秒内,至少迁移 100 个数组槽位,时间有空余就迁移更多批次,没有空余,执行完第一批 100 个槽位就停下