Python threading模块详解
1. Python 多线程基础
在 Python 中,threading
模块是实现多线程编程的重要工具。多线程编程允许程序在同一时间执行多个任务,这在处理 I/O 密集型任务(如网络请求、文件读写等)时能显著提高程序的效率。
1.1 线程与进程的区别
在深入了解 threading
模块之前,有必要先明确线程与进程的区别。
- 进程:是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间,不同进程之间的通信相对复杂,需要借助特定的进程间通信(IPC)机制,如管道、消息队列、共享内存等。
- 线程:是进程中的一个执行单元,是程序执行的最小单位。一个进程可以包含多个线程,这些线程共享进程的内存空间,因此线程间的通信相对简单,但同时也带来了数据同步的问题。
1.2 创建线程的基本方式
在 threading
模块中,创建线程主要有两种方式:一种是通过继承 threading.Thread
类,另一种是直接使用 threading.Thread
类并传入目标函数。
方式一:继承 threading.Thread
类
import threading
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print(f"线程 {self.name} 正在运行")
if __name__ == "__main__":
thread = MyThread("示例线程")
thread.start()
在上述代码中,我们定义了一个 MyThread
类,它继承自 threading.Thread
类。通过重写 run
方法,我们定义了线程启动后要执行的代码。在 if __name__ == "__main__"
块中,我们创建了 MyThread
类的实例,并调用 start
方法启动线程。
方式二:使用 threading.Thread
类并传入目标函数
import threading
def target_function():
print("线程正在运行")
if __name__ == "__main__":
thread = threading.Thread(target=target_function)
thread.start()
这里我们直接创建了 threading.Thread
类的实例,并将目标函数 target_function
作为参数传入。同样调用 start
方法启动线程。
2. 线程同步
由于多个线程共享进程的内存空间,当多个线程同时访问和修改共享数据时,可能会导致数据不一致的问题。为了解决这类问题,threading
模块提供了多种线程同步机制。
2.1 锁(Lock)
锁是最基本的线程同步工具。它只有两种状态:锁定(locked)和未锁定(unlocked)。当一个线程获取到锁(将锁的状态设置为锁定)时,其他线程就无法再获取该锁,直到该线程释放锁(将锁的状态设置为未锁定)。
import threading
lock = threading.Lock()
shared_variable = 0
def increment():
global shared_variable
lock.acquire()
try:
shared_variable = shared_variable + 1
finally:
lock.release()
threads = []
for _ in range(10):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"共享变量的值: {shared_variable}")
在上述代码中,我们定义了一个锁 lock
和一个共享变量 shared_variable
。在 increment
函数中,我们先调用 lock.acquire()
获取锁,确保在修改 shared_variable
时不会有其他线程同时修改。使用 try - finally
块来保证无论在修改过程中是否发生异常,锁都会被释放。
2.2 信号量(Semaphore)
信号量是一个计数器,它允许一定数量的线程同时访问共享资源。
import threading
semaphore = threading.Semaphore(3)
def limited_access():
semaphore.acquire()
try:
print(f"{threading.current_thread().name} 获得信号量,正在访问受限资源")
finally:
semaphore.release()
print(f"{threading.current_thread().name} 释放信号量")
threads = []
for i in range(5):
thread = threading.Thread(target=limited_access, name=f"线程 {i}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
这里我们创建了一个信号量 semaphore
,其初始值为 3,表示最多允许 3 个线程同时访问受限资源。每个线程在访问资源前调用 semaphore.acquire()
获取信号量,访问结束后调用 semaphore.release()
释放信号量。
2.3 事件(Event)
事件是一种简单的线程同步机制,它允许一个线程通知其他线程某件事情已经发生。
import threading
import time
event = threading.Event()
def waiting_thread():
print("等待线程开始等待事件")
event.wait()
print("等待线程接收到事件,继续执行")
def signaling_thread():
time.sleep(2)
print("信号线程发送事件")
event.set()
wait_thread = threading.Thread(target=waiting_thread)
signal_thread = threading.Thread(target=signaling_thread)
wait_thread.start()
signal_thread.start()
wait_thread.join()
signal_thread.join()
在上述代码中,waiting_thread
调用 event.wait()
进入等待状态,直到 signaling_thread
调用 event.set()
发送事件,waiting_thread
才会继续执行。
2.4 条件变量(Condition)
条件变量用于复杂的线程同步场景,它结合了锁和事件的功能。线程可以在条件变量上等待特定条件满足,其他线程可以通知条件变量条件已满足。
import threading
condition = threading.Condition()
queue = []
def producer():
with condition:
for i in range(5):
queue.append(i)
print(f"生产者添加元素 {i}")
condition.notify()
time.sleep(1)
def consumer():
with condition:
while True:
if not queue:
condition.wait()
item = queue.pop(0)
print(f"消费者取出元素 {item}")
time.sleep(1)
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
在这段代码中,生产者线程向队列中添加元素,并通过 condition.notify()
通知消费者线程。消费者线程在队列空时调用 condition.wait()
等待,直到收到生产者线程的通知。
3. 线程池
在实际应用中,频繁地创建和销毁线程会带来一定的性能开销。线程池可以有效地管理一组线程,复用线程资源,减少线程创建和销毁的开销。
3.1 使用 concurrent.futures
模块实现线程池
虽然 threading
模块本身没有直接提供线程池的实现,但 concurrent.futures
模块提供了 ThreadPoolExecutor
类来方便地创建和管理线程池。
import concurrent.futures
def task_function(x):
return x * x
if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task_function, range(5)))
print(results)
在上述代码中,我们使用 ThreadPoolExecutor
创建了一个最大工作线程数为 3 的线程池。通过 executor.map
方法,我们将 task_function
应用到 range(5)
的每个元素上,并收集结果。
3.2 自定义线程池
我们也可以基于 threading
模块自定义一个简单的线程池。
import threading
import queue
class ThreadPool:
def __init__(self, num_threads):
self.task_queue = queue.Queue()
self.threads = []
for _ in range(num_threads):
thread = threading.Thread(target=self.worker)
thread.daemon = True
thread.start()
self.threads.append(thread)
def worker(self):
while True:
try:
task = self.task_queue.get()
task()
except queue.Empty:
break
finally:
self.task_queue.task_done()
def submit(self, task):
self.task_queue.put(task)
def wait_completion(self):
self.task_queue.join()
def example_task():
print(f"{threading.current_thread().name} 执行任务")
if __name__ == "__main__":
pool = ThreadPool(3)
for _ in range(5):
pool.submit(example_task)
pool.wait_completion()
在这个自定义线程池中,ThreadPool
类包含一个任务队列 task_queue
和一组线程。worker
方法是每个线程执行的函数,它从任务队列中取出任务并执行。submit
方法用于向任务队列中添加任务,wait_completion
方法用于等待所有任务完成。
4. 线程的生命周期
线程从创建到结束,经历了几个不同的阶段,了解这些阶段有助于更好地管理线程。
4.1 新建(New)
当我们创建一个 threading.Thread
实例时,线程处于新建状态。此时线程还没有开始执行,只是一个对象。
import threading
def target():
print("线程执行")
thread = threading.Thread(target=target)
这里创建了 thread
线程对象,它处于新建状态。
4.2 就绪(Runnable)
当我们调用线程的 start
方法后,线程进入就绪状态。此时线程已经准备好执行,但还没有获得 CPU 时间片。
thread.start()
调用 start
方法后,thread
线程进入就绪状态,等待 CPU 调度执行。
4.3 运行(Running)
当 CPU 调度到该线程时,线程进入运行状态,开始执行 run
方法中的代码。
4.4 阻塞(Blocked)
在运行过程中,线程可能会因为某些原因进入阻塞状态,例如等待 I/O 操作完成、获取锁失败、等待事件等。
import threading
import time
lock = threading.Lock()
def blocked_thread():
lock.acquire()
try:
print("线程获取锁,开始执行")
time.sleep(2)
finally:
lock.release()
thread = threading.Thread(target=blocked_thread)
thread.start()
这里 blocked_thread
线程在获取锁之前处于就绪状态,获取锁后进入运行状态,调用 time.sleep
时进入阻塞状态,等待 2 秒后继续执行并释放锁。
4.5 死亡(Dead)
当线程的 run
方法执行完毕,或者因为异常而提前终止时,线程进入死亡状态。此时线程的生命周期结束,不能再重新启动。
import threading
def end_thread():
print("线程执行结束")
thread = threading.Thread(target=end_thread)
thread.start()
thread.join()
print("线程已死亡")
在上述代码中,thread
线程执行完 end_thread
函数后进入死亡状态,join
方法等待线程死亡后,打印出“线程已死亡”。
5. 线程安全与全局解释器锁(GIL)
在 Python 中,虽然 threading
模块提供了多线程编程的能力,但由于全局解释器锁(GIL)的存在,在同一时刻只有一个线程能真正执行 Python 字节码。
5.1 GIL 的原理
GIL 是 CPython 解释器中的一个机制,它确保在任何时刻,只有一个线程能够执行 Python 字节码。这是因为 CPython 的内存管理不是线程安全的,为了避免多个线程同时操作内存导致内存错误,引入了 GIL。
5.2 GIL 对多线程的影响
- I/O 密集型任务:对于 I/O 密集型任务,如网络请求、文件读写等,由于线程在等待 I/O 操作完成时会释放 GIL,其他线程可以趁机获取 GIL 并执行,因此多线程在 I/O 密集型任务中能提高程序的效率。
- CPU 密集型任务:对于 CPU 密集型任务,如大量的数学计算,由于线程在执行计算时不会释放 GIL,多线程并不能充分利用多核 CPU 的优势,反而会因为线程切换带来额外的开销,导致性能下降。
import threading
import time
def cpu_bound_task():
start = time.time()
result = 0
for i in range(100000000):
result += i
end = time.time()
print(f"CPU 密集型任务执行时间: {end - start} 秒")
def io_bound_task():
start = time.time()
with open('test.txt', 'w') as f:
for i in range(1000000):
f.write(str(i) + '\n')
end = time.time()
print(f"I/O 密集型任务执行时间: {end - start} 秒")
cpu_thread = threading.Thread(target=cpu_bound_task)
io_thread = threading.Thread(target=io_bound_task)
cpu_thread.start()
io_thread.start()
cpu_thread.join()
io_thread.join()
在上述代码中,cpu_bound_task
是一个 CPU 密集型任务,io_bound_task
是一个 I/O 密集型任务。通过多线程运行这两个任务,可以观察到 I/O 密集型任务在多线程下能提高效率,而 CPU 密集型任务效率提升不明显甚至可能下降。
5.3 绕过 GIL 的方法
- 使用多进程:
multiprocessing
模块提供了多进程编程的能力,每个进程都有自己独立的 Python 解释器和 GIL,因此可以充分利用多核 CPU 的优势,适用于 CPU 密集型任务。 - 使用 C 扩展:对于性能要求极高的部分代码,可以使用 C 语言编写并作为 Python 的扩展模块,在 C 代码中可以直接控制线程,避免 GIL 的限制。
6. 线程的高级应用
除了上述基本的线程使用方法,threading
模块还支持一些高级应用场景。
6.1 守护线程(Daemon Thread)
守护线程是一种特殊的线程,当主线程结束时,所有守护线程会自动结束,不管它们是否完成任务。守护线程通常用于执行一些后台任务,如日志记录、数据清理等。
import threading
import time
def daemon_task():
while True:
print("守护线程正在运行")
time.sleep(1)
daemon_thread = threading.Thread(target=daemon_task)
daemon_thread.daemon = True
daemon_thread.start()
time.sleep(3)
print("主线程结束")
在上述代码中,我们将 daemon_thread
设置为守护线程(daemon_thread.daemon = True
)。主线程运行 3 秒后结束,守护线程也会随之结束。
6.2 线程本地存储(Thread - Local Storage)
线程本地存储允许每个线程拥有自己独立的变量副本,避免了多线程访问共享变量时的数据冲突问题。在 threading
模块中,可以通过 threading.local
类来实现线程本地存储。
import threading
local_data = threading.local()
def thread_function():
local_data.value = threading.current_thread().name
print(f"线程 {local_data.value} 设置了本地数据")
print(f"线程 {local_data.value} 访问本地数据: {local_data.value}")
threads = []
for _ in range(3):
thread = threading.Thread(target=thread_function)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
在上述代码中,local_data
是一个 threading.local
的实例。每个线程都可以独立地设置和访问 local_data.value
,互不干扰。
通过对 threading
模块的详细介绍,从基础的线程创建到高级的线程同步、线程池、线程生命周期以及 GIL 等内容,希望能帮助你全面掌握 Python 多线程编程,在实际项目中灵活运用多线程技术,提高程序的性能和效率。在实际应用中,需要根据任务的特点(如 I/O 密集型还是 CPU 密集型)合理选择使用多线程或其他并发编程方式,以达到最佳的效果。同时,要特别注意线程同步问题,避免出现数据竞争和死锁等错误。