Redis 整数集合实现的底层逻辑探秘
Redis 整数集合的基本概念
Redis 是一款基于内存的高性能键值对数据库,其丰富的数据结构和高效的实现方式使得它在众多应用场景中脱颖而出。整数集合(intset)作为 Redis 中一种较为底层的数据结构,主要用于存储整数类型的元素,并且这些元素在集合中是唯一的。它为 Redis 的集合类型提供了基础支持,在某些场景下可以有效地节省内存空间。
整数集合被设计用来在内存使用和操作效率之间达到一种平衡。当一个 Redis 集合只包含整数值元素,并且元素数量不是特别多的时候,Redis 就会使用整数集合来存储这个集合。这样做的好处是,相比使用哈希表等更为通用的数据结构,整数集合可以显著减少内存的占用。
整数集合的结构定义
在 Redis 的源码中,整数集合的结构定义在 intset.h
文件中,如下所示:
typedef struct intset {
// 编码方式
uint32_t encoding;
// 集合包含的元素数量
uint32_t length;
// 保存元素的数组
int8_t contents[];
} intset;
- encoding 字段:该字段用于表示整数集合中元素的编码方式。Redis 支持三种不同的编码方式,分别对应不同范围的整数:
INTSET_ENC_INT16
:表示集合中的元素都是 16 位有符号整数(int16_t
),取值范围为-32768
到32767
。INTSET_ENC_INT32
:表示集合中的元素都是 32 位有符号整数(int32_t
),取值范围为-2147483648
到2147483647
。INTSET_ENC_INT64
:表示集合中的元素都是 64 位有符号整数(int64_t
),取值范围为-9223372036854775808
到9223372036854775807
。
- length 字段:该字段记录了整数集合中当前包含的元素数量。通过这个字段,我们可以快速获取集合的大小,时间复杂度为 $O(1)$。
- contents 数组:这是一个柔性数组(flexible array member),它实际存储了整数集合中的所有元素。虽然在定义中它的类型是
int8_t
,但根据encoding
的值,实际存储的元素类型会有所不同。这个数组中的元素是按照从小到大的顺序排列的,这一特性在后续的查找、插入和删除操作中起到了关键作用。
整数集合的编码升级
编码升级的触发条件
当我们向一个整数集合中插入新元素时,如果新元素的类型无法用当前整数集合的编码来表示,就会触发编码升级。例如,当前整数集合使用 INTSET_ENC_INT16
编码,而要插入的元素值超出了 int16_t
的范围,这时就需要将编码升级为 INTSET_ENC_INT32
或者 INTSET_ENC_INT64
。
编码升级的具体过程
- 确定新的编码:根据要插入元素的大小,选择合适的新编码。如果要插入的元素在
int16_t
范围内,但集合当前编码为INTSET_ENC_INT8
,则升级为INTSET_ENC_INT16
;如果超出int16_t
范围但在int32_t
范围内,则升级为INTSET_ENC_INT32
;如果超出int32_t
范围,则升级为INTSET_ENC_INT64
。 - 重新分配内存:根据新的编码和当前集合的元素数量,重新计算需要分配的内存大小。由于新编码可能需要更大的空间来存储每个元素,所以新分配的内存会比原来的大。
- 数据迁移:将原来集合中的所有元素按照新的编码方式重新存储到新分配的内存空间中。这个过程中,需要注意保持元素的顺序不变。因为原来的元素是有序的,所以在迁移过程中,只需要按照顺序依次将每个元素转换为新编码并存储即可。
- 插入新元素:完成数据迁移后,将新元素按照正确的顺序插入到整数集合中。
以下是一个简单的代码示例,模拟整数集合编码升级的过程:
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
// 模拟整数集合结构
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
// 创建一个新的整数集合
intset* intsetNew(void) {
intset *is = (intset*)malloc(sizeof(intset) + sizeof(int16_t));
is->encoding = INTSET_ENC_INT16;
is->length = 0;
return is;
}
// 插入元素
intset* intsetAdd(intset *is, int64_t value) {
// 检查是否需要编码升级
if (value > INT16_MAX || value < INT16_MIN) {
// 这里简单假设升级为 INTSET_ENC_INT32
uint32_t oldLen = is->length;
uint32_t newLen = oldLen + 1;
intset *newIs = (intset*)malloc(sizeof(intset) + newLen * sizeof(int32_t));
newIs->encoding = INTSET_ENC_INT32;
newIs->length = newLen;
// 数据迁移
for (uint32_t i = 0; i < oldLen; i++) {
int16_t oldValue = *(int16_t*)&is->contents[i * sizeof(int16_t)];
*(int32_t*)&newIs->contents[i * sizeof(int32_t)] = oldValue;
}
// 插入新元素
int32_t newValue = (int32_t)value;
int inserted = 0;
for (uint32_t i = 0; i < newLen; i++) {
if (!inserted && newValue < *(int32_t*)&newIs->contents[i * sizeof(int32_t)]) {
for (uint32_t j = newLen - 1; j > i; j--) {
*(int32_t*)&newIs->contents[j * sizeof(int32_t)] = *(int32_t*)&newIs->contents[(j - 1) * sizeof(int32_t)];
}
*(int32_t*)&newIs->contents[i * sizeof(int32_t)] = newValue;
inserted = 1;
}
}
if (!inserted) {
*(int32_t*)&newIs->contents[(newLen - 1) * sizeof(int32_t)] = newValue;
}
free(is);
return newIs;
} else {
// 不需要编码升级,直接插入
uint32_t oldLen = is->length;
uint32_t newLen = oldLen + 1;
intset *newIs = (intset*)malloc(sizeof(intset) + newLen * sizeof(int16_t));
newIs->encoding = INTSET_ENC_INT16;
newIs->length = newLen;
// 数据迁移
for (uint32_t i = 0; i < oldLen; i++) {
*(int16_t*)&newIs->contents[i * sizeof(int16_t)] = *(int16_t*)&is->contents[i * sizeof(int16_t)];
}
// 插入新元素
int16_t newValue = (int16_t)value;
int inserted = 0;
for (uint32_t i = 0; i < newLen; i++) {
if (!inserted && newValue < *(int16_t*)&newIs->contents[i * sizeof(int16_t)]) {
for (uint32_t j = newLen - 1; j > i; j--) {
*(int16_t*)&newIs->contents[j * sizeof(int16_t)] = *(int16_t*)&newIs->contents[(j - 1) * sizeof(int16_t)];
}
*(int16_t*)&newIs->contents[i * sizeof(int16_t)] = newValue;
inserted = 1;
}
}
if (!inserted) {
*(int16_t*)&newIs->contents[(newLen - 1) * sizeof(int16_t)] = newValue;
}
free(is);
return newIs;
}
}
// 打印整数集合
void printIntset(intset *is) {
if (is->encoding == INTSET_ENC_INT16) {
for (uint32_t i = 0; i < is->length; i++) {
int16_t value = *(int16_t*)&is->contents[i * sizeof(int16_t)];
printf("%d ", value);
}
} else if (is->encoding == INTSET_ENC_INT32) {
for (uint32_t i = 0; i < is->length; i++) {
int32_t value = *(int32_t*)&is->contents[i * sizeof(int32_t)];
printf("%d ", value);
}
}
printf("\n");
}
int main() {
intset *is = intsetNew();
is = intsetAdd(is, 10);
is = intsetAdd(is, 20);
is = intsetAdd(is, 30000); // 触发编码升级
printIntset(is);
free(is);
return 0;
}
编码升级的意义
编码升级的机制使得整数集合在面对不同范围的整数元素时,能够动态地调整自身的存储方式,从而在保证元素能够正确存储的同时,尽可能地节省内存。这种动态调整的策略避免了一开始就使用大编码(如 INTSET_ENC_INT64
)来存储所有元素导致的内存浪费,也避免了因为元素类型不匹配而频繁进行数据类型转换的开销。
整数集合的查找操作
查找的实现原理
由于整数集合中的元素是按照从小到大的顺序排列的,因此在整数集合中查找元素可以使用二分查找法。二分查找的基本思想是,每次将查找范围缩小一半,直到找到目标元素或者确定目标元素不存在为止。这种查找方式的时间复杂度为 $O(\log n)$,其中 $n$ 是整数集合中元素的数量。
代码示例
以下是使用 C 语言实现的在整数集合中进行二分查找的代码:
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
// 模拟整数集合结构
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
// 二分查找函数
int intsetFind(intset *is, int64_t value) {
int low = 0;
int high = is->length - 1;
while (low <= high) {
int mid = (low + high) / 2;
int64_t midValue;
if (is->encoding == INTSET_ENC_INT16) {
midValue = *(int16_t*)&is->contents[mid * sizeof(int16_t)];
} else if (is->encoding == INTSET_ENC_INT32) {
midValue = *(int32_t*)&is->contents[mid * sizeof(int32_t)];
} else {
midValue = *(int64_t*)&is->contents[mid * sizeof(int64_t)];
}
if (midValue == value) {
return 1;
} else if (midValue < value) {
low = mid + 1;
} else {
high = mid - 1;
}
}
return 0;
}
int main() {
// 假设已经有一个整数集合
intset is;
is.encoding = INTSET_ENC_INT16;
is.length = 3;
int16_t data[] = {10, 20, 30};
for (int i = 0; i < 3; i++) {
*(int16_t*)&is.contents[i * sizeof(int16_t)] = data[i];
}
int found = intsetFind(&is, 20);
if (found) {
printf("元素 20 存在\n");
} else {
printf("元素 20 不存在\n");
}
return 0;
}
查找操作的性能分析
二分查找的高效性得益于整数集合的有序性。与无序集合的查找操作(时间复杂度为 $O(n)$)相比,整数集合在查找元素时具有明显的性能优势。这种高效的查找机制使得整数集合在需要频繁查找元素的场景下表现出色,例如在一些需要快速判断某个整数是否存在于集合中的应用中。
整数集合的插入操作
插入操作的步骤
- 查找元素是否已存在:在插入新元素之前,首先使用二分查找法检查该元素是否已经存在于整数集合中。如果存在,则直接返回,不进行插入操作,以保证集合元素的唯一性。
- 检查是否需要编码升级:如果元素不存在,则检查新元素的类型是否可以用当前整数集合的编码表示。如果不能表示,则触发编码升级,按照编码升级的流程重新分配内存、迁移数据。
- 确定插入位置:由于整数集合是有序的,在编码升级完成后(如果需要的话),根据元素的大小确定新元素在数组中的插入位置。可以通过二分查找找到合适的插入点,使得插入后集合仍然保持有序。
- 移动元素并插入:确定插入位置后,将插入位置及之后的所有元素向后移动一个位置,为新元素腾出空间,然后将新元素插入到指定位置。同时,更新整数集合的
length
字段。
代码示例
以下是一个完整的 C 语言实现的整数集合插入操作的代码:
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
// 模拟整数集合结构
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
// 创建一个新的整数集合
intset* intsetNew(void) {
intset *is = (intset*)malloc(sizeof(intset) + sizeof(int16_t));
is->encoding = INTSET_ENC_INT16;
is->length = 0;
return is;
}
// 二分查找函数
int intsetFind(intset *is, int64_t value) {
int low = 0;
int high = is->length - 1;
while (low <= high) {
int mid = (low + high) / 2;
int64_t midValue;
if (is->encoding == INTSET_ENC_INT16) {
midValue = *(int16_t*)&is->contents[mid * sizeof(int16_t)];
} else if (is->encoding == INTSET_ENC_INT32) {
midValue = *(int32_t*)&is->contents[mid * sizeof(int32_t)];
} else {
midValue = *(int64_t*)&is->contents[mid * sizeof(int64_t)];
}
if (midValue == value) {
return 1;
} else if (midValue < value) {
low = mid + 1;
} else {
high = mid - 1;
}
}
return 0;
}
// 插入元素
intset* intsetAdd(intset *is, int64_t value) {
if (intsetFind(is, value)) {
return is;
}
// 检查是否需要编码升级
if (value > INT16_MAX || value < INT16_MIN) {
// 这里简单假设升级为 INTSET_ENC_INT32
uint32_t oldLen = is->length;
uint32_t newLen = oldLen + 1;
intset *newIs = (intset*)malloc(sizeof(intset) + newLen * sizeof(int32_t));
newIs->encoding = INTSET_ENC_INT32;
newIs->length = newLen;
// 数据迁移
for (uint32_t i = 0; i < oldLen; i++) {
int16_t oldValue = *(int16_t*)&is->contents[i * sizeof(int16_t)];
*(int32_t*)&newIs->contents[i * sizeof(int32_t)] = oldValue;
}
// 确定插入位置
int inserted = 0;
for (uint32_t i = 0; i < newLen; i++) {
if (!inserted && value < *(int32_t*)&newIs->contents[i * sizeof(int32_t)]) {
for (uint32_t j = newLen - 1; j > i; j--) {
*(int32_t*)&newIs->contents[j * sizeof(int32_t)] = *(int32_t*)&newIs->contents[(j - 1) * sizeof(int32_t)];
}
*(int32_t*)&newIs->contents[i * sizeof(int32_t)] = (int32_t)value;
inserted = 1;
}
}
if (!inserted) {
*(int32_t*)&newIs->contents[(newLen - 1) * sizeof(int32_t)] = (int32_t)value;
}
free(is);
return newIs;
} else {
// 不需要编码升级,直接插入
uint32_t oldLen = is->length;
uint32_t newLen = oldLen + 1;
intset *newIs = (intset*)malloc(sizeof(intset) + newLen * sizeof(int16_t));
newIs->encoding = INTSET_ENC_INT16;
newIs->length = newLen;
// 数据迁移
for (uint32_t i = 0; i < oldLen; i++) {
*(int16_t*)&newIs->contents[i * sizeof(int16_t)] = *(int16_t*)&is->contents[i * sizeof(int16_t)];
}
// 确定插入位置
int inserted = 0;
for (uint32_t i = 0; i < newLen; i++) {
if (!inserted && value < *(int16_t*)&newIs->contents[i * sizeof(int16_t)]) {
for (uint32_t j = newLen - 1; j > i; j--) {
*(int16_t*)&newIs->contents[j * sizeof(int16_t)] = *(int16_t*)&newIs->contents[(j - 1) * sizeof(int16_t)];
}
*(int16_t*)&newIs->contents[i * sizeof(int16_t)] = (int16_t)value;
inserted = 1;
}
}
if (!inserted) {
*(int16_t*)&newIs->contents[(newLen - 1) * sizeof(int16_t)] = (int16_t)value;
}
free(is);
return newIs;
}
}
// 打印整数集合
void printIntset(intset *is) {
if (is->encoding == INTSET_ENC_INT16) {
for (uint32_t i = 0; i < is->length; i++) {
int16_t value = *(int16_t*)&is->contents[i * sizeof(int16_t)];
printf("%d ", value);
}
} else if (is->encoding == INTSET_ENC_INT32) {
for (uint32_t i = 0; i < is->length; i++) {
int32_t value = *(int32_t*)&is->contents[i * sizeof(int32_t)];
printf("%d ", value);
}
}
printf("\n");
}
int main() {
intset *is = intsetNew();
is = intsetAdd(is, 10);
is = intsetAdd(is, 20);
is = intsetAdd(is, 15);
printIntset(is);
free(is);
return 0;
}
插入操作的性能分析
插入操作的时间复杂度主要由查找元素是否存在($O(\log n)$)和移动元素(最坏情况下 $O(n)$)两部分组成。因此,插入操作的最坏时间复杂度为 $O(n)$,平均时间复杂度为 $O(\log n)$。虽然最坏情况下时间复杂度较高,但由于整数集合通常用于存储相对较少的元素,在实际应用中,插入操作的性能仍然是可以接受的。同时,编码升级的过程虽然会带来额外的开销,但这种情况并不经常发生,只有在插入元素类型不匹配时才会触发。
整数集合的删除操作
删除操作的步骤
- 查找元素位置:使用二分查找法确定要删除的元素在整数集合中的位置。如果元素不存在,则直接返回,不进行删除操作。
- 移动元素:找到元素位置后,将该位置之后的所有元素向前移动一个位置,覆盖要删除的元素。这样就完成了元素的删除操作。
- 更新长度:更新整数集合的
length
字段,使其反映删除元素后的实际元素数量。
代码示例
以下是 C 语言实现的整数集合删除操作的代码:
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
// 模拟整数集合结构
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
// 二分查找函数
int intsetFind(intset *is, int64_t value) {
int low = 0;
int high = is->length - 1;
while (low <= high) {
int mid = (low + high) / 2;
int64_t midValue;
if (is->encoding == INTSET_ENC_INT16) {
midValue = *(int16_t*)&is->contents[mid * sizeof(int16_t)];
} else if (is->encoding == INTSET_ENC_INT32) {
midValue = *(int32_t*)&is->contents[mid * sizeof(int32_t)];
} else {
midValue = *(int64_t*)&is->contents[mid * sizeof(int64_t)];
}
if (midValue == value) {
return mid;
} else if (midValue < value) {
low = mid + 1;
} else {
high = mid - 1;
}
}
return -1;
}
// 删除元素
intset* intsetRemove(intset *is, int64_t value) {
int pos = intsetFind(is, value);
if (pos == -1) {
return is;
}
uint32_t oldLen = is->length;
uint32_t newLen = oldLen - 1;
intset *newIs = (intset*)malloc(sizeof(intset) + newLen * (is->encoding == INTSET_ENC_INT16? sizeof(int16_t) : sizeof(int32_t)));
newIs->encoding = is->encoding;
newIs->length = newLen;
for (uint32_t i = 0, j = 0; i < oldLen; i++) {
if (i != pos) {
if (is->encoding == INTSET_ENC_INT16) {
*(int16_t*)&newIs->contents[j * sizeof(int16_t)] = *(int16_t*)&is->contents[i * sizeof(int16_t)];
} else {
*(int32_t*)&newIs->contents[j * sizeof(int32_t)] = *(int32_t*)&is->contents[i * sizeof(int32_t)];
}
j++;
}
}
free(is);
return newIs;
}
// 打印整数集合
void printIntset(intset *is) {
if (is->encoding == INTSET_ENC_INT16) {
for (uint32_t i = 0; i < is->length; i++) {
int16_t value = *(int16_t*)&is->contents[i * sizeof(int16_t)];
printf("%d ", value);
}
} else if (is->encoding == INTSET_ENC_INT32) {
for (uint32_t i = 0; i < is->length; i++) {
int32_t value = *(int32_t*)&is->contents[i * sizeof(int32_t)];
printf("%d ", value);
}
}
printf("\n");
}
int main() {
// 假设已经有一个整数集合
intset is;
is.encoding = INTSET_ENC_INT16;
is.length = 3;
int16_t data[] = {10, 20, 30};
for (int i = 0; i < 3; i++) {
*(int16_t*)&is.contents[i * sizeof(int16_t)] = data[i];
}
intset *newIs = intsetRemove(&is, 20);
printIntset(newIs);
free(newIs);
return 0;
}
删除操作的性能分析
删除操作的时间复杂度主要由查找元素位置($O(\log n)$)和移动元素(最坏情况下 $O(n)$)组成。因此,删除操作的最坏时间复杂度为 $O(n)$,平均时间复杂度为 $O(\log n)$。与插入操作类似,虽然最坏时间复杂度较高,但在实际应用中,由于整数集合元素数量通常较少,删除操作的性能也能满足大多数场景的需求。
整数集合与其他数据结构的比较
与哈希表的比较
- 内存占用:哈希表通常需要更多的内存来存储键值对,即使对于只包含整数的集合也是如此。因为哈希表除了存储元素值外,还需要存储哈希值、指针等额外信息。而整数集合通过紧凑的数组结构,并且根据元素类型动态调整编码,能够在存储整数集合时显著节省内存。
- 查找性能:哈希表的查找操作平均时间复杂度为 $O(1)$,但在最坏情况下(如哈希冲突严重时)会退化为 $O(n)$。整数集合使用二分查找,平均和最坏时间复杂度为 $O(\log n)$。对于元素数量较少的集合,两者的查找性能差异不明显,但随着元素数量的增加,哈希表的最坏情况性能可能会受到哈希冲突的影响,而整数集合的性能相对更稳定。
- 插入和删除性能:哈希表的插入和删除操作平均时间复杂度为 $O(1)$,但同样在最坏情况下会退化为 $O(n)$。整数集合的插入和删除操作最坏时间复杂度为 $O(n)$,平均时间复杂度为 $O(\log n)$。总体来说,在元素数量较少时,哈希表的插入和删除性能更优,但在元素数量较多且哈希冲突不可控的情况下,整数集合的性能表现可能更稳定。
与数组的比较
- 内存管理:普通数组在存储整数集合时,无法动态调整每个元素的存储类型,可能会造成内存浪费。例如,如果数组中所有元素都在
int16_t
范围内,但却使用int32_t
类型来存储,就会浪费一半的内存空间。而整数集合可以根据元素的实际范围动态调整编码,从而更有效地利用内存。 - 元素唯一性:普通数组本身不保证元素的唯一性,需要额外的逻辑来检查和处理重复元素。而整数集合在插入操作时会自动检查元素的唯一性,避免重复元素的插入。
- 查找、插入和删除性能:普通数组的查找操作时间复杂度为 $O(n)$,除非数组是有序的且使用二分查找(此时与整数集合查找性能相同)。插入和删除操作在普通数组中通常需要移动大量元素,时间复杂度为 $O(n)$。整数集合利用有序性进行二分查找,查找性能更好,插入和删除操作虽然最坏时间复杂度也是 $O(n)$,但平均时间复杂度为 $O(\log n)$,在性能上更有优势。
整数集合在 Redis 中的应用场景
集合类型的底层存储
当 Redis 的集合类型只包含整数值元素,并且元素数量较少时,Redis 会使用整数集合作为底层存储结构。例如,在一些统计用户 ID 集合、商品 ID 集合等场景中,如果这些 ID 都是整数类型且数量不是特别庞大,使用整数集合可以有效地节省内存,同时保证一定的操作性能。
有序集合的部分实现
在 Redis 的有序集合(Sorted Set)中,当所有成员都是整数值且数量较小时,Redis 会使用一种特殊的编码方式,其中就可能涉及到整数集合。这种优化可以在保证有序集合基本功能的同时,减少内存占用和提高操作效率。
总结整数集合的特点与优势
整数集合作为 Redis 中一种重要的底层数据结构,具有以下特点和优势:
- 内存高效:通过动态调整编码方式,整数集合能够根据元素的实际范围选择最合适的存储类型,从而最大限度地节省内存空间。这在内存资源有限的环境中尤为重要,使得 Redis 能够在存储大量整数集合时保持高效。
- 操作性能较好:虽然插入、删除操作的最坏时间复杂度为 $O(n)$,但由于元素有序,可以使用二分查找,平均时间复杂度为 $O(\log n)$,在查找、插入和删除操作上都有不错的性能表现。特别是对于元素数量较少的集合,性能优势更加明显。
- 元素唯一性:整数集合在插入操作时会自动检查元素的唯一性,确保集合中不会出现重复元素,这对于很多需要保证数据唯一性的应用场景非常方便。
- 简单易用:整数集合的结构和实现相对简单,易于理解和维护。这使得 Redis 的开发者能够更高效地对其进行优化和扩展,同时也方便其他开发者在自己的项目中借鉴和使用类似的设计思路。
整数集合在 Redis 的数据存储和处理中扮演着重要的角色,它的设计理念和实现方式为我们在处理整数集合类型数据时提供了很好的参考范例,无论是在内存管理还是操作性能方面,都有许多值得学习和借鉴的地方。