MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python multiprocessing模块详解

2023-05-172.5k 阅读

1. 引言

在Python编程中,multiprocessing模块是一个功能强大的工具,用于在Python中实现多进程编程。随着计算机硬件的发展,多核处理器已经成为主流,利用多进程技术可以充分发挥多核处理器的优势,提高程序的执行效率,尤其是对于那些计算密集型的任务。multiprocessing模块提供了一个简单而直观的接口,使得开发者可以轻松地创建和管理多个进程,同时还提供了一系列同步和通信机制,以确保多个进程之间能够协同工作。

2. 进程基础概念

在深入了解multiprocessing模块之前,我们先来回顾一些进程的基本概念。

2.1 什么是进程

进程是计算机中程序的一次执行实例,是系统进行资源分配和调度的基本单位。每个进程都有自己独立的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。操作系统负责在多个进程之间进行调度和切换,使得它们看起来像是在同时运行。

2.2 进程与线程的区别

虽然进程和线程都可以实现并发执行,但它们之间存在一些关键区别:

  • 资源分配:进程拥有独立的资源,包括内存空间、文件描述符等;而线程共享进程的资源,如内存、文件句柄等。
  • 上下文切换开销:进程上下文切换开销较大,因为需要切换整个地址空间;而线程上下文切换开销相对较小,因为只需要切换寄存器和栈。
  • 编程复杂度:多进程编程相对简单,因为进程之间相互独立,不存在共享数据带来的同步问题;多线程编程则需要更小心地处理共享数据,以避免数据竞争和死锁等问题。

3. Python multiprocessing模块入门

3.1 创建简单进程

multiprocessing模块中,最基本的操作就是创建一个新的进程。我们可以通过继承multiprocessing.Process类或者直接使用Process类的构造函数来创建进程。

通过继承Process类创建进程

import multiprocessing


class MyProcess(multiprocessing.Process):
    def run(self):
        print('子进程运行中,pid = {}'.format(multiprocessing.current_process().pid))


if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('父进程运行中,pid = {}'.format(multiprocessing.current_process().pid))
    p.join()

在上述代码中,我们定义了一个继承自multiprocessing.Process的类MyProcess,并重写了run方法,该方法中的代码将在子进程中执行。在if __name__ == '__main__':块中,我们创建了MyProcess的实例p,调用start方法启动子进程,然后调用join方法等待子进程结束。

使用Process类的构造函数创建进程

import multiprocessing


def worker():
    print('子进程运行中,pid = {}'.format(multiprocessing.current_process().pid))


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker)
    p.start()
    print('父进程运行中,pid = {}'.format(multiprocessing.current_process().pid))
    p.join()

这里我们直接使用multiprocessing.Process的构造函数,通过target参数指定子进程要执行的函数worker

3.2 进程间通信

在多进程编程中,进程之间通常需要进行数据交换和通信。multiprocessing模块提供了几种不同的方式来实现进程间通信。

使用Queue进行进程间通信Queue是一个线程和进程安全的队列,可用于在多个进程之间传递数据。

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print('生产者放入数据: {}'.format(i))


def consumer(queue):
    while True:
        data = queue.get()
        if data is None:
            break
        print('消费者取出数据: {}'.format(data))


if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)
    p2.join()

在上述代码中,producer函数向Queue中放入数据,consumer函数从Queue中取出数据。注意,为了让消费者能够结束循环,我们在生产者结束后向队列中放入一个None作为结束信号。

使用Pipe进行进程间通信Pipe用于创建一个管道,两端连接两个进程,可用于双向通信。

import multiprocessing


def sender(pipe):
    conn, _ = pipe
    conn.send('Hello from sender')
    conn.close()


def receiver(pipe):
    _, conn = pipe
    data = conn.recv()
    print('接收者收到数据: {}'.format(data))
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=((parent_conn, child_conn),))
    p2 = multiprocessing.Process(target=receiver, args=((parent_conn, child_conn),))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

这里通过multiprocessing.Pipe()创建了一个管道,sender函数通过管道的一端发送数据,receiver函数通过另一端接收数据。

4. 进程同步

在多进程环境中,当多个进程同时访问共享资源时,可能会导致数据不一致等问题。为了解决这些问题,multiprocessing模块提供了多种同步机制。

4.1 使用Lock进行进程同步

Lock(锁)是一种简单的同步原语,用于确保同一时间只有一个进程可以访问共享资源。

import multiprocessing
import time


def worker(lock, num):
    lock.acquire()
    try:
        print('进程 {} 获取到锁'.format(num))
        time.sleep(1)
        print('进程 {} 释放锁'.format(num))
    finally:
        lock.release()


if __name__ == '__main__':
    lock = multiprocessing.Lock()
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(lock, i))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()

在上述代码中,每个进程在访问共享资源(这里通过打印模拟)之前,先获取锁,访问结束后释放锁。这样可以避免多个进程同时访问共享资源导致的数据混乱。

4.2 使用Semaphore进行进程同步

Semaphore(信号量)是一个计数器,用于控制同时访问共享资源的进程数量。

import multiprocessing
import time


def worker(semaphore, num):
    semaphore.acquire()
    try:
        print('进程 {} 获取到信号量'.format(num))
        time.sleep(1)
        print('进程 {} 释放信号量'.format(num))
    finally:
        semaphore.release()


if __name__ == '__main__':
    semaphore = multiprocessing.Semaphore(2)
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()

这里我们创建了一个Semaphore对象,初始值为2,表示最多允许两个进程同时访问共享资源。每个进程在访问前获取信号量,访问结束后释放信号量。

4.3 使用Condition进行进程同步

Condition(条件变量)用于在某些条件满足时通知进程。它结合了LockSemaphore的功能。

import multiprocessing
import time


def consumer(condition, queue):
    with condition:
        while True:
            condition.wait()
            if not queue:
                break
            item = queue.pop(0)
            print('消费者消费: {}'.format(item))


def producer(condition, queue):
    with condition:
        for i in range(5):
            queue.append(i)
            print('生产者生产: {}'.format(i))
            condition.notify()
            time.sleep(1)


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    queue = []
    p1 = multiprocessing.Process(target=producer, args=(condition, queue))
    p2 = multiprocessing.Process(target=consumer, args=(condition, queue))
    p2.start()
    p1.start()
    p1.join()
    with condition:
        condition.notify()
    p2.join()

在上述代码中,consumer进程在condition.wait()处等待,直到producer进程调用condition.notify()通知它有新的数据可用。

5. 进程池

在实际应用中,创建和销毁大量进程会带来较大的开销。multiprocessing模块提供了Pool类,用于管理一个进程池,复用进程资源。

5.1 简单使用Pool

import multiprocessing


def worker(x):
    return x * x


if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker, range(10))
    print(results)

在上述代码中,我们创建了一个包含4个进程的进程池。pool.map方法将worker函数应用到range(10)的每个元素上,并返回结果列表。map方法会自动分配任务给进程池中的进程,并收集结果。

5.2 使用Pool的异步方法

Pool还提供了异步执行任务的方法,如apply_async

import multiprocessing


def worker(x):
    return x * x


def print_result(result):
    print('结果: {}'.format(result))


if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        for i in range(10):
            pool.apply_async(worker, args=(i,), callback=print_result)
        pool.close()
        pool.join()

这里我们使用apply_async方法异步地提交任务到进程池,callback参数指定了任务完成后要调用的函数print_result,用于处理任务的结果。在提交完所有任务后,我们调用pool.close()禁止再向进程池提交新任务,然后调用pool.join()等待所有任务完成。

6. 高级话题

6.1 共享内存

在某些情况下,进程之间需要共享数据以提高效率。multiprocessing模块提供了ValueArray类来实现共享内存。

import multiprocessing


def increment_shared_value(shared_value):
    with shared_value.get_lock():
        shared_value.value += 1


if __name__ == '__main__':
    shared_value = multiprocessing.Value('i', 0)
    processes = []
    for _ in range(10):
        p = multiprocessing.Process(target=increment_shared_value, args=(shared_value,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print('共享值: {}'.format(shared_value.value))

在上述代码中,我们使用multiprocessing.Value创建了一个共享的整数值。注意,为了避免数据竞争,我们通过get_lock方法获取锁,在修改共享值时使用锁进行同步。

6.2 进程间的信号处理

在多进程编程中,信号处理也是一个重要的话题。multiprocessing模块允许我们在进程中处理信号。

import multiprocessing
import signal
import time


def signal_handler(sig, frame):
    print('收到信号 {}, 进程 {} 即将退出'.format(sig, multiprocessing.current_process().pid))
    exit(0)


def worker():
    signal.signal(signal.SIGINT, signal_handler)
    while True:
        print('子进程运行中...')
        time.sleep(1)


if __name__ == '__main__':
    p = multiprocessing.Process(target=worker)
    p.start()
    try:
        while True:
            print('父进程运行中...')
            time.sleep(1)
    except KeyboardInterrupt:
        p.terminate()
        p.join()

在上述代码中,子进程通过signal.signal注册了一个信号处理函数signal_handler,用于处理SIGINT信号(通常由用户按下Ctrl+C产生)。父进程在捕获到KeyboardInterrupt时,通过terminate方法终止子进程。

7. 注意事项与常见问题

7.1 Windows系统下的注意事项

在Windows系统中,multiprocessing模块的实现方式与Unix系统有所不同。由于Windows没有fork系统调用,multiprocessing模块使用了一种称为“spawn”的方法来启动新进程。这意味着在Windows上,所有要在子进程中执行的代码必须放在if __name__ == '__main__':块中,否则可能会导致错误。

7.2 资源泄漏问题

在使用多进程时,如果不正确地管理进程资源,可能会导致资源泄漏。例如,忘记调用join方法等待进程结束,或者在进程异常退出时没有正确清理资源。为了避免资源泄漏,一定要确保每个启动的进程都有对应的join操作,并且在进程出现异常时进行适当的资源清理。

7.3 性能调优

虽然多进程可以提高程序的执行效率,但并不是进程越多越好。过多的进程会导致上下文切换开销增大,反而降低性能。在实际应用中,需要根据任务的特性和硬件资源情况,合理调整进程数量,以达到最佳性能。同时,也要注意进程间通信和同步的开销,尽量减少不必要的通信和同步操作。

通过深入理解和运用multiprocessing模块的各种功能,我们可以编写出高效、稳定的多进程Python程序,充分发挥多核处理器的性能优势。无论是处理计算密集型任务还是需要并发处理的任务,multiprocessing模块都为我们提供了强大的工具和灵活的解决方案。在实际开发中,根据具体需求选择合适的进程创建、通信、同步和资源管理方式,是编写优秀多进程程序的关键。