MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python 同步原语的工作原理与使用

2021-06-197.7k 阅读

Python 同步原语概述

在多线程或多进程编程中,同步原语是至关重要的工具,用于协调不同执行单元之间的操作,避免出现竞争条件、死锁等问题。Python 提供了一系列同步原语,如锁(Lock)、信号量(Semaphore)、事件(Event)、条件变量(Condition)和屏障(Barrier)等。这些同步原语基于操作系统提供的底层同步机制,为 Python 开发者提供了方便易用的同步控制手段。

锁(Lock)

工作原理

锁是一种最基本的同步原语,它只有两种状态:锁定(locked)和未锁定(unlocked)。当一个线程获取到锁时,锁进入锁定状态,其他线程如果尝试获取锁,就会被阻塞,直到锁被释放(即回到未锁定状态)。在 Python 中,threading.Lock 类实现了锁的功能。它基于操作系统的互斥锁(Mutex)机制,在底层通过系统调用实现线程间的同步。

使用示例

import threading


def worker(lock):
    with lock:
        print(f"{threading.current_thread().name} has acquired the lock.")
        # 模拟一些工作
        for _ in range(3):
            print(f"{threading.current_thread().name} is working.")


if __name__ == "__main__":
    lock = threading.Lock()
    threads = []
    for i in range(3):
        t = threading.Thread(target=worker, args=(lock,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

在上述代码中,worker 函数尝试获取锁,获取成功后才会执行后续操作。with lock 语句块会自动在进入时获取锁,在离开时释放锁,确保代码的安全性。

信号量(Semaphore)

工作原理

信号量是一个计数器,它允许一定数量的线程同时访问某个资源或执行某个操作。当一个线程获取信号量时,计数器减 1;当一个线程释放信号量时,计数器加 1。如果计数器的值为 0,那么尝试获取信号量的线程就会被阻塞,直到有其他线程释放信号量使得计数器的值大于 0。Python 中的 threading.Semaphore 类实现了信号量的功能,其底层也是基于操作系统的信号量机制。

使用示例

import threading


def limited_resource(semaphore):
    with semaphore:
        print(f"{threading.current_thread().name} has access to the limited resource.")
        # 模拟对资源的使用
        for _ in range(3):
            print(f"{threading.current_thread().name} is using the resource.")


if __name__ == "__main__":
    semaphore = threading.Semaphore(2)
    threads = []
    for i in range(5):
        t = threading.Thread(target=limited_resource, args=(semaphore,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

在这个例子中,Semaphore(2) 表示最多允许两个线程同时访问有限资源。每个线程在访问资源前先获取信号量,访问完后释放信号量,保证了对资源的并发访问在可控范围内。

事件(Event)

工作原理

事件是一种简单的线程间通信机制,它允许一个线程通知其他线程某个事件已经发生。事件有两种状态:设置(set)和未设置(unset)。线程可以等待事件的发生(即等待事件被设置),当事件被设置时,等待该事件的线程将被唤醒。threading.Event 类通过内部的标志位来表示事件的状态,利用操作系统的线程唤醒机制实现线程间的通知。

使用示例

import threading
import time


def waiter(event):
    print(f"{threading.current_thread().name} is waiting for the event.")
    event.wait()
    print(f"{threading.current_thread().name} has received the event.")


def signaler(event):
    time.sleep(3)
    print(f"{threading.current_thread().name} is setting the event.")
    event.set()


if __name__ == "__main__":
    event = threading.Event()
    t1 = threading.Thread(target=waiter, args=(event,))
    t2 = threading.Thread(target=signaler, args=(event,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

在上述代码中,waiter 线程等待事件的发生,signaler 线程在延迟 3 秒后设置事件,从而唤醒 waiter 线程。

条件变量(Condition)

工作原理

条件变量结合了锁和事件的功能,它允许线程在满足特定条件时才执行某些操作。一个条件变量总是与一个锁相关联,线程在等待条件变量时会自动释放相关的锁,当条件满足被唤醒时,又会自动获取锁。threading.Condition 类实现了条件变量的功能,它基于操作系统的条件变量机制,通过线程的挂起和唤醒操作实现复杂的同步控制。

使用示例

import threading


class Queue:
    def __init__(self):
        self.items = []
        self.max_size = 5
        self.condition = threading.Condition()

    def enqueue(self, item):
        with self.condition:
            while len(self.items) >= self.max_size:
                print(f"Queue is full. {threading.current_thread().name} is waiting.")
                self.condition.wait()
            self.items.append(item)
            print(f"{threading.current_thread().name} added {item} to the queue.")
            self.condition.notify()

    def dequeue(self):
        with self.condition:
            while not self.items:
                print(f"Queue is empty. {threading.current_thread().name} is waiting.")
                self.condition.wait()
            item = self.items.pop(0)
            print(f"{threading.current_thread().name} removed {item} from the queue.")
            self.condition.notify()
            return item


def producer(queue, item):
    queue.enqueue(item)


def consumer(queue):
    queue.dequeue()


if __name__ == "__main__":
    queue = Queue()
    producer_threads = []
    consumer_threads = []

    for i in range(3):
        p = threading.Thread(target=producer, args=(queue, i))
        producer_threads.append(p)
        p.start()

    for i in range(2):
        c = threading.Thread(target=consumer, args=(queue,))
        consumer_threads.append(c)
        c.start()

    for p in producer_threads:
        p.join()

    for c in consumer_threads:
        c.join()

在这个队列示例中,enqueuedequeue 方法使用条件变量来确保队列在满或空时,相应的线程能够正确等待和被唤醒,实现了生产者 - 消费者模型的同步。

屏障(Barrier)

工作原理

屏障是一种同步原语,它允许一定数量的线程在某个点上等待,直到所有线程都到达该点后,才继续执行后续操作。threading.Barrier 类实现了屏障的功能,它通过内部计数器记录到达屏障的线程数量,当计数器达到指定数量时,所有等待的线程将被同时释放。这在需要多个线程协同完成某个阶段任务,然后再一起进入下一阶段的场景中非常有用。

使用示例

import threading


def worker(barrier):
    print(f"{threading.current_thread().name} has reached the barrier.")
    barrier.wait()
    print(f"{threading.current_thread().name} has passed the barrier.")


if __name__ == "__main__":
    barrier = threading.Barrier(3)
    threads = []
    for i in range(3):
        t = threading.Thread(target=worker, args=(barrier,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

在上述代码中,三个线程分别到达屏障,当所有线程都到达后,它们会同时通过屏障,继续执行后续操作。

多进程中的同步原语

在多进程编程中,Python 的 multiprocessing 模块也提供了类似的同步原语,如 multiprocessing.Lockmultiprocessing.Semaphoremultiprocessing.Eventmultiprocessing.Conditionmultiprocessing.Barrier。这些同步原语的工作原理与 threading 模块中的类似,但由于进程间的资源隔离,它们基于操作系统的进程间通信机制(如共享内存、信号等)来实现同步。

锁(Lock)在多进程中的使用

import multiprocessing


def worker(lock, num):
    with lock:
        print(f"Process {num} has acquired the lock.")
        # 模拟一些工作
        for _ in range(3):
            print(f"Process {num} is working.")


if __name__ == "__main__":
    lock = multiprocessing.Lock()
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(lock, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

在多进程场景下,锁的使用方式与多线程类似,通过获取和释放锁来保证同一时间只有一个进程能访问共享资源。

信号量(Semaphore)在多进程中的使用

import multiprocessing


def limited_resource(semaphore, num):
    with semaphore:
        print(f"Process {num} has access to the limited resource.")
        # 模拟对资源的使用
        for _ in range(3):
            print(f"Process {num} is using the resource.")


if __name__ == "__main__":
    semaphore = multiprocessing.Semaphore(2)
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=limited_resource, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

多进程中的信号量同样控制着对有限资源的并发访问,原理与多线程中的信号量一致。

事件(Event)在多进程中的使用

import multiprocessing
import time


def waiter(event, num):
    print(f"Process {num} is waiting for the event.")
    event.wait()
    print(f"Process {num} has received the event.")


def signaler(event, num):
    time.sleep(3)
    print(f"Process {num} is setting the event.")
    event.set()


if __name__ == "__main__":
    event = multiprocessing.Event()
    p1 = multiprocessing.Process(target=waiter, args=(event, 1))
    p2 = multiprocessing.Process(target=signaler, args=(event, 2))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

多进程中的事件用于进程间的通知,其原理与多线程中的事件类似,但实现基于进程间通信机制。

条件变量(Condition)在多进程中的使用

import multiprocessing


class Queue:
    def __init__(self):
        self.items = multiprocessing.Manager().list()
        self.max_size = 5
        self.condition = multiprocessing.Condition()

    def enqueue(self, item):
        with self.condition:
            while len(self.items) >= self.max_size:
                print(f"Queue is full. Process {multiprocessing.current_process().name} is waiting.")
                self.condition.wait()
            self.items.append(item)
            print(f"Process {multiprocessing.current_process().name} added {item} to the queue.")
            self.condition.notify()

    def dequeue(self):
        with self.condition:
            while not self.items:
                print(f"Queue is empty. Process {multiprocessing.current_process().name} is waiting.")
                self.condition.wait()
            item = self.items.pop(0)
            print(f"Process {multiprocessing.current_process().name} removed {item} from the queue.")
            self.condition.notify()
            return item


def producer(queue, item):
    queue.enqueue(item)


def consumer(queue):
    queue.dequeue()


if __name__ == "__main__":
    queue = Queue()
    producer_processes = []
    consumer_processes = []

    for i in range(3):
        p = multiprocessing.Process(target=producer, args=(queue, i))
        producer_processes.append(p)
        p.start()

    for i in range(2):
        c = multiprocessing.Process(target=consumer, args=(queue,))
        consumer_processes.append(c)
        c.start()

    for p in producer_processes:
        p.join()

    for c in consumer_processes:
        c.join()

在多进程的队列实现中,条件变量用于控制进程间对队列的操作,确保队列在满或空时进程的正确同步。

屏障(Barrier)在多进程中的使用

import multiprocessing


def worker(barrier, num):
    print(f"Process {num} has reached the barrier.")
    barrier.wait()
    print(f"Process {num} has passed the barrier.")


if __name__ == "__main__":
    barrier = multiprocessing.Barrier(3)
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(barrier, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

多进程中的屏障同样使多个进程在某个点同步,等待所有进程到达后一起继续执行。

同步原语的选择与注意事项

在选择使用哪种同步原语时,需要根据具体的应用场景来决定。如果只是简单地保护共享资源,避免竞争条件,锁是一个不错的选择。当需要限制对有限资源的并发访问数量时,信号量更为合适。如果是线程或进程间的通知机制,事件是首选。对于需要在特定条件满足时才执行操作的场景,条件变量是最佳选择。而当多个线程或进程需要在某个点同步时,屏障则能发挥作用。

在使用同步原语时,还需要注意死锁问题。死锁通常发生在多个线程或进程相互等待对方释放资源的情况下。为了避免死锁,应尽量减少锁的嵌套使用,按照一定的顺序获取锁,并且在合适的时机释放锁。同时,合理设置信号量的初始值、事件的触发条件以及条件变量的判断逻辑,也是确保程序正确运行的关键。

另外,在多进程编程中,由于进程间的资源隔离,同步原语的实现和开销与多线程有所不同。使用共享内存等机制时,要注意数据的一致性和同步的正确性。

总之,深入理解 Python 同步原语的工作原理和正确使用方法,对于编写高效、可靠的多线程和多进程程序至关重要。通过合理选择和使用同步原语,可以有效地避免并发编程中的常见问题,提高程序的性能和稳定性。