MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python异步IO与多线程/多进程的协同工作

2021-03-314.3k 阅读

Python 异步 I/O 基础

在深入探讨异步 I/O 与多线程/多进程的协同工作之前,我们先来了解一下 Python 异步 I/O 的基础概念。

Python 的异步 I/O 主要基于 asyncio 库。asyncio 是 Python 3.4 引入的标准库,用于编写异步代码,尤其是在处理 I/O 操作时表现出色。

协程(Coroutine)

协程是异步编程的核心概念之一。在 Python 中,使用 async def 关键字定义协程函数。

import asyncio


async def my_coroutine():
    print('Start of coroutine')
    await asyncio.sleep(1)
    print('End of coroutine')


在上述代码中,my_coroutine 是一个协程函数。await 关键字用于暂停协程的执行,直到被等待的 Future 或另一个协程完成。asyncio.sleep 是一个模拟 I/O 操作的协程,它会暂停当前协程指定的时间。

事件循环(Event Loop)

事件循环是 asyncio 的核心组件。它负责调度和执行协程。在 asyncio 中,可以通过 asyncio.get_event_loop() 获取事件循环对象。

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(my_coroutine())
finally:
    loop.close()


上述代码获取事件循环,并使用 run_until_complete 方法来运行协程,直到协程完成。最后,关闭事件循环。在 Python 3.7 及以上版本,还可以使用更简洁的方式:

asyncio.run(my_coroutine())


asyncio.run 内部会创建事件循环,运行协程,然后关闭事件循环。

并发执行多个协程

在实际应用中,常常需要并发执行多个协程。可以使用 asyncio.gather 函数来实现。

async def coroutine1():
    await asyncio.sleep(1)
    print('Coroutine 1 finished')


async def coroutine2():
    await asyncio.sleep(2)
    print('Coroutine 2 finished')


async def main():
    await asyncio.gather(coroutine1(), coroutine2())


asyncio.run(main())


在上述代码中,asyncio.gather 接受多个协程作为参数,并并发运行它们。main 协程等待所有传入 asyncio.gather 的协程完成。

多线程编程基础

多线程是一种在同一进程内实现并发执行的技术。Python 的 threading 模块提供了多线程编程的支持。

创建和启动线程

import threading


def worker():
    print('Thread started')
    # 模拟一些工作
    import time
    time.sleep(2)
    print('Thread finished')


t = threading.Thread(target=worker)
t.start()


在上述代码中,通过 threading.Thread 创建一个新线程,target 参数指定线程要执行的函数。然后调用 start 方法启动线程。

线程同步

多线程编程中,线程同步是一个重要的问题。当多个线程访问共享资源时,如果没有适当的同步机制,可能会导致数据竞争和不一致的结果。

threading 模块提供了多种同步原语,如锁(Lock)、信号量(Semaphore)等。

import threading


lock = threading.Lock()
shared_variable = 0


def increment():
    global shared_variable
    with lock:
        shared_variable += 1
        print(f'Incremented: {shared_variable}')


threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()


for t in threads:
    t.join()


在上述代码中,使用 Lock 来确保每次只有一个线程可以访问和修改 shared_variablewith lock 语句块会自动获取和释放锁。

多进程编程基础

多进程是另一种实现并发的方式,与多线程不同,每个进程都有自己独立的内存空间。Python 的 multiprocessing 模块用于多进程编程。

创建和启动进程

import multiprocessing


def worker():
    print('Process started')
    # 模拟一些工作
    import time
    time.sleep(2)
    print('Process finished')


p = multiprocessing.Process(target=worker)
p.start()


在上述代码中,通过 multiprocessing.Process 创建一个新进程,target 参数指定进程要执行的函数。然后调用 start 方法启动进程。

进程间通信

多进程之间需要进行通信时,可以使用 multiprocessing 提供的队列(Queue)、管道(Pipe)等。

import multiprocessing


def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f'Produced: {i}')


def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'Consumed: {item}')


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    queue.put(None)
    p2.join()


在上述代码中,使用 Queue 实现生产者 - 消费者模型。生产者进程将数据放入队列,消费者进程从队列中取出数据。注意,在 Windows 系统上,多进程代码需要放在 if __name__ == '__main__': 块中,以避免一些启动问题。

异步 I/O 与多线程的协同工作

在某些场景下,将异步 I/O 与多线程结合使用可以发挥两者的优势。例如,当异步 I/O 遇到一些无法通过异步方式优化的阻塞操作(如一些 C 语言扩展库的函数调用)时,可以将这些操作放在线程中执行,从而不阻塞事件循环。

在异步 I/O 中使用线程

import asyncio
import threading


def blocking_operation():
    import time
    time.sleep(2)
    return 42


async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_operation)
    print(f'Result: {result}')


asyncio.run(main())


在上述代码中,loop.run_in_executor 方法将 blocking_operation 函数提交到线程池中执行。None 参数表示使用默认的线程池。这样,在执行阻塞操作时,事件循环不会被阻塞,其他异步任务可以继续执行。

线程中使用异步 I/O

在某些情况下,可能需要在线程中使用异步 I/O。但是,需要注意的是,asyncio 的事件循环不是线程安全的。为了在线程中使用异步 I/O,需要在每个线程中创建自己的事件循环。

import asyncio
import threading


async def async_task():
    await asyncio.sleep(1)
    print('Async task in thread')


def thread_function():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(async_task())
    finally:
        loop.close()


t = threading.Thread(target=thread_function)
t.start()


在上述代码中,每个线程创建自己的事件循环,并在其中运行异步任务。这样可以避免事件循环的线程安全问题。

异步 I/O 与多进程的协同工作

将异步 I/O 与多进程结合使用也有其适用场景。例如,当需要处理大量计算密集型任务,同时又有 I/O 操作时,可以利用多进程进行并行计算,利用异步 I/O 处理 I/O 操作。

在异步 I/O 中使用进程

import asyncio
import multiprocessing


def cpu_bound_operation(x):
    return x * x


async def main():
    loop = asyncio.get_running_loop()
    results = []
    with multiprocessing.Pool() as pool:
        for i in range(5):
            result = await loop.run_in_executor(pool, cpu_bound_operation, i)
            results.append(result)
    print(f'Results: {results}')


asyncio.run(main())


在上述代码中,loop.run_in_executor 方法将 cpu_bound_operation 函数提交到进程池中执行。这样,计算密集型任务可以在多个进程中并行执行,而 I/O 操作可以通过异步 I/O 进行,从而提高整体性能。

进程中使用异步 I/O

在进程中使用异步 I/O 与在线程中使用类似,每个进程需要创建自己的事件循环。

import asyncio
import multiprocessing


async def async_task():
    await asyncio.sleep(1)
    print('Async task in process')


def process_function():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(async_task())
    finally:
        loop.close()


if __name__ == '__main__':
    p = multiprocessing.Process(target=process_function)
    p.start()
    p.join()


在上述代码中,每个进程创建自己的事件循环,并在其中运行异步任务。

综合应用场景分析

高并发 I/O 与计算混合场景

假设我们有一个网络爬虫应用,需要从多个网站下载页面内容,并对下载的内容进行解析和处理。下载页面是 I/O 密集型操作,适合使用异步 I/O;而解析和处理页面内容可能是计算密集型操作,适合使用多线程或多进程。

import asyncio
import multiprocessing
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


def process_data(data):
    # 模拟数据处理
    return len(data)


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        htmls = await asyncio.gather(*tasks)

    with multiprocessing.Pool() as pool:
        results = await asyncio.get_running_loop().run_in_executor(pool, lambda: pool.map(process_data, htmls))
    print(f'Results: {results}')


if __name__ == '__main__':
    asyncio.run(main())


在上述代码中,首先使用异步 I/O 和 aiohttp 库并发下载多个网页内容。然后,将下载的内容提交到进程池中进行处理。这样可以充分利用异步 I/O 的高并发优势和多进程的计算并行优势。

服务端应用场景

在一个 Web 服务端应用中,可能需要处理大量的客户端连接,同时还需要进行一些数据库查询、文件读写等 I/O 操作,以及一些业务逻辑计算。

import asyncio
import threading
import sqlite3


async def handle_connection(reader, writer):
    data = await reader.read(1024)
    message = data.decode('utf - 8')
    print(f'Received: {message}')

    def query_database():
        conn = sqlite3.connect('example.db')
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM users WHERE name =?', (message,))
        result = cursor.fetchone()
        conn.close()
        return result

    loop = asyncio.get_running_loop()
    database_result = await loop.run_in_executor(None, query_database)
    response = f'Database result: {database_result}'
    writer.write(response.encode('utf - 8'))
    await writer.drain()
    writer.close()


async def main():
    server = await asyncio.start_server(handle_connection, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(main())


在上述代码中,使用异步 I/O 处理客户端连接和数据读写。对于数据库查询操作,由于 SQLite 的连接不是线程安全的,将其放在线程池中执行,以避免阻塞事件循环。这样可以有效地处理高并发的客户端连接,同时完成必要的数据库操作。

性能优化与注意事项

资源消耗

无论是多线程、多进程还是异步 I/O,都需要消耗系统资源。多线程和多进程会占用更多的内存,而异步 I/O 在处理大量并发任务时,事件循环的调度也会带来一定的开销。因此,在选择使用哪种方式或它们的组合时,需要根据实际的系统资源情况和任务特点进行权衡。

调试与错误处理

多线程和多进程编程中,由于并发执行的特性,调试和错误处理变得更加复杂。在异步 I/O 中,await 关键字可能会隐藏一些异常,需要使用 try - except 块进行适当的异常处理。在多线程和多进程中,需要注意共享资源的访问控制,以及进程间/线程间通信的错误处理。

线程安全与进程安全

在多线程编程中,需要特别注意线程安全问题,确保对共享资源的访问是线程安全的。在多进程编程中,虽然进程间内存独立,但在进程间通信和共享资源(如文件、数据库连接)时,也需要考虑进程安全。

平台差异

不同的操作系统对多线程和多进程的支持存在差异。例如,在 Windows 系统上,多进程代码需要放在 if __name__ == '__main__': 块中,而在 Unix - like 系统上则不需要。在使用异步 I/O 时,不同操作系统的 I/O 多路复用机制也可能对性能产生影响。

总结

Python 的异步 I/O、多线程和多进程各有其特点和适用场景。通过合理地协同使用它们,可以在不同的应用场景下实现高效的并发编程。在实际项目中,需要根据任务的 I/O 特性、计算特性以及系统资源情况,精心选择和组合这些技术,以达到最佳的性能和可扩展性。同时,要注意资源消耗、调试与错误处理、线程安全与进程安全以及平台差异等问题,确保程序的稳定性和可靠性。