MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python异步HTTP请求处理

2021-02-266.5k 阅读

异步编程基础

在深入探讨Python异步HTTP请求处理之前,我们先来了解一下异步编程的基础概念。

同步与异步

在传统的同步编程模型中,程序按照顺序依次执行各个任务。例如,当一个函数发起一个HTTP请求时,它会等待请求完成(无论是成功获取响应还是超时),然后才能继续执行后续代码。这种方式在处理I/O密集型任务时效率较低,因为大部分时间都花在了等待I/O操作完成上,而CPU处于闲置状态。

而异步编程则允许程序在发起I/O操作后,无需等待操作完成就继续执行其他任务。当I/O操作完成时,程序会得到通知并相应地处理结果。这样可以显著提高程序的效率,特别是在处理大量I/O操作(如HTTP请求)的场景中。

事件循环(Event Loop)

事件循环是异步编程的核心机制。它是一个无限循环,负责不断检查是否有新的事件(如I/O操作完成)需要处理。当有事件发生时,事件循环会将对应的回调函数添加到执行队列中,然后按照一定的顺序执行这些回调函数。

在Python的异步编程中,asyncio库提供了事件循环的实现。我们可以通过asyncio.get_event_loop()获取当前线程的事件循环对象。

import asyncio


async def main():
    print('Hello, async world!')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在上述代码中,我们首先定义了一个main异步函数。然后通过asyncio.get_event_loop()获取事件循环对象,并使用run_until_complete方法将main函数作为任务添加到事件循环中执行。最后,在程序结束时关闭事件循环。

协程(Coroutine)

协程是一种特殊的函数,它可以暂停执行并将控制权交回给调用者,然后在适当的时候恢复执行。在Python中,我们可以使用async def关键字定义一个协程函数。

async def my_coroutine():
    await asyncio.sleep(1)
    print('This is a coroutine')


async def main():
    await my_coroutine()


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,my_coroutine是一个协程函数。它使用await关键字暂停执行1秒钟(模拟一个I/O操作),然后打印出一条消息。main函数也是一个协程,它调用了my_coroutine。注意,在协程函数内部调用其他协程函数时,需要使用await关键字。

任务(Task)

任务是对协程的进一步封装,它可以被事件循环调度执行。通过asyncio.create_task方法,我们可以将一个协程包装成任务并添加到事件循环中。

async def my_coroutine():
    await asyncio.sleep(1)
    print('This is a coroutine')


async def main():
    task = asyncio.create_task(my_coroutine())
    await task


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们通过asyncio.create_taskmy_coroutine包装成一个任务,然后在main函数中使用await等待任务完成。这样,事件循环会调度任务的执行,并且在任务执行期间,main函数的其他部分可以继续执行其他操作。

Python中的异步HTTP请求库

Python有多个优秀的库可以用于处理异步HTTP请求,下面我们来介绍几个常用的库。

aiohttp

aiohttp是Python中一个流行的异步HTTP客户端/服务器框架。它基于asyncio实现,提供了简洁易用的API来处理异步HTTP请求。

  1. 安装aiohttp 可以使用pip安装aiohttp
pip install aiohttp
  1. 简单的GET请求示例
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://example.com')
        print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们定义了一个fetch函数,它接收一个aiohttp.ClientSession对象和一个URL作为参数。在fetch函数内部,我们使用session.get发起一个GET请求,并使用await等待响应。然后通过response.text()获取响应的文本内容。在main函数中,我们创建了一个ClientSession对象,并调用fetch函数获取指定URL的网页内容。

  1. POST请求示例
import asyncio
import aiohttp


async def post_data(session, url, data):
    async with session.post(url, data=data) as response:
        return await response.json()


async def main():
    async with aiohttp.ClientSession() as session:
        data = {'key': 'value'}
        result = await post_data(session, 'http://example.com/api', data)
        print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

这里的post_data函数使用session.post发起一个POST请求,并将数据以字典的形式传递。await response.json()用于将响应解析为JSON格式的数据。

  1. 并发请求 aiohttp很容易实现并发请求。我们可以创建多个任务并同时执行。
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们定义了一个URL列表,然后为每个URL创建一个fetch任务。通过asyncio.gather函数,我们可以同时执行这些任务,并等待所有任务完成。asyncio.gather会返回一个包含所有任务结果的列表。

httpx

httpx也是一个功能强大的HTTP请求库,它同时支持同步和异步请求。它的API设计简洁,并且提供了丰富的功能。

  1. 安装httpx 使用pip安装httpx
pip install httpx
  1. 异步GET请求示例
import asyncio
import httpx


async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text


async def main():
    html = await fetch('http://example.com')
    print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个示例中,我们使用httpx.AsyncClient创建一个异步客户端,并使用client.get发起GET请求。await response.text获取响应的文本内容。

  1. POST请求示例
import asyncio
import httpx


async def post_data(url, data):
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data)
        return response.json()


async def main():
    data = {'key': 'value'}
    result = await post_data('http://example.com/api', data)
    print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

这里通过client.post发起POST请求,并将数据以JSON格式传递。await response.json()将响应解析为JSON数据。

  1. 并发请求
import asyncio
import httpx


async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

aiohttp类似,我们可以通过创建多个任务并使用asyncio.gather实现并发请求。

处理复杂的异步HTTP请求场景

在实际应用中,我们可能会遇到一些更复杂的异步HTTP请求场景,下面我们来探讨如何处理这些情况。

处理请求超时

在发起HTTP请求时,设置一个合理的超时时间是非常重要的,以防止程序无限期等待响应。

  1. aiohttp中的超时设置
import asyncio
import aiohttp


async def fetch(session, url):
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
            return await response.text()
    except aiohttp.ClientError as e:
        print(f'Error occurred: {e}')


async def main():
    async with aiohttp.ClientSession() as session:
        await fetch(session, 'http://example.com')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们通过aiohttp.ClientTimeout(total = 5)设置了总的超时时间为5秒。如果请求在5秒内没有完成,会抛出aiohttp.ClientError异常,我们可以在except块中进行相应的处理。

  1. httpx中的超时设置
import asyncio
import httpx


async def fetch(url):
    try:
        async with httpx.AsyncClient(timeout=httpx.Timeout(5)) as client:
            response = await client.get(url)
            return response.text()
    except httpx.RequestError as e:
        print(f'Error occurred: {e}')


async def main():
    await fetch('http://example.com')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

httpx中,我们通过httpx.Timeout(5)设置超时时间为5秒。同样,如果请求超时,会抛出httpx.RequestError异常。

处理重定向

HTTP请求可能会遇到重定向的情况,我们需要正确处理这些重定向以获取最终的响应。

  1. aiohttp中的重定向处理 aiohttp默认会自动处理重定向。但是,我们也可以自定义重定向行为。
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url, allow_redirects=False) as response:
        if response.status in [301, 302, 303, 307, 308]:
            new_url = response.headers.get('Location')
            async with session.get(new_url) as new_response:
                return await new_response.text()
        else:
            return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://example.com/redirect')
        print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们通过allow_redirects = False禁用了自动重定向。然后手动检查响应状态码,如果是重定向状态码(301、302等),则获取重定向的URL并发起新的请求。

  1. httpx中的重定向处理 httpx同样默认自动处理重定向。我们也可以自定义重定向行为。
import asyncio
import httpx


async def fetch(url):
    async with httpx.AsyncClient(follow_redirects=False) as client:
        response = await client.get(url)
        if response.is_redirect:
            new_url = response.headers['Location']
            new_response = await client.get(new_url)
            return new_response.text()
        else:
            return response.text()


async def main():
    html = await fetch('http://example.com/redirect')
    print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

这里通过follow_redirects = False禁用自动重定向,然后根据响应的is_redirect属性判断是否为重定向,并手动处理重定向。

处理认证

很多HTTP API需要认证才能访问,常见的认证方式有Basic认证、OAuth等。

  1. aiohttp中的Basic认证
import asyncio
import aiohttp
from aiohttp import BasicAuth


async def fetch(session, url):
    auth = BasicAuth('username', 'password')
    async with session.get(url, auth=auth) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://example.com/protected')
        print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们通过aiohttp.BasicAuth创建Basic认证对象,并在session.get中传递auth参数进行认证。

  1. httpx中的Basic认证
import asyncio
import httpx


async def fetch(url):
    auth = httpx.BasicAuth('username', 'password')
    async with httpx.AsyncClient(auth=auth) as client:
        response = await client.get(url)
        return response.text()


async def main():
    html = await fetch('http://example.com/protected')
    print(html)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

httpx中,通过httpx.BasicAuth创建认证对象,并在AsyncClient初始化时传递auth参数进行认证。

性能优化与最佳实践

在处理大量异步HTTP请求时,性能优化是非常关键的。以下是一些性能优化的方法和最佳实践。

连接池

使用连接池可以减少每次请求建立新连接的开销。aiohttphttpx都默认实现了连接池。

  1. aiohttp连接池aiohttp中,ClientSession内部维护了一个连接池。当我们创建多个请求时,aiohttp会从连接池中复用连接,而不是每次都创建新的连接。
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['http://example.com', 'http://example.org', 'http://example.net']
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,ClientSession会自动管理连接池,我们无需手动干预。

  1. httpx连接池 httpx同样默认使用连接池。当我们使用AsyncClient时,它会在内部维护连接池。
import asyncio
import httpx


async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text()


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

httpxAsyncClient会自动复用连接,提高请求效率。

限制并发数

虽然异步编程可以提高效率,但如果并发数过高,可能会导致系统资源耗尽。我们可以通过限制并发数来避免这种情况。

  1. 使用Semaphore限制并发数(aiohttp)
import asyncio
import aiohttp


async def fetch(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    semaphore = asyncio.Semaphore(2)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们通过asyncio.Semaphore(2)创建了一个信号量对象,限制同时执行的任务数为2。在fetch函数中,使用async with semaphore来获取信号量,确保同一时间只有2个任务可以执行。

  1. 使用Limiter限制并发数(httpx)
import asyncio
import httpx


async def fetch(url, limiter):
    async with limiter:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.text()


async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    limiter = asyncio.Semaphore(2)
    tasks = [fetch(url, limiter) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

同样,在httpx中我们也可以通过asyncio.Semaphore来限制并发数。

错误处理与重试

在处理HTTP请求时,可能会遇到各种错误,如网络故障、服务器错误等。合理的错误处理和重试机制可以提高程序的稳定性。

  1. 简单的错误处理与重试(aiohttp)
import asyncio
import aiohttp


async def fetch(session, url, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            async with session.get(url) as response:
                return await response.text()
        except aiohttp.ClientError as e:
            print(f'Error occurred: {e}, retrying...')
            retries += 1
            await asyncio.sleep(1)
    print('Max retries reached, giving up.')


async def main():
    async with aiohttp.ClientSession() as session:
        await fetch(session, 'http://example.com')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在这个例子中,我们定义了一个fetch函数,它会在遇到aiohttp.ClientError异常时进行重试,最多重试3次。每次重试之间等待1秒钟。

  1. 更复杂的错误处理与重试(httpx)
import asyncio
import httpx


async def fetch(url, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                response.raise_for_status()
                return response.text()
        except (httpx.RequestError, httpx.HTTPStatusError) as e:
            print(f'Error occurred: {e}, retrying...')
            retries += 1
            await asyncio.sleep(1)
    print('Max retries reached, giving up.')


async def main():
    await fetch('http://example.com')


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

httpx中,我们不仅处理了httpx.RequestError,还通过response.raise_for_status()处理了HTTP状态码错误(如404、500等)。同样进行最多3次重试。

与其他异步框架集成

在实际项目中,我们可能需要将异步HTTP请求处理与其他异步框架集成,例如FastAPITornado等。

与FastAPI集成

FastAPI是一个基于Python的快速Web框架,它支持异步编程。我们可以在FastAPI应用中使用aiohttphttpx进行异步HTTP请求。

  1. 使用aiohttp在FastAPI中发起请求
from fastapi import FastAPI
import aiohttp


app = FastAPI()


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


@app.get('/')
async def root():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://example.com')
        return {'message': html}


在这个例子中,我们在FastAPI的路由函数root中使用aiohttp发起异步HTTP请求,并将响应内容作为JSON数据返回。

  1. 使用httpx在FastAPI中发起请求
from fastapi import FastAPI
import httpx


app = FastAPI()


async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text()


@app.get('/')
async def root():
    html = await fetch('http://example.com')
    return {'message': html}


同样,这里使用httpxFastAPI应用中发起异步HTTP请求。

与Tornado集成

Tornado是一个高性能的Python Web框架,它也支持异步编程。我们可以在Tornado应用中使用aiohttphttpx进行异步HTTP请求。

  1. 使用aiohttp在Tornado中发起请求
import tornado.ioloop
import tornado.web
import asyncio
import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()


class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'http://example.com')
            self.write({'message': html})


def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()


在这个Tornado应用中,我们在MainHandlerget方法中使用aiohttp发起异步HTTP请求,并将响应内容写入响应。

  1. 使用httpx在Tornado中发起请求
import tornado.ioloop
import tornado.web
import asyncio
import httpx


async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text()


class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        html = await fetch('http://example.com')
        self.write({'message': html})


def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()


这里使用httpxTornado应用中发起异步HTTP请求。

通过以上内容,我们全面地了解了Python中异步HTTP请求处理的相关知识,包括异步编程基础、常用的异步HTTP请求库、复杂场景处理、性能优化以及与其他异步框架的集成。希望这些内容能帮助你在实际项目中高效地处理异步HTTP请求。