MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis 跳跃表 API 性能优化的策略

2024-03-054.5k 阅读

Redis 跳跃表概述

Redis 是一款广泛应用于缓存、消息队列等场景的高性能键值对数据库。其数据结构丰富多样,跳跃表(Skip List)作为 Redis 有序集合(Sorted Set)的底层实现之一,具有独特的优势。

跳跃表是一种随机化的数据结构,它通过在每个节点中维持多个指向其他节点的指针,以达到快速查找的目的。与平衡树相比,跳跃表的实现相对简单,而且在插入、删除和查找操作上具有近似于平衡树的时间复杂度。

在 Redis 中,跳跃表主要用于实现有序集合。有序集合中的每个元素都关联一个分数(score),通过跳跃表可以根据分数快速地对元素进行排序和查找。

跳跃表的结构

Redis 中的跳跃表由 zskiplistzskiplistNode 两个结构体组成。

// 跳跃表节点
typedef struct zskiplistNode {
    sds ele;        // 元素值
    double score;   // 分数
    struct zskiplistNode *backward; // 后退指针
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 前进指针
        unsigned int span; // 跨度
    } level[];
} zskiplistNode;

// 跳跃表
typedef struct zskiplist {
    struct zskiplistNode *header, *tail; // 表头和表尾指针
    unsigned long length; // 节点数量
    int level; // 层数
} zskiplist;

zskiplistNode 代表跳跃表中的节点,ele 存储元素值,score 是用于排序的分数。backward 指针用于从后向前遍历跳跃表。level 数组则存储了不同层的 forward 指针和 spanspan 表示从当前节点到 forward 指针指向节点之间的节点数量,用于计算排名。

zskiplist 结构体则是对整个跳跃表的封装,headertail 分别指向表头和表尾节点,length 记录跳跃表中的节点数量,level 表示当前跳跃表的层数。

跳跃表的操作

  1. 插入操作:插入一个新节点时,首先需要确定新节点的层数。Redis 通过随机函数来决定新节点的层数,这样可以保证跳跃表在一定程度上的平衡性。然后,从表头开始,在每一层找到合适的插入位置,并更新指针。
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
}
  1. 删除操作:删除节点时,同样从表头开始查找要删除的节点。找到后,更新相应层的指针,将该节点从跳跃表中移除。
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        return 1;
    } else {
        return 0;
    }
}
  1. 查找操作:查找节点时,从表头开始,在每一层根据分数和元素值进行比较,逐步向下层查找,直到找到目标节点或者确定目标节点不存在。
zskiplistNode *zslSearch(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x = zsl->header;
    int i;

    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
            x = x->level[i].forward;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        return NULL;
    }
}

Redis 跳跃表 API 性能优化策略

优化随机层数生成

  1. 原理:跳跃表的性能很大程度上取决于节点层数的分布。Redis 中使用随机函数来生成新节点的层数,默认情况下,生成层数的概率是每增加一层,概率减半。这种方式虽然简单,但可能在某些情况下导致跳跃表的结构不够理想,影响性能。
  2. 优化方法:可以考虑根据跳跃表的当前状态来动态调整生成层数的概率。例如,如果当前跳跃表的层数较少,可以适当增加新节点生成高层数的概率;反之,如果层数较多,可以降低高层数的生成概率。这样可以使跳跃表的结构更加平衡,提高查找、插入和删除操作的性能。
// 优化后的随机层数生成函数
int optimizedZslRandomLevel(void) {
    int level = 1;
    while ((random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level < ZSKIPLIST_MAXLEVEL)? level : ZSKIPLIST_MAXLEVEL;
}

在插入节点时,使用优化后的函数来生成层数:

zskiplistNode *optimizedZslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        level = optimizedZslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
}

批量操作优化

  1. 原理:在实际应用中,经常需要对有序集合进行批量插入、删除或查找操作。如果每次操作都单独调用相应的 API,会产生较多的系统开销,如函数调用开销、内存分配和释放开销等。
  2. 优化方法:可以提供批量操作的 API。例如,批量插入 API 可以一次性接收多个元素及其分数,然后在内部进行一次性的插入操作。这样可以减少函数调用次数,提高效率。
void zslBulkInsert(zskiplist *zsl, int count, double *scores, sds *eles) {
    int i;
    for (i = 0; i < count; i++) {
        zslInsert(zsl, scores[i], eles[i]);
    }
}

对于批量删除操作,可以类似地实现:

void zslBulkDelete(zskiplist *zsl, int count, double *scores, sds *eles) {
    int i;
    for (i = 0; i < count; i++) {
        zslDelete(zsl, scores[i], eles[i], NULL);
    }
}

减少内存分配和释放

  1. 原理:跳跃表的节点在插入和删除时会频繁进行内存分配和释放操作,这会增加系统的负担,影响性能。特别是在高并发场景下,内存分配和释放的竞争可能会导致性能瓶颈。
  2. 优化方法:可以使用内存池(Memory Pool)技术。内存池预先分配一块较大的内存空间,当需要创建新节点时,从内存池中获取内存,而不是调用系统的内存分配函数。当节点被删除时,将内存返回给内存池,而不是释放给系统。
// 内存池结构体
typedef struct MemoryPool {
    char *start;
    char *current;
    size_t size;
} MemoryPool;

// 初始化内存池
MemoryPool *initMemoryPool(size_t poolSize) {
    MemoryPool *pool = (MemoryPool *)malloc(sizeof(MemoryPool));
    pool->start = (char *)malloc(poolSize);
    pool->current = pool->start;
    pool->size = poolSize;
    return pool;
}

// 从内存池获取内存
void *allocateFromPool(MemoryPool *pool, size_t size) {
    if (pool->current + size > pool->start + pool->size) {
        return NULL;
    }
    void *result = pool->current;
    pool->current += size;
    return result;
}

// 将内存返回给内存池
void freeToPool(MemoryPool *pool, void *ptr) {
    // 这里简单处理,实际应用中可能需要更复杂的管理
    // 假设内存是按顺序分配和释放的
    if (ptr == pool->current - sizeof(zskiplistNode)) {
        pool->current -= sizeof(zskiplistNode);
    }
}

// 使用内存池的插入操作
zskiplistNode *zslInsertWithPool(zskiplist *zsl, double score, sds ele, MemoryPool *pool) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = (zskiplistNode *)allocateFromPool(pool, sizeof(zskiplistNode) + (level - 1) * sizeof(struct zskiplistLevel));
        if (!x) {
            return NULL;
        }
        x->score = score;
        x->ele = sdsdup(ele);
        x->backward = NULL;
        for (i = 0; i < level; i++) {
            x->level[i].forward = NULL;
            x->level[i].span = 0;
        }
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }
}

// 使用内存池的删除操作
int zslDeleteWithPool(zskiplist *zsl, double score, sds ele, MemoryPool *pool, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node) {
            sdsfree(x->ele);
            freeToPool(pool, x);
        }
        return 1;
    } else {
        return 0;
    }
}

缓存常用操作结果

  1. 原理:在某些应用场景中,可能会频繁地对跳跃表进行相同的查找操作,例如查找某个固定分数范围内的元素。每次都进行完整的查找操作会浪费大量的时间。
  2. 优化方法:可以使用缓存来存储常用操作的结果。例如,维护一个哈希表,将分数范围作为键,将对应的查找结果作为值。当再次进行相同分数范围的查找时,先从缓存中查找,如果找到则直接返回结果,否则进行实际的跳跃表查找,并将结果存入缓存。
// 缓存结构体
typedef struct SkipListCache {
    dict *cacheDict;
} SkipListCache;

// 初始化缓存
SkipListCache *initSkipListCache() {
    SkipListCache *cache = (SkipListCache *)malloc(sizeof(SkipListCache));
    cache->cacheDict = dictCreate(&dictTypeHeapStringCopyKey, NULL);
    return cache;
}

// 从缓存中获取结果
zskiplistNode *getFromCache(SkipListCache *cache, double minScore, double maxScore) {
    char key[64];
    snprintf(key, sizeof(key), "%.10f-%.10f", minScore, maxScore);
    dictEntry *entry = dictFind(cache->cacheDict, key);
    if (entry) {
        return dictGetVal(entry);
    }
    return NULL;
}

// 将结果存入缓存
void setToCache(SkipListCache *cache, double minScore, double maxScore, zskiplistNode *result) {
    char key[64];
    snprintf(key, sizeof(key), "%.10f-%.10f", minScore, maxScore);
    dictAdd(cache->cacheDict, sdsnew(key), result);
}

// 使用缓存的查找操作
zskiplistNode *zslSearchWithCache(zskiplist *zsl, double minScore, double maxScore, SkipListCache *cache) {
    zskiplistNode *result = getFromCache(cache, minScore, maxScore);
    if (result) {
        return result;
    }
    // 实际的跳跃表查找操作
    zskiplistNode *x = zsl->header;
    int i;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward && x->level[i].forward->score < minScore) {
            x = x->level[i].forward;
        }
    }
    x = x->level[0].forward;
    zskiplistNode *start = NULL;
    if (x && x->score >= minScore) {
        start = x;
        while (x && x->score <= maxScore) {
            x = x->level[0].forward;
        }
    }
    setToCache(cache, minScore, maxScore, start);
    return start;
}

优化跨度计算

  1. 原理:在跳跃表的插入和删除操作中,跨度(span)的计算和更新是一个较为复杂的过程。跨度用于计算排名,其准确计算对于保证跳跃表的正确性和性能至关重要。
  2. 优化方法:可以在跳跃表节点中增加一个辅助字段,用于快速计算跨度。例如,记录从当前节点到表头节点的总跨度。这样在插入和删除操作时,可以减少跨度的重新计算量。
// 修改后的跳跃表节点结构体
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
    unsigned int totalSpan; // 新增的辅助字段,记录到表头的总跨度
} zskiplistNode;

// 修改后的插入操作
zskiplistNode *optimizedZslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        return x;
    } else {
        level = zslRandomLevel();
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        x = zslCreateNode(level, score, ele);
        x->totalSpan = rank[0];
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        x->backward = (update[0] == zsl->header)? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        // 更新后续节点的 totalSpan
        zskiplistNode *tmp = x->level[0].forward;
        while (tmp) {
            tmp->totalSpan += 1;
            tmp = tmp->level[0].forward;
        }
        return x;
    }
}

// 修改后的删除操作
int optimizedZslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        // 更新后续节点的 totalSpan
        zskiplistNode *tmp = update[0]->level[0].forward;
        while (tmp) {
            tmp->totalSpan -= 1;
            tmp = tmp->level[0].forward;
        }
        return 1;
    } else {
        return 0;
    }
}

并发访问优化

  1. 原理:在多线程环境下,对跳跃表的并发访问可能会导致数据竞争和不一致问题。例如,一个线程进行插入操作,另一个线程同时进行删除操作,可能会破坏跳跃表的结构。
  2. 优化方法:可以使用锁机制来保证跳跃表操作的原子性。例如,在进行插入、删除和查找操作前,先获取锁,操作完成后释放锁。
// 定义锁结构体
typedef struct {
    pthread_mutex_t mutex;
} SkipListLock;

// 初始化锁
void initSkipListLock(SkipListLock *lock) {
    pthread_mutex_init(&lock->mutex, NULL);
}

// 加锁
void lockSkipList(SkipListLock *lock) {
    pthread_mutex_lock(&lock->mutex);
}

// 解锁
void unlockSkipList(SkipListLock *lock) {
    pthread_mutex_unlock(&lock->mutex);
}

// 使用锁的插入操作
zskiplistNode *zslInsertWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock) {
    lockSkipList(lock);
    zskiplistNode *result = zslInsert(zsl, score, ele);
    unlockSkipList(lock);
    return result;
}

// 使用锁的删除操作
int zslDeleteWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock, zskiplistNode **node) {
    lockSkipList(lock);
    int result = zslDelete(zsl, score, ele, node);
    unlockSkipList(lock);
    return result;
}

// 使用锁的查找操作
zskiplistNode *zslSearchWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock) {
    lockSkipList(lock);
    zskiplistNode *result = zslSearch(zsl, score, ele);
    unlockSkipList(lock);
    return result;
}

除了锁机制,还可以考虑使用无锁数据结构或者读写锁来进一步提高并发性能。读写锁允许多个线程同时进行读操作,但在写操作时会独占跳跃表,以保证数据一致性。

// 定义读写锁结构体
typedef struct {
    pthread_rwlock_t rwlock;
} SkipListReadWriteLock;

// 初始化读写锁
void initSkipListReadWriteLock(SkipListReadWriteLock *lock) {
    pthread_rwlock_init(&lock->rwlock, NULL);
}

// 加读锁
void readLockSkipList(SkipListReadWriteLock *lock) {
    pthread_rwlock_rdlock(&lock->rwlock);
}

// 加写锁
void writeLockSkipList(SkipListReadWriteLock *lock) {
    pthread_rwlock_wrlock(&lock->rwlock);
}

// 解锁
void unlockSkipListReadWriteLock(SkipListReadWriteLock *lock) {
    pthread_rwlock_unlock(&lock->rwlock);
}

// 使用读写锁的读操作
zskiplistNode *zslSearchWithReadWriteLock(zskiplist *zsl, double score, sds ele, SkipListReadWriteLock *lock) {
    readLockSkipList(lock);
    zskiplistNode *result = zslSearch(zsl, score, ele);
    unlockSkipListReadWriteLock(lock);
    return result;
}

// 使用读写锁的写操作(插入为例)
zskiplistNode *zslInsertWithReadWriteLock(zskiplist *zsl, double score, sds ele, SkipListReadWriteLock *lock) {
    writeLockSkipList(lock);
    zskiplistNode *result = zslInsert(zsl, score, ele);
    unlockSkipListReadWriteLock(lock);
    return result;
}

通过以上多种性能优化策略,可以显著提升 Redis 跳跃表 API 的性能,使其在不同的应用场景下都能高效运行。在实际应用中,需要根据具体的业务需求和系统环境选择合适的优化方法。