MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Redis实现分布式锁保障MySQL数据一致性

2023-12-287.4k 阅读

一、数据库数据一致性问题背景

在现代分布式系统中,多个服务可能同时对 MySQL 数据库进行读写操作,这就容易引发数据一致性问题。例如,在电商场景下,库存扣减是一个常见操作。当多个用户同时下单购买同一款商品时,如果没有合适的并发控制机制,可能会导致库存超卖的情况。假设当前商品库存为 10 件,两个用户同时发起购买请求,每个请求要购买 1 件商品。在并发环境下,如果没有控制,两个请求可能都读取到库存为 10 件,然后各自进行库存减 1 的操作,最终库存变为 9 件,但实际上应该变为 8 件,这就出现了数据不一致。

传统的单机系统可以通过数据库的事务机制来保证数据一致性。例如,在 MySQL 中,可以使用 BEGINCOMMITROLLBACK 语句来定义一个事务。当执行一个事务时,数据库会保证该事务内的所有操作要么全部成功,要么全部失败。然而,在分布式系统中,由于涉及多个节点和服务,单纯依靠数据库事务往往难以满足需求。因为不同服务可能部署在不同的服务器上,访问不同的数据库实例,这就需要一种分布式的机制来协调各个服务之间的操作,保证数据一致性。

二、分布式锁概念与作用

2.1 分布式锁的定义

分布式锁是一种在分布式环境下,用于控制多个进程或线程对共享资源访问的机制。与单机锁(如 Java 中的 synchronized 关键字或 ReentrantLock)不同,分布式锁需要在多个节点之间进行协调,确保在同一时刻只有一个节点能够获取到锁,从而访问共享资源。

2.2 分布式锁在保障数据一致性中的作用

在分布式系统中,当多个服务需要对 MySQL 数据库中的同一数据进行修改操作时,通过分布式锁可以保证在同一时间只有一个服务能够进行修改,其他服务需要等待锁的释放。这样就避免了多个服务同时修改数据导致的数据不一致问题。例如,在上述库存扣减场景中,当一个服务获取到分布式锁后,才可以进行库存扣减操作,其他服务在锁未释放时无法进行相同操作,从而保证了库存数据的一致性。

三、Redis 实现分布式锁原理

3.1 Redis 特性适合实现分布式锁

Redis 是一个基于内存的高性能键值对数据库,具有以下特性使其非常适合实现分布式锁:

  • 原子性操作:Redis 提供了一系列原子性操作,如 SETNX(SET if Not eXists)。SETNX key value 命令只有在键 key 不存在时,才会将键 key 的值设置为 value,并返回 1;如果键 key 已经存在,则不做任何操作,返回 0。这种原子性操作保证了在多节点环境下,锁的获取操作是线程安全的。
  • 高可用性:Redis 可以通过主从复制和 Sentinel 机制实现高可用性。主从复制可以将数据复制到多个从节点,提高读取性能和数据冗余;Sentinel 机制可以监控主节点的状态,当主节点出现故障时,自动将一个从节点提升为主节点,保证系统的可用性。这对于分布式锁来说非常重要,因为如果锁服务不可用,整个分布式系统的并发控制就会失效。
  • 高性能:由于 Redis 数据存储在内存中,读写速度非常快。在分布式锁场景下,快速的锁获取和释放操作可以减少服务等待时间,提高系统的并发性能。

3.2 基于 Redis 的分布式锁实现原理

基于 Redis 实现分布式锁的基本原理是利用 Redis 的原子性操作来模拟锁的获取和释放过程。具体步骤如下:

  1. 获取锁:客户端尝试使用 SETNX 命令设置一个特定的键值对,键通常是锁的名称,值可以是一个唯一标识(如客户端的唯一 ID 或时间戳)。如果 SETNX 命令返回 1,表示获取锁成功;如果返回 0,表示锁已被其他客户端获取,获取锁失败。
  2. 释放锁:当客户端完成对共享资源的操作后,需要释放锁。释放锁可以通过删除 Redis 中对应的键来实现。但在释放锁时需要注意,只能删除自己获取的锁,避免误删其他客户端获取的锁。可以在获取锁时记录下自己设置的锁值,在释放锁时先验证锁值是否匹配,只有匹配时才进行删除操作。

四、Redis 分布式锁实现代码示例

4.1 使用 Java 和 Jedis 实现 Redis 分布式锁

以下是使用 Java 和 Jedis 库实现 Redis 分布式锁的代码示例:

import redis.clients.jedis.Jedis;

public class RedisDistributedLock {
    private static final String LOCK_KEY = "my_distributed_lock";
    private static final String LOCK_VALUE = System.currentTimeMillis() + ":" + Thread.currentThread().getName();
    private static final int LOCK_EXPIRE_TIME = 10 * 1000; // 10 seconds
    private static final int WAIT_TIME = 10 * 1000; // 10 seconds

    public static boolean tryLock(Jedis jedis) {
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < WAIT_TIME) {
            if ("OK".equals(jedis.set(LOCK_KEY, LOCK_VALUE, "NX", "EX", LOCK_EXPIRE_TIME / 1000))) {
                return true;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return false;
    }

    public static void unlock(Jedis jedis) {
        if (LOCK_VALUE.equals(jedis.get(LOCK_KEY))) {
            jedis.del(LOCK_KEY);
        }
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        if (tryLock(jedis)) {
            try {
                // 模拟业务操作
                System.out.println("Lock acquired, doing business logic...");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                unlock(jedis);
                System.out.println("Lock released.");
            }
        } else {
            System.out.println("Failed to acquire lock.");
        }
        jedis.close();
    }
}

在上述代码中:

  • tryLock 方法尝试获取锁,它使用 jedis.set 方法并传入 NX(表示只有键不存在时才设置)和 EX(表示设置过期时间,单位为秒)选项来模拟 SETNX 命令。如果设置成功,返回 true;如果在等待时间内未获取到锁,则返回 false
  • unlock 方法用于释放锁,它先验证当前锁的值是否与自己设置的值相同,只有相同才删除锁键,避免误删其他客户端的锁。

4.2 使用 Python 和 redis - py 实现 Redis 分布式锁

以下是使用 Python 和 redis - py 库实现 Redis 分布式锁的代码示例:

import redis
import time

LOCK_KEY ='my_distributed_lock'
LOCK_VALUE = str(time.time()) + ':' + str(id(redis))
LOCK_EXPIRE_TIME = 10  # 10 seconds
WAIT_TIME = 10  # 10 seconds


def try_lock(redis_client):
    start_time = time.time()
    while time.time() - start_time < WAIT_TIME:
        if redis_client.set(LOCK_KEY, LOCK_VALUE, nx=True, ex=LOCK_EXPIRE_TIME):
            return True
        time.sleep(0.1)
    return False


def unlock(redis_client):
    if redis_client.get(LOCK_KEY).decode('utf - 8') == LOCK_VALUE:
        redis_client.delete(LOCK_KEY)


if __name__ == '__main__':
    r = redis.Redis(host='localhost', port=6379, db=0)
    if try_lock(r):
        try:
            print('Lock acquired, doing business logic...')
            time.sleep(5)
        finally:
            unlock(r)
            print('Lock released.')
    else:
        print('Failed to acquire lock.')

在 Python 代码中:

  • try_lock 函数通过 redis_client.set 方法的 nxex 参数实现类似 SETNX 并设置过期时间的功能。如果成功获取锁,返回 True;否则在等待时间内尝试获取,超时返回 False
  • unlock 函数验证锁值并删除锁键来释放锁。

五、Redis 分布式锁的问题与解决方案

5.1 锁的过期时间问题

在前面的代码示例中,我们设置了锁的过期时间,这是为了防止因程序异常导致锁无法释放,从而造成死锁。然而,如果业务操作的执行时间超过了锁的过期时间,就会出现问题。例如,一个业务操作预计执行 5 秒,但锁的过期时间设置为 3 秒,当锁过期后,其他客户端可能会获取到锁并开始操作,而此时第一个客户端的业务操作还未完成,这就会导致数据不一致。

解决方案

  • 自动续期:可以使用 Redisson 等框架来实现锁的自动续期功能。Redisson 提供了一个看门狗机制,当客户端获取到锁后,会启动一个后台线程,定期检查锁的剩余时间。如果剩余时间小于一定阈值,就会自动延长锁的过期时间,保证在业务操作完成前锁不会过期。
  • 合理设置过期时间:在实际应用中,需要对业务操作的执行时间进行预估,并根据预估结果合理设置锁的过期时间。可以通过统计历史数据或进行性能测试来确定一个合适的过期时间。同时,为了防止极端情况下业务操作执行时间过长,可以在业务代码中添加超时处理逻辑,当操作超过一定时间时,主动放弃操作并释放锁。

5.2 主从复制导致的锁丢失问题

在 Redis 主从复制架构中,当主节点接收到客户端的锁设置请求并成功设置锁后,还未来得及将锁数据同步到从节点,此时主节点发生故障,Sentinel 机制会将一个从节点提升为主节点。由于新的主节点没有之前的锁数据,其他客户端就可以再次获取锁,从而导致锁丢失,数据一致性被破坏。

解决方案

  • Redlock 算法:Redis 作者提出了 Redlock 算法来解决这个问题。Redlock 算法的基本思想是使用多个独立的 Redis 实例(至少 5 个),客户端在获取锁时,需要向大多数(N/2 + 1,N 为 Redis 实例总数)的 Redis 实例发送锁设置请求。只有当大多数实例都成功设置锁时,客户端才认为获取锁成功。在释放锁时,需要向所有实例发送释放锁请求。这样即使某个实例出现故障,也不会影响锁的一致性。 以下是使用 Redlock 的 Python 代码示例(假设已安装 redlock - py 库):
from redlock import Redlock

lock_key ='my_distributed_lock'
resource = 'example_resource'
redlock = Redlock([{
    "host": "localhost",
    "port": 6379,
    "db": 0
}, {
    "host": "localhost",
    "port": 6380,
    "db": 0
}, {
    "host": "localhost",
    "port": 6381,
    "db": 0
}, {
    "host": "localhost",
    "port": 6382,
    "db": 0
}, {
    "host": "localhost",
    "port": 6383,
    "db": 0
}])
lock = redlock.lock(lock_key, 10000)
if lock:
    try:
        print('Lock acquired, doing business logic...')
        time.sleep(5)
    finally:
        redlock.unlock(lock)
        print('Lock released.')
else:
    print('Failed to acquire lock.')

在上述代码中,通过 Redlock 类连接多个 Redis 实例,并使用 lock 方法获取锁,使用 unlock 方法释放锁。

六、结合 Redis 分布式锁保障 MySQL 数据一致性实践

6.1 库存扣减场景实践

以电商库存扣减为例,假设我们有一个 MySQL 数据库表 products 存储商品信息,其中 stock 字段表示商品库存。

  1. 数据库表结构
CREATE TABLE products (
    id INT PRIMARY KEY AUTO_INCREMENT,
    product_name VARCHAR(255),
    stock INT
);
  1. Java 代码实现
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class StockDeduction {
    private static final String LOCK_KEY = "product_stock_lock";
    private static final String LOCK_VALUE = System.currentTimeMillis() + ":" + Thread.currentThread().getName();
    private static final int LOCK_EXPIRE_TIME = 10 * 1000; // 10 seconds
    private static final int WAIT_TIME = 10 * 1000; // 10 seconds

    public static boolean tryLock(Jedis jedis) {
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < WAIT_TIME) {
            if ("OK".equals(jedis.set(LOCK_KEY, LOCK_VALUE, "NX", "EX", LOCK_EXPIRE_TIME / 1000))) {
                return true;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return false;
    }

    public static void unlock(Jedis jedis) {
        if (LOCK_VALUE.equals(jedis.get(LOCK_KEY))) {
            jedis.del(LOCK_KEY);
        }
    }

    public static void deductStock(int productId, int quantity) {
        Jedis jedis = new Jedis("localhost", 6379);
        if (tryLock(jedis)) {
            Connection conn = null;
            PreparedStatement pstmt = null;
            try {
                Class.forName("com.mysql.cj.jdbc.Driver");
                conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/your_database", "root", "password");
                String sql = "UPDATE products SET stock = stock -? WHERE id =? AND stock >=?";
                pstmt = conn.prepareStatement(sql);
                pstmt.setInt(1, quantity);
                pstmt.setInt(2, productId);
                pstmt.setInt(3, quantity);
                int rowsUpdated = pstmt.executeUpdate();
                if (rowsUpdated == 0) {
                    System.out.println("Insufficient stock for product with id " + productId);
                } else {
                    System.out.println("Stock deducted successfully for product with id " + productId);
                }
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            } finally {
                if (pstmt!= null) {
                    try {
                        pstmt.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                if (conn!= null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                unlock(jedis);
                jedis.close();
            }
        } else {
            System.out.println("Failed to acquire lock for stock deduction.");
        }
    }

    public static void main(String[] args) {
        deductStock(1, 1);
    }
}

在上述代码中,deductStock 方法先尝试获取 Redis 分布式锁,获取成功后连接 MySQL 数据库执行库存扣减操作。通过 UPDATE 语句的条件判断 stock >=? 来确保库存充足时才进行扣减,并且在操作完成后释放锁。

6.2 订单创建场景实践

在电商订单创建场景中,可能会涉及到多个表的操作,如在 orders 表插入订单信息,在 order_items 表插入订单项信息,同时还要更新商品库存。为了保证数据一致性,同样需要使用 Redis 分布式锁。

  1. 数据库表结构
CREATE TABLE orders (
    id INT PRIMARY KEY AUTO_INCREMENT,
    user_id INT,
    order_date TIMESTAMP
);

CREATE TABLE order_items (
    id INT PRIMARY KEY AUTO_INCREMENT,
    order_id INT,
    product_id INT,
    quantity INT,
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (product_id) REFERENCES products(id)
);
  1. Python 代码实现
import redis
import time
import mysql.connector

LOCK_KEY = 'order_creation_lock'
LOCK_VALUE = str(time.time()) + ':' + str(id(redis))
LOCK_EXPIRE_TIME = 10
WAIT_TIME = 10


def try_lock(redis_client):
    start_time = time.time()
    while time.time() - start_time < WAIT_TIME:
        if redis_client.set(LOCK_KEY, LOCK_VALUE, nx=True, ex=LOCK_EXPIRE_TIME):
            return True
        time.sleep(0.1)
    return False


def unlock(redis_client):
    if redis_client.get(LOCK_KEY).decode('utf - 8') == LOCK_VALUE:
        redis_client.delete(LOCK_KEY)


def create_order(user_id, product_id, quantity):
    r = redis.Redis(host='localhost', port=6379, db=0)
    if try_lock(r):
        try:
            conn = mysql.connector.connect(
                host='localhost',
                user='root',
                password='password',
                database='your_database'
            )
            cursor = conn.cursor()
            # 插入订单
            order_sql = "INSERT INTO orders (user_id, order_date) VALUES (%s, NOW())"
            cursor.execute(order_sql, (user_id,))
            order_id = cursor.lastrowid
            # 插入订单项
            item_sql = "INSERT INTO order_items (order_id, product_id, quantity) VALUES (%s, %s, %s)"
            cursor.execute(item_sql, (order_id, product_id, quantity))
            # 更新库存
            stock_sql = "UPDATE products SET stock = stock - %s WHERE id = %s AND stock >= %s"
            cursor.execute(stock_sql, (quantity, product_id, quantity))
            if cursor.rowcount == 0:
                print("Insufficient stock for product with id", product_id)
                conn.rollback()
            else:
                conn.commit()
                print("Order created successfully")
        except mysql.connector.Error as err:
            print("Error: ", err)
            if conn.is_connected():
                conn.rollback()
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()
            unlock(r)
    else:
        print("Failed to acquire lock for order creation.")


if __name__ == '__main__':
    create_order(1, 1, 1)

在这个 Python 代码示例中,create_order 函数在获取 Redis 分布式锁后,依次执行插入订单、插入订单项和更新库存的操作。如果库存不足,会回滚整个事务,并且在操作完成后释放锁,确保订单创建过程中的数据一致性。

七、性能优化与监控

7.1 性能优化

  1. 减少锁的持有时间:尽量将业务逻辑中不需要锁保护的部分移出锁的范围。例如,在库存扣减场景中,如果有一些查询商品信息等只读操作,可以在获取锁之前进行,这样可以减少锁的持有时间,提高系统并发性能。
  2. 批量操作:对于一些可以批量执行的数据库操作,尽量进行批量处理。例如,在订单创建场景中,如果一次要创建多个订单项,可以将多个订单项的插入操作合并为一个批量插入语句,减少数据库交互次数,从而提高性能。
  3. 优化 Redis 配置:合理调整 Redis 的内存配置、持久化策略等参数。例如,对于分布式锁这种对性能要求较高的场景,可以选择 AOF(Append - Only - File)持久化方式,并设置合理的刷盘策略(如 appendfsync everysec),在保证数据安全性的同时尽量减少持久化对性能的影响。

7.2 监控

  1. Redis 监控:可以使用 Redis 自带的 INFO 命令来监控 Redis 的运行状态,包括内存使用情况、命令执行次数、连接数等。通过这些指标可以了解 Redis 是否存在性能瓶颈或异常情况。例如,如果发现 keyspace_hitskeyspace_misses 比例异常,可能表示缓存命中率有问题,需要优化缓存策略。
  2. 数据库监控:对于 MySQL 数据库,可以使用 SHOW STATUS 命令来获取数据库的运行状态信息,如查询执行次数、锁等待时间等。通过监控这些指标,可以及时发现数据库性能问题,如锁争用严重等情况,并针对性地进行优化。例如,如果发现 InnoDB_row_lock_time_avg 指标较高,说明行锁等待时间较长,可能需要优化 SQL 语句或调整数据库架构。
  3. 分布式锁监控:可以自定义一些监控指标来跟踪分布式锁的使用情况。例如,记录锁的获取成功率、锁的平均持有时间等。通过这些指标可以评估分布式锁对系统性能的影响,及时发现锁相关的问题,如锁获取失败频繁可能表示锁竞争过于激烈,需要调整锁的粒度或优化业务逻辑。

八、总结

通过使用 Redis 实现分布式锁,可以有效地保障 MySQL 数据在分布式系统中的一致性。在实际应用中,需要充分理解 Redis 分布式锁的原理、实现方式以及可能遇到的问题,并采取相应的解决方案。同时,要注重性能优化和监控,确保系统在高并发环境下能够稳定、高效地运行。在不同的业务场景中,如库存扣减、订单创建等,合理应用分布式锁机制,可以避免数据不一致问题,提升系统的可靠性和用户体验。