Redis源码剖析(一)

基础数据结构部分

动态字符串 SDS

实现在 sds.h/sds.c。

设计原则

为什么不使用c语言原生的字符串操作库? c字符串用'\0'作为终止符,不能满足二进制安全,而且求字符串长度,拼接等操作都要遍历到'\0'来实现,需要自己控制内存使用,操作复杂度高。

前置知识

由于我对C语言没有深入了解,有很多知识点会在前面补充。

数据结构

这里以sdshdr8为例

// 一个字节 8位
// __attribute__ ((__packed__)) 用来告诉编译器取消结构在编译中的优化对齐,按照实际占用。
// 因为内存是按照2的倍数读取的,否则可能读两次,速度变慢。这里是用时间换空间
// 保证整个结构体的空间紧密
struct __attribute__ ((__packed__)) sdshdr8 {
    // buf 中已经使用的字节数
    uint8_t len; /* used */
    // 去掉头和null结束符,已经分配的字节数=有效长度+数据长度
    uint8_t alloc; /* excluding the header and null terminator */ 
    // 8位,只用前三位
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    // 柔性数组,没有分配之前不占内存
    char buf[];
};

记录了已经使用的空间和分配的空间,比C字符串操作效率更高。和 C 语言中的字符串操作相比,SDS 通过记录字符数组的使用长度和分配空间大小,避免了对字符串的遍历操作,降低了操作开销,进一步就可以帮助诸多字符串操作更加高效地完成,比如创建、追加、复制、比较等。

二进制安全

什么是二进制安全?通俗地讲,C语言中,用“\0”表示字符串的结束,如果字符串中本身就有“\0”字符,字符串就会被截断,即非二进制安全;若通过某种机制,保证读写字符串时不损害其内容,则是二进制安全。在网络报文中常常需要二进制安全。

sds使用 len 来控制字符串长度,而不是使用"\0",保障了二进制安全。 

极致的内存使用

与c字符串函数

始终将buf指针暴露给上层,可以和c字符串函数切合。同时可以很容易地通过减去一个sdshdr大小偏移到结构体首部来调用结构体属性。
	
如下面的宏定义:

```c
#define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (void*)((s)-(sizeof(struct sdshdr##T)));
#define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))
```

基本操作

创建

sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) {
    void *sh;
    sds s;
    char type = sdsReqType(initlen);
    // 因为通常总是有空字符串,而使用type5每增加一次就需要扩容,所以直接使用type 8 ---- 为什么不把type5直接删了?
    /* Empty strings are usually created in order to append. Use type 8
     * since type 5 is not good at this. */
    if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
    int hdrlen = sdsHdrSize(type);
    unsigned char *fp; /* flags pointer. */
    size_t usable;

    // initlen 是buf中实际装的大小,hdrlen是sds header大小,+1 是\0终止符
    assert(initlen + hdrlen + 1 > initlen); /* Catch size_t overflow */
    //malloc_usable还是调用tyymalloc_usable
    sh = trymalloc?
        s_trymalloc_usable(hdrlen+initlen+1, &usable) :
        s_malloc_usable(hdrlen+initlen+1, &usable);
    if (sh == NULL) return NULL;
    if (init==SDS_NOINIT)
        init = NULL;
    else if (!init)
        memset(sh, 0, hdrlen+initlen+1);
    // 指向buf
    s = (char*)sh+hdrlen;
    fp = ((unsigned char*)s)-1;
    usable = usable-hdrlen-1;
    // 可能申请的超过类型MaxSize
    if (usable > sdsTypeMaxSize(type))
        usable = sdsTypeMaxSize(type);
    switch(type) {
        case SDS_TYPE_5: {
            *fp = type | (initlen << SDS_TYPE_BITS);
            break;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            sh->len = initlen;
            sh->alloc = usable;
            *fp = type;
            break;
        }
    // ...
    }
    if (initlen && init)
        memcpy(s, init, initlen);
    s[initlen] = '\0';
    return s;
}
  1. 首先根据申请的初始大小来确定类型,通过类型可以确定hdr大小,然后来申请空间。
  2. 这里使用的s_trymalloc_usable与s_malloc_usable都是文件zmallo.c实现的内存管理函数,后面会专门讲解。这里只用知道它会申请前一个参数大小的空间,并且将空间大小赋值给后一个参数usable。
  3. 得到空间的首地址,加上头大小得到buf地址s,s[-1] 得到类型指针fp,usable减去头大小hdrlen和类型大小1得到实际可用大小。
  4. 根据类型来构建一个sds结构体,最后返回是buf的指针,补上终止符'\0'。这里使用的是一个宏,可以借鉴这种写法,一个经常使用的操作,如果写成函数,会增加堆栈调度消耗,写成宏可以提高性能,代价是编译后的文件大小会增加。

这是sds创建的底层实现,实际使用的是上层的封装,只是对这个函数的封装调用。

销毁

有两种方法,一种是直接销毁:

void sdsfree(sds s) {
    if (s == NULL) return;
    s_free((char*)s-sdsHdrSize(s[-1]));
}

一种是仅仅将sds的len标记为0,但是实际的buf并不会释放,而是等待覆写。这样可以优化性能。

扩容

sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
    void *sh, *newsh;
    size_t avail = sdsavail(s);
    size_t len, newlen, reqlen;
    char type, oldtype = s[-1] & SDS_TYPE_MASK;
    int hdrlen;
    size_t usable;

    /* Return ASAP if there is enough space left. */
    if (avail >= addlen) return s;

    len = sdslen(s);
    sh = (char*)s-sdsHdrSize(oldtype);
    reqlen = newlen = (len+addlen);
    // 这里是防止溢出
    assert(newlen > len);   /* Catch size_t overflow */
    if (greedy == 1) {
        // SDS_MAX_PREALLOC 是 1MB
        if (newlen < SDS_MAX_PREALLOC)
            newlen *= 2;
        else
            newlen += SDS_MAX_PREALLOC;
    }

    type = sdsReqType(newlen);

    /* Don't use type 5: the user is appending to the string and type 5 is
     * not able to remember empty space, so sdsMakeRoomFor() must be called
     * at every appending operation. */
    if (type == SDS_TYPE_5) type = SDS_TYPE_8;

    hdrlen = sdsHdrSize(type);
    assert(hdrlen + newlen + 1 > reqlen);  /* Catch size_t overflow */
    if (oldtype==type) {
        // 和原类型相同,则不用释放内存,直接将buf扩容即可
        newsh = s_realloc_usable(sh, hdrlen+newlen+1, &usable);
        if (newsh == NULL) return NULL;
        s = (char*)newsh+hdrlen;
    } else {
        /* Since the header size changes, need to move the string forward,
         * and can't use realloc */
        // 类型改变,需要重新申请内存,原内存释放
        newsh = s_malloc_usable(hdrlen+newlen+1, &usable);
        if (newsh == NULL) return NULL;
        memcpy((char*)newsh+hdrlen, s, len+1);
        s_free(sh);
        s = (char*)newsh+hdrlen;
        s[-1] = type;
        sdssetlen(s, len);
    }
    usable = usable-hdrlen-1;
    // type 是通过newlen判断得到的,而usable 是 hdrlen + newlen + 1 可能出现超出的情况
    if (usable > sdsTypeMaxSize(type))
        usable = sdsTypeMaxSize(type);
    sdssetalloc(s, usable);
    return s;
}
  1. 首先判断newlen加上len是否超出可用的大小avail,没超就不扩容。
  2. 和之前不同,这一版本加入了greedy参数,来调节扩容策略,当greedy为1时,每次会扩大的比所需要的更多,这样可以减少扩容频率。而greedy为0时,就是节约内存
  3. greedy为1时启用此策略: 如果newlen小于1MB,每次扩容二背,大于1MB时每次增加1MB。(每次2倍内存很快就耗尽了)
  4. 这里根据新的newlen来确定类型,如果类型不变,只需要扩展buf数组,而类型改变的话就需要重新申请内存。

跳表zskiplist

对应的代码在 server.h 和 t_zset.c。

跳表可以看作链表加上都多层索引,一般每两个

skiplist

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele; // 存储字符串类型的数据
    double score; // 储存排序的分值
    struct zskiplistNode *backward; // 后向指针 头节点和第一个节点都为NULL
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 指向本层下一个节点
        unsigned long span; // 跨度 指向本层下一个节点中间跨越的节点个数
    } level[]; // 柔性数组,未分配内存时不占空间。初始化时,level 随机分配1~32
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail; 
    unsigned long length; // 除了头节点以外节点总数
    int level; // 跳表的高度
} zskiplist;

用的图片是网图 链接,其中obj一般为sds,在Redis6中已经改为sds ele。

可以从图和代码很清晰地看出跳跃表的结构。

跳跃表是Redis有序集合的底层实现方式之一,所以每个节点的ele存储有序集合的成员member值,score存储成员score值。所有节点的分值是按从小到大的方式排序的,当有序集合的成员分值相同时,节点会按member的字典序进行排序。

通过跳跃表结构体的属性我们可以看到,程序可以在O(1)的时间复杂度下,快速获取到跳跃表的头节点、尾节点、长度和高度。

创建

Redis通过zslRandomLevel函数随机生成一个1~32的值,作为新建节点的高度,值越大出现的概率越低。节点层高确定之后便不会再修改。生成随机层高的代码如下。

// ZSKIPLIST 为 0.25
int zslRandomLevel(void) {
    static const int threshold = ZSKIPLIST_P*RAND_MAX;
    int level = 1;
    while (random() < threshold)
        level += 1;
    // 这里 ZSKIPLIST_MAXLEVEL 为32 
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

当p=0.25时,跳跃表节点的期望层高为1/(1-0.25)≈1.33。

下面是创建函数

/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
		    // level 柔性数组加上头大小 来申请内存
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
		// 头节点的level是最大层数
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

头节点是一个特殊的节点,不存储有序集合的member信息。头节点是跳跃表中第一个插入的节点,其level数组的每项forward都为NULL, span值都为0

插入

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // 记录每层所能到达的最右边节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // 记录每层从header到update[i}所需的步长
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    // 判断score 是不是NAN
    serverAssert(!isnan(score));
    x = zsl->header;
    // 从最高层索引开始遍历
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 当在最高层时,先将rank赋值为0,先假设
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        
        // 在第i层一直向前移动比较,因为是按照score 从小到大排列的
        // 找到这层大于插入score 的位置然后下移
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            // 更新总的span
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // 记录这层的终点
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // zslInsert不能应用在插入节点已经存在的情况下。
    // 所以不用检查存在
    
    //为插入节点计算随机层数
    level = zslRandomLevel();
    //大于原来层高的部分,只需要调整header就行。
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            // 为啥是这个?可能是用来占位
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        // 插入到每层最右侧能到达的节点之后
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // 插入节点每层的span更新,这个看下图
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

下图来源于 链接

以节点19插入为例,其中 黑色箭头的表示的跨度为update[i]->level[i].span 蓝色箭头表示的跨度为rank[0] - rank[i]即节点19在level_0的update[0]为11, 在level_1的update[1]为7,rank[0] - rank[i]为节点7与节点11之间的跨度 绿色箭头表示的跨度为节点19到节点37的span

删除

首先查找到对应的节点,将每层最右边到达的节点记录下来,对应的update。 辅助函数:

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    // 调整对应的span和forward
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }

    // 调整level 
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

删除函数:

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    
    //查找位置
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        // 保存每层最右边的节点
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    // 可能同一个score有多个ele
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

压缩列表

具体的实现在ziplist.h和ziplist.c

压缩列表是Redis中一种高效利用内存的数据结构,用来储存字符串和数字,它的push和pop操作都是 O(1) 。Redis的有序集合、散列和列表都直接或者间接使用了压缩列表。当有序集合或散列表的元素个数比较少,且元素都是短字符串时,Redis便使用压缩列表作为其底层数据存储结构。列表使用快速链表(quicklist)数据结构存储,而快速链表就是双向链表与压缩列表的组合。

// ziplist 结构
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

这里的所有结构都是按照小端存储。

这里可以清楚地感受到C语言对内存的掌控,通过指针位移来获取结构信息。这里使用宏又是C语言的一个特色,比起inline只是建议编译器内联,宏真正是内联,对于一些细小而频繁的操作提高了性能。

/* Return total bytes a ziplist is composed of. */
#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))

/* Return the offset of the last item inside the ziplist. */
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))

/* Return the length of a ziplist, or UINT16_MAX if the length cannot be
 * determined without scanning the whole ziplist. */
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))

/* The size of a ziplist header: two 32 bit integers for the total
 * bytes count and last item offset. One 16 bit integer for the number
 * of items field. */
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))

/* Size of the "end of ziplist" entry. Just one byte. */
#define ZIPLIST_END_SIZE        (sizeof(uint8_t))

/* Return the pointer to the first entry of a ziplist. */
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)

/* Return the pointer to the last entry of a ziplist, using the
 * last entry offset inside the ziplist header. */
#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))

/* Return the pointer to the last byte of a ziplist, which is, the
 * end of ziplist FF entry. */
#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-ZIPLIST_END_SIZE)

对于 结构如下:

<prevlen> <encoding> <entry-data>

previous_entry_length字段表示前一个元素的字节长度,占1个或者5个字节,当前一个元素的长度小于254字节时,用1个字节表示;当前一个元素的长度大于或等于254字节时,用5个字节来表示。而此时previous_entry_length字段的第1个字节是固定的0xFE,后面4个字节才真正表示前一个元素的长度。假设已知当前元素的首地址为p,那么p-previous_entry_length就是前一个元素的首地址,从而实现压缩列表从尾到头的遍历。

encoding字段表示当前元素的编码,即content字段存储的数据类型(整数或者字节数组),数据内容存储在content字段。为了节约内存,encoding字段同样长度可变。

Redis使用宏来表示

#define ZIP_STR_MASK 0xc0
#define ZIP_INT_MASK 0x30
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)
#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe

这里使用位运算来代表类型,既节省了内存又提高了性能。

结构

typedef struct zlentry {
    unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/
    unsigned int prevrawlen;     /* Previous entry len. */
    unsigned int lensize;        /* Bytes used to encode this entry type/len.
                                    For example strings have a 1, 2 or 5 bytes
                                    header. Integers always use a single byte.*/
    unsigned int len;            /* Bytes used to represent the actual entry.
                                    For strings this is just the string length
                                    while for integers it is 1, 2, 3, 4, 8 or
                                    0 (for 4 bit immediate) depending on the
                                    number range. */
    unsigned int headersize;     /* prevrawlensize + lensize. */
    unsigned char encoding;      /* Set to ZIP_STR_* or ZIP_INT_* depending on
                                    the entry encoding. However for 4 bits
                                    immediate integers this can assume a range
                                    of values and must be range-checked. */
    unsigned char *p;            /* Pointer to the very start of the entry, that
                                    is, this points to prev-entry-len field. */
} zlentry;

对于压缩列表的任意元素,获取前一个元素的长度、判断存储的数据类型、获取数据内容都需要经过复杂的解码运算。解码后的结果应该被缓存起来,为此定义了结构体zlentry,用于表示解码后的压缩列表元素。

解码操作,主要用宏实现:

static inline void zipEntry(unsigned char *p, zlentry *e) {
    ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
    ZIP_ENTRY_ENCODING(p + e->prevrawlensize, e->encoding);
    ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
    assert(e->lensize != 0); /* check that encoding was valid. */
    e->headersize = e->prevrawlensize + e->lensize;
    e->p = p;
}

这里主要就是对字节的读取,可以去看源代码。

操作

创建

/* Create a new empty ziplist. */
// 先申请初始的空间(4+4+2+1),再对zlbytes,zltail,zllen,zlend逐个初始化
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

插入元素

  1. 编码。计算previous_entry_length字段、encoding字段和content字段的内容。

字典

结构

节点:

typedef struct dictEntry {
    void *key;
    // 节省内存 不同场景下使用不同字段
    union {
        void *val; // db.dict 储存值
        uint64_t u64; 
        int64_t s64;  // db.expires 储存过期时间
        double d;
    } v;
    // 单链表法 解决哈希冲突。
    struct dictEntry *next;     /* Next entry in the same hash bucket. */
    void *metadata[];           /* An arbitrary number of bytes (starting at a
                                 * pointer-aligned address) of size as returned
                                 * by dictType's dictEntryMetadataBytes(). */
} dictEntry;

可以看出是使用链表法来解决hash冲突的。

struct dict {
    dictType *type; // 对应特定类型操作函数
    
    dictEntry **ht_table[2]; // 哈希表。有两个,一个正常使用,另外一个在rehash时使用
    unsigned long ht_used[2]; // 记录每个哈希表被使用的数目。

    long rehashidx; /* rehashing not in progress if rehashidx == -1 */

    /* Keep small vars at end for optimal (minimal) struct padding */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
    // size 的 系数,size 是2 的N次幂
    signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};

这里可以看到一个dictType 用来对应特定类型的操作函数,这些函数体现了面向对象编程的思想,会在后面合适的时机用到。

比如找个hashFunction 用来控制dict使用的hash函数,默认为siphash。

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key); // hash函数
    void *(*keyDup)(dict *d, const void *key); // key的 复制函数
    void *(*valDup)(dict *d, const void *obj); // val 的复制函数
    int (*keyCompare)(dict *d, const void *key1, const void *key2); // key 对比函数
    void (*keyDestructor)(dict *d, void *key); // key 销毁函数
    void (*valDestructor)(dict *d, void *obj); // val 销毁函数
    int (*expandAllowed)(size_t moreMem, double usedRatio); //扩展函数 
    /* Allow a dictEntry to carry extra caller-defined metadata.  The
     * extra memory is initialized to 0 when a dictEntry is allocated. */
    size_t (*dictEntryMetadataBytes)(dict *d); // 元数据
} dictType;

创建

先申请空间,再初始化参数

/* Reset hash table parameters already initialized with _dictInit()*/
static void _dictReset(dict *d, int htidx)
{
    d->ht_table[htidx] = NULL;
    d->ht_size_exp[htidx] = -1;
    d->ht_used[htidx] = 0;
}

/* Create a new hash table */
dict *dictCreate(dictType *type)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type);
    return d;
}

/* Initialize the hash table */
int _dictInit(dict *d, dictType *type)
{
    _dictReset(d, 0);
    _dictReset(d, 1);
    d->type = type;
    d->rehashidx = -1;
    d->pauserehash = 0;
    return DICT_OK; // 使用一些宏来反馈结果
}

增加与扩容

这里先提前讲一下Rehash的概念,便于理解增加扩容中的一些操作:

扩容后,字典容量及掩码值会发生改变,同一个键与掩码经位运算后得到的索引值就会发生改变,从而导致根据键查找不到值的情况。解决这个问题的方法是,新扩容的内存放到一个全新的Hash表中(ht[1]),并给字典打上在进行rehash操作中的标识(即rehashidx! =-1)。此后,新添加的键值对都往新的Hash表中存储;而修改、删除、查找操作需要在ht[0]、ht[1]中进行检查,然后再决定去对哪个Hash表操作。除此之外,还需要把老Hash表(ht[0])中的数据重新计算索引值后全部迁移插入到新的Hash表(ht[1])中,此迁移过程称作rehash。

先看增加单个entry的操作:

/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key,NULL);
    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    int htidx;

    // 如果正在rehash,在add时进行一步rehash,这里是将大范围的rehash分散来减小资源集中消耗
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    htidx = dictIsRehashing(d) ? 1 : 0;
    size_t metasize = dictMetadataSize(d);
    entry = zmalloc(sizeof(*entry) + metasize);
    if (metasize > 0) {
        memset(dictMetadata(entry), 0, metasize);
    }
    // 插入在顶部:根据时空局限性
    entry->next = d->ht_table[htidx][index];
    d->ht_table[htidx][index] = entry;
    d->ht_used[htidx]++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

可以看出,add调用了一个底层的addraw函数。addraw首先使用dictkeyindex来查找一个合适的插入位置,如果这个key已经存在就退出add操作。然后确定是否在rehash,上面我们讲过如果在rehash那么 新添加的键值对都往新的Hash表中存储。后面就申请空间在相应位置顶部插入,这是数据库时空局限性的体现。

这里看一下dictSetKey和dictSetVal:

#define dictSetKey(d, entry, _key_) do { \
    if ((d)->type->keyDup) \
        (entry)->key = (d)->type->keyDup((d), _key_); \
    else \
        (entry)->key = (_key_); \
} while(0)

#define dictSetVal(d, entry, _val_) do { \
    if ((d)->type->valDup) \
        (entry)->v.val = (d)->type->valDup((d), _val_); \
    else \
        (entry)->v.val = (_val_); \
} while(0)

可以看出是用宏的形式调用dict的dicttype函数,也就是说这些操作是可以调整的。

扩容操作:

// 将d扩容到2^size的大小
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
    if (malloc_failed) *malloc_failed = 0;

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht_used[0] > size)
        return DICT_ERR;

    /* the new hash table */
    dictEntry **new_ht_table;
    unsigned long new_ht_used;
    signed char new_ht_size_exp = _dictNextExp(size);

    /* Detect overflows */
    size_t newsize = 1ul<<new_ht_size_exp;
    // 后者判断在什么时候成立?
    if (newsize < size || newsize * sizeof(dictEntry*) < newsize)
        return DICT_ERR;

    /* Rehashing to the same table size is not useful. */
    if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    if (malloc_failed) {
        new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
        *malloc_failed = new_ht_table == NULL;
        if (*malloc_failed)
            return DICT_ERR;
    } else
        new_ht_table = zcalloc(newsize*sizeof(dictEntry*));

    // 新的hash表被使用的数量
    new_ht_used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht_table[0] == NULL) {
        d->ht_size_exp[0] = new_ht_size_exp;
        d->ht_used[0] = new_ht_used;
        d->ht_table[0] = new_ht_table;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht_size_exp[1] = new_ht_size_exp;
    d->ht_used[1] = new_ht_used;
    d->ht_table[1] = new_ht_table;
    d->rehashidx = 0;
    return DICT_OK;
}

首先判断是否在rehash,在rehash中不能扩容。然后创建一个新的hash table,这个newsize是2的n次幂。expand操作在刚开始初始化时会使用,也会在这里做一个判断。更常用的是在扩容后进行rehash操作。

获得size的函数:

// 确保hash cap 为2的N次幂
static signed char _dictNextExp(unsigned long size)
{
    unsigned char e = DICT_HT_INITIAL_EXP;

    if (size >= LONG_MAX) return (8*sizeof(long)-1);
    // 1 << e == 1 * 2^e  
    // 找到一个大于size 的2^e
    while(1) {
        if (((unsigned long)1<<e) >= size)
            return e;
        e++;
    }
}

渐进式Rehash

直接看函数:

int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht_used[0] != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
        while(d->ht_table[0][d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht_table[0][d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
            de->next = d->ht_table[1][h];
            d->ht_table[1][h] = de;
            d->ht_used[0]--;
            d->ht_used[1]++;
            de = nextde;
        }
        d->ht_table[0][d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht_used[0] == 0) {
        zfree(d->ht_table[0]);
        /* Copy the new ht onto the old one */
        d->ht_table[0] = d->ht_table[1];
        d->ht_used[0] = d->ht_used[1];
        d->ht_size_exp[0] = d->ht_size_exp[1];
        _dictReset(d, 1);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

rehash除了扩容时会触发,缩容时也会触发。Redis整个rehash的实现,主要分为如下几步完成。

  1. 给Hash表ht[1]申请足够的空间;扩容时空间大小为当前容量2,即d->ht[0]. used2;当使用量不到总空间10%时,则进行缩容。缩容时空间大小则为能恰好包含d->ht[0].used个节点的2^N次方幂整数,并把字典中字段rehashidx标识为0
  2. 进行rehash操作调用的是dictRehash函数,重新计算ht[0]中每个键的Hash值与索引值(重新计算就叫rehash),依次添加到新的Hash表ht[1],并把老Hash表中该键值对删除。把字典中字段rehashidx字段修改为Hash表ht[0]中正在进行rehash操作节点的索引值.
  3. rehash操作后,清空ht[0],然后对调一下ht[1]与ht[0]的值,并把字典中rehashidx字段标识为-1。

我们知道,Redis可以提供高性能的线上服务,而且是单进程模式,当数据库中键值对数量达到了百万、千万、亿级别时,整个rehash过程将非常缓慢,如果不优化rehash过程,可能会造成很严重的服务不可用现象。Redis优化的思想很巧妙,利用分而治之的思想了进行rehash操作,大致的步骤如下。

执行插入、删除、查找、修改等操作前,都先判断当前字典rehash操作是否在进行中,进行中则调用dictRehashStep函数进行rehash操作(每次只对1个节点进行rehash操作,共执行1次)。除这些操作之外,当服务空闲时,如果当前字典也需要进行rehsh操作,则会调用incrementallyRehash函数进行批量rehash操作(每次对100个节点进行rehash操作,共执行1毫秒)。在经历N次rehash操作后,整个ht[0]的数据都会迁移到ht[1]中,这样做的好处就把是本应集中处理的时间分散到了上百万、千万、亿次操作中,所以其耗时可忽略不计。

/* This function performs just a step of rehashing, and only if hashing has
 * not been paused for our hash table. When we have iterators in the
 * middle of a rehashing we can't mess with the two hash tables otherwise
 * some elements can be missed or duplicated.
 *
 * This function is called by common lookup or update operations in the
 * dictionary so that the hash table automatically migrates from H1 to H2
 * while it is actively used. */
static void _dictRehashStep(dict *d) {
    if (d->pauserehash == 0) dictRehash(d,1);
}

删除

static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
    uint64_t h, idx;
    dictEntry *he, *prevHe;
    int table;

    /* dict is empty */
    if (dictSize(d) == 0) return NULL;

    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);

    for (table = 0; table <= 1; table++) {
        idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
        he = d->ht_table[table][idx];
        prevHe = NULL;
        // 查找
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else // 在bucket顶部,直接略过
                    d->ht_table[table][idx] = he->next;
                if (!nofree) {
                    dictFreeUnlinkedEntry(d, he);
                }
                d->ht_used[table]--;
                return he;
            }
            prevHe = he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return NULL; /* not found */
}

遍历

遍历Redis整个数据库主要有两种方式:全遍历(例如keys命令)、间断遍历(hscan命令):

迭代器——可在容器(容器可为字典、链表等数据结构)上遍访的接口,设计人员无须关心容器的内容,调用迭代器固定的接口就可遍历数据,在很多高级语言中都有实现。

字典迭代器主要用于迭代字典这个数据结构中的数据,既然是迭代字典中的数据,必然会出现一个问题,迭代过程中,如果发生了数据增删,则可能导致字典触发rehash操作,或迭代开始时字典正在进行rehash操作,从而导致一条数据可能多次遍历到。

typedef struct dictIterator {
    dict *d;
    long index; // 迭代hash中的索引值
    // safe 为1表示是安全迭代器,可以在add,find等rehash场景中使用
    int table, safe;
    // entry 当前读取节点,nextEntry entry 节点的next字段
    dictEntry *entry, *nextEntry;
    /* unsafe iterator fingerprint for misuse detection. */
    unsigned long long fingerprint;// 字典指纹,字典发生改变随之改变
} dictIterator;

fingerprint字段是一个64位的整数,表示在给定时间内字典的状态。在这里称其为字典的指纹,因为该字段的值为字典(dict结构体)中所有字段值组合在一起生成的Hash值,所以当字典中数据发生任何变化时,其值都会不同,生成算法可参见源码dict.c文件中的dictFingerprint函数。

/* A fingerprint is a 64 bit number that represents the state of the dictionary
 * at a given time, it's just a few dict properties xored together.
 * When an unsafe iterator is initialized, we get the dict fingerprint, and check
 * the fingerprint again when the iterator is released.
 * If the two fingerprints are different it means that the user of the iterator
 * performed forbidden operations against the dictionary while iterating. */
unsigned long long dictFingerprint(dict *d) {
    unsigned long long integers[6], hash = 0;
    int j;

    integers[0] = (long) d->ht_table[0];
    integers[1] = d->ht_size_exp[0];
    integers[2] = d->ht_used[0];
    integers[3] = (long) d->ht_table[1];
    integers[4] = d->ht_size_exp[1];
    integers[5] = d->ht_used[1];

    /* We hash N integers by summing every successive integer with the integer
     * hashing of the previous sum. Basically:
     *
     * Result = hash(hash(hash(int1)+int2)+int3) ...
     *
     * This way the same set of integers in a different order will (likely) hash
     * to a different number. */
    for (j = 0; j < 6; j++) {
        hash += integers[j];
        /* For the hashing step we use Tomas Wang's 64 bit integer hash. */
        hash = (~hash) + (hash << 21); // hash = (hash << 21) - hash - 1;
        hash = hash ^ (hash >> 24);
        hash = (hash + (hash << 3)) + (hash << 8); // hash * 265
        hash = hash ^ (hash >> 14);
        hash = (hash + (hash << 2)) + (hash << 4); // hash * 21
        hash = hash ^ (hash >> 28);
        hash = hash + (hash << 31);
    }
    return hash;
}

根据迭代器结构中的safe字段,将迭代器分为普通迭代器和安全迭代器:

普通迭代器

普通迭代器迭代字典中数据时,会对迭代器中fingerprint字段的值作严格的校验,来保证迭代过程中字典结构不发生任何变化,确保读取出的数据不出现重复

当Redis执行部分命令时会使用普通迭代器迭代字典数据,例如sort命令。sort命令主要作用是对给定列表、集合、有序集合的元素进行排序,如果给定的是有序集合,其成员名存储用的是字典,分值存储用的是跳跃表,则执行sort命令读取数据的时候会用到迭代器来遍历整个字典。

     dict *set = ((zset*)sortval->ptr)->dict;
        dictIterator *di;
        dictEntry *setele;
        sds sdsele;
        di = dictGetIterator(set);
        while((setele = dictNext(di)) != NULL) {
            sdsele =  dictGetKey(setele);
            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        dictReleaseIterator(di);
  1. 调用dictGetIterator函数初始化一个普通迭代器,此时会把iter->safe值置为0,表示初始化的迭代器为普通迭代器

    void dictInitIterator(dictIterator *iter, dict *d)
    {
        iter->d = d;
        iter->table = 0;
        iter->index = -1;
        iter->safe = 0;
        iter->entry = NULL;
        iter->nextEntry = NULL;
    }
    
    dictIterator *dictGetIterator(dict *d)
    {
      dictIterator *iter = zmalloc(sizeof(*iter));
      dictInitIterator(iter, d);
      return iter;
    }
    
  2. 循环调用dictNext函数依次遍历字典中Hash表的节点,首次遍历时会通过dictFingerprint函数拿到当前字典的指纹值。

安全迭代器