MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis事务实现的底层原理剖析

2022-09-024.4k 阅读

Redis事务概述

Redis 是一种基于键值对的高性能非关系型数据库,广泛应用于缓存、消息队列、分布式锁等场景。事务是 Redis 提供的一项重要功能,它允许用户将多个命令组合在一起,作为一个原子操作执行。在 Redis 事务中,要么所有命令都成功执行,要么都不执行。

Redis 事务主要由三个命令组成:MULTIEXECDISCARDMULTI 用于标记事务的开始,EXEC 用于执行事务中的所有命令,DISCARD 用于取消事务。

以下是一个简单的 Redis 事务示例:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 开启事务
pipe = r.pipeline()

# 加入事务命令
pipe.set('key1', 'value1')
pipe.get('key1')

# 执行事务
result = pipe.execute()
print(result)

在上述 Python 代码中,使用 redis - py 库操作 Redis。首先通过 pipeline() 方法开启一个事务管道,然后向管道中添加 setget 命令,最后通过 execute() 方法执行事务。

Redis事务底层实现数据结构

事务状态(multiState)

在 Redis 内部,使用 multiState 结构体来表示事务状态。multiState 结构体定义在 server.h 文件中,主要包含以下几个重要字段:

typedef struct multiState {
    multiCmd *commands;  // 事务命令数组
    int count;           // 事务命令数量
    int minreplicas;     // 最小副本数
    time_t minreplicas_timeout; // 最小副本超时时间
} multiState;
  1. commands:这是一个 multiCmd 类型的数组,用于存储事务中的所有命令。multiCmd 结构体定义如下:
typedef struct multiCmd {
    robj **argv;        // 命令参数数组
    int argc;           // 命令参数个数
    struct redisCommand *cmd; // 命令指针
} multiCmd;

argv 是一个 robj 类型的指针数组,robj 是 Redis 中表示对象的结构体,用于存储命令的参数。argc 表示命令参数的个数,cmd 是指向 redisCommand 结构体的指针,redisCommand 结构体定义了 Redis 所有命令的相关信息,如命令名、实现函数等。

  1. count:表示事务中命令的数量。它记录了 commands 数组中已经存储的命令个数。

  2. minreplicasminreplicas_timeout:这两个字段主要用于 Redis 集群模式下的同步复制。minreplicas 表示执行事务时需要同步的最小副本数,minreplicas_timeout 表示等待副本同步的超时时间。

客户端状态(client)

Redis 使用 client 结构体来表示每个客户端连接,该结构体也定义在 server.h 文件中。在事务相关的操作中,client 结构体中有几个重要字段与事务相关:

typedef struct client {
    multiState mstate;  // 事务状态
    int flags;          // 客户端标志,包含是否处于事务状态等标志位
    // 其他字段省略
} client;
  1. mstate:类型为 multiState,用于存储当前客户端事务的具体状态,如事务中的命令等信息。

  2. flags:这是一个整数类型的字段,使用位运算来表示不同的标志。其中与事务相关的标志有 CLIENT_MULTI,当这个标志位被设置时,表示当前客户端处于事务状态。

Redis事务命令实现过程

MULTI命令

当客户端发送 MULTI 命令时,Redis 服务器会执行 multiCommand 函数,该函数定义在 t_mset.c 文件中。主要逻辑如下:

void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c, "MULTI calls can not be nested");
        return;
    }
    c->flags |= CLIENT_MULTI;
    addReply(c, shared.ok);
}
  1. 首先检查客户端的 flags 字段是否已经设置了 CLIENT_MULTI 标志。如果已经设置,说明当前客户端已经处于事务状态,不允许嵌套调用 MULTI 命令,此时向客户端返回错误信息。
  2. 如果客户端不在事务状态,则设置 CLIENT_MULTI 标志,表示客户端进入事务状态,并向客户端返回 OK 响应。

事务命令入队

在客户端处于事务状态(即 CLIENT_MULTI 标志被设置)时,客户端发送的命令不会立即执行,而是被放入事务队列中。以 SET 命令为例,当客户端在事务状态下发送 SET 命令时,Redis 会执行 setCommand 函数,但在事务状态下,setCommand 函数会将命令入队,而不是直接执行。具体逻辑如下(简化版):

void setCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        robj **argv = zmalloc(sizeof(robj*)*3);
        argv[0] = createStringObject("SET", 3);
        argv[1] = c->argv[1];
        argv[2] = c->argv[2];
        int argc = 3;
        struct redisCommand *cmd = lookupCommand("SET");
        multiCmd *mc = zmalloc(sizeof(multiCmd));
        mc->argv = argv;
        mc->argc = argc;
        mc->cmd = cmd;
        c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count + 1));
        c->mstate.commands[c->mstate.count++] = *mc;
        addReply(c, shared.queued);
        return;
    }
    // 非事务状态下的SET命令执行逻辑省略
}
  1. 首先检查客户端是否处于事务状态(c->flags & CLIENT_MULTI)。如果是,则为 multiCmd 结构体分配内存,并将命令参数和命令指针填充到 multiCmd 结构体中。
  2. 然后将 multiCmd 结构体添加到 client 结构体的 mstate.commands 数组中,并增加 mstate.count 的值。
  3. 最后向客户端返回 QUEUED 响应,表示命令已成功入队。

EXEC命令

当客户端发送 EXEC 命令时,Redis 服务器会执行 execCommand 函数,该函数定义在 t_mset.c 文件中。主要逻辑如下:

void execCommand(client *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c, "EXEC without MULTI");
        return;
    }
    unwatchAllKeys(c);
    c->flags &= ~CLIENT_MULTI;
    if (c->mstate.count == 0) {
        addReply(c, shared.nullmultibulk);
        return;
    }
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    for (j = 0; j < c->mstate.count; j++) {
        multiCmd *mc = c->mstate.commands + j;
        c->argc = mc->argc;
        c->argv = mc->argv;
        c->cmd = mc->cmd;
        call(c, CMD_CALL_FULL);
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    resetClientMultiState(c);
    // 处理事务结果并回复客户端省略
}
  1. 首先检查客户端是否处于事务状态。如果客户端没有通过 MULTI 命令开启事务而直接发送 EXEC 命令,向客户端返回错误信息。
  2. 调用 unwatchAllKeys(c) 函数取消客户端对所有键的监视(如果客户端使用了 WATCH 命令)。
  3. 清除客户端的 CLIENT_MULTI 标志,表示事务结束。
  4. 如果事务队列中没有命令(c->mstate.count == 0),向客户端返回空的多批量回复(nullmultibulk)。
  5. 遍历事务队列中的所有命令,将当前命令的参数和命令指针赋值给客户端的 argvargccmd 字段,然后调用 call 函数执行命令。call 函数会根据命令类型执行相应的命令处理逻辑。
  6. 执行完所有命令后,恢复客户端原来的 argvargccmd 字段,并调用 resetClientMultiState(c) 函数重置客户端的事务状态。
  7. 最后处理事务执行结果并回复客户端,这部分逻辑省略。

DISCARD命令

当客户端发送 DISCARD 命令时,Redis 服务器会执行 discardCommand 函数,该函数定义在 t_mset.c 文件中。主要逻辑如下:

void discardCommand(client *c) {
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c, "DISCARD without MULTI");
        return;
    }
    unwatchAllKeys(c);
    c->flags &= ~CLIENT_MULTI;
    resetClientMultiState(c);
    addReply(c, shared.ok);
}
  1. 首先检查客户端是否处于事务状态。如果客户端没有通过 MULTI 命令开启事务而直接发送 DISCARD 命令,向客户端返回错误信息。
  2. 调用 unwatchAllKeys(c) 函数取消客户端对所有键的监视。
  3. 清除客户端的 CLIENT_MULTI 标志,表示事务取消。
  4. 调用 resetClientMultiState(c) 函数重置客户端的事务状态,释放事务队列等相关资源。
  5. 向客户端返回 OK 响应,表示事务已成功取消。

Redis事务的原子性与一致性

原子性保证

Redis 事务的原子性是通过将多个命令放入事务队列,然后在 EXEC 命令执行时顺序执行队列中的命令来实现的。在执行 EXEC 命令期间,Redis 服务器不会中断事务的执行去处理其他客户端的请求,直到事务中的所有命令都执行完毕或者出现错误。

例如,假设有两个客户端同时对同一个键进行操作,客户端 A 开启事务并执行 INCR 命令,客户端 B 也尝试对同一个键执行 SET 命令。由于 Redis 单线程的特性,在客户端 A 执行 EXEC 命令期间,客户端 B 的命令不会被执行,从而保证了客户端 A 事务中 INCR 命令的原子性。

一致性保证

  1. 入队错误:在事务命令入队过程中,如果某个命令的格式不正确,例如参数个数错误等,Redis 会将该命令入队失败,并向客户端返回错误信息。在执行 EXEC 命令时,整个事务会被取消,不会执行任何命令,从而保证了数据的一致性。例如:
import redis

r = redis.Redis(host='localhost', port=6379, db=0)
pipe = r.pipeline()
# 错误的命令,SET命令应该有两个参数
pipe.execute_command('SET', 'key1')
try:
    result = pipe.execute()
except redis.ResponseError as e:
    print(f"Error: {e}")

在上述 Python 代码中,故意向事务管道中添加一个参数个数错误的 SET 命令,执行 execute() 方法时会捕获到 ResponseError 异常,事务不会执行。

  1. 运行时错误:如果在事务执行过程中(即 EXEC 命令执行期间)某个命令执行失败,例如对一个非数字类型的键执行 INCR 命令,Redis 会继续执行事务中的其他命令,不会回滚已经执行的命令。这是因为 Redis 设计事务主要是为了方便用户将多个命令组合在一起执行,而不是像关系型数据库那样提供严格的事务回滚机制。虽然这种方式在某些情况下可能导致数据不一致,但在大多数 Redis 的应用场景中,用户可以通过业务逻辑来处理这种情况。例如:
import redis

r = redis.Redis(host='localhost', port=6379, db=0)
pipe = r.pipeline()
pipe.set('key1', 'value1')
# 对非数字类型的键执行INCR命令,会失败
pipe.incr('key1')
pipe.set('key2', 'value2')
try:
    result = pipe.execute()
except redis.ResponseError as e:
    print(f"Error: {e}")

在上述代码中,key1 被设置为字符串类型的值 value1,然后尝试对其执行 INCR 命令会失败,但 key2SET 命令仍然会执行。

Redis事务与WATCH机制

WATCH命令原理

WATCH 命令用于监视一个或多个键。当执行 EXEC 命令时,如果被监视的键在事务开启后被其他客户端修改过,那么整个事务将被取消,EXEC 命令返回 nil,表示事务执行失败。

Redis 使用 watched_keys 链表来记录每个客户端监视的键。watched_keys 链表节点定义如下:

typedef struct watchedKey {
    client *client;  // 监视该键的客户端
    robj *key;       // 被监视的键
    struct watchedKey *next; // 指向下一个节点的指针
} watchedKey;

当客户端发送 WATCH 命令时,Redis 会将被监视的键添加到客户端的 watched_keys 链表中。例如,执行 WATCH key1 key2 命令,Redis 会为每个键创建一个 watchedKey 节点,并将其添加到客户端的 watched_keys 链表中。

键修改检测

在 Redis 执行写命令(如 SETDEL 等)时,除了执行正常的命令逻辑外,还会检查是否有客户端监视了该键。如果有,则会将所有监视该键的客户端的 flags 字段设置 CLIENT_DIRTY_CAS 标志,表示该客户端的事务可能因为键被修改而需要取消。

当客户端执行 EXEC 命令时,Redis 会检查客户端的 flags 字段是否设置了 CLIENT_DIRTY_CAS 标志。如果设置了,则取消事务的执行,并向客户端返回 nil

以下是一个使用 WATCH 命令的示例:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 客户端1监视key1
r.watch('key1')
value1 = r.get('key1')
# 客户端2修改key1
r.set('key1', 'new_value')
pipe = r.pipeline()
pipe.multi()
pipe.incr('key1')
try:
    result = pipe.execute()
except redis.WatchError:
    print("Transaction cancelled due to key modification")

在上述 Python 代码中,客户端 1 首先使用 watch 方法监视 key1,获取 key1 的值后,客户端 2 修改了 key1 的值。此时客户端 1 执行事务时会捕获到 WatchError 异常,表示事务由于键被修改而取消。

Redis事务在集群模式下的实现

集群模式概述

Redis 集群是一种分布式部署方式,它将数据分布在多个节点上,以实现高可用性和扩展性。在 Redis 集群中,数据根据键的哈希值分布到不同的节点上,每个节点负责一部分数据。

事务实现差异

  1. 命令路由:在集群模式下,客户端发送的事务命令需要经过路由才能到达正确的节点。当客户端发送 MULTI 命令时,集群节点会检查事务中的所有命令,确保这些命令操作的键都在同一个节点上(即符合“同槽规则”)。如果不符合,集群节点会向客户端返回错误信息。
  2. 同步复制:为了保证事务的一致性,Redis 集群引入了 minreplicasminreplicas_timeout 配置参数。当执行 EXEC 命令时,主节点会等待至少 minreplicas 个从节点同步完事务数据后才向客户端返回成功响应。如果等待时间超过 minreplicas_timeout,主节点会向客户端返回错误信息。

例如,在 Redis 集群配置文件中,可以设置 minreplicas 2minreplicas_timeout 5,表示执行事务时需要等待至少 2 个从节点同步,等待时间为 5 秒。

代码示例(集群模式下的事务操作)

使用 redis - py - cluster 库进行集群模式下的事务操作示例:

from rediscluster import RedisCluster

# 初始化集群客户端
startup_nodes = [{"host": "127.0.0.1", "port": "7000"}]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# 开启事务
pipe = rc.pipeline()

# 加入事务命令
pipe.set('key1', 'value1')
pipe.get('key1')

# 执行事务
try:
    result = pipe.execute()
    print(result)
except Exception as e:
    print(f"Error: {e}")

在上述代码中,使用 redis - py - cluster 库连接 Redis 集群,并进行事务操作。如果事务中的命令不符合“同槽规则”,执行 execute() 方法时会抛出异常。

通过对 Redis 事务底层原理的剖析,我们了解了 Redis 事务的实现机制、数据结构、命令执行过程以及在不同场景下的特点。这有助于开发者更好地利用 Redis 事务来构建可靠、高效的应用程序。无论是在单节点环境还是集群模式下,掌握 Redis 事务的原理对于优化应用性能和保证数据一致性都具有重要意义。