MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python concurrent.futures模块实现线程池与进程池

2024-07-078.0k 阅读

Python concurrent.futures 模块概述

在Python的并发编程领域,concurrent.futures模块是一个强大的工具,它提供了高层次的异步执行调用接口。这个模块抽象了启动异步任务的细节,使得开发者能够轻松地使用线程池或者进程池来管理并发任务,从而有效地利用多核CPU的优势以及提高I/O密集型任务的执行效率。concurrent.futures模块在Python 3.2中被引入,它包含了两个主要的类:ThreadPoolExecutor(线程池执行器)和ProcessPoolExecutor(进程池执行器),以及Future类来处理异步执行的结果。

为什么需要线程池和进程池

在深入了解concurrent.futures模块如何实现线程池和进程池之前,我们先来探讨一下为什么需要这些概念。

资源管理

在处理大量并发任务时,如果为每个任务都创建一个新的线程或者进程,会消耗大量的系统资源。线程和进程的创建、销毁都需要一定的时间和内存开销。线程池和进程池可以复用已有的线程或进程,避免了频繁创建和销毁带来的开销,从而提高了资源的利用率。

控制并发度

如果并发任务过多,可能会导致系统资源耗尽,性能下降甚至系统崩溃。通过线程池和进程池,可以限制同时执行的任务数量,也就是控制并发度。这样可以确保系统在稳定的状态下运行,不会因为过多的并发任务而出现问题。

简化编程模型

使用线程池和进程池,开发者不需要手动管理线程或进程的生命周期、同步机制等复杂问题。concurrent.futures模块提供了简洁统一的接口,使得并发编程变得更加简单和直观。

ThreadPoolExecutor实现线程池

基本使用

ThreadPoolExecutor类用于创建一个线程池,它可以在后台异步执行任务。下面是一个简单的示例,展示了如何使用ThreadPoolExecutor来并行执行多个任务:

import concurrent.futures
import time


def task_function(task_number):
    print(f"Task {task_number} started")
    time.sleep(2)  # 模拟一个耗时任务
    print(f"Task {task_number} finished")
    return task_number * task_number


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_task = {executor.submit(task_function, i): i for i in range(5)}
    for future in concurrent.futures.as_completed(future_to_task):
        task_number = future_to_task[future]
        try:
            result = future.result()
        except Exception as e:
            print(f"Task {task_number} generated an exception: {e}")
        else:
            print(f"Task {task_number} result: {result}")

在这个示例中:

  1. 我们定义了一个task_function,它模拟了一个耗时2秒的任务,并返回任务编号的平方。
  2. 使用ThreadPoolExecutor创建了一个线程池,max_workers参数设置为3,表示线程池中最多同时执行3个任务。
  3. 通过executor.submit(task_function, i)提交任务到线程池,submit方法返回一个Future对象。我们将Future对象和任务编号关联起来,存储在future_to_task字典中。
  4. 使用concurrent.futures.as_completed函数来迭代已完成的任务。as_completed会在任务完成时返回一个迭代器,我们可以通过future.result()获取任务的返回值。如果任务执行过程中抛出异常,result方法会重新抛出该异常,我们可以在try - except块中捕获并处理。

ThreadPoolExecutor的构造函数参数

ThreadPoolExecutor的构造函数如下:

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
  • max_workers:指定线程池中最多可以同时运行的线程数。如果未指定,默认值为机器CPU核心数的5倍。在I/O密集型任务中,较大的max_workers值可能会提高性能,但如果设置过大,可能会导致系统资源耗尽。
  • thread_name_prefix:线程名称的前缀。这在调试和日志记录时很有用,通过设置前缀可以方便地区分不同线程池中的线程。

线程池中的任务调度

线程池中的任务调度遵循以下原则:

  1. 任务提交:当调用submit方法提交任务时,如果线程池中有空闲线程,任务会立即分配给空闲线程执行。
  2. 队列机制:如果线程池中的所有线程都在忙碌,新提交的任务会被放入一个内部队列中等待执行。这个队列的大小没有限制,只要系统内存足够,就可以一直添加任务。
  3. 线程复用:当一个线程完成任务后,它不会被销毁,而是返回线程池等待新的任务分配。这样就实现了线程的复用,减少了线程创建和销毁的开销。

线程池的关闭

ThreadPoolExecutor提供了两种关闭线程池的方法:shutdown__exit__(在使用with语句时自动调用)。

shutdown方法

shutdown(wait=True, cancel_futures=False)
  • wait:如果为True,调用shutdown方法后,会等待所有已提交的任务(包括正在执行的和队列中等待的)完成后再关闭线程池。如果为False,会立即返回,未完成的任务会继续执行,但不再接受新的任务提交。
  • cancel_futures:如果为True,会尝试取消队列中等待执行的任务。已在执行的任务不会被取消。

with语句

使用with语句可以更方便地管理线程池的生命周期。当with块结束时,会自动调用shutdown(wait = True)方法,确保所有任务完成后关闭线程池。例如:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(task_function, 1)
    executor.submit(task_function, 2)

在这个例子中,当with块结束时,线程池会自动关闭,等待所有任务完成。

ProcessPoolExecutor实现进程池

基本使用

ProcessPoolExecutor类用于创建一个进程池,与ThreadPoolExecutor类似,但它使用进程来执行任务,适用于CPU密集型任务。下面是一个使用ProcessPoolExecutor的示例:

import concurrent.futures
import time


def cpu_intensive_task(task_number):
    print(f"Task {task_number} started")
    result = 1
    for i in range(1, 1000000):
        result = result * i
    print(f"Task {task_number} finished")
    return result


with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
    future_to_task = {executor.submit(cpu_intensive_task, i): i for i in range(3)}
    for future in concurrent.futures.as_completed(future_to_task):
        task_number = future_to_task[future]
        try:
            result = future.result()
        except Exception as e:
            print(f"Task {task_number} generated an exception: {e}")
        else:
            print(f"Task {task_number} result: {result}")

在这个示例中:

  1. 定义了一个cpu_intensive_task函数,它是一个CPU密集型任务,通过计算1到1000000的乘积来模拟。
  2. 使用ProcessPoolExecutor创建了一个进程池,max_workers设置为2,表示最多同时执行2个进程。
  3. 同样通过submit方法提交任务到进程池,并使用as_completed迭代已完成的任务获取结果。

ProcessPoolExecutor的构造函数参数

ProcessPoolExecutor的构造函数如下:

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
  • max_workers:指定进程池中最多可以同时运行的进程数。如果未指定,默认值为机器CPU核心数。
  • mp_context:指定使用的multiprocessing上下文。默认使用None,即使用默认的上下文。在一些特殊情况下,例如在Windows系统上可能需要指定特定的上下文。
  • initializer:在每个工作进程启动时调用的可调用对象。可以用来初始化每个进程的状态。
  • initargs:传递给initializer的参数元组。

进程池中的任务调度

进程池的任务调度与线程池有一些不同之处:

  1. 任务提交:与线程池类似,调用submit方法提交任务。如果进程池中有空闲进程,任务会立即分配给空闲进程执行。
  2. 队列机制:同样,当所有进程都在忙碌时,新提交的任务会被放入队列等待执行。但与线程池不同的是,进程池的队列大小默认是有上限的(multiprocessing.Queue的默认大小)。如果队列已满,再次提交任务可能会阻塞,直到有任务完成,队列有空间为止。
  3. 进程复用:进程完成任务后,也会被复用。但进程的创建和销毁开销比线程大得多,因此在设计任务时,要尽量让每个进程执行足够多的工作,以充分利用进程复用的优势。

进程池的关闭

ProcessPoolExecutor也有shutdown方法和__exit__with语句自动调用)来关闭进程池。其行为与ThreadPoolExecutor类似,但由于进程的特性,在关闭进程池时需要注意一些额外的问题。

shutdown方法

shutdown(wait=True, cancel_futures=False)
  • wait:与线程池相同,waitTrue时等待所有任务完成,为False时立即返回,未完成任务继续执行。
  • cancel_futures:同样,为True时尝试取消队列中等待的任务。但由于进程间通信的复杂性,取消任务可能并不总是成功。

with语句

使用with语句管理进程池生命周期时,with块结束时会自动调用shutdown(wait = True)。但在进程池中,由于进程间资源的释放和清理可能比较复杂,使用with语句能更可靠地确保资源的正确清理。

Future类详解

concurrent.futures模块中,Future类扮演着重要的角色。当我们使用submit方法提交任务到线程池或进程池时,会返回一个Future对象。这个对象代表了一个异步执行的任务,可以用来查询任务的状态、获取任务的结果或者取消任务。

Future对象的方法

cancel()

cancel()

尝试取消任务。如果任务尚未开始执行,任务会被取消,返回True。如果任务已经在执行或者已经完成,取消操作无效,返回False

cancelled()

cancelled()

返回任务是否已被取消。如果cancel方法成功取消任务,这个方法会返回True

running()

running()

返回任务是否正在执行。如果任务正在执行,返回True;如果任务尚未开始或者已经完成(包括已取消),返回False

done()

done()

返回任务是否已完成(包括正常完成、异常结束或已取消)。如果任务已完成,返回True,否则返回False

result(timeout=None)

result(timeout=None)

获取任务的返回值。如果任务尚未完成,这个方法会阻塞,直到任务完成或者超时。timeout参数指定了等待的最长时间(单位为秒)。如果任务在等待期间完成,会返回任务的返回值;如果超时,会抛出TimeoutError异常。如果任务执行过程中抛出异常,result方法会重新抛出该异常。

exception(timeout=None)

exception(timeout=None)

获取任务执行过程中抛出的异常(如果有)。如果任务尚未完成,这个方法会阻塞,直到任务完成或者超时。timeout参数与result方法中的含义相同。如果任务正常完成,返回None;如果任务抛出异常,返回该异常对象。

使用Future对象的示例

import concurrent.futures
import time


def simple_task():
    time.sleep(2)
    return 42


executor = concurrent.futures.ThreadPoolExecutor(max_workers = 1)
future = executor.submit(simple_task)

print(f"Is the task running? {future.running()}")
time.sleep(1)
print(f"Is the task running? {future.running()}")

try:
    result = future.result(timeout = 3)
    print(f"Task result: {result}")
except concurrent.futures.TimeoutError:
    print("Task timed out")
except Exception as e:
    print(f"Task generated an exception: {e}")

print(f"Is the task done? {future.done()}")

在这个示例中:

  1. 定义了一个简单的任务simple_task,它睡眠2秒后返回42。
  2. 使用ThreadPoolExecutor提交任务并获取Future对象。
  3. 通过running方法检查任务是否正在执行,在不同时间点调用可以看到任务状态的变化。
  4. 使用result方法获取任务结果,并设置了3秒的超时时间。如果任务在3秒内完成,会打印任务结果;否则会捕获TimeoutError
  5. 最后通过done方法检查任务是否已完成。

线程池与进程池的选择

在实际应用中,选择使用线程池还是进程池取决于任务的性质。

CPU密集型任务

对于CPU密集型任务,由于Python的全局解释器锁(GIL)的存在,多线程并不能充分利用多核CPU的优势。在这种情况下,应该选择进程池。每个进程都有自己独立的Python解释器和内存空间,不受GIL的限制,能够真正地并行执行CPU密集型任务。例如,上述的计算1到1000000乘积的任务就是典型的CPU密集型任务,使用进程池能显著提高执行效率。

I/O密集型任务

对于I/O密集型任务,如网络请求、文件读写等,线程池是更好的选择。因为在I/O操作时,线程会释放GIL,其他线程可以继续执行。而且线程的创建和切换开销比进程小得多,更适合处理大量的I/O任务。例如,同时发起多个网络请求获取数据的场景,使用线程池可以高效地管理这些请求。

混合任务

如果任务中既有CPU密集型部分,又有I/O密集型部分,可以考虑将任务拆分成不同的子任务,分别使用线程池和进程池来执行。或者根据任务中CPU和I/O操作的比例,在两者之间进行权衡。

最佳实践与注意事项

  1. 合理设置并发度:根据任务的性质和系统资源,合理设置max_workers参数。对于I/O密集型任务,可以适当增加并发度;对于CPU密集型任务,并发度不宜过高,一般设置为CPU核心数或略小于CPU核心数。
  2. 异常处理:在获取任务结果时,要使用try - except块捕获可能出现的异常,如任务执行过程中的异常、超时异常等。这样可以确保程序的稳定性,避免因为某个任务的异常导致整个程序崩溃。
  3. 资源清理:无论是线程池还是进程池,都要注意资源的清理。使用with语句可以自动管理资源的关闭和清理,避免手动调用shutdown方法可能出现的遗漏。
  4. 数据共享与同步:在多线程或多进程编程中,要注意数据共享和同步问题。线程之间共享数据可能会导致竞争条件,需要使用锁、信号量等同步机制。而进程之间数据是隔离的,如果需要共享数据,需要使用multiprocessing模块提供的共享内存、队列等机制。

通过深入理解concurrent.futures模块以及线程池和进程池的使用,开发者可以更加高效地编写并发程序,充分利用系统资源,提高程序的性能。无论是处理CPU密集型还是I/O密集型任务,都能通过合理的选择和配置,实现最优的执行效果。