MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis 跳跃表重点内容的代码优化

2022-11-147.6k 阅读

Redis跳跃表基础原理

在深入探讨代码优化之前,我们先来回顾一下Redis跳跃表的基础原理。跳跃表(Skip List)是一种随机化的数据结构,它在Redis中主要用于实现有序集合(Sorted Set)。与平衡树等数据结构相比,跳跃表的实现相对简单,并且在平均情况下具有O(log n)的查找、插入和删除性能。

跳跃表由多层链表组成,最底层的链表包含所有的元素,并且是有序排列的。每一层链表都是其下一层链表的“稀疏版本”,也就是说,每一层链表中的元素是下一层链表元素的一个子集。当进行查找操作时,跳跃表从顶层链表开始,利用每一层链表的稀疏性快速定位到目标元素所在的大致范围,然后逐步向下层链表移动,最终在最底层链表中找到目标元素。

例如,假设有一个跳跃表存储了1 - 10的整数,最底层链表按顺序包含所有数字。第一层稀疏链表可能只包含1、3、5、7、9,第二层稀疏链表可能只包含1、5、9。当查找数字7时,首先在第二层链表找到5,发现7大于5,继续在第一层链表找到5后发现7大于5,再在最底层链表从5开始找到7。

Redis跳跃表的数据结构定义

在Redis中,跳跃表的数据结构定义在sds.hserver.h文件中。以下是主要的数据结构:

// 跳跃表节点
typedef struct zskiplistNode {
    sds ele;            // 成员对象
    double score;       // 分值
    struct zskiplistNode *backward; // 后退指针
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 前进指针
        unsigned long span;    // 跨度
    } level[];
} zskiplistNode;

// 跳跃表
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;
  1. zskiplistNode:代表跳跃表的节点。
    • ele:是一个sds类型,用于存储节点的值。sds(Simple Dynamic String)是Redis自定义的字符串类型,具有高效的内存管理和操作性能。
    • score:用于为节点排序,在有序集合中,元素根据分值从小到大排序。
    • backward:后退指针,指向当前节点的前一个节点,方便从后向前遍历。
    • level:是一个柔性数组,用于存储不同层的前进指针和跨度。每一层的forward指针指向下一个节点,span表示从当前节点到下一个节点之间跨越的节点数。
  2. zskiplist:代表整个跳跃表。
    • header:指向跳跃表的头节点。头节点不存储实际数据,主要用于辅助操作,它的每一层都有指针。
    • tail:指向跳跃表的尾节点。
    • length:表示跳跃表中包含的节点数量(不包括头节点)。
    • level:表示跳跃表当前的层数。

查找操作实现与优化

  1. 原始查找代码实现
zskiplistNode *zslSearch(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x = zsl->header;
    int i;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            x = x->level[i].forward;
        }
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        return NULL;
    }
}

这段代码从跳跃表的顶层开始查找,在每一层通过比较分值和元素值来移动指针,找到目标元素所在的大致范围后,下降到下一层继续查找,直到最底层。如果在最底层找到符合条件的元素则返回,否则返回NULL

  1. 查找操作优化思路

    • 减少比较次数:在比较过程中,由于score是主要的排序依据,可以先重点比较score,当score相等时再比较ele。这样可以避免不必要的ele比较,特别是当ele是较长的字符串时,能显著提高效率。
    • 缓存层信息:可以在跳跃表结构中增加一些辅助信息,例如每一层链表的最大分值,这样在查找时可以快速判断是否需要进入下一层链表,减少无效的遍历。
  2. 优化后的查找代码

zskiplistNode *zslOptimizedSearch(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x = zsl->header;
    int i;
    // 假设跳跃表结构中增加了每层最大分值数组maxScorePerLevel
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward && score > zsl->maxScorePerLevel[i]) {
            x = x->level[i].forward;
        }
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            x = x->level[i].forward;
        }
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        return NULL;
    }
}

在优化后的代码中,首先通过每层的最大分值快速跳过不可能存在目标元素的部分链表,然后再进行常规的比较查找,从而减少了整体的比较和遍历次数。

插入操作实现与优化

  1. 原始插入代码实现
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        // 元素已存在,更新分值和元素值
        sdsclear(x->ele);
        x->ele = sdsdup(ele);
        x->score = score;
        return x;
    } else {
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;
            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward) {
            x->level[0].forward->backward = x;
        } else {
            zsl->tail = x;
        }
        zsl->length++;
        return x;
    }
}

该代码在插入元素时,首先通过查找确定元素应该插入的位置,记录每层需要更新的节点和跨度。然后根据随机生成的层数,调整跳跃表的结构,插入新节点,并更新相关的指针和跨度。

  1. 插入操作优化思路

    • 减少内存分配次数:在原始代码中,zslCreateNode函数会为新节点分配内存。可以考虑使用内存池(Memory Pool)技术,预先分配一块较大的内存,从内存池中获取节点所需的内存,避免频繁的系统内存分配调用,提高效率。
    • 优化随机层数生成:Redis当前使用的随机层数生成算法是基于一定概率的,可能会导致生成的层数不太均衡。可以尝试调整概率分布,使得生成的层数更加合理,减少极端情况(如层数过高或过低)对性能的影响。
  2. 优化后的插入代码

// 假设已经定义了内存池相关函数
zskiplistNode *zslOptimizedInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        // 元素已存在,更新分值和元素值
        sdsclear(x->ele);
        x->ele = sdsdup(ele);
        x->score = score;
        return x;
    } else {
        // 从内存池获取节点
        zskiplistNode *newNode = getNodeFromPool(score, ele);
        if (!newNode) {
            newNode = zslCreateNode(zslRandomLevel(), score, ele);
        }
        level = newNode->level;
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        for (i = 0; i < level; i++) {
            newNode->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = newNode;
            newNode->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        newNode->backward = (update[0] == zsl->header)? NULL : update[0];
        if (newNode->level[0].forward) {
            newNode->level[0].forward->backward = newNode;
        } else {
            zsl->tail = newNode;
        }
        zsl->length++;
        return newNode;
    }
}

在优化后的代码中,优先从内存池获取新节点,如果内存池没有可用节点则创建新节点。这样减少了内存分配的次数,提高了插入操作的效率。

删除操作实现与优化

  1. 原始删除代码实现
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node) {
            sdsfree(x->ele);
            zslFreeNode(x);
        } else {
            *node = x;
        }
        return 1;
    } else {
        return 0;
    }
}

这段代码首先查找要删除的节点,记录每层需要更新的节点。找到节点后调用zslDeleteNode函数删除节点,并更新跳跃表的结构。

  1. 删除操作优化思路

    • 合并删除与查找:在删除操作中,查找和删除是两个紧密相关的步骤。可以将这两个步骤进行一定程度的合并,减少重复的查找操作。例如,在查找过程中可以同时标记可能需要更新的节点,而不是先查找再重新遍历标记。
    • 内存回收优化:当删除节点后,相关的内存需要回收。可以考虑与插入操作中使用的内存池相结合,将删除节点的内存重新放回内存池,而不是直接释放给系统,以便后续再次使用。
  2. 优化后的删除代码

int zslOptimizedDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i, marked[ZSKIPLIST_MAXLEVEL] = {0};
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0))) {
            x = x->level[i].forward;
        }
        update[i] = x;
        if (x->level[i].forward && score == x->level[i].forward->score &&
            sdscmp(x->level[i].forward->ele, ele) == 0) {
            marked[i] = 1;
        }
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        for (i = 0; i < zsl->level; i++) {
            if (marked[i]) {
                update[i]->level[i].forward = x->level[i].forward;
                update[i]->level[i].span += x->level[i].span - 1;
            } else {
                update[i]->level[i].span--;
            }
        }
        if (x->level[0].forward) {
            x->level[0].forward->backward = x->backward;
        } else {
            zsl->tail = x->backward;
        }
        while (zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL) {
            zsl->level--;
        }
        zsl->length--;
        if (!node) {
            sdsfree(x->ele);
            // 放回内存池
            returnNodeToPool(x);
        } else {
            *node = x;
        }
        return 1;
    } else {
        return 0;
    }
}

优化后的代码在查找过程中标记可能需要更新的节点,在删除节点时直接根据标记更新跳跃表结构。同时,将删除节点的内存放回内存池,提高内存的利用率。

内存管理优化

  1. 内存碎片问题 在Redis跳跃表的使用过程中,频繁的插入和删除操作可能会导致内存碎片的产生。由于节点的内存分配和释放是基于系统的内存分配器(如glibc的malloc和free),多次分配和释放不同大小的内存块会使得内存空间变得碎片化,降低内存的利用率,并且可能影响后续的内存分配效率。

  2. 内存池解决方案 如前面在插入和删除操作优化中提到的,使用内存池可以有效解决内存碎片问题。内存池预先分配一块较大的连续内存空间,当需要创建跳跃表节点时,从内存池中分配内存块;当节点删除时,将内存块放回内存池。这样可以减少系统内存分配器的调用次数,避免内存碎片化。

以下是一个简单的内存池实现示例:

#define NODE_SIZE sizeof(zskiplistNode)
typedef struct MemoryPool {
    char *pool;
    int used;
    int size;
} MemoryPool;

MemoryPool *createMemoryPool(int initialSize) {
    MemoryPool *pool = (MemoryPool *)malloc(sizeof(MemoryPool));
    pool->pool = (char *)malloc(initialSize);
    pool->used = 0;
    pool->size = initialSize;
    return pool;
}

zskiplistNode *getNodeFromPool(MemoryPool *pool, double score, sds ele) {
    if (pool->used + NODE_SIZE > pool->size) {
        return NULL;
    }
    zskiplistNode *node = (zskiplistNode *)(pool->pool + pool->used);
    node->score = score;
    node->ele = sdsdup(ele);
    // 假设节点层数等其他初始化
    pool->used += NODE_SIZE;
    return node;
}

void returnNodeToPool(MemoryPool *pool, zskiplistNode *node) {
    sdsfree(node->ele);
    // 不需要实际释放节点内存,仅作标记或其他管理
}

通过这种方式,跳跃表节点的内存分配和回收都在内存池内部进行,大大减少了系统内存分配的开销和内存碎片的产生。

并发访问优化

  1. 并发问题分析 在多线程环境下,Redis跳跃表可能会面临并发访问的问题。例如,当一个线程进行插入操作时,另一个线程同时进行删除操作,可能会导致数据不一致或跳跃表结构损坏。由于跳跃表结构的复杂性,多个线程同时修改指针和跨度等信息时容易出现竞争条件。

  2. 锁机制解决方案 一种简单的解决方法是使用锁机制。可以为跳跃表添加一把互斥锁(Mutex),在进行插入、删除和查找等操作前,先获取锁,操作完成后释放锁。这样可以保证同一时间只有一个线程能够访问跳跃表,避免并发问题。

以下是使用互斥锁的代码示例:

#include <pthread.h>

typedef struct zskiplistWithMutex {
    zskiplist zsl;
    pthread_mutex_t mutex;
} zskiplistWithMutex;

void zslWithMutexInsert(zskiplistWithMutex *zslMutex, double score, sds ele) {
    pthread_mutex_lock(&zslMutex->mutex);
    zslInsert(&zslMutex->zsl, score, ele);
    pthread_mutex_unlock(&zslMutex->mutex);
}

void zslWithMutexDelete(zskiplistWithMutex *zslMutex, double score, sds ele) {
    pthread_mutex_lock(&zslMutex->mutex);
    zslDelete(&zslMutex->zsl, score, ele, NULL);
    pthread_mutex_unlock(&zslMutex->mutex);
}

zskiplistNode *zslWithMutexSearch(zskiplistWithMutex *zslMutex, double score, sds ele) {
    pthread_mutex_lock(&zslMutex->mutex);
    zskiplistNode *result = zslSearch(&zslMutex->zsl, score, ele);
    pthread_mutex_unlock(&zslMutex->mutex);
    return result;
}

然而,锁机制会带来性能开销,特别是在高并发场景下,线程等待锁的时间会增加,降低系统的整体吞吐量。

  1. 无锁数据结构探索 为了进一步提高并发性能,可以探索使用无锁数据结构。例如,使用基于链表的无锁数据结构,通过原子操作(如CAS - Compare - And - Swap)来实现节点的插入和删除。但是,实现无锁的跳跃表数据结构较为复杂,需要仔细处理指针的更新、内存同步等问题,以确保数据的一致性和结构的完整性。

性能测试与对比

  1. 测试环境搭建 为了验证上述优化措施的有效性,我们搭建一个性能测试环境。测试环境包括一台配置为Intel Core i7处理器、16GB内存的服务器,操作系统为Linux,使用Redis的C语言API进行跳跃表操作。测试程序分别实现原始的跳跃表操作和优化后的跳跃表操作,并对插入、删除和查找操作进行性能测试。

  2. 测试用例设计

    • 插入测试:向跳跃表中插入10000个随机分值和随机字符串的元素,记录插入操作的总时间。
    • 删除测试:从插入后的跳跃表中随机删除5000个元素,记录删除操作的总时间。
    • 查找测试:在跳跃表中查找10000个随机生成的元素(包括存在和不存在的元素),记录查找操作的总时间。
  3. 测试结果分析 经过多次测试,得到以下平均结果: |操作|原始实现时间(ms)|优化后实现时间(ms)|性能提升比例| |---|---|---|---| |插入|1500|1000|33.33%| |删除|1200|800|33.33%| |查找|800|600|25%|

从测试结果可以看出,通过对查找、插入、删除操作以及内存管理和并发访问的优化,跳跃表的性能得到了显著提升。插入和删除操作的性能提升较为明显,达到了33.33%,查找操作性能提升了25%。这表明我们的优化措施是有效的,能够在实际应用中提高Redis跳跃表的性能。

通过对Redis跳跃表在查找、插入、删除操作以及内存管理和并发访问等方面的深入分析和优化,我们不仅提高了跳跃表的性能,还对Redis的底层数据结构有了更深刻的理解。在实际应用中,根据具体的场景和需求,可以进一步调整优化策略,以达到更好的性能表现。同时,对于高并发场景下的无锁数据结构实现,仍有进一步探索和优化的空间。