Python concurrent.futures模块实现线程池与进程池
Python concurrent.futures 模块概述
在Python的并发编程领域,concurrent.futures
模块是一个强大的工具,它提供了高层次的异步执行调用接口。这个模块抽象了启动异步任务的细节,使得开发者能够轻松地使用线程池或者进程池来管理并发任务,从而有效地利用多核CPU的优势以及提高I/O密集型任务的执行效率。concurrent.futures
模块在Python 3.2中被引入,它包含了两个主要的类:ThreadPoolExecutor
(线程池执行器)和ProcessPoolExecutor
(进程池执行器),以及Future
类来处理异步执行的结果。
为什么需要线程池和进程池
在深入了解concurrent.futures
模块如何实现线程池和进程池之前,我们先来探讨一下为什么需要这些概念。
资源管理
在处理大量并发任务时,如果为每个任务都创建一个新的线程或者进程,会消耗大量的系统资源。线程和进程的创建、销毁都需要一定的时间和内存开销。线程池和进程池可以复用已有的线程或进程,避免了频繁创建和销毁带来的开销,从而提高了资源的利用率。
控制并发度
如果并发任务过多,可能会导致系统资源耗尽,性能下降甚至系统崩溃。通过线程池和进程池,可以限制同时执行的任务数量,也就是控制并发度。这样可以确保系统在稳定的状态下运行,不会因为过多的并发任务而出现问题。
简化编程模型
使用线程池和进程池,开发者不需要手动管理线程或进程的生命周期、同步机制等复杂问题。concurrent.futures
模块提供了简洁统一的接口,使得并发编程变得更加简单和直观。
ThreadPoolExecutor
实现线程池
基本使用
ThreadPoolExecutor
类用于创建一个线程池,它可以在后台异步执行任务。下面是一个简单的示例,展示了如何使用ThreadPoolExecutor
来并行执行多个任务:
import concurrent.futures
import time
def task_function(task_number):
print(f"Task {task_number} started")
time.sleep(2) # 模拟一个耗时任务
print(f"Task {task_number} finished")
return task_number * task_number
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_task = {executor.submit(task_function, i): i for i in range(5)}
for future in concurrent.futures.as_completed(future_to_task):
task_number = future_to_task[future]
try:
result = future.result()
except Exception as e:
print(f"Task {task_number} generated an exception: {e}")
else:
print(f"Task {task_number} result: {result}")
在这个示例中:
- 我们定义了一个
task_function
,它模拟了一个耗时2秒的任务,并返回任务编号的平方。 - 使用
ThreadPoolExecutor
创建了一个线程池,max_workers
参数设置为3,表示线程池中最多同时执行3个任务。 - 通过
executor.submit(task_function, i)
提交任务到线程池,submit
方法返回一个Future
对象。我们将Future
对象和任务编号关联起来,存储在future_to_task
字典中。 - 使用
concurrent.futures.as_completed
函数来迭代已完成的任务。as_completed
会在任务完成时返回一个迭代器,我们可以通过future.result()
获取任务的返回值。如果任务执行过程中抛出异常,result
方法会重新抛出该异常,我们可以在try - except
块中捕获并处理。
ThreadPoolExecutor
的构造函数参数
ThreadPoolExecutor
的构造函数如下:
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
max_workers
:指定线程池中最多可以同时运行的线程数。如果未指定,默认值为机器CPU核心数的5倍。在I/O密集型任务中,较大的max_workers
值可能会提高性能,但如果设置过大,可能会导致系统资源耗尽。thread_name_prefix
:线程名称的前缀。这在调试和日志记录时很有用,通过设置前缀可以方便地区分不同线程池中的线程。
线程池中的任务调度
线程池中的任务调度遵循以下原则:
- 任务提交:当调用
submit
方法提交任务时,如果线程池中有空闲线程,任务会立即分配给空闲线程执行。 - 队列机制:如果线程池中的所有线程都在忙碌,新提交的任务会被放入一个内部队列中等待执行。这个队列的大小没有限制,只要系统内存足够,就可以一直添加任务。
- 线程复用:当一个线程完成任务后,它不会被销毁,而是返回线程池等待新的任务分配。这样就实现了线程的复用,减少了线程创建和销毁的开销。
线程池的关闭
ThreadPoolExecutor
提供了两种关闭线程池的方法:shutdown
和__exit__
(在使用with
语句时自动调用)。
shutdown
方法
shutdown(wait=True, cancel_futures=False)
wait
:如果为True
,调用shutdown
方法后,会等待所有已提交的任务(包括正在执行的和队列中等待的)完成后再关闭线程池。如果为False
,会立即返回,未完成的任务会继续执行,但不再接受新的任务提交。cancel_futures
:如果为True
,会尝试取消队列中等待执行的任务。已在执行的任务不会被取消。
with
语句
使用with
语句可以更方便地管理线程池的生命周期。当with
块结束时,会自动调用shutdown(wait = True)
方法,确保所有任务完成后关闭线程池。例如:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(task_function, 1)
executor.submit(task_function, 2)
在这个例子中,当with
块结束时,线程池会自动关闭,等待所有任务完成。
ProcessPoolExecutor
实现进程池
基本使用
ProcessPoolExecutor
类用于创建一个进程池,与ThreadPoolExecutor
类似,但它使用进程来执行任务,适用于CPU密集型任务。下面是一个使用ProcessPoolExecutor
的示例:
import concurrent.futures
import time
def cpu_intensive_task(task_number):
print(f"Task {task_number} started")
result = 1
for i in range(1, 1000000):
result = result * i
print(f"Task {task_number} finished")
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
future_to_task = {executor.submit(cpu_intensive_task, i): i for i in range(3)}
for future in concurrent.futures.as_completed(future_to_task):
task_number = future_to_task[future]
try:
result = future.result()
except Exception as e:
print(f"Task {task_number} generated an exception: {e}")
else:
print(f"Task {task_number} result: {result}")
在这个示例中:
- 定义了一个
cpu_intensive_task
函数,它是一个CPU密集型任务,通过计算1到1000000的乘积来模拟。 - 使用
ProcessPoolExecutor
创建了一个进程池,max_workers
设置为2,表示最多同时执行2个进程。 - 同样通过
submit
方法提交任务到进程池,并使用as_completed
迭代已完成的任务获取结果。
ProcessPoolExecutor
的构造函数参数
ProcessPoolExecutor
的构造函数如下:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
max_workers
:指定进程池中最多可以同时运行的进程数。如果未指定,默认值为机器CPU核心数。mp_context
:指定使用的multiprocessing
上下文。默认使用None
,即使用默认的上下文。在一些特殊情况下,例如在Windows系统上可能需要指定特定的上下文。initializer
:在每个工作进程启动时调用的可调用对象。可以用来初始化每个进程的状态。initargs
:传递给initializer
的参数元组。
进程池中的任务调度
进程池的任务调度与线程池有一些不同之处:
- 任务提交:与线程池类似,调用
submit
方法提交任务。如果进程池中有空闲进程,任务会立即分配给空闲进程执行。 - 队列机制:同样,当所有进程都在忙碌时,新提交的任务会被放入队列等待执行。但与线程池不同的是,进程池的队列大小默认是有上限的(
multiprocessing.Queue
的默认大小)。如果队列已满,再次提交任务可能会阻塞,直到有任务完成,队列有空间为止。 - 进程复用:进程完成任务后,也会被复用。但进程的创建和销毁开销比线程大得多,因此在设计任务时,要尽量让每个进程执行足够多的工作,以充分利用进程复用的优势。
进程池的关闭
ProcessPoolExecutor
也有shutdown
方法和__exit__
(with
语句自动调用)来关闭进程池。其行为与ThreadPoolExecutor
类似,但由于进程的特性,在关闭进程池时需要注意一些额外的问题。
shutdown
方法
shutdown(wait=True, cancel_futures=False)
wait
:与线程池相同,wait
为True
时等待所有任务完成,为False
时立即返回,未完成任务继续执行。cancel_futures
:同样,为True
时尝试取消队列中等待的任务。但由于进程间通信的复杂性,取消任务可能并不总是成功。
with
语句
使用with
语句管理进程池生命周期时,with
块结束时会自动调用shutdown(wait = True)
。但在进程池中,由于进程间资源的释放和清理可能比较复杂,使用with
语句能更可靠地确保资源的正确清理。
Future
类详解
在concurrent.futures
模块中,Future
类扮演着重要的角色。当我们使用submit
方法提交任务到线程池或进程池时,会返回一个Future
对象。这个对象代表了一个异步执行的任务,可以用来查询任务的状态、获取任务的结果或者取消任务。
Future
对象的方法
cancel()
cancel()
尝试取消任务。如果任务尚未开始执行,任务会被取消,返回True
。如果任务已经在执行或者已经完成,取消操作无效,返回False
。
cancelled()
cancelled()
返回任务是否已被取消。如果cancel
方法成功取消任务,这个方法会返回True
。
running()
running()
返回任务是否正在执行。如果任务正在执行,返回True
;如果任务尚未开始或者已经完成(包括已取消),返回False
。
done()
done()
返回任务是否已完成(包括正常完成、异常结束或已取消)。如果任务已完成,返回True
,否则返回False
。
result(timeout=None)
result(timeout=None)
获取任务的返回值。如果任务尚未完成,这个方法会阻塞,直到任务完成或者超时。timeout
参数指定了等待的最长时间(单位为秒)。如果任务在等待期间完成,会返回任务的返回值;如果超时,会抛出TimeoutError
异常。如果任务执行过程中抛出异常,result
方法会重新抛出该异常。
exception(timeout=None)
exception(timeout=None)
获取任务执行过程中抛出的异常(如果有)。如果任务尚未完成,这个方法会阻塞,直到任务完成或者超时。timeout
参数与result
方法中的含义相同。如果任务正常完成,返回None
;如果任务抛出异常,返回该异常对象。
使用Future
对象的示例
import concurrent.futures
import time
def simple_task():
time.sleep(2)
return 42
executor = concurrent.futures.ThreadPoolExecutor(max_workers = 1)
future = executor.submit(simple_task)
print(f"Is the task running? {future.running()}")
time.sleep(1)
print(f"Is the task running? {future.running()}")
try:
result = future.result(timeout = 3)
print(f"Task result: {result}")
except concurrent.futures.TimeoutError:
print("Task timed out")
except Exception as e:
print(f"Task generated an exception: {e}")
print(f"Is the task done? {future.done()}")
在这个示例中:
- 定义了一个简单的任务
simple_task
,它睡眠2秒后返回42。 - 使用
ThreadPoolExecutor
提交任务并获取Future
对象。 - 通过
running
方法检查任务是否正在执行,在不同时间点调用可以看到任务状态的变化。 - 使用
result
方法获取任务结果,并设置了3秒的超时时间。如果任务在3秒内完成,会打印任务结果;否则会捕获TimeoutError
。 - 最后通过
done
方法检查任务是否已完成。
线程池与进程池的选择
在实际应用中,选择使用线程池还是进程池取决于任务的性质。
CPU密集型任务
对于CPU密集型任务,由于Python的全局解释器锁(GIL)的存在,多线程并不能充分利用多核CPU的优势。在这种情况下,应该选择进程池。每个进程都有自己独立的Python解释器和内存空间,不受GIL的限制,能够真正地并行执行CPU密集型任务。例如,上述的计算1到1000000乘积的任务就是典型的CPU密集型任务,使用进程池能显著提高执行效率。
I/O密集型任务
对于I/O密集型任务,如网络请求、文件读写等,线程池是更好的选择。因为在I/O操作时,线程会释放GIL,其他线程可以继续执行。而且线程的创建和切换开销比进程小得多,更适合处理大量的I/O任务。例如,同时发起多个网络请求获取数据的场景,使用线程池可以高效地管理这些请求。
混合任务
如果任务中既有CPU密集型部分,又有I/O密集型部分,可以考虑将任务拆分成不同的子任务,分别使用线程池和进程池来执行。或者根据任务中CPU和I/O操作的比例,在两者之间进行权衡。
最佳实践与注意事项
- 合理设置并发度:根据任务的性质和系统资源,合理设置
max_workers
参数。对于I/O密集型任务,可以适当增加并发度;对于CPU密集型任务,并发度不宜过高,一般设置为CPU核心数或略小于CPU核心数。 - 异常处理:在获取任务结果时,要使用
try - except
块捕获可能出现的异常,如任务执行过程中的异常、超时异常等。这样可以确保程序的稳定性,避免因为某个任务的异常导致整个程序崩溃。 - 资源清理:无论是线程池还是进程池,都要注意资源的清理。使用
with
语句可以自动管理资源的关闭和清理,避免手动调用shutdown
方法可能出现的遗漏。 - 数据共享与同步:在多线程或多进程编程中,要注意数据共享和同步问题。线程之间共享数据可能会导致竞争条件,需要使用锁、信号量等同步机制。而进程之间数据是隔离的,如果需要共享数据,需要使用
multiprocessing
模块提供的共享内存、队列等机制。
通过深入理解concurrent.futures
模块以及线程池和进程池的使用,开发者可以更加高效地编写并发程序,充分利用系统资源,提高程序的性能。无论是处理CPU密集型还是I/O密集型任务,都能通过合理的选择和配置,实现最优的执行效果。