MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python threading模块详解

2023-09-103.6k 阅读

1. Python 多线程基础

在 Python 中,threading 模块是实现多线程编程的重要工具。多线程编程允许程序在同一时间执行多个任务,这在处理 I/O 密集型任务(如网络请求、文件读写等)时能显著提高程序的效率。

1.1 线程与进程的区别

在深入了解 threading 模块之前,有必要先明确线程与进程的区别。

  • 进程:是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间,不同进程之间的通信相对复杂,需要借助特定的进程间通信(IPC)机制,如管道、消息队列、共享内存等。
  • 线程:是进程中的一个执行单元,是程序执行的最小单位。一个进程可以包含多个线程,这些线程共享进程的内存空间,因此线程间的通信相对简单,但同时也带来了数据同步的问题。

1.2 创建线程的基本方式

threading 模块中,创建线程主要有两种方式:一种是通过继承 threading.Thread 类,另一种是直接使用 threading.Thread 类并传入目标函数。

方式一:继承 threading.Thread

import threading


class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print(f"线程 {self.name} 正在运行")


if __name__ == "__main__":
    thread = MyThread("示例线程")
    thread.start()

在上述代码中,我们定义了一个 MyThread 类,它继承自 threading.Thread 类。通过重写 run 方法,我们定义了线程启动后要执行的代码。在 if __name__ == "__main__" 块中,我们创建了 MyThread 类的实例,并调用 start 方法启动线程。

方式二:使用 threading.Thread 类并传入目标函数

import threading


def target_function():
    print("线程正在运行")


if __name__ == "__main__":
    thread = threading.Thread(target=target_function)
    thread.start()

这里我们直接创建了 threading.Thread 类的实例,并将目标函数 target_function 作为参数传入。同样调用 start 方法启动线程。

2. 线程同步

由于多个线程共享进程的内存空间,当多个线程同时访问和修改共享数据时,可能会导致数据不一致的问题。为了解决这类问题,threading 模块提供了多种线程同步机制。

2.1 锁(Lock)

锁是最基本的线程同步工具。它只有两种状态:锁定(locked)和未锁定(unlocked)。当一个线程获取到锁(将锁的状态设置为锁定)时,其他线程就无法再获取该锁,直到该线程释放锁(将锁的状态设置为未锁定)。

import threading

lock = threading.Lock()
shared_variable = 0


def increment():
    global shared_variable
    lock.acquire()
    try:
        shared_variable = shared_variable + 1
    finally:
        lock.release()


threads = []
for _ in range(10):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"共享变量的值: {shared_variable}")

在上述代码中,我们定义了一个锁 lock 和一个共享变量 shared_variable。在 increment 函数中,我们先调用 lock.acquire() 获取锁,确保在修改 shared_variable 时不会有其他线程同时修改。使用 try - finally 块来保证无论在修改过程中是否发生异常,锁都会被释放。

2.2 信号量(Semaphore)

信号量是一个计数器,它允许一定数量的线程同时访问共享资源。

import threading

semaphore = threading.Semaphore(3)


def limited_access():
    semaphore.acquire()
    try:
        print(f"{threading.current_thread().name} 获得信号量,正在访问受限资源")
    finally:
        semaphore.release()
        print(f"{threading.current_thread().name} 释放信号量")


threads = []
for i in range(5):
    thread = threading.Thread(target=limited_access, name=f"线程 {i}")
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

这里我们创建了一个信号量 semaphore,其初始值为 3,表示最多允许 3 个线程同时访问受限资源。每个线程在访问资源前调用 semaphore.acquire() 获取信号量,访问结束后调用 semaphore.release() 释放信号量。

2.3 事件(Event)

事件是一种简单的线程同步机制,它允许一个线程通知其他线程某件事情已经发生。

import threading
import time


event = threading.Event()


def waiting_thread():
    print("等待线程开始等待事件")
    event.wait()
    print("等待线程接收到事件,继续执行")


def signaling_thread():
    time.sleep(2)
    print("信号线程发送事件")
    event.set()


wait_thread = threading.Thread(target=waiting_thread)
signal_thread = threading.Thread(target=signaling_thread)

wait_thread.start()
signal_thread.start()

wait_thread.join()
signal_thread.join()

在上述代码中,waiting_thread 调用 event.wait() 进入等待状态,直到 signaling_thread 调用 event.set() 发送事件,waiting_thread 才会继续执行。

2.4 条件变量(Condition)

条件变量用于复杂的线程同步场景,它结合了锁和事件的功能。线程可以在条件变量上等待特定条件满足,其他线程可以通知条件变量条件已满足。

import threading


condition = threading.Condition()
queue = []


def producer():
    with condition:
        for i in range(5):
            queue.append(i)
            print(f"生产者添加元素 {i}")
            condition.notify()
            time.sleep(1)


def consumer():
    with condition:
        while True:
            if not queue:
                condition.wait()
            item = queue.pop(0)
            print(f"消费者取出元素 {item}")
            time.sleep(1)


producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

在这段代码中,生产者线程向队列中添加元素,并通过 condition.notify() 通知消费者线程。消费者线程在队列空时调用 condition.wait() 等待,直到收到生产者线程的通知。

3. 线程池

在实际应用中,频繁地创建和销毁线程会带来一定的性能开销。线程池可以有效地管理一组线程,复用线程资源,减少线程创建和销毁的开销。

3.1 使用 concurrent.futures 模块实现线程池

虽然 threading 模块本身没有直接提供线程池的实现,但 concurrent.futures 模块提供了 ThreadPoolExecutor 类来方便地创建和管理线程池。

import concurrent.futures


def task_function(x):
    return x * x


if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(task_function, range(5)))
        print(results)

在上述代码中,我们使用 ThreadPoolExecutor 创建了一个最大工作线程数为 3 的线程池。通过 executor.map 方法,我们将 task_function 应用到 range(5) 的每个元素上,并收集结果。

3.2 自定义线程池

我们也可以基于 threading 模块自定义一个简单的线程池。

import threading
import queue


class ThreadPool:
    def __init__(self, num_threads):
        self.task_queue = queue.Queue()
        self.threads = []
        for _ in range(num_threads):
            thread = threading.Thread(target=self.worker)
            thread.daemon = True
            thread.start()
            self.threads.append(thread)

    def worker(self):
        while True:
            try:
                task = self.task_queue.get()
                task()
            except queue.Empty:
                break
            finally:
                self.task_queue.task_done()

    def submit(self, task):
        self.task_queue.put(task)

    def wait_completion(self):
        self.task_queue.join()


def example_task():
    print(f"{threading.current_thread().name} 执行任务")


if __name__ == "__main__":
    pool = ThreadPool(3)
    for _ in range(5):
        pool.submit(example_task)
    pool.wait_completion()

在这个自定义线程池中,ThreadPool 类包含一个任务队列 task_queue 和一组线程。worker 方法是每个线程执行的函数,它从任务队列中取出任务并执行。submit 方法用于向任务队列中添加任务,wait_completion 方法用于等待所有任务完成。

4. 线程的生命周期

线程从创建到结束,经历了几个不同的阶段,了解这些阶段有助于更好地管理线程。

4.1 新建(New)

当我们创建一个 threading.Thread 实例时,线程处于新建状态。此时线程还没有开始执行,只是一个对象。

import threading


def target():
    print("线程执行")


thread = threading.Thread(target=target)

这里创建了 thread 线程对象,它处于新建状态。

4.2 就绪(Runnable)

当我们调用线程的 start 方法后,线程进入就绪状态。此时线程已经准备好执行,但还没有获得 CPU 时间片。

thread.start()

调用 start 方法后,thread 线程进入就绪状态,等待 CPU 调度执行。

4.3 运行(Running)

当 CPU 调度到该线程时,线程进入运行状态,开始执行 run 方法中的代码。

4.4 阻塞(Blocked)

在运行过程中,线程可能会因为某些原因进入阻塞状态,例如等待 I/O 操作完成、获取锁失败、等待事件等。

import threading
import time


lock = threading.Lock()


def blocked_thread():
    lock.acquire()
    try:
        print("线程获取锁,开始执行")
        time.sleep(2)
    finally:
        lock.release()


thread = threading.Thread(target=blocked_thread)
thread.start()

这里 blocked_thread 线程在获取锁之前处于就绪状态,获取锁后进入运行状态,调用 time.sleep 时进入阻塞状态,等待 2 秒后继续执行并释放锁。

4.5 死亡(Dead)

当线程的 run 方法执行完毕,或者因为异常而提前终止时,线程进入死亡状态。此时线程的生命周期结束,不能再重新启动。

import threading


def end_thread():
    print("线程执行结束")


thread = threading.Thread(target=end_thread)
thread.start()
thread.join()
print("线程已死亡")

在上述代码中,thread 线程执行完 end_thread 函数后进入死亡状态,join 方法等待线程死亡后,打印出“线程已死亡”。

5. 线程安全与全局解释器锁(GIL)

在 Python 中,虽然 threading 模块提供了多线程编程的能力,但由于全局解释器锁(GIL)的存在,在同一时刻只有一个线程能真正执行 Python 字节码。

5.1 GIL 的原理

GIL 是 CPython 解释器中的一个机制,它确保在任何时刻,只有一个线程能够执行 Python 字节码。这是因为 CPython 的内存管理不是线程安全的,为了避免多个线程同时操作内存导致内存错误,引入了 GIL。

5.2 GIL 对多线程的影响

  • I/O 密集型任务:对于 I/O 密集型任务,如网络请求、文件读写等,由于线程在等待 I/O 操作完成时会释放 GIL,其他线程可以趁机获取 GIL 并执行,因此多线程在 I/O 密集型任务中能提高程序的效率。
  • CPU 密集型任务:对于 CPU 密集型任务,如大量的数学计算,由于线程在执行计算时不会释放 GIL,多线程并不能充分利用多核 CPU 的优势,反而会因为线程切换带来额外的开销,导致性能下降。
import threading
import time


def cpu_bound_task():
    start = time.time()
    result = 0
    for i in range(100000000):
        result += i
    end = time.time()
    print(f"CPU 密集型任务执行时间: {end - start} 秒")


def io_bound_task():
    start = time.time()
    with open('test.txt', 'w') as f:
        for i in range(1000000):
            f.write(str(i) + '\n')
    end = time.time()
    print(f"I/O 密集型任务执行时间: {end - start} 秒")


cpu_thread = threading.Thread(target=cpu_bound_task)
io_thread = threading.Thread(target=io_bound_task)

cpu_thread.start()
io_thread.start()

cpu_thread.join()
io_thread.join()

在上述代码中,cpu_bound_task 是一个 CPU 密集型任务,io_bound_task 是一个 I/O 密集型任务。通过多线程运行这两个任务,可以观察到 I/O 密集型任务在多线程下能提高效率,而 CPU 密集型任务效率提升不明显甚至可能下降。

5.3 绕过 GIL 的方法

  • 使用多进程multiprocessing 模块提供了多进程编程的能力,每个进程都有自己独立的 Python 解释器和 GIL,因此可以充分利用多核 CPU 的优势,适用于 CPU 密集型任务。
  • 使用 C 扩展:对于性能要求极高的部分代码,可以使用 C 语言编写并作为 Python 的扩展模块,在 C 代码中可以直接控制线程,避免 GIL 的限制。

6. 线程的高级应用

除了上述基本的线程使用方法,threading 模块还支持一些高级应用场景。

6.1 守护线程(Daemon Thread)

守护线程是一种特殊的线程,当主线程结束时,所有守护线程会自动结束,不管它们是否完成任务。守护线程通常用于执行一些后台任务,如日志记录、数据清理等。

import threading
import time


def daemon_task():
    while True:
        print("守护线程正在运行")
        time.sleep(1)


daemon_thread = threading.Thread(target=daemon_task)
daemon_thread.daemon = True
daemon_thread.start()

time.sleep(3)
print("主线程结束")

在上述代码中,我们将 daemon_thread 设置为守护线程(daemon_thread.daemon = True)。主线程运行 3 秒后结束,守护线程也会随之结束。

6.2 线程本地存储(Thread - Local Storage)

线程本地存储允许每个线程拥有自己独立的变量副本,避免了多线程访问共享变量时的数据冲突问题。在 threading 模块中,可以通过 threading.local 类来实现线程本地存储。

import threading


local_data = threading.local()


def thread_function():
    local_data.value = threading.current_thread().name
    print(f"线程 {local_data.value} 设置了本地数据")
    print(f"线程 {local_data.value} 访问本地数据: {local_data.value}")


threads = []
for _ in range(3):
    thread = threading.Thread(target=thread_function)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

在上述代码中,local_data 是一个 threading.local 的实例。每个线程都可以独立地设置和访问 local_data.value,互不干扰。

通过对 threading 模块的详细介绍,从基础的线程创建到高级的线程同步、线程池、线程生命周期以及 GIL 等内容,希望能帮助你全面掌握 Python 多线程编程,在实际项目中灵活运用多线程技术,提高程序的性能和效率。在实际应用中,需要根据任务的特点(如 I/O 密集型还是 CPU 密集型)合理选择使用多线程或其他并发编程方式,以达到最佳的效果。同时,要特别注意线程同步问题,避免出现数据竞争和死锁等错误。