MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python中的协程与异步编程

2024-06-174.1k 阅读

Python中的协程与异步编程

协程基础概念

在深入探讨Python的协程与异步编程之前,我们先来理解协程的基本概念。协程,从本质上来说,是一种用户态的轻量级线程,也被称为微线程。与操作系统提供的线程不同,协程的调度完全由用户程序控制,这使得它在一些场景下具有更高的灵活性和效率。

传统的线程模型,线程的切换是由操作系统内核完成的,这种切换需要陷入内核态,涉及到上下文的保存和恢复等操作,开销较大。而协程的切换发生在用户态,不需要操作系统的介入,因此切换成本更低。

以一个简单的生产者 - 消费者模型为例,在传统的线程模型中,生产者和消费者线程通过共享队列进行数据传递,线程之间的切换由操作系统调度。而使用协程时,生产者和消费者可以是两个协程,它们之间的协作由用户程序通过手动切换协程来实现。

Python中的协程实现方式

在Python中,协程的实现经历了多个阶段的发展。早期通过generator(生成器)来实现协程的部分功能,后来引入了asyncio库,提供了更完善的异步编程框架,并且Python 3.5及以后引入了asyncawait关键字,让异步代码的编写更加简洁和直观。

  1. 使用生成器实现协程 Python的生成器是一种可迭代对象,它可以暂停和恢复执行。通过yield语句,生成器可以暂停执行并返回一个值,下次调用next()方法时,生成器会从暂停的地方继续执行。

    def simple_coroutine():
        print('开始执行')
        x = yield
        print(f'接收到的值: {x}')
    
    
    coro = simple_coroutine()
    next(coro)  # 启动协程,执行到yield语句暂停
    coro.send(42)  # 向协程发送值42,协程恢复执行
    

    在上述代码中,simple_coroutine是一个生成器函数,它定义了一个简单的协程。next(coro)启动协程,使其执行到yield语句暂停。coro.send(42)向协程发送值42,协程恢复执行,并打印接收到的值。

  2. asyncio库与异步函数 asyncio是Python用于编写异步代码的标准库。在asyncio中,异步函数使用async def定义,这种函数会返回一个coroutine对象。

    import asyncio
    
    
    async def async_function():
        print('异步函数开始')
        await asyncio.sleep(1)
        print('异步函数结束')
    
    
    async def main():
        await async_function()
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在上述代码中,async_function是一个异步函数,其中await asyncio.sleep(1)表示暂停当前协程,等待1秒钟,然后继续执行。main函数也是一个异步函数,它调用了async_functionasyncio.run(main())用于运行异步函数main,它会创建一个事件循环,并在事件循环中执行main函数。

  3. async和await关键字 async关键字用于定义异步函数,await关键字只能在异步函数内部使用,它用于暂停当前协程,等待一个可等待对象(如另一个协程、Future对象或Task对象)完成,并返回其结果。

    async def fetch_data():
        await asyncio.sleep(2)
        return {'data': '示例数据'}
    
    
    async def process_data():
        result = await fetch_data()
        print(f'处理数据: {result}')
    
    
    async def main():
        await process_data()
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在这段代码中,fetch_data是一个异步函数,它模拟了一个耗时操作(通过await asyncio.sleep(2)),并返回数据。process_data函数等待fetch_data完成,并处理其返回的数据。main函数用于协调这些操作,asyncio.run(main())启动整个异步流程。

异步编程的优势

  1. 提高程序的并发性能 在I/O密集型任务中,程序大部分时间都在等待I/O操作完成,如网络请求、文件读写等。传统的多线程或多进程方式虽然可以实现并发,但线程和进程的创建、切换开销较大。而异步编程通过协程的方式,在等待I/O操作时,协程可以暂停执行,将控制权交给其他协程,从而充分利用CPU资源,提高程序的整体并发性能。 例如,在一个需要同时处理多个网络请求的程序中,使用异步编程可以在等待一个请求响应的同时,处理其他请求,而不需要为每个请求创建一个新的线程。
  2. 简化代码逻辑 相比于使用回调函数来处理异步操作,asyncawait的语法使得异步代码看起来更像同步代码,逻辑更加清晰。回调函数可能会导致“回调地狱”,即多层嵌套的回调函数使得代码难以阅读和维护。而异步函数通过await关键字可以顺序地编写异步操作,提高了代码的可读性和可维护性。 例如,下面是使用回调函数和异步函数处理多个异步操作的对比。 使用回调函数
    def step1(callback):
        # 模拟一些异步操作
        result1 = '步骤1结果'
        callback(result1)
    
    
    def step2(result1, callback):
        # 模拟一些异步操作
        result2 = result1 + ' - 步骤2结果'
        callback(result2)
    
    
    def step3(result2):
        print(f'最终结果: {result2}')
    
    
    def main():
        step1(lambda res1: step2(res1, step3))
    
    
    if __name__ == '__main__':
        main()
    
    使用异步函数
    async def step1():
        # 模拟一些异步操作
        await asyncio.sleep(1)
        return '步骤1结果'
    
    
    async def step2(result1):
        # 模拟一些异步操作
        await asyncio.sleep(1)
        return result1 + ' - 步骤2结果'
    
    
    async def step3(result2):
        print(f'最终结果: {result2}')
    
    
    async def main():
        result1 = await step1()
        result2 = await step2(result1)
        await step3(result2)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    可以看到,异步函数的代码结构更清晰,更符合人类的阅读习惯。

异步编程中的事件循环

事件循环是异步编程的核心概念之一。在asyncio中,事件循环负责管理和调度协程的执行。它不断地循环,检查是否有可执行的协程、I/O事件等,并执行相应的操作。

  1. 事件循环的创建和运行 在Python中,可以通过asyncio.get_running_loop()获取当前正在运行的事件循环,如果没有则会引发RuntimeError。也可以使用asyncio.new_event_loop()创建一个新的事件循环。通常使用asyncio.run()来运行异步函数,asyncio.run()内部会创建一个事件循环,运行异步函数,然后关闭事件循环。

    import asyncio
    
    
    async def simple_task():
        print('简单任务开始')
        await asyncio.sleep(1)
        print('简单任务结束')
    
    
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(simple_task())
    finally:
        loop.close()
    

    在上述代码中,通过asyncio.new_event_loop()创建了一个新的事件循环loop,然后使用loop.run_until_complete(simple_task())在这个事件循环中运行simple_task协程。最后,在finally块中关闭事件循环。

  2. 事件循环中的任务和Future对象 Taskasyncio中用于管理协程的对象,它继承自FutureFuture表示一个异步操作的最终结果,它可以处于未完成、完成(成功或失败)等状态。

    import asyncio
    
    
    async def task_function():
        await asyncio.sleep(2)
        return '任务结果'
    
    
    async def main():
        task = asyncio.create_task(task_function())
        result = await task
        print(f'获取到的结果: {result}')
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在这段代码中,asyncio.create_task(task_function())创建了一个Task对象,将task_function协程包装成一个任务并安排在事件循环中执行。await task等待任务完成并获取其返回结果。

异步I/O操作

  1. 异步网络I/O 在Python中,aiohttp库是一个常用的用于异步HTTP请求的库。它基于asyncio实现,能够高效地处理大量的网络请求。

    import aiohttp
    import asyncio
    
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.json()
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = []
            urls = ['https://example.com', 'https://another - example.com']
            for url in urls:
                task = asyncio.create_task(fetch(session, url))
                tasks.append(task)
            results = await asyncio.gather(*tasks)
            for result in results:
                print(result)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在上述代码中,fetch函数使用aiohttpsession.get方法发送异步HTTP GET请求,并返回响应的JSON数据。main函数创建了多个任务,每个任务对应一个URL的请求,通过asyncio.gather等待所有任务完成,并处理结果。

  2. 异步文件I/O Python 3.6及以后引入了aiofiles库,用于实现异步文件读写操作。

    import aiofiles
    
    
    async def read_file():
        async with aiofiles.open('example.txt', 'r') as f:
            content = await f.read()
            print(f'文件内容: {content}')
    
    
    async def write_file():
        async with aiofiles.open('example.txt', 'w') as f:
            await f.write('这是示例内容')
    
    
    async def main():
        await write_file()
        await read_file()
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在这段代码中,write_file函数异步地向文件中写入内容,read_file函数异步地从文件中读取内容。main函数协调这两个异步操作。

协程的并发与并行

  1. 并发与并行的区别 并发是指在同一时间段内,多个任务交替执行,但在同一时刻,只有一个任务在执行。而并行是指在同一时刻,有多个任务同时执行,通常需要多个CPU核心或多个处理器。 在异步编程中,协程实现的是并发,而不是并行。多个协程在一个线程内通过事件循环进行切换执行,虽然看起来像是同时执行多个任务,但实际上在同一时刻只有一个协程在执行。

  2. 利用多进程实现并行的异步编程 如果需要在异步编程中实现并行,可以结合asynciomultiprocessing库。每个进程可以有自己的事件循环,从而实现真正的并行异步操作。

    import asyncio
    import multiprocessing
    
    
    async def async_task():
        await asyncio.sleep(2)
        return '异步任务结果'
    
    
    def run_async_in_process():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(async_task())
        print(f'进程内结果: {result}')
    
    
    if __name__ == '__main__':
        processes = []
        for _ in range(3):
            p = multiprocessing.Process(target = run_async_in_process)
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
    

    在上述代码中,run_async_in_process函数在每个进程内创建一个事件循环并运行异步任务。通过multiprocessing.Process创建多个进程,从而实现并行的异步操作。

异步编程中的错误处理

  1. 捕获异步函数中的异常 在异步函数中,可以使用try - except语句来捕获异常。

    import asyncio
    
    
    async def error_prone_task():
        raise ValueError('这是一个示例异常')
    
    
    async def main():
        try:
            await error_prone_task()
        except ValueError as e:
            print(f'捕获到异常: {e}')
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在这段代码中,error_prone_task函数抛出一个ValueError异常,main函数通过try - except语句捕获并处理这个异常。

  2. 处理任务中的异常 当使用asyncio.create_task创建任务时,如果任务内部发生异常,需要正确处理。可以通过Task对象的add_done_callback方法来处理任务完成时的异常。

    import asyncio
    
    
    async def error_task():
        await asyncio.sleep(1)
        raise TypeError('任务中的类型错误')
    
    
    def handle_task_result(task):
        if task.exception():
            print(f'任务中捕获到异常: {task.exception()}')
    
    
    async def main():
        task = asyncio.create_task(error_task())
        task.add_done_callback(handle_task_result)
        await task
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    在上述代码中,error_task函数抛出一个TypeError异常。handle_task_result函数作为add_done_callback的回调函数,用于检查任务是否有异常并处理。

总结与最佳实践

  1. 总结 协程与异步编程是Python中强大的编程范式,特别适用于I/O密集型任务。通过asyncio库、asyncawait关键字,Python提供了简洁而高效的异步编程方式。事件循环、任务和Future对象等概念是异步编程的核心,理解并掌握它们对于编写复杂的异步应用至关重要。

  2. 最佳实践

    • 合理使用异步库:根据具体需求选择合适的异步库,如aiohttp用于网络请求,aiofiles用于文件操作等。
    • 优化事件循环:避免在事件循环中执行长时间的同步操作,以免阻塞其他协程的执行。
    • 清晰的错误处理:在异步代码中,确保有完善的错误处理机制,及时捕获和处理异常,以提高程序的稳定性。
    • 性能测试与调优:在实际应用中,对异步代码进行性能测试,根据测试结果进行调优,以充分发挥异步编程的优势。

希望通过本文的介绍,读者能够深入理解Python中的协程与异步编程,并在实际项目中灵活运用这一强大的编程技术。