Redis 跳跃表 API 性能优化的策略
Redis 跳跃表概述
Redis 是一款广泛应用于缓存、消息队列等场景的高性能键值对数据库。其数据结构丰富多样,跳跃表(Skip List)作为 Redis 有序集合(Sorted Set)的底层实现之一,具有独特的优势。
跳跃表是一种随机化的数据结构,它通过在每个节点中维持多个指向其他节点的指针,以达到快速查找的目的。与平衡树相比,跳跃表的实现相对简单,而且在插入、删除和查找操作上具有近似于平衡树的时间复杂度。
在 Redis 中,跳跃表主要用于实现有序集合。有序集合中的每个元素都关联一个分数(score),通过跳跃表可以根据分数快速地对元素进行排序和查找。
跳跃表的结构
Redis 中的跳跃表由 zskiplist
和 zskiplistNode
两个结构体组成。
// 跳跃表节点
typedef struct zskiplistNode {
sds ele; // 元素值
double score; // 分数
struct zskiplistNode *backward; // 后退指针
struct zskiplistLevel {
struct zskiplistNode *forward; // 前进指针
unsigned int span; // 跨度
} level[];
} zskiplistNode;
// 跳跃表
typedef struct zskiplist {
struct zskiplistNode *header, *tail; // 表头和表尾指针
unsigned long length; // 节点数量
int level; // 层数
} zskiplist;
zskiplistNode
代表跳跃表中的节点,ele
存储元素值,score
是用于排序的分数。backward
指针用于从后向前遍历跳跃表。level
数组则存储了不同层的 forward
指针和 span
。span
表示从当前节点到 forward
指针指向节点之间的节点数量,用于计算排名。
zskiplist
结构体则是对整个跳跃表的封装,header
和 tail
分别指向表头和表尾节点,length
记录跳跃表中的节点数量,level
表示当前跳跃表的层数。
跳跃表的操作
- 插入操作:插入一个新节点时,首先需要确定新节点的层数。Redis 通过随机函数来决定新节点的层数,这样可以保证跳跃表在一定程度上的平衡性。然后,从表头开始,在每一层找到合适的插入位置,并更新指针。
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
return x;
} else {
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level, score, ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header)? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
}
- 删除操作:删除节点时,同样从表头开始查找要删除的节点。找到后,更新相应层的指针,将该节点从跳跃表中移除。
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
return 1;
} else {
return 0;
}
}
- 查找操作:查找节点时,从表头开始,在每一层根据分数和元素值进行比较,逐步向下层查找,直到找到目标节点或者确定目标节点不存在。
zskiplistNode *zslSearch(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x = zsl->header;
int i;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
return x;
} else {
return NULL;
}
}
Redis 跳跃表 API 性能优化策略
优化随机层数生成
- 原理:跳跃表的性能很大程度上取决于节点层数的分布。Redis 中使用随机函数来生成新节点的层数,默认情况下,生成层数的概率是每增加一层,概率减半。这种方式虽然简单,但可能在某些情况下导致跳跃表的结构不够理想,影响性能。
- 优化方法:可以考虑根据跳跃表的当前状态来动态调整生成层数的概率。例如,如果当前跳跃表的层数较少,可以适当增加新节点生成高层数的概率;反之,如果层数较多,可以降低高层数的生成概率。这样可以使跳跃表的结构更加平衡,提高查找、插入和删除操作的性能。
// 优化后的随机层数生成函数
int optimizedZslRandomLevel(void) {
int level = 1;
while ((random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level < ZSKIPLIST_MAXLEVEL)? level : ZSKIPLIST_MAXLEVEL;
}
在插入节点时,使用优化后的函数来生成层数:
zskiplistNode *optimizedZslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
return x;
} else {
level = optimizedZslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level, score, ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header)? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
}
批量操作优化
- 原理:在实际应用中,经常需要对有序集合进行批量插入、删除或查找操作。如果每次操作都单独调用相应的 API,会产生较多的系统开销,如函数调用开销、内存分配和释放开销等。
- 优化方法:可以提供批量操作的 API。例如,批量插入 API 可以一次性接收多个元素及其分数,然后在内部进行一次性的插入操作。这样可以减少函数调用次数,提高效率。
void zslBulkInsert(zskiplist *zsl, int count, double *scores, sds *eles) {
int i;
for (i = 0; i < count; i++) {
zslInsert(zsl, scores[i], eles[i]);
}
}
对于批量删除操作,可以类似地实现:
void zslBulkDelete(zskiplist *zsl, int count, double *scores, sds *eles) {
int i;
for (i = 0; i < count; i++) {
zslDelete(zsl, scores[i], eles[i], NULL);
}
}
减少内存分配和释放
- 原理:跳跃表的节点在插入和删除时会频繁进行内存分配和释放操作,这会增加系统的负担,影响性能。特别是在高并发场景下,内存分配和释放的竞争可能会导致性能瓶颈。
- 优化方法:可以使用内存池(Memory Pool)技术。内存池预先分配一块较大的内存空间,当需要创建新节点时,从内存池中获取内存,而不是调用系统的内存分配函数。当节点被删除时,将内存返回给内存池,而不是释放给系统。
// 内存池结构体
typedef struct MemoryPool {
char *start;
char *current;
size_t size;
} MemoryPool;
// 初始化内存池
MemoryPool *initMemoryPool(size_t poolSize) {
MemoryPool *pool = (MemoryPool *)malloc(sizeof(MemoryPool));
pool->start = (char *)malloc(poolSize);
pool->current = pool->start;
pool->size = poolSize;
return pool;
}
// 从内存池获取内存
void *allocateFromPool(MemoryPool *pool, size_t size) {
if (pool->current + size > pool->start + pool->size) {
return NULL;
}
void *result = pool->current;
pool->current += size;
return result;
}
// 将内存返回给内存池
void freeToPool(MemoryPool *pool, void *ptr) {
// 这里简单处理,实际应用中可能需要更复杂的管理
// 假设内存是按顺序分配和释放的
if (ptr == pool->current - sizeof(zskiplistNode)) {
pool->current -= sizeof(zskiplistNode);
}
}
// 使用内存池的插入操作
zskiplistNode *zslInsertWithPool(zskiplist *zsl, double score, sds ele, MemoryPool *pool) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
return x;
} else {
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = (zskiplistNode *)allocateFromPool(pool, sizeof(zskiplistNode) + (level - 1) * sizeof(struct zskiplistLevel));
if (!x) {
return NULL;
}
x->score = score;
x->ele = sdsdup(ele);
x->backward = NULL;
for (i = 0; i < level; i++) {
x->level[i].forward = NULL;
x->level[i].span = 0;
}
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header)? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
}
// 使用内存池的删除操作
int zslDeleteWithPool(zskiplist *zsl, double score, sds ele, MemoryPool *pool, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node) {
sdsfree(x->ele);
freeToPool(pool, x);
}
return 1;
} else {
return 0;
}
}
缓存常用操作结果
- 原理:在某些应用场景中,可能会频繁地对跳跃表进行相同的查找操作,例如查找某个固定分数范围内的元素。每次都进行完整的查找操作会浪费大量的时间。
- 优化方法:可以使用缓存来存储常用操作的结果。例如,维护一个哈希表,将分数范围作为键,将对应的查找结果作为值。当再次进行相同分数范围的查找时,先从缓存中查找,如果找到则直接返回结果,否则进行实际的跳跃表查找,并将结果存入缓存。
// 缓存结构体
typedef struct SkipListCache {
dict *cacheDict;
} SkipListCache;
// 初始化缓存
SkipListCache *initSkipListCache() {
SkipListCache *cache = (SkipListCache *)malloc(sizeof(SkipListCache));
cache->cacheDict = dictCreate(&dictTypeHeapStringCopyKey, NULL);
return cache;
}
// 从缓存中获取结果
zskiplistNode *getFromCache(SkipListCache *cache, double minScore, double maxScore) {
char key[64];
snprintf(key, sizeof(key), "%.10f-%.10f", minScore, maxScore);
dictEntry *entry = dictFind(cache->cacheDict, key);
if (entry) {
return dictGetVal(entry);
}
return NULL;
}
// 将结果存入缓存
void setToCache(SkipListCache *cache, double minScore, double maxScore, zskiplistNode *result) {
char key[64];
snprintf(key, sizeof(key), "%.10f-%.10f", minScore, maxScore);
dictAdd(cache->cacheDict, sdsnew(key), result);
}
// 使用缓存的查找操作
zskiplistNode *zslSearchWithCache(zskiplist *zsl, double minScore, double maxScore, SkipListCache *cache) {
zskiplistNode *result = getFromCache(cache, minScore, maxScore);
if (result) {
return result;
}
// 实际的跳跃表查找操作
zskiplistNode *x = zsl->header;
int i;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->score < minScore) {
x = x->level[i].forward;
}
}
x = x->level[0].forward;
zskiplistNode *start = NULL;
if (x && x->score >= minScore) {
start = x;
while (x && x->score <= maxScore) {
x = x->level[0].forward;
}
}
setToCache(cache, minScore, maxScore, start);
return start;
}
优化跨度计算
- 原理:在跳跃表的插入和删除操作中,跨度(span)的计算和更新是一个较为复杂的过程。跨度用于计算排名,其准确计算对于保证跳跃表的正确性和性能至关重要。
- 优化方法:可以在跳跃表节点中增加一个辅助字段,用于快速计算跨度。例如,记录从当前节点到表头节点的总跨度。这样在插入和删除操作时,可以减少跨度的重新计算量。
// 修改后的跳跃表节点结构体
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
unsigned int totalSpan; // 新增的辅助字段,记录到表头的总跨度
} zskiplistNode;
// 修改后的插入操作
zskiplistNode *optimizedZslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
rank[i] = i == (zsl->level - 1)? 0 : rank[i + 1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
return x;
} else {
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level, score, ele);
x->totalSpan = rank[0];
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header)? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
// 更新后续节点的 totalSpan
zskiplistNode *tmp = x->level[0].forward;
while (tmp) {
tmp->totalSpan += 1;
tmp = tmp->level[0].forward;
}
return x;
}
}
// 修改后的删除操作
int optimizedZslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
update[i] = x;
}
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
// 更新后续节点的 totalSpan
zskiplistNode *tmp = update[0]->level[0].forward;
while (tmp) {
tmp->totalSpan -= 1;
tmp = tmp->level[0].forward;
}
return 1;
} else {
return 0;
}
}
并发访问优化
- 原理:在多线程环境下,对跳跃表的并发访问可能会导致数据竞争和不一致问题。例如,一个线程进行插入操作,另一个线程同时进行删除操作,可能会破坏跳跃表的结构。
- 优化方法:可以使用锁机制来保证跳跃表操作的原子性。例如,在进行插入、删除和查找操作前,先获取锁,操作完成后释放锁。
// 定义锁结构体
typedef struct {
pthread_mutex_t mutex;
} SkipListLock;
// 初始化锁
void initSkipListLock(SkipListLock *lock) {
pthread_mutex_init(&lock->mutex, NULL);
}
// 加锁
void lockSkipList(SkipListLock *lock) {
pthread_mutex_lock(&lock->mutex);
}
// 解锁
void unlockSkipList(SkipListLock *lock) {
pthread_mutex_unlock(&lock->mutex);
}
// 使用锁的插入操作
zskiplistNode *zslInsertWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock) {
lockSkipList(lock);
zskiplistNode *result = zslInsert(zsl, score, ele);
unlockSkipList(lock);
return result;
}
// 使用锁的删除操作
int zslDeleteWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock, zskiplistNode **node) {
lockSkipList(lock);
int result = zslDelete(zsl, score, ele, node);
unlockSkipList(lock);
return result;
}
// 使用锁的查找操作
zskiplistNode *zslSearchWithLock(zskiplist *zsl, double score, sds ele, SkipListLock *lock) {
lockSkipList(lock);
zskiplistNode *result = zslSearch(zsl, score, ele);
unlockSkipList(lock);
return result;
}
除了锁机制,还可以考虑使用无锁数据结构或者读写锁来进一步提高并发性能。读写锁允许多个线程同时进行读操作,但在写操作时会独占跳跃表,以保证数据一致性。
// 定义读写锁结构体
typedef struct {
pthread_rwlock_t rwlock;
} SkipListReadWriteLock;
// 初始化读写锁
void initSkipListReadWriteLock(SkipListReadWriteLock *lock) {
pthread_rwlock_init(&lock->rwlock, NULL);
}
// 加读锁
void readLockSkipList(SkipListReadWriteLock *lock) {
pthread_rwlock_rdlock(&lock->rwlock);
}
// 加写锁
void writeLockSkipList(SkipListReadWriteLock *lock) {
pthread_rwlock_wrlock(&lock->rwlock);
}
// 解锁
void unlockSkipListReadWriteLock(SkipListReadWriteLock *lock) {
pthread_rwlock_unlock(&lock->rwlock);
}
// 使用读写锁的读操作
zskiplistNode *zslSearchWithReadWriteLock(zskiplist *zsl, double score, sds ele, SkipListReadWriteLock *lock) {
readLockSkipList(lock);
zskiplistNode *result = zslSearch(zsl, score, ele);
unlockSkipListReadWriteLock(lock);
return result;
}
// 使用读写锁的写操作(插入为例)
zskiplistNode *zslInsertWithReadWriteLock(zskiplist *zsl, double score, sds ele, SkipListReadWriteLock *lock) {
writeLockSkipList(lock);
zskiplistNode *result = zslInsert(zsl, score, ele);
unlockSkipListReadWriteLock(lock);
return result;
}
通过以上多种性能优化策略,可以显著提升 Redis 跳跃表 API 的性能,使其在不同的应用场景下都能高效运行。在实际应用中,需要根据具体的业务需求和系统环境选择合适的优化方法。