MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python异步编程基础

2023-11-113.2k 阅读

Python异步编程基础

什么是异步编程

在传统的编程模式中,程序按照顺序依次执行每一行代码,只有当前面的操作完成后,才会继续执行后面的代码。这种模式在处理 I/O 操作(如网络请求、文件读写等)时会有一个明显的问题:I/O 操作通常比较耗时,在等待 I/O 操作完成的这段时间里,程序处于阻塞状态,CPU 无法执行其他任务,这就造成了资源的浪费。

而异步编程则是一种允许程序在等待 I/O 操作完成的同时,继续执行其他任务的编程模式。它通过将 I/O 操作与 CPU 计算操作分离,提高了程序的整体效率和响应性。在 Python 中,异步编程主要通过 asyncio 库来实现。

异步编程的优势

  1. 提高程序效率:在处理大量 I/O 操作时,异步编程能让 CPU 在等待 I/O 完成的间隙执行其他任务,避免 CPU 资源的闲置,从而大大提高程序的整体执行效率。例如,一个需要同时发起多个网络请求的程序,如果使用同步方式,每个请求都要等待前一个请求完成才能发起,而使用异步方式可以同时发起所有请求,在等待请求响应的过程中执行其他代码。
  2. 增强响应性:对于一些交互式应用程序(如 Web 服务器、聊天程序等),异步编程可以确保程序在处理耗时操作时,依然能够快速响应用户的输入。比如,Web 服务器在处理一个长时间运行的数据库查询时,异步编程可以让它继续处理其他用户的请求,而不会让用户一直等待。
  3. 节省资源:在传统的多线程或多进程编程中,每个线程或进程都需要占用一定的系统资源(如内存、文件描述符等)。而异步编程通过在单线程内实现异步操作,避免了大量线程或进程带来的资源开销,特别适合处理高并发场景。

协程(Coroutine)

协程是 Python 异步编程的核心概念之一。它是一种轻量级的线程,与普通函数不同,协程可以暂停执行并将控制权交回给调用者,然后在适当的时候恢复执行。在 Python 3.5 及以上版本中,通过 asyncawait 关键字来定义和使用协程。

定义协程函数

import asyncio


async def my_coroutine():
    print('开始执行协程')
    await asyncio.sleep(1)
    print('协程执行完毕')


在上述代码中,my_coroutine 是一个协程函数,通过 async 关键字定义。函数内部使用 await 关键字暂停协程的执行,并等待 asyncio.sleep(1) 这个异步操作完成。asyncio.sleep 模拟了一个耗时 1 秒的 I/O 操作。

运行协程

要运行协程,需要将其放入事件循环(Event Loop)中。事件循环是 asyncio 库的核心,它负责管理和调度异步任务的执行。

import asyncio


async def my_coroutine():
    print('开始执行协程')
    await asyncio.sleep(1)
    print('协程执行完毕')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(my_coroutine())
finally:
    loop.close()


在这段代码中,首先通过 asyncio.get_event_loop() 获取事件循环对象 loop,然后使用 loop.run_until_complete() 方法将协程 my_coroutine() 放入事件循环中运行,直到协程执行完毕。最后,通过 loop.close() 关闭事件循环。

从 Python 3.7 开始,还可以使用更简洁的方式来运行协程:

import asyncio


async def my_coroutine():
    print('开始执行协程')
    await asyncio.sleep(1)
    print('协程执行完毕')


asyncio.run(my_coroutine())


asyncio.run() 方法会自动创建事件循环,运行协程,并在协程执行完毕后关闭事件循环。

异步函数与普通函数的区别

  1. 调用方式:普通函数调用时会立即执行函数体中的代码,直到函数返回。而异步函数(协程函数)调用时,并不会立即执行函数体,而是返回一个协程对象。
def normal_function():
    print('普通函数执行')


async def async_function():
    print('异步函数执行')


# 调用普通函数
normal_function()
# 调用异步函数,返回协程对象
coroutine = async_function()
print(coroutine)


  1. 执行过程:普通函数的执行是连续的,一旦开始就会一直执行到结束。而异步函数在遇到 await 关键字时,会暂停执行,将控制权交回给事件循环,事件循环可以调度其他异步任务执行。当 await 后面的异步操作完成后,协程会从暂停的地方继续执行。
import asyncio


async def async_function():
    print('异步函数开始')
    await asyncio.sleep(2)
    print('异步函数继续')


async def main():
    task1 = asyncio.create_task(async_function())
    print('在 task1 暂停时执行其他代码')
    await task1


asyncio.run(main())


在上述代码中,async_function 在执行到 await asyncio.sleep(2) 时会暂停 2 秒,在这 2 秒内,main 函数中的 print('在 task1 暂停时执行其他代码') 会被执行。

  1. 返回值:普通函数返回一个具体的结果值。而异步函数返回的协程对象需要通过事件循环来驱动执行,最终获取其返回值。如果异步函数中有 return 语句,return 的值会作为 await 该协程的结果。
import asyncio


async def async_function():
    await asyncio.sleep(1)
    return '异步函数的返回值'


async def main():
    result = await async_function()
    print(result)


asyncio.run(main())


并发与并行

在理解异步编程时,容易混淆并发(Concurrency)和并行(Parallelism)这两个概念。

并发

并发指的是在一段时间内,系统可以处理多个任务,但这些任务不一定是同时执行的。在单线程环境下,通过事件循环和异步操作(如协程),程序可以在不同任务之间快速切换,从宏观上看起来好像是同时在处理多个任务。例如,一个 Web 服务器在单线程中使用异步编程,可以同时处理多个客户端的请求,通过在 I/O 操作(如读取请求数据、发送响应数据)时暂停协程,切换到其他请求处理协程,实现并发处理。

并行

并行则是指在同一时刻,系统可以真正同时执行多个任务。这通常需要多核 CPU 的支持,每个任务可以在不同的 CPU 核心上同时运行。例如,使用多进程编程,每个进程可以在不同的 CPU 核心上并行执行计算密集型任务,从而提高整体的计算速度。

总结来说,并发侧重于任务的交替处理,而并行侧重于任务的同时执行。异步编程是实现并发的一种有效方式,而并行通常通过多线程或多进程来实现。

异步 I/O 操作

在异步编程中,I/O 操作是主要的应用场景。Python 的 asyncio 库提供了丰富的异步 I/O 支持,包括网络 I/O(如 TCP、UDP 套接字操作)和文件 I/O。

异步网络 I/O

以下是一个简单的异步 TCP 服务器示例:

import asyncio


async def handle_connection(reader, writer):
    data = await reader.read(1024)
    message = data.decode('utf - 8')
    addr = writer.get_extra_info('peername')
    print(f"收到来自 {addr} 的消息: {message}")

    response = f"你好,你发送的消息是: {message}"
    writer.write(response.encode('utf - 8'))
    await writer.drain()

    print("关闭连接")
    writer.close()


async def main():
    server = await asyncio.start_server(handle_connection, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f"在 {addr} 启动服务器")

    async with server:
        await server.serve_forever()


asyncio.run(main())


在这个示例中,asyncio.start_server 创建了一个 TCP 服务器,handle_connection 协程处理每个客户端连接。reader.readwriter.write 以及 writer.drain 都是异步操作,使用 await 关键字等待操作完成。

异步文件 I/O

Python 3.6 引入了 aiofiles 库来支持异步文件 I/O 操作。以下是一个简单的示例:

import aiofiles


async def read_file():
    async with aiofiles.open('example.txt', 'r') as f:
        content = await f.read()
        print(f"文件内容: {content}")


async def write_file():
    async with aiofiles.open('example.txt', 'w') as f:
        await f.write('这是异步写入的内容')


async def main():
    await write_file()
    await read_file()


asyncio.run(main())


在上述代码中,aiofiles.open 以异步方式打开文件,f.readf.write 都是异步操作,需要使用 await 等待操作完成。

任务(Task)

asyncio 中,任务(Task)是对协程的进一步封装,它可以被事件循环调度执行。通过 asyncio.create_task() 方法可以将一个协程包装成任务并加入到事件循环中。

创建和运行任务

import asyncio


async def task_function():
    await asyncio.sleep(2)
    print('任务执行完毕')


async def main():
    task = asyncio.create_task(task_function())
    print('任务已创建,继续执行其他代码')
    await task


asyncio.run(main())


main 函数中,首先使用 asyncio.create_task(task_function()) 创建了一个任务 task,此时任务并不会立即执行,而是被加入到事件循环的任务队列中。print('任务已创建,继续执行其他代码') 会立即执行,然后通过 await task 等待任务 task 执行完毕。

多个任务并发执行

可以创建多个任务并同时运行,事件循环会在这些任务之间进行调度。

import asyncio


async def task_function(task_number):
    await asyncio.sleep(task_number)
    print(f'任务 {task_number} 执行完毕')


async def main():
    tasks = []
    for i in range(1, 4):
        task = asyncio.create_task(task_function(i))
        tasks.append(task)

    for task in tasks:
        await task


asyncio.run(main())


在这个示例中,创建了三个任务,每个任务的 asyncio.sleep 时间不同。通过 await task 依次等待每个任务完成,事件循环会在不同任务的等待时间内调度其他任务执行,从而实现并发效果。

等待多个任务完成

在实际应用中,经常需要等待多个任务全部完成后再进行下一步操作。asyncio 提供了 asyncio.gather() 方法来实现这一功能。

使用 asyncio.gather

import asyncio


async def task_function(task_number):
    await asyncio.sleep(task_number)
    print(f'任务 {task_number} 执行完毕')
    return f'任务 {task_number} 的结果'


async def main():
    tasks = []
    for i in range(1, 4):
        task = asyncio.create_task(task_function(i))
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    print('所有任务执行完毕,结果如下:')
    for result in results:
        print(result)


asyncio.run(main())


在上述代码中,asyncio.gather(*tasks) 会等待所有任务完成,并返回一个包含所有任务返回值的列表。*tasks 是将任务列表展开作为 asyncio.gather 的参数。

处理任务异常

当任务中发生异常时,asyncio.gather 会将异常传递出来。可以通过捕获异常来处理。

import asyncio


async def task_function(task_number):
    if task_number == 2:
        raise ValueError('任务 2 发生错误')
    await asyncio.sleep(task_number)
    print(f'任务 {task_number} 执行完毕')
    return f'任务 {task_number} 的结果'


async def main():
    tasks = []
    for i in range(1, 4):
        task = asyncio.create_task(task_function(i))
        tasks.append(task)

    try:
        results = await asyncio.gather(*tasks)
    except ValueError as e:
        print(f'捕获到异常: {e}')


asyncio.run(main())


在这个示例中,当 task_number 为 2 时,任务会抛出 ValueError 异常。asyncio.gather 会捕获这个异常并传递给 await 处,通过 try - except 块可以捕获并处理异常。

异步迭代器和异步生成器

异步迭代器

异步迭代器是指实现了 __aiter____anext__ 方法的对象,__aiter__ 方法返回异步迭代器自身,__anext__ 方法用于返回下一个异步迭代的值,并且在没有更多值时引发 StopAsyncIteration 异常。

以下是一个简单的异步迭代器示例:

import asyncio


class AsyncIterator:
    def __init__(self, max_num):
        self.max_num = max_num
        self.current = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.max_num:
            raise StopAsyncIteration
        self.current += 1
        await asyncio.sleep(1)
        return self.current


async def main():
    async_iter = AsyncIterator(3)
    async for num in async_iter:
        print(num)


asyncio.run(main())


在上述代码中,AsyncIterator 类实现了异步迭代器接口。async for 语句用于异步迭代这个异步迭代器,每次迭代会等待 asyncio.sleep(1) 模拟的异步操作完成后获取下一个值。

异步生成器

异步生成器是使用 async def 定义且包含 yield 语句的函数,它返回一个异步生成器对象,该对象也是一个异步迭代器。

import asyncio


async def async_generator():
    for i in range(1, 4):
        await asyncio.sleep(1)
        yield i


async def main():
    async for num in async_generator():
        print(num)


asyncio.run(main())


在这个示例中,async_generator 是一个异步生成器函数,通过 async for 可以异步迭代它生成的值,每次迭代等待 asyncio.sleep(1) 模拟的异步操作完成。

异步上下文管理器

异步上下文管理器用于管理异步资源的生命周期,它实现了 __aenter____aexit__ 方法。在进入异步上下文时,__aenter__ 方法被调用,离开上下文时,__aexit__ 方法被调用。

使用异步上下文管理器

import asyncio


class AsyncResource:
    async def __aenter__(self):
        print('进入异步上下文,获取资源')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print('离开异步上下文,释放资源')
        await asyncio.sleep(1)

    async def do_something(self):
        print('在异步上下文中执行操作')
        await asyncio.sleep(1)


async def main():
    async with AsyncResource() as resource:
        await resource.do_something()


asyncio.run(main())


在上述代码中,AsyncResource 类实现了异步上下文管理器接口。async with 语句用于进入和离开异步上下文,在上下文中可以调用 resource.do_something() 方法执行具体操作。

总结异步编程注意事项

  1. 避免阻塞操作:在异步代码中,要确保不包含长时间阻塞的同步操作,否则会破坏异步的优势。例如,不要在协程中使用同步的 I/O 操作函数。
  2. 异常处理:合理处理异步任务中的异常,通过 try - except 块捕获 asyncio.gather 等方法传递的异常,避免异常导致程序崩溃。
  3. 资源管理:使用异步上下文管理器来管理异步资源,确保资源在使用完毕后正确释放,避免资源泄漏。
  4. 性能调优:虽然异步编程可以提高效率,但在处理大量并发任务时,也需要注意性能调优。例如,合理设置异步操作的并发数量,避免过多的任务导致系统资源耗尽。

通过深入理解和掌握上述 Python 异步编程的基础知识,开发者可以编写出高效、响应性强的异步应用程序,特别是在处理 I/O 密集型任务和高并发场景时,异步编程能够发挥出巨大的优势。