MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python中的锁与同步机制

2023-07-276.9k 阅读

Python中的锁与同步机制

并发编程中的问题

在并发编程的领域里,当多个线程或进程尝试同时访问和修改共享资源时,会引发一系列棘手的问题,这些问题严重影响程序的正确性和稳定性。

竞态条件(Race Condition)

竞态条件是指多个线程或进程在访问共享资源时,由于执行顺序的不确定性,导致最终结果依赖于这些线程或进程的执行顺序。例如,假设有一个全局变量 counter,初始值为0,两个线程都要对其进行加1操作。如果没有适当的同步机制,以下情况可能发生:

import threading

counter = 0

def increment():
    global counter
    temp = counter
    temp = temp + 1
    counter = temp

threads = []
for _ in range(2):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter)

在理想情况下,两个线程执行完毕后 counter 的值应该是2。但在实际运行中,由于线程调度的不确定性,可能会出现一个线程读取 counter 的值后,还未更新 counter,另一个线程就读取了相同的值,最终导致 counter 只增加了1。

死锁(Deadlock)

死锁是指两个或多个线程或进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。假设有两个线程 T1T2T1 持有资源 R1 并等待资源 R2,而 T2 持有资源 R2 并等待资源 R1,这样就形成了死锁。以下是一个简单的死锁示例:

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1():
    lock1.acquire()
    print('Thread 1 acquired lock1')
    lock2.acquire()
    print('Thread 1 acquired lock2')
    lock2.release()
    print('Thread 1 released lock2')
    lock1.release()
    print('Thread 1 released lock1')

def thread2():
    lock2.acquire()
    print('Thread 2 acquired lock2')
    lock1.acquire()
    print('Thread 2 acquired lock1')
    lock1.release()
    print('Thread 2 released lock1')
    lock2.release()
    print('Thread 2 released lock2')

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

t1.start()
t2.start()

t1.join()
t2.join()

在上述代码中,如果 thread1 先获取 lock1thread2 先获取 lock2,就会发生死锁,两个线程都无法继续执行。

锁机制

为了解决并发编程中的这些问题,Python 提供了多种锁机制。

互斥锁(Mutex - Mutual Exclusion)

互斥锁是一种最基本的同步工具,它的作用是保证在同一时刻只有一个线程能够访问共享资源,就像给共享资源上了一把锁。在 Python 中,threading.Lock 类实现了互斥锁。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    lock.acquire()
    try:
        temp = counter
        temp = temp + 1
        counter = temp
    finally:
        lock.release()

threads = []
for _ in range(2):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter)

在上述代码中,lock.acquire() 用于获取锁,只有获取到锁的线程才能进入临界区(对共享资源 counter 进行操作的代码块)。try - finally 语句确保无论临界区内的代码是否发生异常,锁都会被释放,避免死锁。

递归锁(RLock - Reentrant Lock)

递归锁允许同一个线程多次获取锁,而不会造成死锁。这在一个线程需要多次调用同一个函数,而该函数内部又需要获取锁的情况下非常有用。Python 中的 threading.RLock 类实现了递归锁。

import threading

lock = threading.RLock()

def recursive_function(n):
    lock.acquire()
    try:
        if n > 0:
            print(f'Level {n}: Entering recursive function')
            recursive_function(n - 1)
            print(f'Level {n}: Exiting recursive function')
    finally:
        lock.release()

recursive_function(3)

在这个例子中,recursive_function 会递归调用自身。如果使用普通的互斥锁,在第二次递归调用获取锁时就会发生死锁,因为锁已经被当前线程持有。而递归锁允许同一个线程多次获取锁,每获取一次,内部的计数器加1,每释放一次,计数器减1,只有当计数器为0时,锁才真正被释放。

信号量(Semaphore)

信号量是一个更通用的同步原语,它可以控制同时访问共享资源的线程数量。信号量内部维护一个计数器,当一个线程获取信号量时,计数器减1;当一个线程释放信号量时,计数器加1。当计数器为0时,其他线程无法获取信号量,只能等待。Python 中的 threading.Semaphore 类实现了信号量机制。

import threading
import time

semaphore = threading.Semaphore(2)

def worker():
    semaphore.acquire()
    print(f'{threading.current_thread().name} acquired the semaphore')
    time.sleep(2)
    print(f'{threading.current_thread().name} released the semaphore')
    semaphore.release()

threads = []
for _ in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在上述代码中,Semaphore(2) 表示同时最多允许2个线程访问共享资源。每个线程调用 semaphore.acquire() 获取信号量,如果信号量计数器大于0,则计数器减1并继续执行;否则线程阻塞等待。semaphore.release() 会将计数器加1,唤醒等待的线程。

条件变量(Condition Variable)

条件变量用于线程间的复杂同步,它允许线程在满足特定条件时才执行某些操作。条件变量通常与锁一起使用。Python 中的 threading.Condition 类实现了条件变量机制。

import threading

condition = threading.Condition()
shared_variable = 0

def producer():
    global shared_variable
    with condition:
        shared_variable = 42
        print('Producer set shared_variable to 42')
        condition.notify()

def consumer():
    with condition:
        condition.wait()
        print(f'Consumer got shared_variable: {shared_variable}')

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

consumer_thread.start()
producer_thread.start()

producer_thread.join()
consumer_thread.join()

在这个例子中,consumer 线程调用 condition.wait() 进入等待状态,同时释放它持有的锁(如果有的话)。producer 线程在设置 shared_variable 后,调用 condition.notify() 唤醒等待在条件变量上的一个线程。当 consumer 被唤醒后,它会重新获取锁并继续执行。

事件(Event)

事件是一种简单的线程同步机制,它允许一个线程通知其他线程发生了某个事件。事件对象内部有一个标志位,线程可以通过设置或清除这个标志位来通知或等待事件的发生。Python 中的 threading.Event 类实现了事件机制。

import threading
import time

event = threading.Event()

def waiter():
    print('Waiter is waiting for the event')
    event.wait()
    print('Waiter got the event')

def notifier():
    time.sleep(2)
    print('Notifier is setting the event')
    event.set()

waiter_thread = threading.Thread(target=waiter)
notifier_thread = threading.Thread(target=notifier)

waiter_thread.start()
notifier_thread.start()

waiter_thread.join()
notifier_thread.join()

在上述代码中,waiter 线程调用 event.wait() 等待事件发生,处于阻塞状态。notifier 线程在休眠2秒后,调用 event.set() 设置事件,从而唤醒 waiter 线程。

多进程中的同步机制

在 Python 中,multiprocessing 模块提供了多进程编程的支持,同时也提供了类似的同步机制,不过由于进程间内存不共享,这些同步机制的实现和使用与线程略有不同。

进程锁(Lock in multiprocessing)

multiprocessing.Lock 用于进程间的同步,保证同一时刻只有一个进程能够访问共享资源。

import multiprocessing

def worker(lock, num):
    lock.acquire()
    try:
        print(f'Process {num} acquired the lock')
        time.sleep(1)
        print(f'Process {num} released the lock')
    finally:
        lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(lock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

需要注意的是,在使用 multiprocessing 模块时,代码需要放在 if __name__ == '__main__': 块中,这是因为在 Windows 系统下,多进程的启动方式决定了需要这样做,以避免一些启动相关的问题。

进程信号量(Semaphore in multiprocessing)

multiprocessing.Semaphore 同样用于控制同时访问共享资源的进程数量。

import multiprocessing
import time

semaphore = multiprocessing.Semaphore(2)

def worker(sem, num):
    sem.acquire()
    print(f'Process {num} acquired the semaphore')
    time.sleep(2)
    print(f'Process {num} released the semaphore')
    sem.release()

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

与线程中的信号量类似,进程信号量也维护一个计数器,通过 acquirerelease 方法来控制进程对共享资源的访问。

进程条件变量(Condition in multiprocessing)

multiprocessing.Condition 用于进程间更复杂的同步场景,结合锁和条件判断实现进程间的协作。

import multiprocessing

condition = multiprocessing.Condition()
shared_value = 0

def producer(cond):
    global shared_value
    with cond:
        shared_value = 100
        print('Producer set shared_value to 100')
        cond.notify()

def consumer(cond):
    with cond:
        cond.wait()
        print(f'Consumer got shared_value: {shared_value}')

if __name__ == '__main__':
    producer_process = multiprocessing.Process(target=producer, args=(condition,))
    consumer_process = multiprocessing.Process(target=consumer, args=(condition,))

    consumer_process.start()
    producer_process.start()

    producer_process.join()
    consumer_process.join()

在这个进程间的条件变量示例中,producer 进程设置 shared_value 后通知 consumer 进程,consumer 进程在接收到通知后获取 shared_value

锁与同步机制的性能考量

在使用锁和同步机制时,性能是一个重要的考量因素。过多或不合理地使用锁可能会导致性能瓶颈。

锁的粒度

锁的粒度指的是锁所保护的共享资源的范围。细粒度锁保护的资源范围小,多个线程可以同时访问不同的资源部分,并发度较高,但管理锁的开销也较大;粗粒度锁保护的资源范围大,同一时刻只有一个线程能访问整个资源,并发度较低,但锁的管理开销相对较小。例如,在一个大型数据结构中,如果对整个数据结构使用一把锁(粗粒度锁),每次只有一个线程能操作这个数据结构;如果将数据结构划分为多个部分,每个部分使用一把锁(细粒度锁),不同线程可以同时操作不同部分的数据,但需要小心处理锁之间的关系,避免死锁。

上下文切换开销

当一个线程获取不到锁而进入等待状态时,操作系统会进行上下文切换,将 CPU 资源分配给其他可运行的线程。上下文切换会带来一定的开销,包括保存和恢复线程的寄存器状态、内存映射等。频繁的上下文切换会降低系统的整体性能。因此,在设计并发程序时,应尽量减少锁的竞争,降低上下文切换的频率。

死锁检测与预防

死锁会导致程序无法继续执行,因此死锁的检测和预防非常重要。一些高级的并发编程框架提供了死锁检测工具,可以帮助开发者发现死锁情况。在代码设计层面,避免死锁的方法包括按照相同的顺序获取锁、使用超时机制等。例如,在获取多个锁时,所有线程都按照固定的顺序获取锁,这样可以避免循环等待导致的死锁。

不同应用场景下的选择

不同的锁和同步机制适用于不同的应用场景。

简单互斥场景

如果只是需要保护简单的共享资源,避免竞态条件,互斥锁是一个很好的选择。例如,对全局变量的简单读写操作,使用 threading.Lockmultiprocessing.Lock 可以很方便地实现同步。

复杂同步场景

当涉及到线程或进程间的复杂协作,如生产者 - 消费者模型,条件变量或信号量更为合适。条件变量可以根据特定条件进行线程间的通知和等待,信号量可以控制并发访问的数量,满足不同的同步需求。

递归调用场景

在递归函数中,如果需要对共享资源进行同步,递归锁是必不可少的。它允许同一个线程多次获取锁,不会导致死锁,保证递归函数的正确执行。

资源限制场景

当需要限制同时访问共享资源的线程或进程数量时,信号量是最佳选择。例如,数据库连接池的管理,通过信号量可以控制同时使用连接的数量,避免资源耗尽。

总结

Python 提供了丰富的锁和同步机制,包括互斥锁、递归锁、信号量、条件变量和事件等,用于解决并发编程中的竞态条件、死锁等问题。在多进程编程中,也有相应的同步机制可供使用。在实际应用中,需要根据具体的应用场景和性能需求,合理选择和使用这些同步机制,以确保程序的正确性和高效性。同时,要注意锁的粒度、上下文切换开销以及死锁的检测与预防,优化并发程序的性能。通过深入理解和熟练运用这些同步机制,开发者可以编写出健壮、高效的并发 Python 程序。

以上从并发编程问题引入,详细介绍了 Python 中的各种锁与同步机制,包括其原理、使用方法、性能考量以及应用场景选择,并通过丰富的代码示例帮助理解,希望能让读者对 Python 中的锁与同步机制有全面且深入的认识。