MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python中的信号量与锁的使用

2022-01-102.2k 阅读

Python 中的信号量与锁:基础概念

在多线程编程中,共享资源的访问控制是一个核心问题。信号量(Semaphore)和锁(Lock)是两种重要的同步原语,它们有助于防止多个线程同时访问共享资源,从而避免数据竞争和不一致性问题。

锁(Lock)

锁是一种最基本的同步工具,它只有两种状态:锁定(locked)和未锁定(unlocked)。当一个线程获取了锁(将其状态变为锁定),其他线程就不能再获取该锁,直到第一个线程释放锁(将其状态变为未锁定)。

想象一下,有一个公共资源,比如银行账户。多个线程可能试图同时对该账户进行取款操作。如果没有适当的同步机制,一个线程可能在另一个线程还未完成操作时就开始操作,导致账户余额出现错误。锁就像是这个银行账户的一把钥匙,只有持有钥匙(获取锁)的线程才能对账户进行操作,操作完成后释放钥匙(释放锁),其他线程才能获取钥匙进行操作。

在 Python 中,使用 threading.Lock 类来创建锁对象。以下是一个简单的示例:

import threading

# 创建一个锁对象
lock = threading.Lock()
counter = 0

def increment():
    global counter
    # 获取锁
    lock.acquire()
    try:
        counter = counter + 1
    finally:
        # 释放锁
        lock.release()


threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final counter value: {counter}")

在这个示例中,lock.acquire() 方法用于获取锁。如果锁当前未被锁定,调用此方法会将锁锁定并立即返回。如果锁已被锁定,调用线程会被阻塞,直到锁被释放。lock.release() 方法用于释放锁,将其状态变为未锁定,允许其他线程获取锁。

try - finally 块的使用非常重要。即使在 try 块中的代码引发异常,finally 块中的代码也会执行,确保锁一定会被释放。如果不这样做,一旦在获取锁之后发生异常,锁可能永远不会被释放,导致其他线程永远阻塞。

信号量(Semaphore)

信号量是一个更通用的同步原语,它内部维护一个计数器。当调用 acquire() 方法时,计数器会减 1;当调用 release() 方法时,计数器会加 1。如果计数器的值为 0,调用 acquire() 方法的线程会被阻塞,直到计数器的值大于 0。

信号量可以用来控制同时访问某个资源的线程数量。例如,假设你有一个数据库连接池,最多只能同时有 5 个连接被使用。你可以创建一个初始值为 5 的信号量,每个线程在获取数据库连接之前先获取信号量,使用完连接后释放信号量。这样就可以确保同时使用数据库连接的线程不会超过 5 个。

在 Python 中,使用 threading.Semaphore 类来创建信号量对象。以下是一个示例:

import threading
import time

# 创建一个信号量,初始值为 3
semaphore = threading.Semaphore(3)


def worker():
    semaphore.acquire()
    try:
        print(f"{threading.current_thread().name} acquired the semaphore.")
        time.sleep(2)
        print(f"{threading.current_thread().name} released the semaphore.")
    finally:
        semaphore.release()


threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在这个示例中,我们创建了一个初始值为 3 的信号量。这意味着最多可以有 3 个线程同时获取信号量并执行 worker 函数中的代码。当一个线程调用 semaphore.acquire() 时,如果信号量的计数器大于 0,计数器会减 1 并允许线程继续执行;如果计数器为 0,线程会被阻塞,直到其他线程调用 semaphore.release() 增加计数器的值。

try - finally 块同样用于确保信号量在使用完后一定会被释放,避免死锁的发生。

信号量与锁的比较

功能区别

  • :锁主要用于确保同一时间只有一个线程能够访问共享资源,它是一种二元状态(锁定或未锁定)的同步机制。锁适用于需要独占访问资源的场景,比如对共享变量的修改操作,以防止数据竞争。
  • 信号量:信号量可以控制同时访问共享资源的线程数量,它通过一个计数器来实现。信号量更适用于资源有限的场景,比如数据库连接池、线程池等,需要限制并发访问的数量。

使用场景举例

假设你有一个打印机资源,同一时间只能有一个打印任务在执行,这时候就可以使用锁。因为打印机资源必须被独占访问,不允许多个打印任务同时进行,否则会导致打印内容混乱。

而对于一个 Web 服务器,它可能有一个固定数量的数据库连接可供使用。为了避免过多的线程同时请求数据库连接导致资源耗尽,就可以使用信号量来限制同时获取数据库连接的线程数量。

实现原理差异

锁的实现相对简单,通常基于操作系统的原子操作。当一个线程获取锁时,它会将锁的状态设置为锁定,其他线程尝试获取锁时会检测到锁已被锁定,从而进入等待状态。

信号量的实现则相对复杂一些。它内部维护一个计数器,当计数器大于 0 时,线程可以获取信号量并将计数器减 1;当计数器为 0 时,线程会被阻塞。信号量的实现通常依赖于操作系统的同步机制,如互斥锁和条件变量等。

死锁问题与预防

死锁的概念

死锁是多线程编程中一个严重的问题,它发生在两个或多个线程相互等待对方释放资源,导致所有线程都无法继续执行的情况。

例如,假设有两个线程 Thread AThread B,以及两把锁 Lock 1Lock 2Thread A 获取了 Lock 1Thread B 获取了 Lock 2。然后 Thread A 尝试获取 Lock 2,但由于 Lock 2Thread B 持有,Thread A 被阻塞。同时,Thread B 尝试获取 Lock 1,但由于 Lock 1Thread A 持有,Thread B 也被阻塞。这样,Thread AThread B 就陷入了死锁。

死锁的原因

  1. 资源竞争:多个线程竞争有限的资源,并且对资源的获取顺序没有合理规划。
  2. 循环等待:线程之间形成了一个循环依赖关系,每个线程都在等待下一个线程释放资源。

使用锁时的死锁示例

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()


def thread1():
    lock1.acquire()
    print("Thread 1 acquired lock1")
    time.sleep(1)
    lock2.acquire()
    print("Thread 1 acquired lock2")
    lock2.release()
    lock1.release()


def thread2():
    lock2.acquire()
    print("Thread 2 acquired lock2")
    time.sleep(1)
    lock1.acquire()
    print("Thread 2 acquired lock1")
    lock1.release()
    lock2.release()


t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

t1.start()
t2.start()

t1.join()
t2.join()

在这个示例中,thread1 先获取 lock1,然后尝试获取 lock2,而 thread2 先获取 lock2,然后尝试获取 lock1。如果 thread1 先获取 lock1thread2 先获取 lock2,就会发生死锁。

死锁的预防

  1. 破坏死锁的必要条件:死锁的发生需要满足四个必要条件,即互斥、占有并等待、不可剥夺和循环等待。通过破坏其中任何一个条件都可以预防死锁。例如,破坏循环等待条件可以通过对资源进行编号,规定所有线程按照相同的顺序获取资源。
  2. 资源分配图算法:可以使用资源分配图算法(如银行家算法)来检测和预防死锁。这些算法通过分析系统的资源分配情况和线程的资源请求,确保系统始终处于安全状态,避免死锁的发生。
  3. 使用超时机制:在获取锁或信号量时设置超时时间。如果在指定时间内未能获取到资源,线程可以放弃获取并进行其他操作,从而避免无限期等待导致的死锁。

信号量与锁在实际项目中的应用

数据库连接池

在 Web 应用开发中,数据库连接是一种宝贵的资源。为了提高性能和资源利用率,通常会使用数据库连接池。信号量在数据库连接池中扮演着重要角色。

假设我们有一个数据库连接池,最多可以容纳 10 个连接。我们可以创建一个初始值为 10 的信号量。当一个线程需要获取数据库连接时,它先获取信号量,如果获取成功,就从连接池中取出一个连接;使用完连接后,释放连接并释放信号量。

以下是一个简化的数据库连接池示例:

import threading
import time


class DatabaseConnection:
    def __init__(self):
        self.is_connected = False

    def connect(self):
        self.is_connected = True
        print(f"{threading.current_thread().name} connected to the database.")

    def disconnect(self):
        self.is_connected = False
        print(f"{threading.current_thread().name} disconnected from the database.")


class ConnectionPool:
    def __init__(self, max_connections):
        self.max_connections = max_connections
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = [DatabaseConnection() for _ in range(max_connections)]
        self.available_connections = set(range(max_connections))

    def get_connection(self):
        self.semaphore.acquire()
        connection_index = self.available_connections.pop()
        connection = self.connections[connection_index]
        connection.connect()
        return connection

    def release_connection(self, connection):
        connection.disconnect()
        connection_index = self.connections.index(connection)
        self.available_connections.add(connection_index)
        self.semaphore.release()


pool = ConnectionPool(5)


def worker():
    connection = pool.get_connection()
    try:
        time.sleep(2)
    finally:
        pool.release_connection(connection)


threads = []
for _ in range(10):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在这个示例中,ConnectionPool 类使用信号量来控制同时获取数据库连接的线程数量。get_connection 方法获取信号量并从连接池中取出一个连接,release_connection 方法释放连接并释放信号量。

多线程爬虫中的资源限制

在多线程爬虫项目中,为了避免对目标服务器造成过大压力,通常需要限制同时发起请求的线程数量。信号量可以很好地满足这个需求。

假设我们要爬取一个网站,为了防止对该网站造成过多请求导致被封禁,我们限制同时只能有 5 个线程发起请求。可以使用信号量来实现:

import threading
import requests


semaphore = threading.Semaphore(5)


def crawl(url):
    semaphore.acquire()
    try:
        response = requests.get(url)
        print(f"{threading.current_thread().name} crawled {url}, status code: {response.status_code}")
    finally:
        semaphore.release()


urls = [
    "http://example.com/page1",
    "http://example.com/page2",
    "http://example.com/page3",
    "http://example.com/page4",
    "http://example.com/page5",
    "http://example.com/page6",
    "http://example.com/page7",
    "http://example.com/page8",
    "http://example.com/page9",
    "http://example.com/page10"
]

threads = []
for url in urls:
    t = threading.Thread(target=crawl, args=(url,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

在这个示例中,每个线程在发起请求前先获取信号量,确保同时发起请求的线程数量不超过 5 个。

共享数据结构的保护

在多线程环境下,对共享数据结构(如列表、字典等)的操作需要使用锁来确保数据的一致性。

例如,假设我们有一个共享的字典,多个线程可能会同时对其进行读取和写入操作。为了防止数据竞争,我们可以使用锁:

import threading

shared_dict = {}
lock = threading.Lock()


def write_to_dict(key, value):
    lock.acquire()
    try:
        shared_dict[key] = value
    finally:
        lock.release()


def read_from_dict(key):
    lock.acquire()
    try:
        return shared_dict.get(key)
    finally:
        lock.release()


def worker1():
    write_to_dict("key1", "value1")


def worker2():
    value = read_from_dict("key1")
    print(f"Read value: {value}")


t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)

t1.start()
t2.start()

t1.join()
t2.join()

在这个示例中,write_to_dictread_from_dict 函数在操作共享字典时都获取锁,确保同一时间只有一个线程可以访问字典,从而避免数据竞争。

高级应用与优化

递归锁(RLock)

递归锁是一种特殊的锁,它允许同一个线程多次获取锁而不会造成死锁。普通的锁如果被同一个线程多次获取,会导致死锁,因为第一次获取锁后,锁处于锁定状态,再次获取时线程会被阻塞。

递归锁内部维护一个计数器,每次获取锁时计数器加 1,每次释放锁时计数器减 1。当计数器为 0 时,锁处于未锁定状态。

在 Python 中,使用 threading.RLock 类来创建递归锁对象。以下是一个示例:

import threading

# 创建一个递归锁
rlock = threading.RLock()


def recursive_function(n):
    rlock.acquire()
    try:
        if n > 0:
            print(f"Recursive call {n}")
            recursive_function(n - 1)
    finally:
        rlock.release()


t = threading.Thread(target=recursive_function, args=(5,))
t.start()
t.join()

在这个示例中,recursive_function 是一个递归函数,它在每次调用时都会获取递归锁。如果使用普通锁,递归调用时第二次获取锁会导致死锁。而递归锁允许同一个线程多次获取锁,确保递归函数能够正常执行。

条件变量(Condition)

条件变量是一种更高级的同步工具,它通常与锁一起使用,用于线程之间的复杂同步。条件变量允许线程在满足特定条件时等待,在条件满足时被唤醒。

例如,假设有一个生产者 - 消费者模型,生产者线程生成数据并放入队列中,消费者线程从队列中取出数据。当队列已满时,生产者线程需要等待;当队列已空时,消费者线程需要等待。这时候就可以使用条件变量。

import threading
import queue


class ProducerConsumer:
    def __init__(self):
        self.queue = queue.Queue(maxsize = 5)
        self.condition = threading.Condition()

    def producer(self):
        for i in range(10):
            self.condition.acquire()
            try:
                while self.queue.full():
                    print("Queue is full, producer waiting...")
                    self.condition.wait()
                self.queue.put(i)
                print(f"Produced {i}")
                self.condition.notify()
            finally:
                self.condition.release()

    def consumer(self):
        for _ in range(10):
            self.condition.acquire()
            try:
                while self.queue.empty():
                    print("Queue is empty, consumer waiting...")
                    self.condition.wait()
                item = self.queue.get()
                print(f"Consumed {item}")
                self.condition.notify()
            finally:
                self.condition.release()


pc = ProducerConsumer()

producer_thread = threading.Thread(target = pc.producer)
consumer_thread = threading.Thread(target = pc.consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

在这个示例中,condition.wait() 方法使线程进入等待状态,并释放与之关联的锁。当其他线程调用 condition.notify()condition.notify_all() 方法时,等待的线程会被唤醒,并重新获取锁。

性能优化

在使用信号量和锁时,性能是一个需要考虑的重要因素。过多的锁竞争会导致线程上下文切换频繁,降低程序的执行效率。

  1. 减少锁的粒度:尽量缩小锁保护的代码块范围,只在对共享资源进行关键操作时获取锁,操作完成后尽快释放锁。这样可以减少锁的持有时间,降低锁竞争的可能性。
  2. 使用读写锁:如果共享资源的读取操作远多于写入操作,可以使用读写锁。读写锁允许多个线程同时进行读取操作,但只允许一个线程进行写入操作。在 Python 中,可以使用 threading.RLock 结合自定义逻辑来实现简单的读写锁功能。

总结

信号量和锁是 Python 多线程编程中重要的同步原语。锁用于确保同一时间只有一个线程能够访问共享资源,而信号量可以控制同时访问共享资源的线程数量。了解它们的原理、使用方法以及在实际项目中的应用,对于编写高效、稳定的多线程程序至关重要。同时,要注意死锁问题的预防和性能优化,合理使用这些同步工具,以充分发挥多线程编程的优势。