MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python多进程编程实战与性能优化

2022-11-012.4k 阅读

多进程编程基础

在Python中,multiprocessing 模块提供了一个强大且易于使用的多进程编程接口。它允许我们充分利用多核CPU的优势,提高程序的执行效率。与多线程不同,多进程中的每个进程都有自己独立的内存空间,这避免了多线程编程中常见的全局变量共享带来的问题,如数据竞争和死锁。

简单的多进程示例

import multiprocessing


def worker(num):
    print('Worker process id for {0}: {1}'.format(num, multiprocessing.current_process().pid))


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()


在上述代码中:

  1. 我们首先导入了 multiprocessing 模块。
  2. 定义了一个 worker 函数,这个函数在每个进程中执行,它简单地打印出当前进程的ID。
  3. if __name__ == '__main__': 块中:
    • 我们创建了一个空列表 jobs 来存储进程对象。
    • 通过循环创建了5个进程,每个进程都以 worker 函数为目标,并传入不同的参数 i
    • 将每个进程添加到 jobs 列表中,并调用 start 方法启动进程。
    • 最后,通过循环调用每个进程的 join 方法,等待所有进程完成。

这里需要注意 if __name__ == '__main__': 这个条件块。在Windows系统以及某些Unix系统上,Python的多进程模块在启动新进程时会重新导入主模块。如果没有这个条件块,新进程会递归地再次创建进程,导致无限循环。

进程间通信

在实际应用中,进程间常常需要交换数据。multiprocessing 模块提供了几种机制来实现进程间通信(IPC),包括队列(Queue)、管道(Pipe)等。

使用队列进行进程间通信

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print('Produced: {0}'.format(i))


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print('Consumed: {0}'.format(item))


if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)
    p2.join()


  1. 生产者 - 消费者模型
    • 我们定义了 producer 函数,它在循环中向队列 queue 中放入数据,并打印出生产的信息。
    • consumer 函数从队列中获取数据。这里使用了一个无限循环,当获取到 None 时,跳出循环,表示数据处理完毕。
    • 在主程序中,创建了一个队列 q,以及一个生产者进程 p1 和一个消费者进程 p2
    • 启动两个进程后,生产者进程向队列放入数据,消费者进程从队列取出数据并处理。最后,生产者进程结束后,向队列中放入 None 作为结束信号,消费者进程获取到 None 后结束。

使用管道进行进程间通信

import multiprocessing


def sender(pipe):
    conn, _ = pipe
    for i in range(5):
        conn.send(i)
        print('Sent: {0}'.format(i))
    conn.close()


def receiver(pipe):
    _, conn = pipe
    while True:
        try:
            item = conn.recv()
            print('Received: {0}'.format(item))
        except EOFError:
            break
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=((parent_conn, child_conn),))
    p2 = multiprocessing.Process(target=receiver, args=((parent_conn, child_conn),))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


  1. 管道原理
    • multiprocessing.Pipe 创建了一个双向管道,返回两个连接对象 parent_connchild_conn
    • sender 函数使用其中一个连接(conn)向管道中发送数据。
    • receiver 函数使用另一个连接从管道中接收数据。try - except 块用于捕获 EOFError,当管道的另一端关闭时,recv 方法会引发这个错误,此时表示数据接收完毕。

共享数据

虽然多进程的每个进程有独立的内存空间,但有时我们也需要在进程间共享数据。multiprocessing 模块提供了 ValueArray 来实现基本数据类型和数组的共享。

使用Value共享单个值

import multiprocessing


def increment(value):
    with value.get_lock():
        value.value += 1


if __name__ == '__main__':
    num = multiprocessing.Value('i', 0)
    processes = []
    for _ in range(10):
        p = multiprocessing.Process(target=increment, args=(num,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print('Final value: {0}'.format(num.value))


  1. Value的使用
    • 我们创建了一个 multiprocessing.Value 对象 num,初始值为0,类型为 'i' 即整数。
    • increment 函数用于增加这个共享值。with value.get_lock(): 语句确保在修改共享值时的线程安全,因为多个进程可能同时尝试修改这个值。
    • 通过创建10个进程来调用 increment 函数,最后打印出共享值的最终结果。

使用Array共享数组

import multiprocessing


def square_array(arr):
    for i in range(len(arr)):
        arr[i] = arr[i] * arr[i]


if __name__ == '__main__':
    numbers = multiprocessing.Array('i', [1, 2, 3, 4, 5])
    p = multiprocessing.Process(target=square_array, args=(numbers,))
    p.start()
    p.join()
    print('Array after squaring: {0}'.format(list(numbers)))


  1. Array的使用
    • 创建了一个 multiprocessing.Array 对象 numbers,初始值为 [1, 2, 3, 4, 5],类型为 'i' 即整数。
    • square_array 函数将数组中的每个元素平方。
    • 创建一个进程来执行 square_array 函数,最后打印出修改后的数组。

性能优化

在进行多进程编程时,性能优化至关重要。以下是一些优化的方法和注意事项。

减少进程启动开销

进程的启动和销毁是有开销的。频繁地创建和销毁进程会降低程序的整体性能。一种优化方法是使用进程池(multiprocessing.Pool)。

import multiprocessing


def square(x):
    return x * x


if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)


  1. 进程池的优势
    • multiprocessing.Pool 创建了一个固定大小的进程池。在上述代码中,我们创建了一个包含4个进程的进程池。
    • pool.map 方法将 square 函数应用到 range(10) 的每个元素上。进程池中的进程会复用,而不是每次都创建新的进程,这样大大减少了进程启动和销毁的开销。

合理分配任务

合理分配任务到不同的进程可以提高整体性能。例如,如果任务之间存在依赖关系,需要仔细设计任务的分配方式,避免进程之间长时间等待。

假设我们有一个任务链,任务B依赖于任务A的结果。

import multiprocessing


def task_a():
    return 5


def task_b(result_a):
    return result_a * 2


if __name__ == '__main__':
    with multiprocessing.Pool(processes=2) as pool:
        result_a = pool.apply_async(task_a)
        result_b = pool.apply_async(task_b, args=(result_a.get(),))
        print(result_b.get())


  1. 任务依赖处理
    • 首先使用 pool.apply_async 异步调用 task_a,这个方法不会阻塞主进程,而是返回一个 AsyncResult 对象。
    • 然后,我们在获取 task_a 的结果后,再异步调用 task_b,传入 task_a 的结果作为参数。通过这种方式,我们在多进程环境中处理了任务之间的依赖关系。

避免不必要的共享数据

虽然共享数据在某些情况下是必要的,但共享数据会带来同步开销,如锁的使用。尽量减少共享数据的使用可以提高性能。如果可能,尽量让每个进程处理独立的数据,然后在最后阶段合并结果。

例如,我们要对一个大列表中的每个元素进行处理并求和。

import multiprocessing


def process_chunk(chunk):
    return sum([i * i for i in chunk])


if __name__ == '__main__':
    data = list(range(10000))
    chunk_size = len(data) // 4
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(process_chunk, chunks)
    total = sum(results)
    print('Total: {0}'.format(total))


  1. 减少共享数据示例
    • 我们将大列表 data 分成4个小块 chunks
    • 每个进程处理一个小块数据,通过 process_chunk 函数计算小块数据中元素平方的和。
    • 最后,在主进程中合并各个进程的结果,得到最终的总和。这种方式避免了在进程间共享整个数据列表,减少了同步开销。

优化通信开销

进程间通信也有一定的开销,特别是在频繁通信的情况下。尽量减少进程间通信的频率,批量传输数据而不是单个数据。

例如,在生产者 - 消费者模型中,如果每次只传输一个数据,开销会比较大。我们可以改为批量传输。

import multiprocessing


def producer(queue):
    batch = [i for i in range(10)]
    queue.put(batch)
    print('Produced a batch')


def consumer(queue):
    batch = queue.get()
    for item in batch:
        print('Consumed: {0}'.format(item))


if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


  1. 批量通信优化
    • producer 函数将一个包含10个元素的列表作为一个批次放入队列。
    • consumer 函数从队列获取这个批次的数据,并逐个处理。这样相比每次只传输一个数据,减少了通信次数,从而降低了通信开销。

处理复杂任务

在实际开发中,我们可能会遇到更复杂的任务,需要综合运用多进程编程的各种技巧。

多阶段任务处理

假设我们有一个图像识别任务,分为三个阶段:图像读取、特征提取和分类。

import multiprocessing
import cv2
import numpy as np


# 模拟特征提取
def extract_features(image):
    # 这里简单使用边缘检测作为特征提取
    edges = cv2.Canny(image, 100, 200)
    return edges.flatten()


# 模拟分类
def classify(features):
    # 简单的分类,根据特征总和判断
    total = np.sum(features)
    if total > 10000:
        return 'Class A'
    else:
        return 'Class B'


def process_image(file_path):
    image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
    features = extract_features(image)
    result = classify(features)
    return result


if __name__ == '__main__':
    image_files = ['image1.jpg', 'image2.jpg', 'image3.jpg']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(process_image, image_files)
    for i, result in enumerate(results):
        print('Result for image {0}: {1}'.format(i + 1, result))


  1. 多阶段任务实现
    • 我们定义了 extract_features 函数用于特征提取,classify 函数用于分类,process_image 函数将图像读取、特征提取和分类整合在一起。
    • 在主程序中,我们有一个图像文件列表 image_files,通过进程池 pool.map 并行处理每个图像文件,最后打印出每个图像的分类结果。

动态任务分配

在某些情况下,任务的执行时间可能不同,静态分配任务可能导致某些进程空闲,而某些进程繁忙。我们可以使用 multiprocessing.Poolimap_unordered 方法实现动态任务分配。

import multiprocessing
import time


def complex_task(num):
    time.sleep(num)
    return num * num


if __name__ == '__main__':
    tasks = [3, 1, 4, 2]
    with multiprocessing.Pool(processes=2) as pool:
        results = list(pool.imap_unordered(complex_task, tasks))
    print(results)


  1. 动态任务分配原理
    • complex_task 函数模拟了一个复杂任务,它的执行时间根据传入的参数 num 而定,这里使用 time.sleep 模拟任务执行时间。
    • 我们有一个任务列表 tasks,其中每个任务的执行时间不同。
    • pool.imap_unordered 方法会动态地将任务分配给空闲的进程,而不是按照任务列表的顺序分配。这样可以提高整体效率,因为进程不会因为等待长任务而空闲。

多进程与网络编程结合

在网络编程中,多进程可以用于处理并发连接。例如,我们可以为每个客户端连接创建一个独立的进程来处理请求。

简单的多进程TCP服务器

import multiprocessing
import socket


def handle_connection(conn, addr):
    print('Handling connection from {0}'.format(addr))
    while True:
        data = conn.recv(1024)
        if not data:
            break
        conn.sendall(b'You sent: ' + data)
    conn.close()


if __name__ == '__main__':
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('127.0.0.1', 8888))
    server_socket.listen(5)
    print('Server listening on port 8888')
    while True:
        conn, addr = server_socket.accept()
        p = multiprocessing.Process(target=handle_connection, args=(conn, addr))
        p.start()


  1. 多进程TCP服务器原理
    • 我们创建了一个TCP服务器套接字,并绑定到本地地址 127.0.0.1 的8888端口,监听最多5个连接。
    • 当有客户端连接时,server_socket.accept() 会返回一个新的连接对象 conn 和客户端地址 addr
    • 为每个连接创建一个新的进程,执行 handle_connection 函数。handle_connection 函数接收客户端发送的数据,并回显 You sent: 加上客户端发送的数据,直到客户端关闭连接。

通过以上的介绍,我们对Python多进程编程有了较为深入的了解,从基础的进程创建、进程间通信、共享数据,到性能优化以及在复杂任务和网络编程中的应用。在实际开发中,根据具体的需求和场景,合理运用多进程编程技术,可以显著提高程序的执行效率和性能。