MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python并发编程:从理论到实践

2021-09-091.6k 阅读

1. 并发编程基础概念

1.1 什么是并发

在计算机编程领域,并发指的是在同一时间段内,多个任务似乎在同时执行。需要注意的是,并发并不等同于并行。并行是指在同一时刻,有多个任务在不同的处理器核心或不同的计算机上真正地同时执行。而并发通常是通过在单个处理器核心上快速切换任务,给人一种多个任务同时执行的错觉。

例如,在一个操作系统中,可能同时运行着浏览器、音乐播放器和文本编辑器等多个应用程序。操作系统通过调度算法,在这些应用程序之间快速切换 CPU 的使用权,使得用户感觉这些应用程序是在同时运行的,这就是并发的体现。

1.2 为什么需要并发编程

  1. 提高资源利用率:在许多应用场景中,计算机的资源(如 CPU、内存、网络带宽等)并不会被一个任务完全占用。例如,当一个任务在等待网络响应或者磁盘 I/O 操作完成时,CPU 处于闲置状态。通过并发编程,可以在等待这些操作完成的间隙,执行其他任务,从而提高 CPU 和其他资源的利用率。
  2. 提升程序响应性:对于交互式应用程序,如桌面应用或 Web 应用,用户期望应用程序能够及时响应用户的操作。如果应用程序中的任务是顺序执行的,当遇到一个耗时较长的任务(如文件读取或网络请求)时,整个应用程序会处于无响应状态,直到该任务完成。而并发编程可以将这些耗时任务放到后台执行,让应用程序的界面保持响应,提升用户体验。
  3. 加速任务处理:在某些情况下,将一个大任务分解为多个并发执行的小任务,可以加快整个任务的处理速度。例如,在数据分析中,对一个大数据集进行处理时,可以将数据集分成多个部分,分别由不同的并发任务进行处理,最后将结果合并,这样往往比顺序处理整个数据集要快。

2. Python 并发编程的实现方式

2.1 多线程编程

2.1.1 线程基础概念

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。

在 Python 中,通过 threading 模块来支持多线程编程。下面是一个简单的示例代码,展示了如何创建和启动一个线程:

import threading


def print_number():
    for i in range(10):
        print(f"Thread {threading.current_thread().name} prints {i}")


# 创建线程对象
thread = threading.Thread(target=print_number)

# 启动线程
thread.start()

# 等待线程结束
thread.join()

在上述代码中,首先定义了一个函数 print_number,这个函数就是线程要执行的任务。然后通过 threading.Thread 类创建了一个线程对象,并将 print_number 函数作为参数传递给 target。接着调用 start 方法启动线程,最后调用 join 方法等待线程执行完毕。

2.1.2 线程同步问题

由于多个线程共享进程的资源,当多个线程同时访问和修改共享资源时,就可能会出现数据竞争和不一致的问题。例如,假设有两个线程同时对一个共享变量进行加 1 操作,如果没有适当的同步机制,可能会导致最终结果不正确。

下面通过一个示例来说明这个问题:

import threading

counter = 0


def increment():
    global counter
    for _ in range(1000000):
        counter = counter + 1


threads = []
for _ in range(2):
    thread = threading.Thread(target = increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter value: {counter}")

在这个例子中,定义了一个共享变量 counter,并创建了两个线程,每个线程对 counter 进行 1000000 次加 1 操作。理想情况下,最终 counter 的值应该是 2000000,但实际运行结果可能会小于这个值,因为两个线程同时访问和修改 counter 时发生了数据竞争。

为了解决这个问题,Python 提供了多种线程同步机制,如锁(Lock)、信号量(Semaphore)、条件变量(Condition)等。下面使用锁来修正上述代码:

import threading

counter = 0
lock = threading.Lock()


def increment():
    global counter
    for _ in range(1000000):
        lock.acquire()
        try:
            counter = counter + 1
        finally:
            lock.release()


threads = []
for _ in range(2):
    thread = threading.Thread(target = increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter value: {counter}")

在这个修正后的代码中,通过 lock.acquire() 获取锁,确保在同一时间只有一个线程可以访问 counter,操作完成后通过 lock.release() 释放锁。这样就避免了数据竞争问题,最终 counter 的值会是 2000000。

2.2 多进程编程

2.2.1 进程基础概念

进程是计算机中程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。与线程不同,每个进程都有自己独立的内存空间和系统资源,进程之间的通信相对复杂。

在 Python 中,通过 multiprocessing 模块来支持多进程编程。以下是一个简单的多进程示例代码:

import multiprocessing


def print_number():
    for i in range(10):
        print(f"Process {multiprocessing.current_process().name} prints {i}")


if __name__ == '__main__':
    process = multiprocessing.Process(target = print_number)
    process.start()
    process.join()

在上述代码中,首先定义了 print_number 函数作为进程要执行的任务。然后通过 multiprocessing.Process 类创建了一个进程对象,并将 print_number 函数传递给 target。注意,在使用 multiprocessing 模块时,需要将相关代码放在 if __name__ == '__main__': 块中,这是为了避免在某些操作系统(如 Windows)上出现进程启动的问题。接着调用 start 方法启动进程,最后调用 join 方法等待进程结束。

2.2.2 进程间通信

由于进程之间有独立的内存空间,所以进程间通信(IPC)需要特殊的机制。multiprocessing 模块提供了多种 IPC 方式,如队列(Queue)、管道(Pipe)等。

下面以队列为例,展示进程间如何进行通信:

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"Produced {i}")


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed {item}")


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    producer_process = multiprocessing.Process(target = producer, args=(queue,))
    consumer_process = multiprocessing.Process(target = consumer, args=(queue,))

    producer_process.start()
    consumer_process.start()

    producer_process.join()
    queue.put(None)  # 发送结束信号
    consumer_process.join()

在这个示例中,producer 进程向队列中放入数据,consumer 进程从队列中取出数据。当 producer 进程完成生产任务后,向队列中放入一个 None 作为结束信号,consumer 进程在接收到 None 后结束循环。

2.3 异步编程

2.3.1 异步编程基础概念

异步编程是一种允许程序在执行 I/O 操作等耗时任务时,不阻塞主线程,而是继续执行其他代码的编程模式。在 Python 中,异步编程主要通过 asyncio 库来实现。

asyncio 基于事件循环(Event Loop)机制,事件循环会不断地检查是否有新的事件(如 I/O 操作完成)发生,并将相应的任务添加到执行队列中。异步函数(使用 async def 定义)在遇到 await 关键字时,会暂停执行,将控制权交回给事件循环,事件循环可以接着执行其他任务,当 await 后面的操作完成后,异步函数再从暂停的地方继续执行。

2.3.2 异步编程示例

下面是一个简单的异步编程示例,模拟了两个异步任务:

import asyncio


async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")


async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")


async def main():
    task_list = [task1(), task2()]
    await asyncio.gather(*task_list)


if __name__ == '__main__':
    asyncio.run(main())

在上述代码中,首先定义了两个异步函数 task1task2,它们分别模拟了一个耗时 2 秒和 1 秒的任务。然后在 main 函数中,将这两个异步任务添加到一个列表中,并使用 asyncio.gather 函数来并发执行这些任务。最后通过 asyncio.run 函数来运行 main 函数,启动整个异步任务流程。运行结果会先打印 Task 1 startedTask 2 started,然后等待 1 秒打印 Task 2 finished,再等待 1 秒打印 Task 1 finished,整个过程不会阻塞主线程。

3. 选择合适的并发编程方式

3.1 CPU 密集型任务与 I/O 密集型任务

  1. CPU 密集型任务:这类任务主要消耗 CPU 资源,如复杂的数学计算、数据加密等。对于 CPU 密集型任务,多进程编程通常是更好的选择,因为 Python 的全局解释器锁(GIL)会限制多线程在 CPU 密集型任务中的并行执行能力。每个进程有自己独立的 Python 解释器实例和 GIL,因此可以充分利用多核 CPU 的优势。
  2. I/O 密集型任务:这类任务主要消耗 I/O 资源,如文件读取、网络请求等。在等待 I/O 操作完成的过程中,CPU 处于闲置状态。对于 I/O 密集型任务,多线程或异步编程更为合适。多线程可以在等待 I/O 时切换到其他线程执行,而异步编程则通过事件循环更加高效地管理 I/O 操作,避免阻塞主线程。

3.2 资源消耗与性能权衡

  1. 多线程:多线程由于共享进程资源,创建和销毁线程的开销相对较小。但是,由于 GIL 的存在,在 CPU 密集型任务中性能提升有限。而且线程同步机制(如锁)的使用不当可能会导致死锁等问题。
  2. 多进程:多进程每个进程有独立的资源,不存在 GIL 限制,在 CPU 密集型任务中可以充分利用多核 CPU。但进程的创建和销毁开销较大,进程间通信也相对复杂,需要额外的资源和代码来实现。
  3. 异步编程:异步编程在 I/O 密集型任务中表现出色,能够高效地利用系统资源,提升程序的响应性。然而,异步编程的代码结构和逻辑相对复杂,调试也更加困难,需要对 asyncio 库有深入的理解。

3.3 应用场景分析

  1. Web 服务器:Web 服务器通常处理大量的 I/O 密集型任务,如接收和发送网络数据。在这种场景下,异步编程(如使用 aiohttp 库)可以极大地提高服务器的并发处理能力,提升性能和响应速度。
  2. 数据处理与分析:如果数据处理任务是 CPU 密集型的,如大规模矩阵运算,多进程编程可以利用多核 CPU 加速处理。如果任务包含大量的 I/O 操作,如从数据库读取数据或写入文件,多线程或异步编程可能更合适。
  3. 分布式系统:在分布式系统中,不同的节点可能运行在不同的物理机器上,此时进程间通信和资源管理变得更加重要。多进程编程可以更好地适应这种分布式环境,通过合适的进程间通信机制(如消息队列)实现节点间的协作。

4. 并发编程中的常见问题与解决方案

4.1 死锁问题

4.1.1 死锁的定义与原因

死锁是指两个或多个进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。死锁通常由以下四个必要条件引起:

  1. 互斥条件:资源在同一时间只能被一个进程(或线程)使用。
  2. 占有并等待条件:进程(或线程)已经占有了一些资源,并在等待获取其他进程(或线程)占用的资源。
  3. 不可剥夺条件:资源一旦被进程(或线程)占有,除非进程(或线程)主动释放,否则不能被其他进程(或线程)剥夺。
  4. 循环等待条件:存在一个进程(或线程)的循环链,链中的每个进程(或线程)都在等待下一个进程(或线程)占用的资源。

4.1.2 死锁的解决方案

  1. 破坏死锁的必要条件
    • 破坏互斥条件:在某些情况下,可以通过允许资源共享来避免死锁。例如,对于一些只读资源,可以允许多个进程(或线程)同时访问。
    • 破坏占有并等待条件:可以要求进程(或线程)在启动时一次性申请所有需要的资源,而不是逐步申请。这样可以避免进程(或线程)在占有部分资源的情况下等待其他资源。
    • 破坏不可剥夺条件:当一个进程(或线程)占有资源但又申请其他资源失败时,可以剥夺其已占有的资源,分配给其他进程(或线程)。
    • 破坏循环等待条件:可以对资源进行排序,要求进程(或线程)按照一定的顺序申请资源,避免形成循环等待。
  2. 死锁检测与恢复:通过定期检测系统中是否存在死锁,如果检测到死锁,则选择一个或多个进程(或线程)进行回滚或终止,以解除死锁。在 Python 中,可以通过一些工具(如 threading.enumerate() 函数查看线程状态)来辅助检测死锁。

4.2 资源竞争与数据一致性问题

4.2.1 资源竞争与数据一致性问题的表现

如前面提到的多线程对共享变量的操作示例,当多个线程同时访问和修改共享资源时,就会出现资源竞争,导致数据不一致的问题。除了共享变量,其他共享资源(如文件、数据库连接等)也可能出现类似问题。

4.2.2 解决方案

  1. 使用同步机制:如锁、信号量、条件变量等。这些同步机制可以确保在同一时间只有一个线程可以访问共享资源,从而保证数据的一致性。
  2. 使用线程安全的数据结构:Python 提供了一些线程安全的数据结构,如 queue.Queue。这些数据结构内部已经实现了同步机制,使用它们可以避免手动处理同步问题。
  3. 采用不可变数据结构:在某些情况下,使用不可变数据结构可以避免数据一致性问题。因为不可变数据结构一旦创建就不能被修改,多个线程可以安全地共享它们。

4.3 并发编程的调试与性能优化

4.3.1 调试并发程序

  1. 打印日志:在关键代码位置添加打印语句,输出线程或进程的状态、变量值等信息,以便跟踪程序的执行流程。
  2. 使用调试工具:Python 提供了 pdb 调试器,在并发程序中可以通过设置断点来暂停程序执行,查看变量状态。此外,一些集成开发环境(IDE)如 PyCharm 也提供了强大的调试功能,支持对并发程序的调试。
  3. 检测死锁和资源竞争:使用工具或自定义代码来检测死锁和资源竞争问题。例如,通过记录锁的获取和释放顺序,检测是否存在死锁的可能性。

4.3.2 性能优化

  1. 减少锁的使用范围:尽量缩短锁的持有时间,只在必要的代码段使用锁,以减少线程等待时间。
  2. 优化任务分配:根据任务的性质(CPU 密集型或 I/O 密集型)合理分配到不同的线程或进程中,充分利用系统资源。
  3. 使用异步编程优化 I/O 操作:对于 I/O 密集型任务,采用异步编程可以显著提升性能,减少线程或进程的创建和切换开销。

5. 实际应用案例

5.1 Web 爬虫

Web 爬虫需要从多个网页中获取数据,这涉及大量的网络 I/O 操作。使用并发编程可以提高爬虫的效率。以下是一个使用异步编程实现的简单 Web 爬虫示例,使用 aiohttp 库进行异步 HTTP 请求:

import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


if __name__ == '__main__':
    asyncio.run(main())

在这个示例中,fetch 函数使用 aiohttp 库发送异步 HTTP 请求并获取网页内容。main 函数创建了多个 fetch 任务,并使用 asyncio.gather 并发执行这些任务,从而提高了爬虫的效率。

5.2 分布式计算

假设要进行一个分布式的矩阵乘法计算。可以使用多进程编程,将矩阵分割成多个子矩阵,分配到不同的进程中进行计算,最后合并结果。以下是一个简化的示例代码:

import multiprocessing


def multiply_submatrix(submatrix_a, submatrix_b, result_queue):
    sub_result = []
    for i in range(len(submatrix_a)):
        row = []
        for j in range(len(submatrix_b[0])):
            value = 0
            for k in range(len(submatrix_b)):
                value += submatrix_a[i][k] * submatrix_b[k][j]
            row.append(value)
        sub_result.append(row)
    result_queue.put(sub_result)


if __name__ == '__main__':
    matrix_a = [[1, 2], [3, 4]]
    matrix_b = [[5, 6], [7, 8]]

    num_processes = 2
    submatrix_size = len(matrix_a) // num_processes

    result_queue = multiprocessing.Queue()
    processes = []

    for i in range(num_processes):
        submatrix_a = matrix_a[i * submatrix_size:(i + 1) * submatrix_size]
        process = multiprocessing.Process(target = multiply_submatrix, args=(submatrix_a, matrix_b, result_queue))
        processes.append(process)
        process.start()

    final_result = []
    for _ in range(num_processes):
        sub_result = result_queue.get()
        final_result.extend(sub_result)

    for process in processes:
        process.join()

    print(final_result)

在这个示例中,将矩阵 matrix_a 分割成多个子矩阵,每个子矩阵由一个进程进行与矩阵 matrix_b 的乘法运算,最后将各个子结果合并得到最终的矩阵乘法结果。通过这种方式,可以利用多进程的优势加速分布式计算任务。

5.3 实时数据处理

在实时数据处理场景中,如处理传感器实时上传的数据,需要快速响应并处理大量的数据。可以使用多线程或异步编程来实现。以下是一个使用多线程处理实时数据的示例,假设数据从传感器以一定频率获取:

import threading
import time


class SensorDataProcessor:
    def __init__(self):
        self.data = []
        self.lock = threading.Lock()

    def receive_data(self, new_data):
        self.lock.acquire()
        try:
            self.data.append(new_data)
        finally:
            self.lock.release()

    def process_data(self):
        while True:
            self.lock.acquire()
            try:
                if self.data:
                    data_to_process = self.data.pop(0)
                    # 模拟数据处理
                    print(f"Processing data: {data_to_process}")
            finally:
                self.lock.release()
            time.sleep(0.1)


if __name__ == '__main__':
    processor = SensorDataProcessor()

    receive_thread = threading.Thread(target = lambda: [processor.receive_data(i) for i in range(10)])
    process_thread = threading.Thread(target = processor.process_data)

    receive_thread.start()
    process_thread.start()

    receive_thread.join()
    process_thread.join()

在这个示例中,SensorDataProcessor 类模拟了一个实时数据处理系统。receive_data 方法用于接收传感器数据,process_data 方法用于处理数据。通过两个线程,一个负责接收数据,一个负责处理数据,实现了实时数据的并发处理。同时使用锁来保证数据的一致性。