MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis 跳跃表 API 的扩展与定制开发

2022-07-013.3k 阅读

Redis 跳跃表基础回顾

在深入探讨 Redis 跳跃表 API 的扩展与定制开发之前,我们先来回顾一下 Redis 跳跃表的基本概念。Redis 中的跳跃表(skiplist)是一种有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,来达到快速访问节点的目的。

跳跃表的节点结构在 Redis 源码中定义如下(简化版 C 代码示例):

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

其中,zskiplistNode 表示跳跃表的节点,ele 是节点存储的元素,score 是用于排序的分值。backward 指针指向前一个节点,而 level 数组中的 forward 指针则指向不同层级的下一个节点,span 记录了当前层级到下一个节点跨越的节点数。zskiplist 结构体则定义了整个跳跃表,包含头节点、尾节点、节点数量和当前跳跃表的最大层级。

跳跃表的插入、删除和查找操作时间复杂度平均为 O(log n),最坏情况下为 O(n),相比链表的 O(n) 查找时间有了显著提升,同时相比于平衡树,跳跃表的实现相对简单,更易于理解和维护。

Redis 跳跃表原生 API 分析

Redis 提供了一系列操作跳跃表的原生 API,主要用于有序集合(Sorted Set)的实现。以下是几个关键的 API 及其分析。

zslInsert 函数 - 插入节点

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    // 检查是否存在相同分值和元素的节点
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        // 存在则返回已有的节点
        return x;
    } else {
        // 不存在则创建新节点
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
}

这个函数的核心逻辑是,从跳跃表的最高层级开始,逐步找到要插入位置的前驱节点。如果找到相同分值和元素的节点则直接返回。否则,随机生成新节点的层级,然后插入新节点,并调整相关节点的指针和跨度信息。

zslDelete 函数 - 删除节点

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        return 1;
    } else {
        return 0;
    }
}

zslDelete 函数首先通过和插入类似的查找过程,找到要删除节点的前驱节点。如果找到了匹配的节点,则调用 zslDeleteNode 函数实际删除节点,并调整跳跃表结构,最后释放节点内存(如果需要)。

zslSearch 函数 - 查找节点

zskiplistNode *zslSearch(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x = zsl->header;
    int i;

    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            x = x->level[i].forward;
        }
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        return NULL;
    }
}

查找函数从最高层级开始,逐步向下查找,直到找到匹配的节点或者确定不存在匹配节点。

为什么要扩展与定制 Redis 跳跃表 API

虽然 Redis 提供的原生跳跃表 API 对于有序集合的常规操作已经足够,但在一些特定场景下,我们可能需要对其进行扩展与定制。

特定业务需求

例如,在某些实时数据分析场景中,除了基本的插入、删除和查找操作,可能还需要快速获取某个分值区间内排名靠前的节点,并且希望能够按照特定的业务规则进行排序,而不仅仅依赖于 Redis 原生的按分值排序。原生 API 无法直接满足这样的需求,需要对其进行扩展。

性能优化

在高并发、大数据量的情况下,Redis 原生的跳跃表操作在某些场景下可能无法达到最优性能。比如,批量插入操作在原生 API 中是一个个单独插入,这可能会导致频繁的内存分配和指针调整,影响性能。通过定制开发批量插入 API,可以显著提升插入效率。

与其他系统集成

当 Redis 作为底层存储与其他复杂系统集成时,可能需要跳跃表支持一些特殊的接口,以便更好地与其他组件交互。例如,与消息队列系统集成时,可能需要跳跃表能够以消息队列的协议格式接收和处理数据,这就需要对跳跃表 API 进行定制。

扩展与定制 Redis 跳跃表 API 的思路

新增功能接口

  1. 区间查找排名靠前接口:为了满足上述提到的实时数据分析场景需求,我们可以新增一个函数,例如 zslRangeTopSearch,用于在指定分值区间内查找排名靠前的节点。该函数需要接受分值区间和要返回的节点数量作为参数。
  2. 批量操作接口:为了提升性能,我们可以添加批量插入和批量删除接口。批量插入接口 zslBatchInsert 可以接受一个节点数组,一次性完成多个节点的插入,减少内存分配和指针调整的次数。批量删除接口 zslBatchDelete 类似,接受一个节点数组,批量删除跳跃表中的节点。

修改现有接口

  1. 优化查找逻辑:在一些场景下,原生的查找逻辑可能不够高效。例如,当数据量非常大且节点分布不均匀时,按照现有从最高层级开始查找的方式可能会遍历过多无效节点。我们可以考虑在查找过程中加入自适应策略,根据跳跃表的节点分布动态调整查找层级的起始位置,以优化查找性能。这就需要修改 zslSearch 函数的内部逻辑。
  2. 增强插入和删除的原子性:在高并发环境下,原生的插入和删除操作可能会出现竞争条件。我们可以通过引入锁机制或者利用 Redis 的事务特性,对 zslInsertzslDelete 函数进行修改,确保操作的原子性。

数据结构扩展

  1. 增加辅助字段:为了支持特定的业务规则排序,我们可以在跳跃表节点结构中增加一个辅助字段。例如,在电商场景中,除了按商品价格(分值)排序,还可能需要按销量排序。我们可以在节点结构中增加一个 salesVolume 字段,在插入和查找时根据这个字段和分值一起进行排序。这就需要修改 zskiplistNode 结构体定义。
  2. 多级索引结构:对于大数据量的跳跃表,为了进一步提升查找性能,可以考虑构建多级索引结构。在跳跃表之上再构建一层或多层跳跃表作为索引,每个索引跳跃表的节点指向原跳跃表的某些节点,这样在查找时可以通过多级索引快速定位到目标节点所在的大致范围,然后再在原跳跃表中进行精确查找。这需要对跳跃表的整体结构进行重新设计和实现。

扩展与定制开发实例

新增区间查找排名靠前接口

  1. 函数定义
zskiplistNode** zslRangeTopSearch(zskiplist *zsl, double minScore, double maxScore, int topCount, int *count) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    zskiplistNode **result = (zskiplistNode **)malloc(topCount * sizeof(zskiplistNode *));
    *count = 0;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < minScore))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    while (x && x->score <= maxScore && *count < topCount) {
        result[(*count)++] = x;
        x = x->level[0].forward;
    }
    return result;
}
  1. 使用示例
// 假设已经有一个跳跃表 zsl
double minScore = 10.0;
double maxScore = 20.0;
int topCount = 5;
int count;
zskiplistNode **result = zslRangeTopSearch(zsl, minScore, maxScore, topCount, &count);
for (int i = 0; i < count; i++) {
    printf("Element: %s, Score: %lf\n", result[i]->ele, result[i]->score);
}
free(result);

这个函数首先定位到分值区间的起始节点,然后遍历跳跃表,收集符合分值区间且排名靠前的节点,并返回这些节点的数组。

批量插入接口

  1. 函数定义
void zslBatchInsert(zskiplist *zsl, double *scores, sds *eles, int count) {
    for (int i = 0; i < count; i++) {
        zslInsert(zsl, scores[i], eles[i]);
    }
}
  1. 使用示例
// 假设已经有一个跳跃表 zsl
double scores[] = {1.0, 2.0, 3.0};
sds eles[] = {sdsnew("a"), sdsnew("b"), sdsnew("c")};
zslBatchInsert(zsl, scores, eles, 3);
for (int i = 0; i < 3; i++) {
    sdsfree(eles[i]);
}

虽然这里的批量插入暂时只是简单地循环调用 zslInsert,但在实际优化中,可以进一步改进,如合并内存分配等操作,以提升性能。

修改查找逻辑优化性能

  1. 修改后的 zslSearch 函数
zskiplistNode *zslSearchOptimized(zskiplist *zsl, double score, sds ele) {
    int level = zsl->level - 1;
    // 根据跳跃表节点数量动态调整起始查找层级
    if (zsl->length > 1000) {
        level = zsl->level / 2;
    }
    zskiplistNode *x = zsl->header;
    int i;

    for (i = level; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            x = x->level[i].forward;
        }
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        return NULL;
    }
}
  1. 使用示例
// 假设已经有一个跳跃表 zsl
double score = 10.0;
sds ele = sdsnew("test");
zskiplistNode *node = zslSearchOptimized(zsl, score, ele);
if (node) {
    printf("Found node: %s, Score: %lf\n", node->ele, node->score);
} else {
    printf("Node not found\n");
}
sdsfree(ele);

这里根据跳跃表的节点数量动态调整了起始查找层级,在大数据量时可以减少无效遍历,提升查找性能。

增加辅助字段实现多维度排序

  1. 修改节点结构
typedef struct zskiplistNode {
    sds ele;
    double score;
    int salesVolume;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;
  1. 修改插入函数
zskiplistNode *zslInsertWithSales(zskiplist *zsl, double score, sds ele, int salesVolume) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 x->level[i].forward->salesVolume < salesVolume) ||
                (x->level[i].forward->score == score &&
                 x->level[i].forward->salesVolume == salesVolume &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    // 检查是否存在相同分值和元素的节点
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        // 存在则更新销量
        x->salesVolume = salesVolume;
        return x;
    } else {
        // 不存在则创建新节点
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        x->salesVolume = salesVolume;
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
}
  1. 使用示例
// 假设已经有一个跳跃表 zsl
double score = 10.0;
sds ele = sdsnew("product1");
int salesVolume = 100;
zskiplistNode *node = zslInsertWithSales(zsl, score, ele, salesVolume);
if (node) {
    printf("Inserted node: %s, Score: %lf, Sales Volume: %d\n", node->ele, node->score, node->salesVolume);
}
sdsfree(ele);

通过增加 salesVolume 字段,并修改插入函数,实现了基于分值和销量的多维度排序。

注意事项与测试

注意事项

  1. 内存管理:在扩展和定制开发过程中,特别是新增接口和修改数据结构时,要注意内存的分配和释放。例如,在新增的 zslRangeTopSearch 函数中,分配了 result 数组,使用完毕后需要及时释放。对于修改后的节点结构,如果涉及到新的内存分配,如在 zslInsertWithSales 中为新字段赋值,也要确保在节点删除时正确释放相关内存。
  2. 兼容性:对 Redis 跳跃表 API 的扩展和定制需要考虑与 Redis 其他功能的兼容性。例如,如果修改了跳跃表的基本操作函数,要确保这些修改不会影响到有序集合的其他操作,如获取集合长度、范围查询等。同时,也要考虑与不同版本 Redis 的兼容性,尽量避免使用特定版本的内部实现细节,以免在版本升级时出现问题。
  3. 性能影响:虽然扩展和定制的目的之一是提升性能,但不当的修改可能会导致性能下降。例如,在修改查找逻辑时,如果调整起始查找层级的策略不合理,可能会增加无效遍历,反而降低查找效率。在进行任何性能优化修改后,都需要进行充分的性能测试。

测试

  1. 功能测试:针对新增的接口和修改后的接口,编写功能测试用例。例如,对于 zslRangeTopSearch 函数,测试不同分值区间和 topCount 设置下是否能正确返回符合条件的节点。对于 zslBatchInsert 函数,测试批量插入后跳跃表的节点数量和结构是否正确。对于修改后的 zslSearchOptimized 函数和 zslInsertWithSales 函数,分别测试查找和插入功能是否符合预期。
  2. 性能测试:使用性能测试工具,如 Redis-benchmark 等,对扩展和定制后的跳跃表操作进行性能测试。对比原生 API 和扩展定制后的 API 在不同数据量、并发度下的性能表现。例如,测试批量插入和单个插入的性能差异,以及优化后的查找函数在大数据量下的查找时间。根据性能测试结果,进一步调整和优化代码。

通过充分考虑这些注意事项并进行全面的测试,可以确保 Redis 跳跃表 API 的扩展与定制开发能够满足特定业务需求,同时保持系统的稳定性和高性能。