MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Rust async/await异步代码编写

2022-01-116.1k 阅读

Rust 异步编程基础

在深入探讨 Rust 的 async/await 语法之前,我们先来回顾一下异步编程的基本概念。异步编程是一种允许程序在等待某些操作(如 I/O 操作、网络请求等)完成时,不阻塞主线程,而是可以继续执行其他任务的编程模型。这种模型在现代应用开发中至关重要,尤其是在处理高并发和 I/O 密集型任务时。

在 Rust 中,异步代码主要通过 Future 特征来表示。Future 代表一个可能尚未完成的计算,它定义了一个 poll 方法,该方法会尝试推进 Future 的执行。如果 Future 还未完成,poll 方法会返回 Poll::Pending,告诉调用者稍后再试。当 Future 完成时,poll 方法会返回 Poll::Ready,并携带计算结果。

async 函数

Rust 的 async 关键字用于定义异步函数。异步函数返回一个实现了 Future 特征的类型。例如:

async fn async_function() -> i32 {
    42
}

在这个例子中,async_function 是一个异步函数,它返回一个 i32 类型的值。虽然函数体看起来很简单,但实际上它返回的是一个实现了 Future 特征的类型。

当调用 async 函数时,并不会立即执行函数体中的代码。相反,它会返回一个 Future,这个 Future 可以被 poll 以推进其执行。例如:

use std::future::Future;

async fn async_function() -> i32 {
    42
}

fn main() {
    let future = async_function();
    assert!(future.is_pending());
}

在这个例子中,我们调用 async_function 并获取其返回的 Future。此时,Future 处于 Pending 状态,因为函数体尚未执行。

await 关键字

await 关键字用于暂停一个 async 函数的执行,直到其等待的 Future 完成。例如:

async fn inner_function() -> i32 {
    42
}

async fn outer_function() -> i32 {
    let result = inner_function().await;
    result + 1
}

outer_function 中,await 关键字暂停了函数的执行,直到 inner_function 返回的 Future 完成。一旦 inner_functionFuture 完成,await 表达式会返回 inner_function 的返回值,然后 outer_function 可以继续执行并返回最终结果。

异步任务执行

要实际执行异步任务,我们需要一个执行器(executor)。执行器负责调度和执行 Future。在 Rust 中,常用的执行器有 tokioasync - std

使用 tokio 执行异步任务

tokio 是一个基于 Rust 的异步运行时,它提供了一个强大的执行器来运行异步任务。首先,我们需要在 Cargo.toml 文件中添加 tokio 依赖:

[dependencies]
tokio = { version = "1", features = ["full"] }

然后,我们可以使用 tokio::runtime::Runtime 来运行异步任务:

use tokio::runtime::Runtime;

async fn async_function() -> i32 {
    42
}

fn main() {
    let mut runtime = Runtime::new().unwrap();
    let result = runtime.block_on(async_function());
    assert_eq!(result, 42);
}

在这个例子中,我们创建了一个 Runtime 实例,并使用 block_on 方法来运行异步函数 async_functionblock_on 方法会阻塞当前线程,直到 async_function 返回的 Future 完成。

使用 async - std 执行异步任务

async - std 也是一个流行的异步运行时。在 Cargo.toml 文件中添加依赖:

[dependencies]
async - std = "1"

使用 async - std 运行异步任务的示例如下:

use async_std::task;

async fn async_function() -> i32 {
    42
}

fn main() {
    let result = task::block_on(async_function());
    assert_eq!(result, 42);
}

这里,我们使用 async_std::task::block_on 来运行异步函数 async_function,同样会阻塞当前线程直到 Future 完成。

异步 I/O 操作

异步编程在处理 I/O 操作时非常有用。Rust 的标准库提供了一些异步 I/O 相关的类型和方法。例如,std::fs::File 类型有异步版本 tokio::fs::File(在 tokio 库中)。

异步文件读取

下面是一个使用 tokio 进行异步文件读取的示例:

use std::io::Read;
use tokio::fs::File;

async fn read_file() -> Result<String, std::io::Error> {
    let mut file = File::open("example.txt").await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

在这个例子中,我们使用 tokio::fs::Fileopen 方法异步打开一个文件,然后使用 read_to_string 方法异步读取文件内容。

异步网络请求

在网络编程中,异步操作同样重要。reqwest 是一个流行的 Rust 网络请求库,它支持异步请求。首先,在 Cargo.toml 中添加依赖:

[dependencies]
reqwest = { version = "0.11", features = ["blocking", "async - rustls"] }

下面是一个异步发送 HTTP GET 请求的示例:

use reqwest;

async fn fetch_data() -> Result<String, reqwest::Error> {
    let response = reqwest::get("https://example.com").await?;
    let body = response.text().await?;
    Ok(body)
}

在这个例子中,我们使用 reqwest::get 方法异步发送一个 HTTP GET 请求,并使用 text 方法异步获取响应体。

异步流

异步流(async stream)是一种特殊的异步迭代器,它允许我们异步生成一系列值。在 Rust 中,异步流由 Stream 特征定义。

创建异步流

我们可以使用 tokio::stream::iter 来创建一个异步流。例如:

use tokio::stream::StreamExt;

async fn print_stream() {
    let stream = tokio::stream::iter(vec![1, 2, 3]);
    while let Some(value) = stream.next().await {
        println!("Value: {}", value);
    }
}

在这个例子中,我们创建了一个包含 123 的异步流,并使用 while let 循环异步迭代这个流,打印出每个值。

异步流的操作

异步流支持许多操作,如 mapfilter 等,类似于普通的迭代器。例如:

use tokio::stream::StreamExt;

async fn process_stream() {
    let stream = tokio::stream::iter(vec![1, 2, 3]);
    let processed_stream = stream
      .filter(|&x| async { x % 2 == 0 })
      .map(|x| x * 2);
    while let Some(value) = processed_stream.next().await {
        println!("Processed Value: {}", value);
    }
}

在这个例子中,我们首先使用 filter 方法过滤出偶数,然后使用 map 方法将每个值乘以 2

异步错误处理

在异步编程中,错误处理同样重要。与同步代码类似,我们可以使用 Result 类型来处理异步函数中的错误。

async 函数中的错误返回

当异步函数遇到错误时,可以返回 Result 类型。例如:

use std::io::Read;
use tokio::fs::File;

async fn read_file() -> Result<String, std::io::Error> {
    let mut file = File::open("nonexistent.txt").await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

在这个例子中,如果文件打开失败或读取失败,await 表达式会返回错误,整个异步函数也会返回错误。

处理异步错误

当调用可能返回错误的异步函数时,我们需要处理这些错误。例如:

async fn main() {
    match read_file().await {
        Ok(contents) => println!("File contents: {}", contents),
        Err(e) => println!("Error: {}", e),
    }
}

在这个例子中,我们使用 match 语句来处理 read_file 函数返回的 Result

异步闭包

异步闭包是一种特殊的闭包,它可以包含异步代码。异步闭包的语法与普通闭包类似,只是在参数列表之前加上 async 关键字。

定义异步闭包

例如:

let async_closure = async |x: i32| -> i32 {
    x * 2
};

在这个例子中,我们定义了一个异步闭包 async_closure,它接受一个 i32 类型的参数,并返回该参数的两倍。

使用异步闭包

我们可以像使用普通闭包一样使用异步闭包。例如:

use tokio::runtime::Runtime;

let async_closure = async |x: i32| -> i32 {
    x * 2
};

fn main() {
    let mut runtime = Runtime::new().unwrap();
    let result = runtime.block_on(async_closure(42));
    assert_eq!(result, 84);
}

在这个例子中,我们使用 tokio 的执行器来运行异步闭包,并得到结果。

异步互斥锁

在多线程异步编程中,我们可能需要保护共享资源。异步互斥锁(async mutex)可以帮助我们实现这一点。在 tokio 库中,提供了 tokio::sync::Mutex 类型。

使用异步互斥锁

例如:

use tokio::sync::Mutex;

async fn access_shared_resource() {
    let shared_resource = Mutex::new(0);
    {
        let mut lock = shared_resource.lock().await;
        *lock += 1;
    }
    let lock = shared_resource.lock().await;
    assert_eq!(*lock, 1);
}

在这个例子中,我们使用 tokio::sync::Mutex 来保护一个共享的整数资源。lock 方法返回一个 Future,我们使用 await 来获取锁。在持有锁的期间,我们可以安全地访问和修改共享资源。

异步通道

异步通道(async channel)是一种在异步任务之间传递数据的方式。在 tokio 库中,有 mpsc(multiple - producer, single - consumer)和 oneshot 两种类型的异步通道。

mpsc 异步通道

mpsc 通道允许多个生产者向单个消费者发送数据。例如:

use tokio::sync::mpsc;

async fn producer(sender: mpsc::Sender<i32>) {
    for i in 0..10 {
        sender.send(i).await.unwrap();
    }
}

async fn consumer(receiver: mpsc::Receiver<i32>) {
    while let Some(value) = receiver.recv().await {
        println!("Received: {}", value);
    }
}

async fn main() {
    let (sender, receiver) = mpsc::channel(10);
    let producer_task = tokio::spawn(producer(sender));
    let consumer_task = tokio::spawn(consumer(receiver));
    producer_task.await.unwrap();
    consumer_task.await.unwrap();
}

在这个例子中,我们创建了一个 mpsc 通道,一个生产者任务向通道发送 09 的整数,一个消费者任务从通道接收并打印这些整数。

oneshot 异步通道

oneshot 通道只允许发送一个值,并且只有一个接收者。例如:

use tokio::sync::oneshot;

async fn sender(sender: oneshot::Sender<i32>) {
    sender.send(42).unwrap();
}

async fn receiver(receiver: oneshot::Receiver<i32>) {
    let result = receiver.await.unwrap();
    assert_eq!(result, 42);
}

async fn main() {
    let (sender, receiver) = oneshot::channel();
    let sender_task = tokio::spawn(sender(sender));
    let receiver_task = tokio::spawn(receiver(receiver));
    sender_task.await.unwrap();
    receiver_task.await.unwrap();
}

在这个例子中,我们创建了一个 oneshot 通道,发送者任务向通道发送值 42,接收者任务从通道接收并验证这个值。

异步代码的性能优化

在编写异步代码时,性能优化是一个重要的考虑因素。以下是一些优化异步代码性能的方法:

减少不必要的 await

避免在 async 函数中进行过多不必要的 await。每次 await 都会暂停函数的执行,导致上下文切换。例如,如果一些计算可以在不等待外部操作的情况下完成,尽量在 await 之前完成这些计算。

合理使用异步任务数量

在使用执行器运行多个异步任务时,要合理控制任务的数量。过多的任务可能会导致资源竞争和上下文切换开销增大。可以根据系统资源和任务特性来调整任务数量。

使用合适的异步库

不同的异步库在性能上可能会有差异。在选择异步库时,要根据项目的需求和性能测试结果来选择。例如,tokioasync - std 在不同场景下可能各有优劣。

异步代码与同步代码的交互

在实际项目中,异步代码和同步代码可能需要相互调用。这就需要一些方法来实现两者之间的交互。

从同步代码调用异步代码

我们可以使用执行器在同步代码中运行异步代码。例如,使用 tokioRuntime::block_on 方法:

use tokio::runtime::Runtime;

async fn async_function() -> i32 {
    42
}

fn main() {
    let mut runtime = Runtime::new().unwrap();
    let result = runtime.block_on(async_function());
    assert_eq!(result, 42);
}

在这个例子中,我们在同步的 main 函数中使用 Runtime::block_on 来运行异步函数 async_function

从异步代码调用同步代码

在异步代码中调用同步代码相对简单,直接调用即可。但是需要注意的是,如果同步代码执行时间较长,可能会阻塞异步执行器的线程,影响整体的异步性能。例如:

fn sync_function() -> i32 {
    42
}

async fn async_function() {
    let result = sync_function();
    println!("Sync result: {}", result);
}

在这个例子中,异步函数 async_function 中直接调用了同步函数 sync_function

深入理解 async/await 的实现原理

Rust 的 async/await 语法糖背后涉及到一系列复杂的机制,包括 FuturePollGenerator 等概念。

FuturePoll

Future 特征定义了异步计算的抽象。它的 poll 方法由执行器调用,用于推进 Future 的执行。当 poll 方法返回 Poll::Pending 时,执行器会在适当的时候再次调用 poll。当返回 Poll::Ready 时,Future 完成,携带计算结果。

Generator

async 函数实际上被编译成了 GeneratorGenerator 是一种可以暂停和恢复执行的函数。async 函数中的 await 表达式会暂停 Generator 的执行,直到其等待的 Future 完成。当 Future 完成时,Generator 会恢复执行。

状态机转换

async 函数的执行过程可以看作是一个状态机的转换。每次遇到 await 时,状态机会暂停,保存当前的执行状态。当 awaitFuture 完成时,状态机从保存的状态恢复并继续执行。

总结

Rust 的 async/await 语法为异步编程提供了一种简洁、高效的方式。通过深入理解异步编程的基本概念、async 函数、await 关键字、异步任务执行、异步 I/O 操作、异步流、错误处理、异步闭包、异步互斥锁、异步通道、性能优化以及异步与同步代码的交互等方面,开发者可以编写出高质量、高性能的异步 Rust 代码。同时,了解 async/await 的实现原理也有助于我们更好地优化和调试异步代码。在实际项目中,根据具体需求选择合适的异步库和执行器,合理设计异步架构,能够充分发挥异步编程的优势,提升应用的性能和响应能力。