基于分布式锁的资源竞争控制策略
分布式系统中的资源竞争问题
在分布式系统中,多个节点可能同时尝试访问和修改共享资源,这就导致了资源竞争的问题。例如,在电商系统中,库存是一种共享资源,当多个用户同时下单购买同一款商品时,如果不加以控制,就可能出现超卖的情况。
传统单机锁在分布式系统中的局限性
在单机环境下,我们可以使用诸如互斥锁(Mutex)、读写锁(Read - Write Lock)等机制来控制对共享资源的访问。例如,在 Java 中,我们可以使用 synchronized
关键字或者 ReentrantLock
来实现线程安全的操作。
// 使用 synchronized 关键字
public class SynchronizedExample {
private static int count = 0;
public static synchronized void increment() {
count++;
}
}
// 使用 ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static int count = 0;
private static ReentrantLock lock = new ReentrantLock();
public static void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
然而,在分布式系统中,这些单机锁机制无法工作。因为分布式系统由多个独立的节点组成,每个节点都有自己独立的内存空间,单机锁只能保证在单个 JVM 进程内的线程安全,无法跨节点进行同步。
分布式锁的概念与原理
分布式锁是一种用于在分布式系统中控制对共享资源访问的机制。其核心原理是通过在多个节点之间共享一个锁资源,只有获取到该锁的节点才能访问共享资源。
分布式锁的特性
- 互斥性:这是分布式锁最基本的特性,同一时刻只能有一个节点获取到锁,从而保证对共享资源的互斥访问。
- 可重入性:同一个节点在持有锁的情况下,可以再次获取锁,而不会造成死锁。例如,一个方法调用另一个需要相同锁的方法时,可重入性确保该节点不会被自己阻塞。
- 高可用性:分布式锁服务应该具备高可用性,即使部分节点出现故障,也不能影响锁的正常获取和释放,以保证系统的正常运行。
- 容错性:在网络分区、节点故障等异常情况下,分布式锁系统应该能够正确处理,避免出现锁无法获取或释放的情况。
实现分布式锁的常见方式
- 基于数据库:可以通过在数据库中创建一个表,表中包含锁的状态信息。例如,创建一个
locks
表,有lock_key
和lock_status
字段,获取锁时插入一条记录,释放锁时删除记录。
-- 创建 locks 表
CREATE TABLE locks (
lock_key VARCHAR(255) PRIMARY KEY,
lock_status BOOLEAN DEFAULT FALSE
);
-- 获取锁
INSERT INTO locks (lock_key, lock_status) VALUES ('resource_lock', TRUE) ON DUPLICATE KEY UPDATE lock_status = FALSE;
-- 释放锁
DELETE FROM locks WHERE lock_key ='resource_lock';
- 基于缓存(如 Redis):Redis 提供了原子操作命令,如
SETNX
(SET if Not eXists)可以用来实现分布式锁。SETNX key value
当且仅当key
不存在时,将key
的值设为value
,返回 1;若key
已经存在,则 SETNX 不做任何动作,返回 0 。
import redis
r = redis.Redis(host='localhost', port=6379, db = 0)
def acquire_lock(lock_key, value):
return r.setnx(lock_key, value)
def release_lock(lock_key):
r.delete(lock_key)
- 基于 ZooKeeper:ZooKeeper 是一个分布式协调服务,它通过树形结构来管理数据。可以利用 ZooKeeper 的临时顺序节点特性来实现分布式锁。当一个节点获取锁时,在指定路径下创建一个临时顺序节点,所有节点获取锁时创建的节点按顺序排列,序号最小的节点获得锁。当持有锁的节点释放锁时,对应的临时节点会被删除,后续序号最小的节点获得锁。
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZooKeeperDistributedLock implements Watcher {
private static final String ZK_SERVERS = "localhost:2181";
private static final int SESSION_TIMEOUT = 5000;
private static final String LOCK_PATH = "/locks";
private ZooKeeper zk;
private String lockNode;
private CountDownLatch latch;
public ZooKeeperDistributedLock() throws IOException, InterruptedException {
this.latch = new CountDownLatch(1);
this.zk = new ZooKeeper(ZK_SERVERS, SESSION_TIMEOUT, this);
latch.await();
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
public boolean acquireLock() throws KeeperException, InterruptedException {
lockNode = zk.create(LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren(LOCK_PATH, false);
Collections.sort(children);
String firstLock = LOCK_PATH + "/" + children.get(0);
if (lockNode.equals(firstLock)) {
return true;
} else {
int index = children.indexOf(lockNode.substring(LOCK_PATH.length() + 1));
String waitPath = LOCK_PATH + "/" + children.get(index - 1);
Stat stat = zk.exists(waitPath, true);
if (stat == null) {
return acquireLock();
}
this.latch = new CountDownLatch(1);
this.latch.await();
return acquireLock();
}
}
public void releaseLock() throws KeeperException, InterruptedException {
zk.delete(lockNode, -1);
zk.close();
}
}
基于分布式锁的资源竞争控制策略
乐观锁与悲观锁策略
- 悲观锁策略:悲观锁认为在任何时候,共享资源都可能被其他节点修改,所以在访问共享资源前,先获取锁,确保在自己访问期间没有其他节点修改资源。例如,在基于 Redis 的分布式锁实现中,先使用
SETNX
获取锁,获取成功后再对资源进行操作。
import redis
r = redis.Redis(host='localhost', port=6379, db = 0)
def update_resource_with_pessimistic_lock(resource_key, new_value):
lock_key = "lock:" + resource_key
lock_value = "unique_value"
if acquire_lock(lock_key, lock_value):
try:
current_value = r.get(resource_key)
# 这里进行实际的资源更新操作
r.set(resource_key, new_value)
finally:
release_lock(lock_key)
return True
else:
return False
- 乐观锁策略:乐观锁假设在大多数情况下,共享资源不会被其他节点修改。因此,在读取资源时并不加锁,而是在更新资源时,检查资源是否在读取后被其他节点修改过。通常使用版本号或者时间戳来实现。例如,在数据库中,可以为表增加一个
version
字段,每次更新时version
自增。
-- 创建资源表
CREATE TABLE resources (
id INT PRIMARY KEY,
data VARCHAR(255),
version INT DEFAULT 0
);
-- 更新资源
UPDATE resources
SET data = 'new_data', version = version + 1
WHERE id = 1 AND version = 0;
在分布式系统中,基于缓存实现乐观锁时,可以在缓存中存储资源的版本号。
def update_resource_with_optimistic_lock(resource_key, new_value):
current_value, version = r.get(resource_key), r.get(resource_key + ":version")
if current_value is not None:
new_version = int(version) + 1
if r.set(resource_key + ":version", new_version, condition=redis.WatchCondition.EQUAL, watch_keys=[resource_key + ":version"]):
r.set(resource_key, new_value)
return True
return False
锁的粒度控制策略
- 粗粒度锁:粗粒度锁是对整个共享资源集合加锁。例如,在一个电商系统中,如果将所有商品的库存视为一个共享资源,对其加锁,那么当一个用户下单时,会锁住整个库存,其他用户都无法下单,直到锁被释放。这种方式实现简单,但并发性能较差,因为同一时间只有一个节点能访问共享资源。
def place_order_with_coarse_lock(product_id, quantity):
lock_key = "inventory_lock"
lock_value = "unique_value"
if acquire_lock(lock_key, lock_value):
try:
inventory = r.get("inventory:" + product_id)
if int(inventory) >= quantity:
r.decrby("inventory:" + product_id, quantity)
return True
else:
return False
finally:
release_lock(lock_key)
else:
return False
- 细粒度锁:细粒度锁是对共享资源的更小单位加锁。还是以电商库存为例,可以为每个商品的库存单独加锁。这样,当一个用户下单某个商品时,只会锁住该商品的库存,其他商品的库存仍然可以被其他用户访问,大大提高了并发性能。但细粒度锁的实现相对复杂,需要更多的锁管理和协调。
def place_order_with_fine_lock(product_id, quantity):
lock_key = "inventory_lock:" + product_id
lock_value = "unique_value"
if acquire_lock(lock_key, lock_value):
try:
inventory = r.get("inventory:" + product_id)
if int(inventory) >= quantity:
r.decrby("inventory:" + product_id, quantity)
return True
else:
return False
finally:
release_lock(lock_key)
else:
return False
锁的超时策略
- 固定超时时间:为分布式锁设置一个固定的超时时间,当持有锁的节点在超时时间内没有释放锁时,锁会自动过期,其他节点可以重新获取锁。在 Redis 中,可以在使用
SETNX
获取锁时同时设置过期时间。
def acquire_lock_with_timeout(lock_key, lock_value, timeout):
return r.set(lock_key, lock_value, ex=timeout, nx=True)
- 动态超时时间:根据实际业务情况动态调整锁的超时时间。例如,对于一些复杂的业务操作,可能需要更长的锁持有时间。可以在获取锁时,根据业务操作的预估时间来设置超时时间。
def acquire_lock_with_dynamic_timeout(lock_key, lock_value, estimated_time):
return r.set(lock_key, lock_value, ex=estimated_time, nx=True)
分布式锁实现中的问题与解决方案
锁的误释放问题
在基于 Redis 的分布式锁实现中,如果一个节点获取了锁,在处理业务逻辑过程中因为某些原因(如网络延迟)导致锁超时自动释放,而此时该节点还未完成业务操作,另一个节点获取了锁并开始操作共享资源,就会出现数据不一致的问题。
解决方案:在获取锁时,为每个锁设置一个唯一的标识(如 UUID),在释放锁时,先检查当前锁的标识是否与自己获取锁时的标识一致,如果一致才释放锁。
import uuid
def acquire_lock(lock_key):
lock_value = str(uuid.uuid4())
if r.setnx(lock_key, lock_value):
r.expire(lock_key, 10) # 设置锁的过期时间
return lock_value
return None
def release_lock(lock_key, lock_value):
stored_value = r.get(lock_key)
if stored_value == lock_value:
r.delete(lock_key)
return True
return False
锁的死锁问题
死锁是指多个节点相互等待对方释放锁,导致所有节点都无法继续执行的情况。在分布式系统中,死锁可能由于网络分区、节点故障等原因发生。
解决方案:
- 超时机制:设置合理的锁超时时间,当一个节点获取锁后,如果在超时时间内没有完成操作,锁会自动释放,避免死锁。
- 资源分配图算法:类似于单机系统中的死锁检测算法,在分布式系统中可以通过构建资源分配图来检测死锁。当检测到死锁时,选择一个节点作为牺牲者,释放其持有的锁,以打破死锁。
网络分区问题
网络分区是指分布式系统中的节点被划分成多个相互隔离的区域,不同区域之间无法通信。在网络分区情况下,可能会出现多个分区内的节点同时获取到锁的情况,导致数据不一致。
解决方案:
- 使用一致性协议:如 Paxos、Raft 等,这些协议可以保证在网络分区恢复后,数据的一致性。在基于 ZooKeeper 的分布式锁实现中,ZooKeeper 内部使用了 Zab 协议来保证数据一致性。
- 多数派投票:在获取锁时,采用多数派投票的方式。例如,在一个由 5 个节点组成的分布式系统中,只有当至少 3 个节点同意获取锁时,才认为获取锁成功。这样即使出现网络分区,也能保证只有一个分区内的节点可以获取锁。
分布式锁在实际项目中的应用案例
电商系统中的库存控制
在电商系统中,库存是一个关键的共享资源。为了防止超卖,需要使用分布式锁来控制对库存的访问。以基于 Redis 的分布式锁为例:
def decrease_inventory(product_id, quantity):
lock_key = "inventory_lock:" + product_id
lock_value = "unique_value"
if acquire_lock(lock_key, lock_value):
try:
inventory = r.get("inventory:" + product_id)
if int(inventory) >= quantity:
r.decrby("inventory:" + product_id, quantity)
return True
else:
return False
finally:
release_lock(lock_key)
else:
return False
分布式任务调度系统中的任务排重
在分布式任务调度系统中,可能会出现多个调度节点同时尝试执行同一个任务的情况。为了避免重复执行任务,可以使用分布式锁。例如,在一个基于 Spring Boot 和 Redis 的任务调度系统中:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class TaskScheduler {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String LOCK_KEY = "task_lock";
private static final String LOCK_VALUE = "unique_value";
@Scheduled(cron = "0 0 0 * * *") // 每天凌晨执行
public void executeTask() {
Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
if (isLocked) {
try {
// 执行任务逻辑
System.out.println("Task is running...");
} finally {
stringRedisTemplate.delete(LOCK_KEY);
}
} else {
System.out.println("Task is already running in another node.");
}
}
}
微服务架构中的分布式事务协调
在微服务架构中,不同的微服务可能需要共同完成一个业务事务。例如,在一个订单处理系统中,订单微服务需要与库存微服务、支付微服务等协同工作。为了保证事务的一致性,可以使用分布式锁来协调各个微服务的操作顺序。
假设订单微服务在创建订单时,需要先获取库存微服务的锁,确保库存足够后再进行支付操作。
// 订单微服务
@Service
public class OrderService {
@Autowired
private InventoryClient inventoryClient;
@Autowired
private PaymentClient paymentClient;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String INVENTORY_LOCK_KEY = "inventory_lock";
private static final String INVENTORY_LOCK_VALUE = "unique_value";
public void createOrder(Order order) {
Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(INVENTORY_LOCK_KEY, INVENTORY_LOCK_VALUE);
if (isLocked) {
try {
boolean hasInventory = inventoryClient.checkInventory(order.getProductId(), order.getQuantity());
if (hasInventory) {
inventoryClient.decreaseInventory(order.getProductId(), order.getQuantity());
paymentClient.processPayment(order.getOrderId(), order.getAmount());
// 创建订单
System.out.println("Order created successfully.");
} else {
System.out.println("Insufficient inventory.");
}
} finally {
redisTemplate.delete(INVENTORY_LOCK_KEY);
}
} else {
System.out.println("Inventory is being processed by another service.");
}
}
}
通过以上方式,分布式锁在实际项目中有效地控制了资源竞争,保证了系统的一致性和稳定性。在实际应用中,需要根据具体的业务场景和系统架构,选择合适的分布式锁实现方式和资源竞争控制策略。