Redis探究之跳表

跳表数据结构

c

typedef struct zskiplist {
    struct zskiplistNode *header, *tail; // 指向跳表的头结点
    unsigned long length;				   // 跳跃表的总节点数量, 除了头节点以外的
    int level;							   // 跳跃表的高度
} zskiplist;



typedef struct zskiplistNode {
    sds ele; 							// 用于存储字符串类型的数据
    double score;						// 用于排序的值
    struct zskiplistNode *backward;	   // 后退指针	
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 指向当前层的下一个节点
        unsigned int span               // 指向节点与本节点之间的元素个数
    } level[];
} zskiplistNode;
  • zskiplistNode *header: 跳跃表的头结点是一个特殊的节点, 他的level数组的元素个数为64, 即默认的level可到的最大值。

通过结构可以知道获取跳跃表头结点, 尾节点, 长度和高度的时间复杂度都是O(1)


函数 作用 复杂度
zslCreateNode 创建并返回一个新的跳跃表节点 最坏 O(1)
zslFreeNode 释放给定的跳跃表节点 最坏 O(1)
zslCreate 创建并初始化一个新的跳跃表 最坏 O(1)
zslFree 释放给定的跳跃表 最坏 O(N)
zslInsert 将一个包含给定 scoremember 的新节点添加到跳跃表中 最坏 O(N) 平均 O(logN)
zslDeleteNode 删除给定的跳跃表节点 最坏 O(N)
zslDelete 删除匹配给定 memberscore 的元素 最坏 O(N)平均 O(logN)
zslFirstInRange 找到跳跃表中第一个符合给定范围的元素 最坏 O(N)平均 O(logN)
zslLastInRange 找到跳跃表中最后一个符合给定范围的元素 最坏 O(N)平均 O(logN)
zslDeleteRangeByScore 删除 score 值在给定范围内的所有节点 最坏 O(N2)
zslDeleteRangeByRank 删除给定排序范围内的所有节点 最坏 O(N2)
zslGetRank 返回目标元素在有序集中的排位 最坏 O(N) 平均 O(logN)
zslGetElementByRank 根据给定排位,返回该排位上的元素节点 最坏 O(N)平均 O(logN)


创建跳表

zskiplist 中的跳跃表的高度最小值为0, 最大值为:

server.h

#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */

Redis通过zslRandomLevel() 方法为新生成的节点创建高度, 高度的值为1~64之间随机的一个值, 作为新节点的高度, 值越大出现的概率越低, 高度一旦确定, 后面不会去修改。

z_set.c

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. 
 **/
int zslRandomLevel(void) {
    // 初始化层级为1
    int level = 1;
    
    // 判断随机数的低16位是否小于(ZSKIPLIST_P * 0xFFFF)
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        // 条件成立,层级 level 增加 1,然后继续判断,直到条件不成立为止。
        level += 1;
    
    // 返回的层级不超过 ZSKIPLIST_MAXLEVEL,即使循环可能将 level 增加到比最大层级还高的值。
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

我们可以通过下面的例子来看一下过程:

  • 第一个随机数生成过程:
    • random() & 0xFFFF 可能生成值为 20000。
    • ZSKIPLIST_P * 0xFFFF 计算结果约为 16384(即 0.25 * 65536)。
    • 由于 20000 大于 16384,循环条件不成立,level 保持为 1。
  • 第二个随机数生成过程:
    • random() & 0xFFFF 可能生成值为 10000。
    • 10000 小于 16384,level 增加到 2。
    • 再次生成随机数,假设结果为 5000。
    • 5000 小于 16384,level 增加到 3。
    • 再次生成随机数,假设结果为 20000。
    • 20000 大于 16384,循环终止,level 最终为 3。

我可可以计算出这个level的期望:

要计算 level 的期望值 $ E[\text{level}] $,我们首先需要定义概率和公式。


设定 level 的初始值为 1,并在每次满足条件(random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 时增加 level。假设 ( P ) 表示增加 level 的概率。即: $ P = ZSKIPLIST_P $

每次递增 level 的概率是 ( P ),则 level 为 ( k ) 的概率 $P(\text{level} = k) $ 是: $ P(\text{level} = k) = (1 - P) \times P^{k-1} $


期望值 $ E[\text{level}] $ 定义为每个可能的 level 乘以其对应的概率的和:

$ E[\text{level}] = \sum_{k=1}^{\infty} k \times P(\text{level} = k) $

代入概率 $ P(\text{level} = k) $ 得:

$ E[\text{level}] = \sum_{k=1}^{\infty} k \times (1 - P) \times P^{k-1} $

将常数 ( (1 - P) ) 提到求和符号外:

$ E[\text{level}] = (1 - P) \sum_{k=1}^{\infty} k \times P^{k-1} $


为了计算上述求和部分:

$ \sum_{k=1}^{\infty} k \times P^{k-1} $

我们可以使用等比级数的求和公式。首先,设:

$ S = \sum_{k=1}^{\infty} k \times P^{k-1} $

我们可以通过以下变换来求解:

$ S = P^0 + 2P^1 + 3P^2 + 4P^3 + \ldots $

将 ( S ) 乘以 ( P ):

$ PS = P^1 + 2P^2 + 3P^3 + 4P^4 + \ldots $

将这两个式子相减:

$ S - PS = (P^0 + P^1 + P^2 + P^3 + \ldots) = \sum_{k=0}^{\infty} P^k $

这部分是一个无穷等比数列的求和,其和为:

$ \sum_{k=0}^{\infty} P^k = \frac{1}{1 - P} $

所以,得到:

$ S(1 - P) = \frac{1}{1 - P} $

即:

$ S = \frac{1}{(1 - P)^2} $


代回到原来的期望值公式中:

$ E[\text{level}] = (1 - P) \times \frac{1}{(1 - P)^2} = \frac{1}{1 - P} $


因此,level 的期望值$ E[\text{level}] $为:

$ E[\text{level}] = \frac{1}{1 - P} $


如果 ( P = 0.25 ):

$ E[\text{level}] = \frac{1}{1 - 0.25} = \frac{1}{0.75} = \frac{4}{3} \approx 1.33 $

这个结果表示,在每次调用 zslRandomLevel 函数时,生成的层级的平均值为 1.33。这表明大多数节点将会插入到第 1 层或第 2 层


  1. 创建头节点

    头结点是跳表中第一个插入的节点, level数组的每一项forward都是NULL, span是0

    text

        for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
              zsl->header->level[j].forward = NULL;
              zsl->header->level[j].span = 0;
        }
  2. 创建完头结点之后就可以创建跳表了

    1. 创建跳跃表结构体对象zsl
    2. zsl头结点指针指向上面创建的头结点
    3. 跳表的层高初始化为1, 长度初始化为0, 尾节点为NULL

t_zset.c

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

新增节点

新增节点的步骤:

  1. 找到位置
  2. 调整高度
  3. 插入节点
  4. 调整backward

新建节点:

c

/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

由于zskiplistLevel是一个数组, 所有在申请内存空间的时候也需要计算上, 这样一个节点占用的内存大小就是zskiplistNode的大小 + (level 的个数 * zskiplistLevel)的大小。


插入:

c

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    // 找到位置, 并更新update[] 和rank[]
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // 生成新的随机高度, 
    level = zslRandomLevel();
    
    // 判断当前的跳跃表的高度是否小于随机生成的level
    if (level > zsl->level) {
        // 调整跳表的高度
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    
    // 创建新节点x
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        // 更新x竖向层级上的所有节点的forward属性
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    // 更新
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 更新backward
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    
    // 增加总长度
    zsl->length++;
    return x;
}

需要用到两个长度为64的数组来辅助新增:

  1. update[]: 用来存放插入的节点所在的每一层的节点, 因为最大高度为64 所以设置长度为64正好
  2. rank[]: 用来记录当前层从header节点到update[i]节点所经历的步长, 在更新update[i]span和设置新插入节点的span时用


删除节点

  1. 找到节点
  2. 设置span和forward

c

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}


删除跳表

  1. 先遍历释放所有节点的内存
  2. 再释放跳跃表的内存

c

/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}


void zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
}


应用

跳表主要应用于有序集合的底层实现, 还有一种实现方式为压缩列表

Redis的配置文件中关于有序集合底层实现的两个配置。

  1. zset-max-ziplist-entries 128: zset采用压缩列表时,元素个数最大值。默认值为128。
  2. zset-max-ziplist-value 64: zset采用压缩列表时,每个元素的字符串长度最大值。默认值为64。

默认使用压缩列表


zset添加元素的主要逻辑位于t_zset.c的zaddGenericCommand()函数中。

zset在插入第一个元素的时候会判断下面两种条件:

  • zset-max-ziplist-entries的值是否等于0

  • zset-max-ziplist-value小于要插入元素的字符串长度。

t_zset.c

/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
    if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
    if (server.zset_max_ziplist_entries == 0 ||
        server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
    {
        
        zobj = createZsetObject();
    } else {
        zobj = createZsetZiplistObject();
    }
    dbAdd(c->db,key,zobj);
} else {
    if (zobj->type != OBJ_ZSET) {
        addReply(c,shared.wrongtypeerr);
        goto cleanup;
    }
}

注意: 一旦采用跳表作为底层结构的时候, 即使元素被删除到阈值以下, 也无法转变回压缩列表.