MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python异步编程及其使用场景

2021-09-213.7k 阅读

Python异步编程基础

同步编程的局限性

在传统的同步编程模型中,程序按照顺序依次执行各项任务。例如,当一个函数进行I/O操作(如读取文件、网络请求等)时,程序会被阻塞,等待该操作完成后才继续执行后续代码。考虑以下简单的Python代码示例,它模拟从文件中读取数据并进行处理:

import time


def read_file():
    with open('example.txt', 'r') as f:
        data = f.read()
        time.sleep(2)  # 模拟I/O延迟
        return data


def process_data(data):
    result = sum(int(num) for num in data.split())
    return result


start_time = time.time()
file_data = read_file()
processed_result = process_data(file_data)
print(f"Processed result: {processed_result}")
print(f"Total time taken: {time.time() - start_time} seconds")

在上述代码中,read_file函数中的time.sleep(2)模拟了I/O操作的延迟。在这2秒的延迟期间,程序无法执行其他任务,整个主线程处于阻塞状态。如果有多个类似的I/O操作,程序的执行效率将会显著降低,尤其是在处理大量并发请求时,这种阻塞式的编程方式会导致资源浪费和响应缓慢。

异步编程的概念

异步编程旨在解决同步编程中的阻塞问题,使程序在等待I/O操作完成的同时,能够继续执行其他任务。在Python中,异步编程主要依赖于asyncio库。asyncio是Python用于编写异步代码的标准库,它提供了基于事件循环(Event Loop)、协程(Coroutine)和任务(Task)的异步编程模型。

事件循环

事件循环是asyncio的核心概念之一。它是一个无限循环,负责不断地检查注册的任务,并在有任务准备好执行时执行它们。简单来说,事件循环会遍历所有已注册的任务,当某个任务处于可执行状态(例如,I/O操作完成)时,事件循环会调度该任务继续执行。

在Python中,可以通过asyncio.get_event_loop()获取当前线程的事件循环。以下是一个简单的示例,展示如何获取并运行事件循环:

import asyncio


async def simple_coroutine():
    print("Coroutine started")
    await asyncio.sleep(1)
    print("Coroutine resumed after sleep")


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(simple_coroutine())
finally:
    loop.close()

在上述代码中,asyncio.get_event_loop()获取事件循环,loop.run_until_complete(simple_coroutine())将协程simple_coroutine添加到事件循环中并运行,直到该协程执行完毕。finally块中的loop.close()用于关闭事件循环,释放资源。

协程

协程是一种特殊的函数,它可以暂停执行并将控制权交回给调用者,稍后再从暂停的地方继续执行。在Python中,定义协程函数需要使用async def关键字。例如:

async def my_coroutine():
    print("Coroutine execution started")
    await asyncio.sleep(2)
    print("Coroutine execution resumed after sleep")

在协程函数内部,可以使用await关键字暂停协程的执行,等待一个可等待对象(如另一个协程或asyncio.sleep这样的I/O模拟操作)完成。当await的对象完成后,协程会从暂停的地方继续执行。

任务

任务(Task)是对协程的进一步封装,它将协程包装成一个可以由事件循环调度执行的对象。通过asyncio.create_task()可以创建一个任务,例如:

import asyncio


async def task_function():
    print("Task started")
    await asyncio.sleep(1)
    print("Task completed")


async def main():
    task = asyncio.create_task(task_function())
    print("Main function continues while task is running")
    await task
    print("Task has been awaited, main function can proceed")


asyncio.run(main())

在上述代码中,asyncio.create_task(task_function())创建了一个任务task,此时main函数会继续执行,而task_function会在事件循环的调度下异步执行。await task用于等待任务task完成,确保main函数在task_function执行完毕后再继续执行后续代码。

异步I/O操作

文件I/O的异步化

在传统的同步编程中,文件I/O操作是阻塞的。而在异步编程中,可以使用aiofiles库来实现异步文件I/O。aiofiles提供了与标准open函数类似的接口,但以异步方式执行文件操作。以下是一个使用aiofiles读取文件的示例:

import asyncio
import aiofiles


async def read_file_async():
    async with aiofiles.open('example.txt', 'r') as f:
        data = await f.read()
        return data


async def process_data_async(data):
    result = sum(int(num) for num in data.split())
    return result


async def main():
    start_time = time.time()
    file_data = await read_file_async()
    processed_result = await process_data_async(file_data)
    print(f"Processed result: {processed_result}")
    print(f"Total time taken: {time.time() - start_time} seconds")


asyncio.run(main())

在上述代码中,async with aiofiles.open('example.txt', 'r') as f:以异步方式打开文件,await f.read()异步读取文件内容。这样,在等待文件读取完成的过程中,事件循环可以调度其他任务执行,提高了程序的执行效率。

网络I/O的异步化

对于网络I/O操作,asyncio提供了asyncio.open_connection()等函数来实现异步网络连接。例如,以下是一个简单的异步TCP客户端示例:

import asyncio


async def tcp_client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    message = 'Hello, Server!'
    writer.write(message.encode())
    await writer.drain()
    data = await reader.read(1024)
    print(f"Received: {data.decode()}")
    writer.close()
    await writer.wait_closed()


asyncio.run(tcp_client())

在上述代码中,await asyncio.open_connection('127.0.0.1', 8888)异步建立TCP连接,await writer.drain()确保数据发送完毕,await reader.read(1024)异步读取服务器返回的数据。通过这种方式,在网络I/O操作过程中,程序不会被阻塞,提高了并发处理能力。

异步编程中的错误处理

协程内的错误处理

在协程内部,可以使用try - except块来捕获异常。例如:

import asyncio


async def error_prone_coroutine():
    try:
        await asyncio.sleep(1)
        result = 1 / 0
    except ZeroDivisionError as e:
        print(f"Caught exception: {e}")


asyncio.run(error_prone_coroutine())

在上述代码中,error_prone_coroutine协程中模拟了一个除零错误。通过try - except块,捕获了ZeroDivisionError异常,并进行了相应的处理。

任务中的错误处理

当在任务中发生异常时,如果没有在任务内部处理,异常会传递到等待该任务的地方。例如:

import asyncio


async def error_task():
    await asyncio.sleep(1)
    raise ValueError("Task encountered an error")


async def main():
    task = asyncio.create_task(error_task())
    try:
        await task
    except ValueError as e:
        print(f"Caught task exception: {e}")


asyncio.run(main())

在上述代码中,error_task任务抛出了一个ValueError异常。在main函数中,通过try - except块捕获了这个异常,确保程序不会因为任务中的异常而崩溃。

Python异步编程的使用场景

网络爬虫

在网络爬虫应用中,需要频繁地发送HTTP请求并等待响应。传统的同步爬虫在等待响应时会阻塞主线程,导致效率低下。而异步爬虫可以在等待响应的同时,继续发送其他请求,大大提高了爬取效率。以下是一个简单的异步爬虫示例,使用aiohttp库发送HTTP请求:

import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


asyncio.run(main())

在上述代码中,fetch函数使用aiohttp异步发送HTTP GET请求并获取响应内容。main函数中创建了多个fetch任务,并通过asyncio.gather并发执行这些任务,提高了爬虫的效率。

实时数据处理

在实时数据处理场景中,例如处理来自传感器的连续数据流或实时更新的网络数据,异步编程可以确保在处理新数据的同时,不会阻塞其他任务的执行。假设我们有一个模拟的传感器数据接收和处理程序:

import asyncio


async def receive_sensor_data():
    while True:
        data = await simulate_sensor_reading()
        await process_sensor_data(data)


async def simulate_sensor_reading():
    await asyncio.sleep(1)
    return "Sensor data"


async def process_sensor_data(data):
    print(f"Processing data: {data}")


asyncio.run(receive_sensor_data())

在上述代码中,receive_sensor_data函数不断地异步接收传感器数据(通过simulate_sensor_reading模拟),并将数据传递给process_sensor_data进行处理。在等待新数据到来的过程中,事件循环可以调度其他任务执行,实现了实时数据的高效处理。

异步微服务架构

在微服务架构中,各个微服务之间可能需要进行频繁的通信和数据交互。异步编程可以优化这些通信过程,减少等待时间,提高系统的整体性能。例如,一个简单的异步微服务示例,其中一个微服务向另一个微服务发送请求并等待响应:

import asyncio


async def microservice_client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 9999)
    request = 'Get data'
    writer.write(request.encode())
    await writer.drain()
    response = await reader.read(1024)
    print(f"Received from microservice: {response.decode()}")
    writer.close()
    await writer.wait_closed()


async def microservice_server():
    server = await asyncio.start_server(handle_connection, '127.0.0.1', 9999)
    async with server:
        await server.serve_forever()


async def handle_connection(reader, writer):
    data = await reader.read(1024)
    request = data.decode()
    response = f"Response to {request}"
    writer.write(response.encode())
    await writer.drain()
    writer.close()
    await writer.wait_closed()


loop = asyncio.get_event_loop()
try:
    loop.create_task(microservice_server())
    loop.run_until_complete(microservice_client())
finally:
    loop.close()

在上述代码中,microservice_clientmicroservice_server发送请求并等待响应。microservice_server监听指定端口,处理客户端请求并返回响应。通过异步编程,在等待响应的过程中,微服务可以继续处理其他任务,提高了系统的并发处理能力。

游戏开发中的异步应用

在游戏开发中,异步编程可以用于处理一些耗时操作,如加载游戏资源、网络通信(例如多人在线游戏中的玩家数据同步)等,而不会阻塞游戏的主循环,确保游戏的流畅运行。例如,以下是一个简单的游戏资源加载示例:

import asyncio
import time


async def load_resource(resource_name):
    print(f"Starting to load {resource_name}")
    await asyncio.sleep(2)  # 模拟资源加载延迟
    print(f"{resource_name} loaded successfully")
    return f"{resource_name} data"


async def game_loop():
    start_time = time.time()
    resource_tasks = [load_resource('texture'), load_resource('model')]
    loaded_resources = await asyncio.gather(*resource_tasks)
    print(f"All resources loaded in {time.time() - start_time} seconds")
    # 继续游戏逻辑,使用加载的资源
    for resource in loaded_resources:
        print(f"Using {resource} in game")


asyncio.run(game_loop())

在上述代码中,load_resource函数模拟了游戏资源的加载过程,game_loop函数通过asyncio.gather并发加载多个资源。在资源加载的过程中,游戏的主循环(虽然这里简化为game_loop函数)不会被阻塞,可以继续执行其他游戏相关的逻辑,提高了游戏的用户体验。

异步编程与多线程、多进程的比较

多线程

多线程编程通过在一个进程内创建多个线程来实现并发执行。每个线程都有自己的执行上下文,可以独立执行任务。在Python中,threading模块用于多线程编程。例如:

import threading
import time


def thread_function():
    print("Thread started")
    time.sleep(2)
    print("Thread completed")


start_time = time.time()
thread = threading.Thread(target = thread_function)
thread.start()
print("Main thread continues while thread is running")
thread.join()
print(f"Total time taken: {time.time() - start_time} seconds")

多线程适用于I/O密集型任务,因为线程在等待I/O操作完成时会释放GIL(全局解释器锁),允许其他线程执行。然而,对于CPU密集型任务,由于GIL的存在,多线程并不能充分利用多核CPU的优势,反而可能因为线程切换带来额外的开销。

多进程

多进程编程通过创建多个独立的进程来实现并行执行。每个进程都有自己独立的内存空间和Python解释器实例。在Python中,multiprocessing模块用于多进程编程。例如:

import multiprocessing
import time


def process_function():
    print("Process started")
    time.sleep(2)
    print("Process completed")


start_time = time.time()
process = multiprocessing.Process(target = process_function)
process.start()
print("Main process continues while process is running")
process.join()
print(f"Total time taken: {time.time() - start_time} seconds")

多进程适用于CPU密集型任务,因为每个进程可以独立使用一个CPU核心,充分利用多核CPU的优势。但是,多进程之间的通信和数据共享相对复杂,并且创建和销毁进程的开销较大。

异步编程与多线程、多进程的对比

  • 资源消耗:异步编程基于单线程,通过事件循环和协程实现并发,资源消耗相对较小,适合处理大量并发任务。多线程虽然在一个进程内,但线程的创建和维护也需要一定的资源。多进程由于每个进程都有独立的内存空间和解释器实例,资源消耗最大。
  • CPU利用率:异步编程主要适用于I/O密集型任务,对于CPU密集型任务无法充分利用多核CPU。多线程在I/O密集型任务中有较好的表现,但在CPU密集型任务中受GIL限制。多进程适用于CPU密集型任务,能充分利用多核CPU。
  • 编程复杂度:异步编程相对来说编程模型较为简单,通过asyncawait关键字来管理异步操作。多线程编程需要处理线程同步问题,如锁、信号量等,编程复杂度较高。多进程编程除了要处理进程间通信和同步问题外,由于进程间内存独立,数据共享也较为复杂,编程复杂度最高。

综上所述,在选择使用异步编程、多线程还是多进程时,需要根据具体的任务类型(I/O密集型还是CPU密集型)、资源需求和编程复杂度等因素进行综合考虑。

高级异步编程技巧

异步生成器

异步生成器是一种特殊的生成器,它可以异步地生成值。在Python中,可以使用async def定义异步生成器函数,并使用yield关键字生成值。例如:

import asyncio


async def async_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i


async def main():
    async for value in async_generator():
        print(f"Received value: {value}")


asyncio.run(main())

在上述代码中,async_generator是一个异步生成器函数,async for用于异步迭代生成器的值。每次迭代时,await asyncio.sleep(1)模拟了一个异步操作,在等待的过程中,事件循环可以调度其他任务执行。

异步上下文管理器

异步上下文管理器用于管理异步资源的获取和释放。通过async def定义异步上下文管理器函数,并使用async with语句来使用它。例如:

import asyncio


class AsyncResource:
    async def __aenter__(self):
        print("Entering async context")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Exiting async context")
        await asyncio.sleep(1)
        if exc_type:
            print(f"Exception handled: {exc_type}")
        return True


async def main():
    async with AsyncResource() as resource:
        print("Inside async context")


asyncio.run(main())

在上述代码中,AsyncResource类定义了一个异步上下文管理器。__aenter__方法在进入async with块时被调用,__aexit__方法在离开async with块时被调用,用于处理资源的获取和释放,以及异常处理。

异步队列

异步队列(asyncio.Queue)用于在异步任务之间安全地传递数据。它是线程安全的,并且可以异步地进行数据的放入和取出操作。例如:

import asyncio


async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"Produced {i}")
        await asyncio.sleep(1)


async def consumer(queue):
    while True:
        value = await queue.get()
        print(f"Consumed {value}")
        queue.task_done()


async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await producer_task
    await queue.join()
    consumer_task.cancel()


asyncio.run(main())

在上述代码中,producer任务将数据放入异步队列queueconsumer任务从队列中取出数据并处理。queue.putqueue.get都是异步操作,确保在数据传递过程中不会阻塞事件循环。queue.join用于等待队列中的所有任务完成,consumer_task.cancel用于在生产者任务完成后取消消费者任务。

性能优化与调优

减少I/O等待时间

在异步编程中,减少I/O等待时间是提高性能的关键。可以通过优化网络请求、合理设置文件缓冲区等方式来减少I/O操作的延迟。例如,在网络请求中,可以使用连接池来复用已有的网络连接,减少建立新连接的开销。在aiohttp中,可以通过aiohttp.TCPConnector来设置连接池:

import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    connector = aiohttp.TCPConnector(limit = 10)
    async with aiohttp.ClientSession(connector = connector) as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


asyncio.run(main())

在上述代码中,aiohttp.TCPConnector(limit = 10)设置了连接池的最大连接数为10,通过复用连接,减少了网络I/O的等待时间。

合理调度任务

合理调度任务可以充分利用事件循环的资源,提高程序的执行效率。避免在协程中进行长时间的同步操作,尽量将所有可能阻塞的操作异步化。同时,可以根据任务的优先级来调度任务,例如,对于实时性要求较高的任务,可以优先执行。在asyncio中,可以通过asyncio.ensure_future并结合自定义的任务优先级队列来实现任务的优先级调度。以下是一个简单的示例:

import asyncio
import heapq


class PriorityQueue:
    def __init__(self):
        self.queue = []

    def put(self, priority, task):
        heapq.heappush(self.queue, (priority, task))

    def get(self):
        return heapq.heappop(self.queue)[1]


async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task completed")


async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task completed")


async def main():
    pq = PriorityQueue()
    pq.put(1, asyncio.create_task(high_priority_task()))
    pq.put(2, asyncio.create_task(low_priority_task()))
    tasks = []
    while pq.queue:
        task = pq.get()
        tasks.append(task)
    await asyncio.gather(*tasks)


asyncio.run(main())

在上述代码中,PriorityQueue类实现了一个简单的任务优先级队列。high_priority_tasklow_priority_task分别代表高优先级和低优先级任务,通过pq.put将任务按照优先级放入队列,然后通过asyncio.gather执行这些任务,确保高优先级任务优先执行。

监控与分析

使用工具对异步程序进行监控和分析可以帮助发现性能瓶颈。例如,cProfile模块可以用于分析Python程序的性能,包括异步程序。以下是一个使用cProfile分析异步程序的示例:

import asyncio
import cProfile


async def async_function():
    await asyncio.sleep(1)


async def main():
    tasks = [async_function() for _ in range(10)]
    await asyncio.gather(*tasks)


cProfile.run('asyncio.run(main())')

在上述代码中,cProfile.run('asyncio.run(main())')main函数进行性能分析,输出的结果可以帮助开发者了解程序中各个函数的执行时间、调用次数等信息,从而找出性能瓶颈并进行优化。

通过以上对Python异步编程的深入探讨,包括基础概念、使用场景、与其他并发模型的比较以及高级技巧和性能优化等方面,希望读者能够对Python异步编程有更全面和深入的理解,并能在实际项目中灵活运用异步编程技术,提高程序的性能和并发处理能力。