MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

基于分布式锁的资源竞争控制策略

2024-03-085.6k 阅读

分布式系统中的资源竞争问题

在分布式系统中,多个节点可能同时尝试访问和修改共享资源,这就导致了资源竞争的问题。例如,在电商系统中,库存是一种共享资源,当多个用户同时下单购买同一款商品时,如果不加以控制,就可能出现超卖的情况。

传统单机锁在分布式系统中的局限性

在单机环境下,我们可以使用诸如互斥锁(Mutex)、读写锁(Read - Write Lock)等机制来控制对共享资源的访问。例如,在 Java 中,我们可以使用 synchronized 关键字或者 ReentrantLock 来实现线程安全的操作。

// 使用 synchronized 关键字
public class SynchronizedExample {
    private static int count = 0;

    public static synchronized void increment() {
        count++;
    }
}

// 使用 ReentrantLock
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();

    public static void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

然而,在分布式系统中,这些单机锁机制无法工作。因为分布式系统由多个独立的节点组成,每个节点都有自己独立的内存空间,单机锁只能保证在单个 JVM 进程内的线程安全,无法跨节点进行同步。

分布式锁的概念与原理

分布式锁是一种用于在分布式系统中控制对共享资源访问的机制。其核心原理是通过在多个节点之间共享一个锁资源,只有获取到该锁的节点才能访问共享资源。

分布式锁的特性

  1. 互斥性:这是分布式锁最基本的特性,同一时刻只能有一个节点获取到锁,从而保证对共享资源的互斥访问。
  2. 可重入性:同一个节点在持有锁的情况下,可以再次获取锁,而不会造成死锁。例如,一个方法调用另一个需要相同锁的方法时,可重入性确保该节点不会被自己阻塞。
  3. 高可用性:分布式锁服务应该具备高可用性,即使部分节点出现故障,也不能影响锁的正常获取和释放,以保证系统的正常运行。
  4. 容错性:在网络分区、节点故障等异常情况下,分布式锁系统应该能够正确处理,避免出现锁无法获取或释放的情况。

实现分布式锁的常见方式

  1. 基于数据库:可以通过在数据库中创建一个表,表中包含锁的状态信息。例如,创建一个 locks 表,有 lock_keylock_status 字段,获取锁时插入一条记录,释放锁时删除记录。
-- 创建 locks 表
CREATE TABLE locks (
    lock_key VARCHAR(255) PRIMARY KEY,
    lock_status BOOLEAN DEFAULT FALSE
);

-- 获取锁
INSERT INTO locks (lock_key, lock_status) VALUES ('resource_lock', TRUE) ON DUPLICATE KEY UPDATE lock_status = FALSE;

-- 释放锁
DELETE FROM locks WHERE lock_key ='resource_lock';
  1. 基于缓存(如 Redis):Redis 提供了原子操作命令,如 SETNX(SET if Not eXists)可以用来实现分布式锁。SETNX key value 当且仅当 key 不存在时,将 key 的值设为 value ,返回 1;若 key 已经存在,则 SETNX 不做任何动作,返回 0 。
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)

def acquire_lock(lock_key, value):
    return r.setnx(lock_key, value)

def release_lock(lock_key):
    r.delete(lock_key)
  1. 基于 ZooKeeper:ZooKeeper 是一个分布式协调服务,它通过树形结构来管理数据。可以利用 ZooKeeper 的临时顺序节点特性来实现分布式锁。当一个节点获取锁时,在指定路径下创建一个临时顺序节点,所有节点获取锁时创建的节点按顺序排列,序号最小的节点获得锁。当持有锁的节点释放锁时,对应的临时节点会被删除,后续序号最小的节点获得锁。
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZooKeeperDistributedLock implements Watcher {

    private static final String ZK_SERVERS = "localhost:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String LOCK_PATH = "/locks";
    private ZooKeeper zk;
    private String lockNode;
    private CountDownLatch latch;

    public ZooKeeperDistributedLock() throws IOException, InterruptedException {
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper(ZK_SERVERS, SESSION_TIMEOUT, this);
        latch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
            latch.countDown();
        }
    }

    public boolean acquireLock() throws KeeperException, InterruptedException {
        lockNode = zk.create(LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        List<String> children = zk.getChildren(LOCK_PATH, false);
        Collections.sort(children);
        String firstLock = LOCK_PATH + "/" + children.get(0);
        if (lockNode.equals(firstLock)) {
            return true;
        } else {
            int index = children.indexOf(lockNode.substring(LOCK_PATH.length() + 1));
            String waitPath = LOCK_PATH + "/" + children.get(index - 1);
            Stat stat = zk.exists(waitPath, true);
            if (stat == null) {
                return acquireLock();
            }
            this.latch = new CountDownLatch(1);
            this.latch.await();
            return acquireLock();
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        zk.delete(lockNode, -1);
        zk.close();
    }
}

基于分布式锁的资源竞争控制策略

乐观锁与悲观锁策略

  1. 悲观锁策略:悲观锁认为在任何时候,共享资源都可能被其他节点修改,所以在访问共享资源前,先获取锁,确保在自己访问期间没有其他节点修改资源。例如,在基于 Redis 的分布式锁实现中,先使用 SETNX 获取锁,获取成功后再对资源进行操作。
import redis

r = redis.Redis(host='localhost', port=6379, db = 0)

def update_resource_with_pessimistic_lock(resource_key, new_value):
    lock_key = "lock:" + resource_key
    lock_value = "unique_value"
    if acquire_lock(lock_key, lock_value):
        try:
            current_value = r.get(resource_key)
            # 这里进行实际的资源更新操作
            r.set(resource_key, new_value)
        finally:
            release_lock(lock_key)
        return True
    else:
        return False
  1. 乐观锁策略:乐观锁假设在大多数情况下,共享资源不会被其他节点修改。因此,在读取资源时并不加锁,而是在更新资源时,检查资源是否在读取后被其他节点修改过。通常使用版本号或者时间戳来实现。例如,在数据库中,可以为表增加一个 version 字段,每次更新时 version 自增。
-- 创建资源表
CREATE TABLE resources (
    id INT PRIMARY KEY,
    data VARCHAR(255),
    version INT DEFAULT 0
);

-- 更新资源
UPDATE resources
SET data = 'new_data', version = version + 1
WHERE id = 1 AND version = 0;

在分布式系统中,基于缓存实现乐观锁时,可以在缓存中存储资源的版本号。

def update_resource_with_optimistic_lock(resource_key, new_value):
    current_value, version = r.get(resource_key), r.get(resource_key + ":version")
    if current_value is not None:
        new_version = int(version) + 1
        if r.set(resource_key + ":version", new_version, condition=redis.WatchCondition.EQUAL, watch_keys=[resource_key + ":version"]):
            r.set(resource_key, new_value)
            return True
    return False

锁的粒度控制策略

  1. 粗粒度锁:粗粒度锁是对整个共享资源集合加锁。例如,在一个电商系统中,如果将所有商品的库存视为一个共享资源,对其加锁,那么当一个用户下单时,会锁住整个库存,其他用户都无法下单,直到锁被释放。这种方式实现简单,但并发性能较差,因为同一时间只有一个节点能访问共享资源。
def place_order_with_coarse_lock(product_id, quantity):
    lock_key = "inventory_lock"
    lock_value = "unique_value"
    if acquire_lock(lock_key, lock_value):
        try:
            inventory = r.get("inventory:" + product_id)
            if int(inventory) >= quantity:
                r.decrby("inventory:" + product_id, quantity)
                return True
            else:
                return False
        finally:
            release_lock(lock_key)
    else:
        return False
  1. 细粒度锁:细粒度锁是对共享资源的更小单位加锁。还是以电商库存为例,可以为每个商品的库存单独加锁。这样,当一个用户下单某个商品时,只会锁住该商品的库存,其他商品的库存仍然可以被其他用户访问,大大提高了并发性能。但细粒度锁的实现相对复杂,需要更多的锁管理和协调。
def place_order_with_fine_lock(product_id, quantity):
    lock_key = "inventory_lock:" + product_id
    lock_value = "unique_value"
    if acquire_lock(lock_key, lock_value):
        try:
            inventory = r.get("inventory:" + product_id)
            if int(inventory) >= quantity:
                r.decrby("inventory:" + product_id, quantity)
                return True
            else:
                return False
        finally:
            release_lock(lock_key)
    else:
        return False

锁的超时策略

  1. 固定超时时间:为分布式锁设置一个固定的超时时间,当持有锁的节点在超时时间内没有释放锁时,锁会自动过期,其他节点可以重新获取锁。在 Redis 中,可以在使用 SETNX 获取锁时同时设置过期时间。
def acquire_lock_with_timeout(lock_key, lock_value, timeout):
    return r.set(lock_key, lock_value, ex=timeout, nx=True)
  1. 动态超时时间:根据实际业务情况动态调整锁的超时时间。例如,对于一些复杂的业务操作,可能需要更长的锁持有时间。可以在获取锁时,根据业务操作的预估时间来设置超时时间。
def acquire_lock_with_dynamic_timeout(lock_key, lock_value, estimated_time):
    return r.set(lock_key, lock_value, ex=estimated_time, nx=True)

分布式锁实现中的问题与解决方案

锁的误释放问题

在基于 Redis 的分布式锁实现中,如果一个节点获取了锁,在处理业务逻辑过程中因为某些原因(如网络延迟)导致锁超时自动释放,而此时该节点还未完成业务操作,另一个节点获取了锁并开始操作共享资源,就会出现数据不一致的问题。

解决方案:在获取锁时,为每个锁设置一个唯一的标识(如 UUID),在释放锁时,先检查当前锁的标识是否与自己获取锁时的标识一致,如果一致才释放锁。

import uuid

def acquire_lock(lock_key):
    lock_value = str(uuid.uuid4())
    if r.setnx(lock_key, lock_value):
        r.expire(lock_key, 10)  # 设置锁的过期时间
        return lock_value
    return None

def release_lock(lock_key, lock_value):
    stored_value = r.get(lock_key)
    if stored_value == lock_value:
        r.delete(lock_key)
        return True
    return False

锁的死锁问题

死锁是指多个节点相互等待对方释放锁,导致所有节点都无法继续执行的情况。在分布式系统中,死锁可能由于网络分区、节点故障等原因发生。

解决方案

  1. 超时机制:设置合理的锁超时时间,当一个节点获取锁后,如果在超时时间内没有完成操作,锁会自动释放,避免死锁。
  2. 资源分配图算法:类似于单机系统中的死锁检测算法,在分布式系统中可以通过构建资源分配图来检测死锁。当检测到死锁时,选择一个节点作为牺牲者,释放其持有的锁,以打破死锁。

网络分区问题

网络分区是指分布式系统中的节点被划分成多个相互隔离的区域,不同区域之间无法通信。在网络分区情况下,可能会出现多个分区内的节点同时获取到锁的情况,导致数据不一致。

解决方案

  1. 使用一致性协议:如 Paxos、Raft 等,这些协议可以保证在网络分区恢复后,数据的一致性。在基于 ZooKeeper 的分布式锁实现中,ZooKeeper 内部使用了 Zab 协议来保证数据一致性。
  2. 多数派投票:在获取锁时,采用多数派投票的方式。例如,在一个由 5 个节点组成的分布式系统中,只有当至少 3 个节点同意获取锁时,才认为获取锁成功。这样即使出现网络分区,也能保证只有一个分区内的节点可以获取锁。

分布式锁在实际项目中的应用案例

电商系统中的库存控制

在电商系统中,库存是一个关键的共享资源。为了防止超卖,需要使用分布式锁来控制对库存的访问。以基于 Redis 的分布式锁为例:

def decrease_inventory(product_id, quantity):
    lock_key = "inventory_lock:" + product_id
    lock_value = "unique_value"
    if acquire_lock(lock_key, lock_value):
        try:
            inventory = r.get("inventory:" + product_id)
            if int(inventory) >= quantity:
                r.decrby("inventory:" + product_id, quantity)
                return True
            else:
                return False
        finally:
            release_lock(lock_key)
    else:
        return False

分布式任务调度系统中的任务排重

在分布式任务调度系统中,可能会出现多个调度节点同时尝试执行同一个任务的情况。为了避免重复执行任务,可以使用分布式锁。例如,在一个基于 Spring Boot 和 Redis 的任务调度系统中:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class TaskScheduler {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static final String LOCK_KEY = "task_lock";
    private static final String LOCK_VALUE = "unique_value";

    @Scheduled(cron = "0 0 0 * * *") // 每天凌晨执行
    public void executeTask() {
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
        if (isLocked) {
            try {
                // 执行任务逻辑
                System.out.println("Task is running...");
            } finally {
                stringRedisTemplate.delete(LOCK_KEY);
            }
        } else {
            System.out.println("Task is already running in another node.");
        }
    }
}

微服务架构中的分布式事务协调

在微服务架构中,不同的微服务可能需要共同完成一个业务事务。例如,在一个订单处理系统中,订单微服务需要与库存微服务、支付微服务等协同工作。为了保证事务的一致性,可以使用分布式锁来协调各个微服务的操作顺序。

假设订单微服务在创建订单时,需要先获取库存微服务的锁,确保库存足够后再进行支付操作。

// 订单微服务
@Service
public class OrderService {

    @Autowired
    private InventoryClient inventoryClient;
    @Autowired
    private PaymentClient paymentClient;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String INVENTORY_LOCK_KEY = "inventory_lock";
    private static final String INVENTORY_LOCK_VALUE = "unique_value";

    public void createOrder(Order order) {
        Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(INVENTORY_LOCK_KEY, INVENTORY_LOCK_VALUE);
        if (isLocked) {
            try {
                boolean hasInventory = inventoryClient.checkInventory(order.getProductId(), order.getQuantity());
                if (hasInventory) {
                    inventoryClient.decreaseInventory(order.getProductId(), order.getQuantity());
                    paymentClient.processPayment(order.getOrderId(), order.getAmount());
                    // 创建订单
                    System.out.println("Order created successfully.");
                } else {
                    System.out.println("Insufficient inventory.");
                }
            } finally {
                redisTemplate.delete(INVENTORY_LOCK_KEY);
            }
        } else {
            System.out.println("Inventory is being processed by another service.");
        }
    }
}

通过以上方式,分布式锁在实际项目中有效地控制了资源竞争,保证了系统的一致性和稳定性。在实际应用中,需要根据具体的业务场景和系统架构,选择合适的分布式锁实现方式和资源竞争控制策略。