MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis 跳跃表 API 的扩展开发思路

2023-10-313.1k 阅读

Redis 跳跃表概述

Redis 中的跳跃表(skiplist)是一种有序的数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速查找的目的。跳跃表在 Redis 中主要用于实现有序集合(Sorted Set)结构。

跳跃表的结构

  1. 节点结构 在 Redis 中,跳跃表节点的结构体定义如下:
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;
  • ele:保存成员对象,例如有序集合中的成员元素。
  • score:分值,用于对节点进行排序。
  • backward:指向当前节点的前一个节点,用于从后向前遍历跳跃表。
  • level:是一个柔性数组,其大小在创建节点时动态分配。level[i].forward 指向第 i 层的下一个节点,span 表示从当前节点到 forward 节点之间跨越的节点数。
  1. 跳跃表结构 跳跃表本身的结构体定义为:
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;
  • header:指向跳跃表的头节点。
  • tail:指向跳跃表的尾节点。
  • length:跳跃表中节点的数量(不包括头节点)。
  • level:当前跳跃表的最大层数。

跳跃表的工作原理

  1. 查找操作 当进行查找时,从跳跃表的头节点开始,首先在最高层进行查找。如果当前节点的 forward 节点的 score 大于要查找的 score,则降低一层继续查找;如果 forward 节点的 score 等于要查找的 score,则检查 ele 是否匹配,如果匹配则找到,否则继续在当前层查找下一个节点。通过这种多层次的查找方式,大大减少了比较的次数,提高了查找效率。例如,在一个高度为 h,节点数为 n 的跳跃表中,平均查找长度为 O(log n)

  2. 插入操作 插入节点时,首先通过查找操作确定插入位置。然后为新节点随机生成一个层数 level(通常使用一种概率算法,使得层数越高的节点出现的概率越低)。接着在各个层插入新节点,并更新相应的指针和 span 值。例如,假设新节点的层数为 3,那么需要在第 1 层、第 2 层和第 3 层分别插入该节点,并调整各层的指针关系。

  3. 删除操作 删除节点时,首先通过查找操作找到要删除的节点。然后从各个层删除该节点,并更新相应的指针和 span 值。同时,如果删除节点后某一层没有任何节点(除了头节点),则降低跳跃表的层数。

Redis 跳跃表现有 API 分析

Redis 中跳跃表相关的 API 主要用于有序集合的操作,虽然这些 API 已经能够满足基本的有序集合需求,但在一些特定场景下,可能需要进行扩展。

现有 API 功能

  1. 插入操作 zslInsert 函数用于向跳跃表中插入一个新节点。它首先查找插入位置,然后为新节点分配内存,设置节点的 elescore 等属性,最后将新节点插入到跳跃表的相应位置。示例代码如下:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1)? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                        sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already present or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header)? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
  1. 删除操作 zslDelete 函数用于从跳跃表中删除一个节点。它首先查找要删除的节点,如果找到则删除该节点,并更新跳跃表的指针和 span 值。示例代码如下:
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                        sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}
  1. 查找操作 虽然 Redis 没有专门为跳跃表的查找操作提供独立的 API,但在 zslInsertzslDelete 等函数中都包含了查找逻辑。基本的查找逻辑是从最高层开始,逐步向下查找,直到找到目标节点或者确定目标节点不存在。

现有 API 的局限性

  1. 范围查询功能有限 Redis 现有跳跃表 API 中,范围查询主要通过 ZRANGEBYSCORE 等命令实现,但这些命令在一些复杂的范围查询场景下不够灵活。例如,如果需要查询分值在某个范围内,且成员对象满足特定正则表达式的节点,现有 API 无法直接实现。

  2. 缺乏批量操作 API 在处理大量数据时,频繁调用单个插入或删除操作会带来性能问题。例如,需要一次性插入或删除多个节点,现有 API 没有提供批量操作的接口,需要在应用层进行多次调用,增加了应用层的复杂性和网络开销。

  3. 自定义排序支持不足 现有跳跃表基于分值进行排序,但在某些场景下,可能需要根据其他属性或者自定义的比较函数进行排序。例如,在一个包含用户信息的有序集合中,可能需要根据用户的年龄和注册时间等多个属性进行综合排序,现有 API 难以满足这种需求。

Redis 跳跃表 API 扩展开发思路

为了克服现有 Redis 跳跃表 API 的局限性,可以从以下几个方面进行扩展。

扩展范围查询功能

  1. 支持复杂条件的范围查询 可以增加一个新的 API 函数,例如 zslRangeByComplexCondition。该函数接受多个条件参数,包括分值范围、成员对象的匹配模式等。在实现时,首先根据分值范围在跳跃表中进行查找,然后对查找到的节点进行成员对象的匹配。示例代码如下:
zskiplistNode* zslRangeByComplexCondition(zskiplist *zsl, double minScore, double maxScore, const char *pattern) {
    zskiplistNode *resultList = NULL;
    zskiplistNode *x = zsl->header->level[0].forward;

    while (x) {
        if (x->score >= minScore && x->score <= maxScore) {
            if (pattern == NULL || strstr(x->ele, pattern)) {
                // 将符合条件的节点添加到结果列表中
                zskiplistNode *newNode = zslCreateNode(1, x->score, sdsdup(x->ele));
                newNode->level[0].forward = resultList;
                resultList = newNode;
            }
        }
        x = x->level[0].forward;
    }
    return resultList;
}
  1. 支持区间内的反向查询 增加一个反向查询的 API,如 zslReverseRangeByScore,用于从尾节点开始向头节点方向进行范围查询。这在某些场景下非常有用,例如在需要按照分值从大到小获取数据时,可以提高查询效率。示例代码如下:
zskiplistNode* zslReverseRangeByScore(zskiplist *zsl, double minScore, double maxScore) {
    zskiplistNode *resultList = NULL;
    zskiplistNode *x = zsl->tail;

    while (x && x != zsl->header) {
        if (x->score >= minScore && x->score <= maxScore) {
            // 将符合条件的节点添加到结果列表中
            zskiplistNode *newNode = zslCreateNode(1, x->score, sdsdup(x->ele));
            newNode->level[0].forward = resultList;
            resultList = newNode;
        }
        x = x->backward;
    }
    return resultList;
}

实现批量操作 API

  1. 批量插入 添加一个 zslBulkInsert 函数,用于一次性插入多个节点。该函数可以接受一个节点数组作为参数,然后批量分配内存,进行节点插入操作。这样可以减少内存分配和指针调整的次数,提高插入效率。示例代码如下:
void zslBulkInsert(zskiplist *zsl, zskiplistNode **nodes, int count) {
    for (int i = 0; i < count; i++) {
        zskiplistNode *node = nodes[i];
        double score = node->score;
        sds ele = sdsdup(node->ele);
        zslInsert(zsl, score, ele);
        zslFreeNode(node);
    }
}
  1. 批量删除 实现 zslBulkDelete 函数,用于一次性删除多个节点。该函数可以接受一个包含要删除节点的分值和成员对象的数组,然后在跳跃表中查找并删除这些节点。示例代码如下:
void zslBulkDelete(zskiplist *zsl, double *scores, sds *eles, int count) {
    for (int i = 0; i < count; i++) {
        zslDelete(zsl, scores[i], eles[i], NULL);
    }
}

支持自定义排序

  1. 增加自定义比较函数接口 在跳跃表节点结构体中增加一个自定义比较函数指针的成员,例如 compareFunc。在插入节点时,可以根据需要设置这个比较函数。在查找和排序时,使用这个自定义的比较函数代替默认的基于分值的比较。示例代码如下:
typedef int (*CompareFunc)(zskiplistNode *a, zskiplistNode *b);

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
    CompareFunc compareFunc;
} zskiplistNode;

zskiplistNode* zslInsertWithCustomCompare(zskiplist *zsl, double score, sds ele, CompareFunc compareFunc) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        rank[i] = i == (zsl->level-1)? 0 : rank[i+1];
        while (x->level[i].forward &&
                (compareFunc? compareFunc(x->level[i].forward, (zskiplistNode *) &(score, ele)) < 0 :
                    x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                        sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    x->compareFunc = compareFunc;
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header)? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
  1. 根据自定义属性排序 如果需要根据节点的其他属性进行排序,可以在节点结构体中增加相应的属性成员。例如,如果要根据用户年龄进行排序,可以在节点结构体中增加一个 age 成员。然后在自定义比较函数中根据 age 进行比较。示例代码如下:
typedef struct zskiplistNode {
    sds ele;
    double score;
    int age;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
    CompareFunc compareFunc;
} zskiplistNode;

int compareByAge(zskiplistNode *a, zskiplistNode *b) {
    return a->age - b->age;
}

扩展 API 的应用场景

复杂数据分析场景

在大数据分析场景中,经常需要对数据进行复杂的范围查询。例如,在一个存储股票交易数据的有序集合中,需要查询某段时间内价格在一定范围内,且成交量大于某个值的交易记录。通过扩展的范围查询 API,可以方便地实现这种复杂条件的查询。

高性能数据处理场景

在处理大量数据的场景下,批量操作 API 可以显著提高性能。例如,在一个实时数据采集系统中,需要将大量的传感器数据快速插入到 Redis 有序集合中进行存储和分析。使用批量插入 API 可以减少网络开销和 Redis 内部的操作次数,提高系统的整体性能。

个性化数据排序场景

在一些个性化推荐系统中,需要根据用户的特定需求进行数据排序。例如,在一个音乐推荐系统中,用户可能希望根据歌曲的播放次数、评分以及自己的收藏偏好等多个因素进行综合排序。通过支持自定义排序的 API,可以方便地实现这种个性化的排序需求。

扩展 API 的实现注意事项

内存管理

在扩展 API 时,尤其是在批量操作和自定义节点结构的情况下,需要特别注意内存管理。例如,在批量插入和删除操作中,要确保正确分配和释放节点内存,避免内存泄漏。在自定义节点结构中,如果增加了新的成员变量,也要确保在节点创建、复制和删除时对这些变量进行正确的内存操作。

兼容性

在扩展 Redis 跳跃表 API 时,要考虑与现有 Redis 版本的兼容性。尽量避免修改现有 API 的接口和行为,以免影响依赖现有 API 的应用程序。可以通过增加新的函数和命令来实现扩展功能,同时在文档中明确说明新功能与现有功能的关系和使用方法。

性能优化

虽然扩展 API 的目的之一是提高性能,但在实现过程中也要注意性能优化。例如,在复杂范围查询的实现中,要尽可能减少不必要的节点遍历和比较操作。在批量操作中,要合理分配内存,减少内存碎片的产生,以提高内存使用效率和操作速度。

通过对 Redis 跳跃表 API 的扩展开发,可以使其更好地适应各种复杂的应用场景,提高 Redis 在数据处理和存储方面的灵活性和性能。在实际应用中,开发人员可以根据具体需求选择合适的扩展 API,并结合 Redis 的其他功能,构建出高效、灵活的数据处理系统。