MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

深入浅出Python多线程、多进程和并发编程

2021-06-167.4k 阅读

一、Python 多线程编程

1.1 线程基础概念

在计算机科学中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的资源,例如内存空间、文件描述符等。

在传统的单线程程序中,程序按照顺序依次执行各个任务,只有前一个任务完成后,下一个任务才能开始。这种方式在处理一些耗时操作(如网络请求、文件读写等)时,会导致程序长时间等待,从而降低了整体的运行效率。而多线程编程则允许程序同时执行多个任务,每个任务由一个线程负责,这样可以显著提高程序的执行效率,特别是在处理 I/O 密集型任务时。

1.2 Python 的 threading 模块

Python 提供了 threading 模块来支持多线程编程。下面通过一个简单的示例来展示如何使用 threading 模块创建和启动线程。

import threading


def print_numbers():
    for i in range(1, 11):
        print(f"Thread 1: {i}")


def print_letters():
    for letter in 'abcdefghij':
        print(f"Thread 2: {letter}")


if __name__ == '__main__':
    thread1 = threading.Thread(target=print_numbers)
    thread2 = threading.Thread(target=print_letters)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

在上述代码中,我们定义了两个函数 print_numbersprint_letters,分别用于打印数字和字母。然后,通过 threading.Thread 创建了两个线程 thread1thread2,并将对应的函数作为目标传递给线程。接着,使用 start() 方法启动线程,最后使用 join() 方法等待线程执行完毕。

1.3 线程同步

当多个线程同时访问共享资源时,可能会出现数据竞争的问题。例如,多个线程同时对一个共享变量进行修改,可能会导致数据不一致。为了解决这个问题,我们需要使用线程同步机制。

1.3.1 锁(Lock) 锁是一种最基本的线程同步工具。当一个线程获取到锁后,其他线程就无法获取该锁,直到该线程释放锁。下面是一个使用锁来避免数据竞争的示例。

import threading


class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1


def worker(counter):
    for _ in range(10000):
        counter.increment()


if __name__ == '__main__':
    counter = Counter()
    threads = []
    for _ in range(10):
        thread = threading.Thread(target=worker, args=(counter,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter.value}")

在这个示例中,Counter 类包含一个共享变量 value 和一个锁 lock。在 increment 方法中,通过 with self.lock 语句获取锁,确保在修改 value 时不会有其他线程同时访问。

1.3.2 信号量(Semaphore) 信号量是一个计数器,它允许一定数量的线程同时访问共享资源。例如,假设有一个数据库连接池,最多允许 10 个线程同时使用连接,就可以使用信号量来控制。

import threading
import time


semaphore = threading.Semaphore(3)


def access_resource():
    with semaphore:
        print(f"{threading.current_thread().name} has access to the resource")
        time.sleep(2)
        print(f"{threading.current_thread().name} has released the resource")


if __name__ == '__main__':
    threads = []
    for _ in range(5):
        thread = threading.Thread(target=access_resource)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

在上述代码中,Semaphore(3) 表示最多允许 3 个线程同时访问资源。每个线程在访问资源前先获取信号量,访问结束后释放信号量。

1.3.3 事件(Event) 事件是一种简单的线程同步机制,它允许一个线程通知其他线程某个事件已经发生。

import threading
import time


event = threading.Event()


def waiter():
    print(f"{threading.current_thread().name} is waiting for the event")
    event.wait()
    print(f"{threading.current_thread().name} has received the event")


def signaler():
    time.sleep(3)
    print(f"{threading.current_thread().name} is signaling the event")
    event.set()


if __name__ == '__main__':
    waiter_thread = threading.Thread(target=waiter)
    signaler_thread = threading.Thread(target=signaler)

    waiter_thread.start()
    signaler_thread.start()

    waiter_thread.join()
    signaler_thread.join()

在这个示例中,waiter 线程通过 event.wait() 等待事件发生,signaler 线程在 3 秒后通过 event.set() 发送事件通知。

二、Python 多进程编程

2.1 进程基础概念

进程是计算机中程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。与线程不同,每个进程都有自己独立的内存空间、文件描述符等资源。多进程编程常用于处理 CPU 密集型任务,因为每个进程可以充分利用多核 CPU 的优势。

2.2 Python 的 multiprocessing 模块

Python 的 multiprocessing 模块提供了一个方便的接口来创建和管理多进程。以下是一个简单的多进程示例。

import multiprocessing


def square(x):
    return x * x


if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(square, numbers)
    print(results)

在上述代码中,我们定义了一个 square 函数,用于计算数字的平方。然后,通过 multiprocessing.Pool 创建了一个包含 3 个进程的进程池,并使用 pool.map 方法将 square 函数应用到 numbers 列表的每个元素上。

2.3 进程间通信

由于每个进程都有独立的内存空间,进程间通信(IPC)就显得尤为重要。multiprocessing 模块提供了多种 IPC 方式,如队列(Queue)和管道(Pipe)。

2.3.1 队列(Queue) 队列是一种常用的 IPC 方式,它可以在不同进程之间安全地传递数据。

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"Produced: {i}")


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

    producer_process.start()
    consumer_process.start()

    producer_process.join()
    queue.put(None)
    consumer_process.join()

在这个示例中,producer 进程将数据放入队列,consumer 进程从队列中取出数据。通过在队列中放入 None 来通知 consumer 进程结束。

2.3.2 管道(Pipe) 管道是另一种 IPC 方式,它提供了一个双向的通信通道。

import multiprocessing


def send_data(pipe):
    conn, _ = pipe
    data = {'message': 'Hello from process 1'}
    conn.send(data)
    conn.close()


def receive_data(pipe):
    _, conn = pipe
    data = conn.recv()
    print(f"Received: {data}")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    sender_process = multiprocessing.Process(target=send_data, args=((parent_conn, child_conn),))
    receiver_process = multiprocessing.Process(target=receive_data, args=((parent_conn, child_conn),))

    sender_process.start()
    receiver_process.start()

    sender_process.join()
    receiver_process.join()

在上述代码中,send_data 进程通过管道发送数据,receive_data 进程从管道接收数据。

三、并发编程

3.1 并发概念

并发是指在一段时间内,多个任务看起来像是同时执行。在单 CPU 系统中,实际上这些任务是在 CPU 上快速切换执行的,而在多 CPU 系统中,多个任务可以真正地同时执行。并发编程主要用于处理 I/O 密集型任务,通过在等待 I/O 操作完成时切换到其他任务,提高系统的整体利用率。

3.2 Python 的 asyncio 库

asyncio 是 Python 用于编写异步代码的标准库。它基于事件循环,通过协程(coroutine)来实现异步编程。

3.2.1 协程基础 协程是一种特殊的函数,可以在执行过程中暂停和恢复。在 Python 中,通过 async def 定义协程函数。

import asyncio


async def print_numbers():
    for i in range(1, 11):
        print(f"Coroutine 1: {i}")
        await asyncio.sleep(1)


async def print_letters():
    for letter in 'abcdefghij':
        print(f"Coroutine 2: {letter}")
        await asyncio.sleep(1)


async def main():
    task1 = asyncio.create_task(print_numbers())
    task2 = asyncio.create_task(print_letters())
    await task1
    await task2


if __name__ == '__main__':
    asyncio.run(main())

在上述代码中,print_numbersprint_letters 是两个协程函数。通过 asyncio.create_task 创建任务,并使用 await 等待任务完成。asyncio.run 用于运行主协程。

3.2.2 异步 I/O 操作 asyncio 特别适用于处理异步 I/O 操作,如网络请求和文件读写。以下是一个使用 aiohttp 库进行异步网络请求的示例。

import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


if __name__ == '__main__':
    asyncio.run(main())

在这个示例中,fetch 函数使用 aiohttp 进行异步 HTTP 请求。通过 asyncio.gather 同时运行多个请求,并等待所有请求完成。

3.3 线程、进程和协程的比较

  1. 资源消耗:线程共享进程资源,资源消耗相对较小;进程有独立的资源,资源消耗较大;协程是在单线程内实现的,资源消耗最小。
  2. 适用场景:线程适用于 I/O 密集型任务,但在 CPU 密集型任务中可能会因为 GIL(全局解释器锁)而受限;进程适用于 CPU 密集型任务,能充分利用多核 CPU;协程适用于高度 I/O 密集型任务,通过异步切换提高效率。
  3. 编程复杂度:线程编程需要处理线程同步问题,复杂度较高;进程编程除了要处理进程间通信,还需要考虑资源管理,复杂度更高;协程编程相对简单,通过异步和事件循环实现并发,代码逻辑较为清晰。

四、实际应用案例

4.1 网络爬虫

在网络爬虫中,我们需要同时请求多个网页并解析数据,这是典型的 I/O 密集型任务。使用多线程或异步编程可以显著提高爬虫的效率。

import asyncio
import aiohttp
from bs4 import BeautifulSoup


async def fetch(session, url):
    async with session.get(url) as response:
        html = await response.text()
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title.string
        return title


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


if __name__ == '__main__':
    asyncio.run(main())

在这个示例中,通过 asyncioaiohttp 实现了一个简单的异步网络爬虫,同时获取多个网页的标题。

4.2 数据分析

在数据分析中,有时需要对大量数据进行计算,这是 CPU 密集型任务。可以使用多进程来利用多核 CPU 加速计算。

import multiprocessing
import numpy as np


def calculate_mean(data_chunk):
    return np.mean(data_chunk)


if __name__ == '__main__':
    data = np.random.rand(1000000)
    num_processes = multiprocessing.cpu_count()
    chunk_size = len(data) // num_processes
    chunks = [data[i * chunk_size: (i + 1) * chunk_size] for i in range(num_processes)]

    with multiprocessing.Pool(processes=num_processes) as pool:
        means = pool.map(calculate_mean, chunks)

    overall_mean = np.mean(means)
    print(f"Overall mean: {overall_mean}")

在这个示例中,将大数据集分成多个小块,使用多进程分别计算每个小块的均值,最后汇总得到总体均值,提高了计算效率。

五、总结多线程、多进程和并发编程的注意事项

  1. 资源管理:无论是多线程、多进程还是并发编程,都需要注意资源的合理使用。在多线程中,要避免过度创建线程导致资源耗尽;在多进程中,要注意进程间通信和资源分配的开销;在并发编程中,要合理管理异步任务的数量,避免过多任务导致系统负载过高。
  2. 调试难度:随着代码复杂度的增加,多线程、多进程和并发编程的调试难度也会增大。特别是在处理线程同步、进程间通信和异步任务调度时,可能会出现各种难以排查的问题。可以使用调试工具和日志记录来辅助调试。
  3. 兼容性:在不同的操作系统和 Python 版本中,多线程、多进程和并发编程的表现可能会有所不同。例如,在 Windows 系统上创建进程的方式与 Unix 系统略有差异。在编写代码时,要考虑兼容性问题,尽量使用跨平台的编程方式。

通过深入理解 Python 的多线程、多进程和并发编程,我们可以根据不同的任务特点选择合适的编程模型,从而编写出高效、稳定的后端应用程序。无论是处理 I/O 密集型任务还是 CPU 密集型任务,都能找到最优的解决方案。同时,在实际应用中,要注意资源管理、调试和兼容性等问题,确保程序的质量和可靠性。