MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

基于分布式锁实现分布式事务

2021-12-285.7k 阅读

分布式事务概述

在传统的单体应用中,事务管理相对简单,通过数据库自身的事务机制,就能轻松保证数据的一致性和完整性。例如,在一个简单的银行转账操作中,从账户 A 向账户 B 转账 100 元,这涉及到两个数据库操作:从账户 A 中扣除 100 元,向账户 B 中增加 100 元。在单体应用里,我们可以使用数据库的事务,确保这两个操作要么都成功,要么都失败。

然而,随着业务的不断发展和系统规模的日益扩大,单体应用逐渐演变成分布式系统。在分布式系统中,不同的服务可能运行在不同的服务器上,并且可能使用不同的数据库。继续以银行转账为例,假设账户 A 和账户 B 分别由不同的服务管理,这两个服务可能部署在不同的服务器上,并且使用不同的数据库实例。此时,要保证转账操作的原子性,就不能简单地依赖单个数据库的事务机制,因为涉及到多个数据库的操作,这就引入了分布式事务的概念。

分布式事务旨在保证在分布式环境下,多个操作要么全部成功,要么全部失败,以维护数据的一致性。常见的分布式事务模型有 XA 事务模型、TCC(Try - Confirm - Cancel)模型、Saga 模型等。但每种模型都有其优缺点,例如 XA 事务模型虽然强一致性,但性能较差,对资源的锁定时间较长;TCC 模型虽然性能较好,但实现复杂,需要业务代码参与事务控制;Saga 模型适用于长事务,但恢复机制相对复杂。

分布式锁基础

分布式锁的概念

分布式锁是一种在分布式系统环境下,用于控制多个进程对共享资源进行访问的机制。它的作用类似于传统单进程应用中的锁,只不过应用场景从单进程扩展到了分布式系统中的多个进程。

以电商系统的库存扣减为例,当多个用户同时下单购买同一款商品时,如果没有合适的控制机制,就可能出现超卖的情况。通过使用分布式锁,在扣减库存操作前获取锁,只有获取到锁的进程才能执行库存扣减操作,其他进程则需要等待,这样就能保证库存扣减操作的原子性,避免超卖问题。

分布式锁的实现方式

  1. 基于数据库的实现方式
    • 表锁方式:在数据库中创建一张锁表,例如 lock_table,表结构可以包含 lock_key(唯一标识锁的键)、lock_value(锁的持有信息,如持有锁的进程 ID 等)、expire_time(锁的过期时间)等字段。当一个进程需要获取锁时,向锁表插入一条记录,例如:
    INSERT INTO lock_table (lock_key, lock_value, expire_time) VALUES ('product_stock_lock', 'process1', '2024 - 01 - 01 12:00:00') ON DUPLICATE KEY UPDATE lock_value = 'process1', expire_time = '2024 - 01 - 01 12:00:00';
    
    如果插入成功,说明获取锁成功;如果因为 lock_key 唯一约束导致插入失败,说明锁已被其他进程持有,获取锁失败。
    • 行锁方式:利用数据库的行锁机制,通过对特定行数据的锁定来实现分布式锁。例如,假设有一张 product 表,包含 product_idstock 等字段。当要对某个商品的库存进行操作时,可以通过以下 SQL 语句获取锁:
    SELECT * FROM product WHERE product_id = 1 FOR UPDATE;
    
    这条语句会对 product 表中 product_id 为 1 的行加锁,其他进程无法同时获取该行的锁,直到当前持有锁的进程提交事务释放锁。
  2. 基于 Redis 的实现方式 Redis 是一种高性能的内存数据库,支持原子性操作,非常适合实现分布式锁。常用的命令是 SETNX(SET if Not eXists),例如:
SETNX lock_key lock_value

如果 lock_key 不存在,该命令会设置 lock_key 的值为 lock_value 并返回 1,表示获取锁成功;如果 lock_key 已经存在,命令返回 0,表示获取锁失败。为了防止死锁,通常还会给锁设置一个过期时间,可以使用 EXPIRE 命令:

EXPIRE lock_key expire_time

或者在 Redis 2.6.12 及以上版本,可以使用 SET 命令的扩展参数同时设置值和过期时间:

SET lock_key lock_value EX expire_time NX
  1. 基于 ZooKeeper 的实现方式 ZooKeeper 是一个分布式协调服务,提供了树形结构的命名空间,通过在 ZooKeeper 上创建临时有序节点来实现分布式锁。当一个进程需要获取锁时,在指定的父节点下创建一个临时有序节点。例如,父节点为 /lock,进程创建的节点可能是 /lock/lock - 0000000001。然后获取父节点下所有子节点,并对节点编号进行排序。如果当前进程创建的节点编号最小,说明获取锁成功;否则,监听比自己编号小的前一个节点,当前一个节点被删除时,重新尝试获取锁。

基于分布式锁实现分布式事务的原理

核心思想

基于分布式锁实现分布式事务的核心思想是通过获取分布式锁,将分布式环境下的并发操作串行化,从而保证事务的原子性。在分布式系统中,不同的服务或节点在执行事务相关操作前,先尝试获取分布式锁。只有获取到锁的节点才能继续执行事务操作,其他节点等待锁的释放。这样就避免了多个节点同时对共享资源进行操作导致的数据不一致问题。

以一个简单的订单创建和库存扣减的分布式事务为例,订单服务和库存服务在不同的节点上。订单服务在创建订单前,先获取分布式锁,库存服务在扣减库存前同样获取分布式锁。由于分布式锁的唯一性,同一时间只有一个服务能获取到锁,从而保证了订单创建和库存扣减这两个操作的原子性。

事务流程

  1. 初始化阶段:在分布式事务开始前,各个参与事务的服务需要确定要操作的共享资源,并确定用于分布式锁的锁键。例如,在上述订单创建和库存扣减的场景中,锁键可以是商品的唯一标识,如 product_id
  2. 获取锁阶段:每个服务按照预定的策略尝试获取分布式锁。以 Redis 实现的分布式锁为例,服务通过执行 SET lock_key lock_value EX expire_time NX 命令来获取锁。如果获取成功,服务可以继续执行后续的事务操作;如果获取失败,服务需要等待一段时间后重试,或者根据业务需求进行相应的处理,如返回错误信息给用户。
  3. 事务执行阶段:获取到锁的服务开始执行事务操作。例如,订单服务创建订单记录,库存服务扣减库存。这些操作需要保证在锁的持有期间内完成,以确保数据的一致性。
  4. 释放锁阶段:事务操作完成后,无论是成功还是失败,持有锁的服务都需要释放分布式锁。在 Redis 中,通过执行 DEL lock_key 命令来释放锁。这样其他等待锁的服务就有机会获取锁并执行相应的事务操作。

代码示例(基于 Redis 分布式锁实现简单分布式事务)

环境搭建

  1. 安装 Redis:可以通过官方网站下载 Redis 安装包,并按照安装指南进行安装。例如,在 Linux 系统下,可以使用以下命令安装 Redis:
wget http://download.redis.io/releases/redis - 6.2.6.tar.gz
tar xzf redis - 6.2.6.tar.gz
cd redis - 6.2.6
make
sudo make install
  1. 安装开发依赖:假设我们使用 Python 语言进行开发,需要安装 redis - py 库来操作 Redis。可以使用 pip 进行安装:
pip install redis

代码实现

import redis
import time


class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, expire_time=10):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.expire_time = expire_time

    def acquire(self):
        result = self.redis_client.set(self.lock_key, "locked", ex=self.expire_time, nx=True)
        return result

    def release(self):
        self.redis_client.delete(self.lock_key)


def create_order(product_id, quantity):
    redis_client = redis.Redis(host='localhost', port=6379, db = 0)
    lock = RedisDistributedLock(redis_client, f'product_{product_id}_lock')

    try:
        if lock.acquire():
            # 模拟订单创建操作
            print(f"成功获取锁,开始创建订单,商品 {product_id},数量 {quantity}")
            # 这里可以添加实际的订单创建逻辑,如写入数据库等
            time.sleep(2)
            print(f"订单创建成功,商品 {product_id},数量 {quantity}")
        else:
            print(f"获取锁失败,无法创建订单,商品 {product_id},数量 {quantity}")
    finally:
        lock.release()


def deduct_stock(product_id, quantity):
    redis_client = redis.Redis(host='localhost', port=6379, db = 0)
    lock = RedisDistributedLock(redis_client, f'product_{product_id}_lock')

    try:
        if lock.acquire():
            # 模拟库存扣减操作
            print(f"成功获取锁,开始扣减库存,商品 {product_id},数量 {quantity}")
            # 这里可以添加实际的库存扣减逻辑,如更新数据库库存字段等
            time.sleep(2)
            print(f"库存扣减成功,商品 {product_id},数量 {quantity}")
        else:
            print(f"获取锁失败,无法扣减库存,商品 {product_id},数量 {quantity}")
    finally:
        lock.release()


# 模拟分布式事务,先创建订单,再扣减库存
def simulate_distributed_transaction(product_id, quantity):
    create_order(product_id, quantity)
    deduct_stock(product_id, quantity)


if __name__ == "__main__":
    simulate_distributed_transaction(1, 5)


在上述代码中,RedisDistributedLock 类封装了基于 Redis 的分布式锁操作,包括获取锁和释放锁。create_orderdeduct_stock 函数分别模拟了订单创建和库存扣减操作,在操作前先获取分布式锁,操作完成后释放锁。simulate_distributed_transaction 函数模拟了一个简单的分布式事务,先调用 create_order 创建订单,再调用 deduct_stock 扣减库存。

优点与局限性

优点

  1. 简单易懂:基于分布式锁实现分布式事务的原理相对直观,对于熟悉锁机制的开发者来说容易理解和实现。在代码示例中,通过简单的 Redis 操作实现分布式锁,进而控制事务操作,代码逻辑清晰。
  2. 兼容性好:分布式锁的实现方式多样,如基于数据库、Redis、ZooKeeper 等,能够适应不同的分布式系统架构和技术栈。例如,在已经使用 Redis 作为缓存的系统中,基于 Redis 实现分布式锁可以复用现有的 Redis 资源,降低系统复杂度。
  3. 部分保证一致性:在获取锁期间,能有效避免并发操作对共享资源的影响,从而在一定程度上保证数据的一致性。例如在库存扣减场景中,通过分布式锁可以防止超卖现象,保证库存数据的准确性。

局限性

  1. 性能问题:由于分布式锁将并发操作串行化,在高并发场景下,大量的进程等待获取锁会导致性能瓶颈。例如,在电商大促期间,大量的订单创建和库存扣减请求同时到达,获取锁的竞争激烈,会严重影响系统的响应时间。
  2. 死锁风险:虽然可以通过设置锁的过期时间来避免死锁,但如果在锁的过期时间内事务操作未完成,可能会导致数据不一致。例如,一个进程获取锁后,在执行事务操作过程中出现故障,未能及时释放锁,而锁过期后其他进程获取锁继续操作,可能会导致数据冲突。
  3. 锁的可靠性:分布式锁依赖于特定的存储系统(如数据库、Redis、ZooKeeper 等),如果这些存储系统出现故障,可能会导致锁机制失效,进而影响分布式事务的正常执行。例如,Redis 集群出现网络分区等问题时,可能会导致分布式锁的获取和释放出现异常。

优化策略

优化锁的获取与释放

  1. 优化重试机制:在获取锁失败时,采用合理的重试策略可以提高获取锁的成功率。例如,使用指数退避算法,即每次重试的时间间隔逐渐增大。以下是 Python 实现的指数退避重试获取锁的代码示例:
import redis
import time


class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, expire_time=10):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.expire_time = expire_time

    def acquire_with_backoff(self, max_retries = 5):
        base_delay = 1
        for i in range(max_retries):
            result = self.redis_client.set(self.lock_key, "locked", ex=self.expire_time, nx=True)
            if result:
                return True
            delay = base_delay * (2 ** i)
            time.sleep(delay)
        return False

    def release(self):
        self.redis_client.delete(self.lock_key)


  1. 异步释放锁:在事务操作完成后,可以考虑使用异步方式释放锁,以减少事务操作的整体时间。例如,在 Python 中可以使用 asyncio 库实现异步释放锁:
import asyncio
import redis


class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, expire_time=10):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.expire_time = expire_time

    async def acquire(self):
        result = self.redis_client.set(self.lock_key, "locked", ex=self.expire_time, nx=True)
        return result

    async def release_async(self):
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self.redis_client.delete, self.lock_key)


提高锁的可靠性

  1. 多副本与故障转移:对于基于 Redis 实现的分布式锁,可以使用 Redis 集群的多副本机制,提高锁的可靠性。当主节点出现故障时,从节点可以自动晋升为主节点,保证锁服务的可用性。同样,对于基于数据库实现的分布式锁,可以采用主从复制和故障转移机制,确保在数据库节点故障时,锁机制仍然能够正常工作。
  2. 心跳检测:在持有锁的进程中,可以引入心跳检测机制,定期向锁存储系统发送心跳消息,表明自己仍然在正常运行。如果锁存储系统在一定时间内未收到心跳消息,可以认为持有锁的进程出现故障,自动释放锁,避免死锁情况的发生。以下是一个简单的心跳检测示例代码:
import redis
import threading
import time


class Heartbeat:
    def __init__(self, redis_client, lock_key, interval = 5):
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.interval = interval
        self.thread = threading.Thread(target = self.send_heartbeat)
        self.thread.daemon = True
        self.thread.start()

    def send_heartbeat(self):
        while True:
            self.redis_client.set(self.lock_key, "alive", ex = self.interval * 2)
            time.sleep(self.interval)


在上述代码中,Heartbeat 类启动一个线程,每隔 interval 时间向 Redis 发送一次心跳消息,更新锁的状态,确保锁不会因为进程故障而长时间被持有。

实际应用场景与案例分析

电商场景

在电商系统中,分布式事务处理非常常见。例如,在用户下单购买商品时,涉及到订单创建、库存扣减、支付处理等多个操作,这些操作可能分布在不同的服务中。

以某电商平台为例,该平台在处理订单时,使用基于 Redis 分布式锁实现分布式事务。当用户下单后,订单服务首先获取商品对应的分布式锁,然后创建订单记录。接着,库存服务获取相同的分布式锁,进行库存扣减操作。如果库存扣减成功,支付服务获取锁并处理支付流程。通过这种方式,保证了订单、库存和支付之间的数据一致性。在高并发的促销活动中,虽然获取锁的竞争较为激烈,但通过优化重试机制和异步释放锁等策略,系统依然能够稳定运行,有效避免了超卖和订单数据不一致等问题。

金融场景

在金融领域,分布式事务的一致性要求更为严格。例如,在跨行转账业务中,涉及到转出银行和转入银行的多个操作。

假设 A 银行向 B 银行转账,A 银行的转账服务和 B 银行的收款服务需要通过分布式事务来保证数据的一致性。该业务采用基于数据库的分布式锁实现分布式事务。A 银行在进行转账操作前,先获取数据库锁,锁定转出账户的资金,然后进行转账操作。B 银行在收到转账请求后,获取相同的数据库锁,进行收款操作。通过这种方式,确保了跨行转账过程中资金的准确性和一致性。同时,为了提高可靠性,数据库采用了主从复制和故障转移机制,防止因数据库故障导致分布式事务失败。

综上所述,基于分布式锁实现分布式事务在实际应用中有广泛的场景,通过合理的优化策略和技术选型,可以有效解决分布式系统中的数据一致性问题,提升系统的稳定性和可靠性。但同时也需要注意其性能瓶颈和可靠性风险等局限性,在实际应用中进行权衡和优化。