MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python中的并发编程基础

2022-10-127.5k 阅读

什么是并发编程

在计算机编程领域,并发编程是一种允许程序同时执行多个任务的技术。想象一下,你正在使用的浏览器,它可以同时加载网页、播放视频、检查新邮件通知等。这些任务看似同时进行,这背后就涉及到并发编程的概念。

从操作系统的角度来看,CPU 的时间被划分为多个时间片,每个任务被分配一个时间片。在一个时间片内,CPU 执行该任务,时间片结束后,CPU 切换到另一个任务,给用户造成一种多个任务同时执行的错觉。

在 Python 中,并发编程为我们提供了更高效地利用计算机资源,提升程序性能的能力。特别是在处理 I/O 密集型任务(如网络请求、文件读写)时,并发编程可以让程序在等待 I/O 操作完成的时间里,去执行其他任务,大大提高了程序的整体效率。

Python 中的并发编程模型

Python 提供了多种并发编程模型,主要包括多线程、多进程和异步 I/O。每种模型都有其适用场景和优缺点。

多线程

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。在 Python 中,多线程编程主要通过 threading 模块来实现。

创建线程

下面是一个简单的示例,展示如何创建并启动一个线程:

import threading


def print_numbers():
    for i in range(1, 6):
        print(f"线程 1: {i}")


def print_letters():
    for letter in 'abcde':
        print(f"线程 2: {letter}")


if __name__ == '__main__':
    t1 = threading.Thread(target=print_numbers)
    t2 = threading.Thread(target=print_letters)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

在这个例子中,我们创建了两个线程 t1t2,分别执行 print_numbersprint_letters 函数。start() 方法用于启动线程,join() 方法用于等待线程执行完毕。

线程同步

当多个线程同时访问和修改共享资源时,可能会出现数据竞争和不一致的问题。为了解决这些问题,我们需要使用线程同步机制。

锁(Lock):锁是一种最基本的同步工具。当一个线程获取到锁后,其他线程就无法获取,直到该线程释放锁。

import threading

lock = threading.Lock()
counter = 0


def increment():
    global counter
    for _ in range(1000000):
        lock.acquire()
        try:
            counter += 1
        finally:
            lock.release()


if __name__ == '__main__':
    t1 = threading.Thread(target=increment)
    t2 = threading.Thread(target=increment)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print(f"最终计数器的值: {counter}")

在这个例子中,我们使用 Lock 来确保在对 counter 进行修改时,不会有其他线程同时操作,避免了数据竞争问题。

条件变量(Condition):条件变量用于线程之间的复杂同步。它允许一个线程等待某个条件满足后再继续执行。

import threading


def consumer(cond):
    with cond:
        print('消费者等待条件')
        cond.wait()
        print('消费者得到条件')


def producer(cond):
    with cond:
        print('生产者设置条件')
        cond.notify()


if __name__ == '__main__':
    condition = threading.Condition()
    c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
    p1 = threading.Thread(name='p1', target=producer, args=(condition,))

    c1.start()
    p1.start()

    c1.join()
    p1.join()

在这个例子中,消费者线程 c1 等待条件变量 condition,生产者线程 p1 调用 notify() 方法来通知消费者线程条件已满足。

GIL(全局解释器锁)

Python 的多线程有一个重要的限制,即全局解释器锁(Global Interpreter Lock,GIL)。GIL 是 CPython 解释器中的一个机制,它确保在任何时刻,只有一个线程可以执行 Python 字节码。这意味着在多核 CPU 上,Python 的多线程并不能真正利用多核优势来提升计算密集型任务的性能。

例如,在下面这个计算密集型的任务中,多线程并不会比单线程快:

import threading
import time


def compute():
    result = 0
    for i in range(100000000):
        result += i
    return result


if __name__ == '__main__':
    start_time = time.time()
    t1 = threading.Thread(target=compute)
    t2 = threading.Thread(target=compute)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    end_time = time.time()
    print(f"多线程执行时间: {end_time - start_time} 秒")

    start_time = time.time()
    compute()
    compute()
    end_time = time.time()
    print(f"单线程执行时间: {end_time - start_time} 秒")

多进程

与线程不同,进程是一个独立的执行环境,有自己独立的内存空间。Python 中的多进程编程主要通过 multiprocessing 模块来实现。

创建进程

下面是一个简单的创建进程的示例:

import multiprocessing


def print_numbers():
    for i in range(1, 6):
        print(f"进程 1: {i}")


def print_letters():
    for letter in 'abcde':
        print(f"进程 2: {letter}")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=print_numbers)
    p2 = multiprocessing.Process(target=print_letters)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

在这个例子中,我们使用 multiprocessing.Process 创建了两个进程 p1p2,分别执行 print_numbersprint_letters 函数。

进程间通信

由于进程有独立的内存空间,进程间通信(IPC)就显得尤为重要。multiprocessing 模块提供了多种 IPC 方式,如队列(Queue)和管道(Pipe)。

队列(Queue):队列是一种常用的进程间通信方式,它可以在不同进程之间传递数据。

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"生产者放入: {i}")


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费者取出: {item}")


if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    q.put(None)
    p2.join()

在这个例子中,生产者进程 p1 将数据放入队列 q,消费者进程 p2 从队列中取出数据。

管道(Pipe):管道也是一种进程间通信的方式,它可以创建一个双向的通信通道。

import multiprocessing


def sender(conn):
    conn.send('你好,接收者!')
    conn.close()


def receiver(conn):
    msg = conn.recv()
    print(f"接收到: {msg}")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

在这个例子中,通过 multiprocessing.Pipe 创建了一个管道,sender 进程通过管道发送消息,receiver 进程从管道接收消息。

多进程的优势与劣势

多进程的优势在于它可以真正利用多核 CPU 的优势,提升计算密集型任务的性能。由于每个进程有独立的内存空间,不存在 GIL 的限制。然而,多进程也有一些劣势,比如进程的创建和销毁开销较大,进程间通信相对复杂,占用的系统资源较多等。

异步 I/O

异步 I/O 是一种在不阻塞主线程的情况下执行 I/O 操作的编程方式。在 Python 中,异步编程主要通过 asyncio 库来实现。

协程(Coroutine)

协程是一种轻量级的线程,它可以在函数执行过程中暂停和恢复。在 Python 中,使用 asyncawait 关键字来定义和使用协程。

import asyncio


async def print_numbers():
    for i in range(1, 6):
        print(f"协程 1: {i}")
        await asyncio.sleep(1)


async def print_letters():
    for letter in 'abcde':
        print(f"协程 2: {letter}")
        await asyncio.sleep(1)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [print_numbers(), print_letters()]
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

在这个例子中,我们定义了两个协程 print_numbersprint_letters,通过 asyncio.gather 将它们并发执行。await 关键字用于暂停协程,等待 asyncio.sleep 这个异步操作完成。

事件循环(Event Loop)

事件循环是 asyncio 的核心。它负责管理和调度协程的执行。当一个协程执行到 await 语句时,事件循环会暂停该协程,去执行其他可运行的协程,直到 await 的操作完成,再恢复该协程的执行。

import asyncio


async def task1():
    print('任务 1 开始')
    await asyncio.sleep(2)
    print('任务 1 结束')


async def task2():
    print('任务 2 开始')
    await asyncio.sleep(1)
    print('任务 2 结束')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.gather(task1(), task2()))
    finally:
        loop.close()

在这个例子中,事件循环先调度 task1task2 开始执行,当 task1 执行到 await asyncio.sleep(2) 时,事件循环暂停 task1,去执行 task2task2 执行完 await asyncio.sleep(1) 后完成,然后事件循环再回到 task1 继续执行,直到 task1 完成。

异步 I/O 的优势

异步 I/O 非常适合处理 I/O 密集型任务,如网络请求、文件读写等。由于它不需要创建大量的线程或进程,资源开销小,并且可以在单线程内实现并发效果,大大提高了程序的性能和效率。

并发编程的选择与应用场景

多线程 vs 多进程

  1. 计算密集型任务:对于计算密集型任务,多进程更适合,因为它可以利用多核 CPU 的优势,避免 GIL 的限制。例如,进行复杂的数学运算、图像渲染等任务时,使用多进程可以显著提升性能。
  2. I/O 密集型任务:对于 I/O 密集型任务,多线程在一定程度上也能提升效率,因为线程的创建和切换开销相对较小。但如果 I/O 操作非常频繁且耗时较长,异步 I/O 会是更好的选择,它可以在单线程内实现高效的并发 I/O 操作。

异步 I/O 的应用场景

  1. 网络编程:在网络编程中,如编写服务器程序,经常需要处理大量的客户端连接和网络请求。异步 I/O 可以让服务器在等待网络数据时,继续处理其他请求,大大提高了服务器的并发处理能力。例如,使用 aiohttp 库可以轻松构建高性能的异步 Web 服务器。
  2. 文件读写:在进行大量文件读写操作时,异步 I/O 也能提升效率。比如,在处理大数据集的文件读取和写入时,使用 asyncio 结合异步文件操作库,可以避免阻塞主线程,提高程序的整体性能。

并发编程的注意事项

  1. 资源竞争:无论是多线程还是多进程,都需要注意资源竞争问题。在多线程中,通过锁等同步机制来解决;在多进程中,要注意对共享资源的访问控制,避免数据不一致。
  2. 异常处理:在并发编程中,异常处理更加复杂。一个线程或进程中的异常可能会影响整个程序的运行。因此,需要在每个线程或进程中进行适当的异常捕获和处理,确保程序的稳定性。
  3. 性能调优:并发编程并不总是能提升性能,有时可能会因为线程或进程的创建、切换开销以及资源竞争等问题导致性能下降。需要根据具体的任务类型和系统环境进行性能测试和调优,选择最合适的并发模型。

通过对 Python 中并发编程基础的学习,我们了解了多线程、多进程和异步 I/O 这三种主要的并发编程模型及其应用场景。在实际开发中,根据任务的特点和需求,合理选择并发编程模型,可以显著提升程序的性能和效率,为用户提供更好的体验。