MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Rust并发编程中的任务调度策略

2023-12-122.7k 阅读

Rust 并发编程基础回顾

在深入探讨 Rust 并发编程中的任务调度策略之前,我们先来简要回顾一下 Rust 并发编程的基础概念。Rust 的并发编程主要依赖于 std::thread 模块和 async/await 语法糖。

线程模型

Rust 的线程模型基于操作系统线程(1:1 线程模型),通过 std::thread::spawn 函数可以创建新线程。例如:

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("This is a new thread!");
    });
    handle.join().unwrap();
    println!("Back to the main thread.");
}

在上述代码中,thread::spawn 创建了一个新线程,线程执行的闭包打印一条消息。handle.join() 方法会阻塞主线程,直到新线程执行完毕。

async/await 异步编程

Rust 的 async/await 语法提供了一种更高级的异步编程方式,它基于 Future、Executor 和 Task 概念。一个 async 函数返回一个 Futureawait 关键字用于暂停当前 Future 的执行,直到被等待的 Future 完成。例如:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MyFuture;

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Polling MyFuture");
        Poll::Ready(())
    }
}

async fn async_function() {
    println!("Inside async function");
    let _result = MyFuture.await;
    println!("Finished awaiting MyFuture");
}

fn main() {
    let mut task = async_function();
    loop {
        match task.as_mut().poll(&mut std::task::Context::from_waker(&std::task::noop_waker())) {
            Poll::Ready(_) => break,
            Poll::Pending => println!("Still pending"),
        }
    }
}

在这段代码中,MyFuture 实现了 Future 特质,async_function 是一个异步函数,其中 awaitMyFuturemain 函数通过手动轮询 async_function 返回的 Future 来执行异步任务。

任务调度策略概述

任务调度策略决定了在并发环境中如何分配 CPU 时间给不同的任务。在 Rust 并发编程中,不同的调度策略适用于不同的场景。

抢占式调度

抢占式调度是操作系统常用的调度策略。在这种策略下,操作系统可以在任何时刻暂停当前执行的任务,转而执行其他任务。Rust 的线程模型默认使用抢占式调度,因为底层基于操作系统线程。例如:

use std::thread;
use std::time::Duration;

fn main() {
    let handle1 = thread::spawn(|| {
        for i in 1..=10 {
            println!("Thread 1: {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    let handle2 = thread::spawn(|| {
        for i in 1..=10 {
            println!("Thread 2: {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

在这个例子中,两个线程并发执行,操作系统会根据其内部的调度算法(通常是基于时间片的抢占式调度)在两个线程之间切换,使得它们看起来像是同时执行。

协作式调度

协作式调度依赖于任务主动让出执行权。在 Rust 的 async/await 异步编程模型中,await 关键字就起到了协作式调度的作用。当一个异步任务执行到 await 时,它会暂停执行,并将执行权交回给调度器,调度器可以选择执行其他任务。例如:

use tokio::runtime::Runtime;

async fn task1() {
    println!("Task 1 started");
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("Task 1 resumed");
}

async fn task2() {
    println!("Task 2 started");
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    println!("Task 2 resumed");
}

fn main() {
    let mut rt = Runtime::new().unwrap();
    rt.block_on(async {
        let task1_future = task1();
        let task2_future = task2();
        tokio::join!(task1_future, task2_future);
    });
}

在这个例子中,task1task2 是两个异步任务,当它们执行到 tokio::time::sleepawait 时,会暂停执行,让调度器有机会执行其他任务。tokio::join! 宏用于等待两个任务都完成。

基于线程池的任务调度

线程池是一种常见的任务调度机制,它通过复用一组固定数量的线程来执行多个任务,从而减少线程创建和销毁的开销。

标准库中的线程池

Rust 的标准库并没有直接提供线程池实现,但可以通过第三方库来使用线程池。例如,threadpool 库:

use threadpool::ThreadPool;

fn main() {
    let pool = ThreadPool::new(4).unwrap();
    for i in 1..=10 {
        let i = i;
        pool.execute(move || {
            println!("Task {} is running on a thread from the pool", i);
        });
    }
    drop(pool);
}

在上述代码中,ThreadPool::new(4) 创建了一个包含 4 个线程的线程池。pool.execute 方法将任务提交到线程池,线程池中的线程会依次执行这些任务。drop(pool) 确保所有任务执行完毕后再销毁线程池。

Tokio 的线程池

Tokio 是 Rust 中流行的异步运行时,它包含一个线程池用于执行异步任务。Tokio 的线程池基于 mio 库,提供了高效的 I/O 多路复用。例如:

use tokio::runtime::Runtime;

async fn async_task() {
    println!("Async task is running");
}

fn main() {
    let mut rt = Runtime::new().unwrap();
    rt.block_on(async {
        for _ in 1..=10 {
            rt.spawn(async_task());
        }
    });
}

在这个例子中,Runtime::new() 创建了一个 Tokio 运行时,其中包含一个线程池。rt.spawn 方法将异步任务提交到线程池中执行。

任务优先级调度

在某些场景下,需要根据任务的优先级来调度任务,确保高优先级任务优先执行。

简单优先级调度实现

可以通过自定义数据结构和排序算法来实现简单的优先级调度。例如:

use std::collections::BinaryHeap;

#[derive(Debug, Eq, PartialEq)]
struct Task {
    priority: i32,
    name: String,
}

impl Ord for Task {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        other.priority.cmp(&self.priority)
    }
}

impl PartialOrd for Task {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

fn main() {
    let mut task_heap = BinaryHeap::new();
    task_heap.push(Task { priority: 3, name: "Task C".to_string() });
    task_heap.push(Task { priority: 1, name: "Task A".to_string() });
    task_heap.push(Task { priority: 2, name: "Task B".to_string() });

    while let Some(task) = task_heap.pop() {
        println!("Executing task: {}, priority: {}", task.name, task.priority);
    }
}

在这个例子中,Task 结构体实现了 OrdPartialOrd 特质,BinaryHeap 用于根据优先级对任务进行排序。task_heap.pop() 方法会按照优先级从高到低取出任务。

基于第三方库的优先级调度

一些第三方库提供了更完善的优先级调度功能。例如,async-task-priority 库可以用于异步任务的优先级调度:

use async_task_priority::{TaskPriority, TaskQueue};

async fn high_priority_task() {
    println!("High priority task is running");
}

async fn low_priority_task() {
    println!("Low priority task is running");
}

fn main() {
    let mut queue = TaskQueue::new();
    queue.push(high_priority_task(), TaskPriority::High);
    queue.push(low_priority_task(), TaskPriority::Low);

    while let Some(task) = queue.pop() {
        tokio::runtime::Runtime::new().unwrap().block_on(task);
    }
}

在这个例子中,TaskQueue 用于管理任务,TaskPriority 定义了任务的优先级。queue.push 方法将任务和其优先级添加到队列中,queue.pop 方法按照优先级从高到低取出任务并执行。

公平调度策略

公平调度策略旨在确保每个任务都能公平地获得执行机会,避免某些任务长时间得不到执行。

轮转调度

轮转调度(Round - Robin)是一种简单的公平调度策略,它将 CPU 时间划分为固定大小的时间片,每个任务轮流执行一个时间片。在 Rust 中,可以通过线程池和时间片管理来模拟轮转调度。例如:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct TaskQueue {
    tasks: Arc<Mutex<Vec<Box<dyn FnMut()>>>>,
    current_task_index: Arc<Mutex<usize>>,
}

impl TaskQueue {
    fn new() -> Self {
        TaskQueue {
            tasks: Arc::new(Mutex::new(Vec::new())),
            current_task_index: Arc::new(Mutex::new(0)),
        }
    }

    fn add_task(&mut self, task: Box<dyn FnMut()>) {
        self.tasks.lock().unwrap().push(task);
    }

    fn execute_round_robin(&mut self) {
        let tasks = Arc::clone(&self.tasks);
        let current_task_index = Arc::clone(&self.current_task_index);
        thread::spawn(move || {
            loop {
                let mut tasks = tasks.lock().unwrap();
                let mut current_index = current_task_index.lock().unwrap();
                if tasks.is_empty() {
                    break;
                }
                let task = tasks.get_mut(*current_index).unwrap();
                (**task)();
                *current_index = (*current_index + 1) % tasks.len();
                thread::sleep(Duration::from_millis(100));
            }
        });
    }
}

fn main() {
    let mut queue = TaskQueue::new();
    queue.add_task(Box::new(|| println!("Task 1")));
    queue.add_task(Box::new(|| println!("Task 2")));
    queue.add_task(Box::new(|| println!("Task 3")));
    queue.execute_round_robin();
    thread::sleep(Duration::from_secs(2));
}

在这个例子中,TaskQueue 结构体管理任务队列和当前任务索引。execute_round_robin 方法创建一个新线程,按照轮转方式执行任务队列中的任务。

公平队列调度

公平队列调度通过维护多个任务队列,每个队列对应不同的优先级或类别,并且在调度时确保每个队列都能得到一定比例的执行时间。例如,我们可以使用 async - fair - queue 库来实现公平队列调度:

use async_fair_queue::FairQueue;

async fn task1() {
    println!("Task 1 is running");
}

async fn task2() {
    println!("Task 2 is running");
}

fn main() {
    let mut queue = FairQueue::new();
    queue.push(task1(), 1);
    queue.push(task2(), 2);

    let mut runtime = tokio::runtime::Runtime::new().unwrap();
    while let Some(task) = queue.pop() {
        runtime.block_on(task);
    }
}

在这个例子中,FairQueue 用于管理任务,push 方法将任务和其权重添加到队列中。权重较高的任务在调度时会有更高的执行机会,但整体上仍然保证公平性。pop 方法按照公平调度策略取出任务并执行。

实时调度策略

实时调度策略用于满足对任务执行时间有严格要求的场景,确保关键任务能够在规定时间内完成。

软实时调度

软实时调度要求任务尽可能在规定时间内完成,但偶尔错过期限不会导致灾难性后果。在 Rust 中,可以通过结合定时器和任务调度来实现软实时调度。例如:

use std::thread;
use std::time::{Duration, Instant};

fn soft_realtime_task() {
    let start = Instant::now();
    println!("Soft real - time task started");
    // 模拟任务执行
    thread::sleep(Duration::from_millis(200));
    let elapsed = start.elapsed();
    if elapsed <= Duration::from_millis(300) {
        println!("Soft real - time task completed within deadline");
    } else {
        println!("Soft real - time task missed deadline");
    }
}

fn main() {
    let handle = thread::spawn(soft_realtime_task);
    handle.join().unwrap();
}

在这个例子中,soft_realtime_task 函数模拟一个软实时任务,通过记录任务开始时间和执行时间,判断是否在规定的 300 毫秒期限内完成。

硬实时调度

硬实时调度要求任务必须在规定时间内完成,否则会导致严重后果。Rust 本身没有直接提供硬实时调度支持,但在一些特定的实时操作系统(RTOS)上,可以结合 Rust 进行硬实时编程。例如,在 Rust - Bare - Metal 项目中,可以通过精确控制中断和任务执行时间来实现硬实时调度。以下是一个简单的示例框架(假设在支持的 RTOS 环境下):

// 假设这是一个 RTOS 相关的模块
extern crate rtos;

use rtos::{Task, TaskPriority, schedule};

fn hard_realtime_task() {
    // 任务逻辑,必须在规定时间内完成
    println!("Hard real - time task is running");
}

fn main() {
    let task = Task::new(hard_realtime_task, TaskPriority::High);
    schedule(task);
}

在这个示例中,rtos 模块是假设的实时操作系统相关模块,Task::new 创建一个硬实时任务,并指定高优先级,schedule 函数用于调度任务执行,确保任务在严格的时间限制内完成。

调度策略的选择与优化

在实际应用中,选择合适的任务调度策略至关重要,并且需要对调度策略进行优化以提高系统性能。

根据应用场景选择调度策略

  1. I/O 密集型应用:对于 I/O 密集型应用,协作式调度(如 async/await)通常更合适,因为任务在等待 I/O 操作时可以主动让出执行权,提高系统整体的并发效率。例如,网络爬虫程序可以使用 async/await 结合 Tokio 运行时来高效地处理多个网络请求。
  2. CPU 密集型应用:CPU 密集型应用可能更适合抢占式调度,通过线程池来充分利用多核 CPU 的性能。例如,科学计算程序可以使用标准库的线程或者基于线程池的库来并行计算。
  3. 实时应用:实时应用需要根据实时要求选择软实时或硬实时调度策略。对于对时间要求不那么苛刻的实时应用,可以使用软实时调度;对于关键任务必须严格按时完成的应用,则需要在支持的 RTOS 环境下实现硬实时调度。

优化调度策略

  1. 减少上下文切换开销:无论是抢占式调度还是协作式调度,上下文切换都有一定的开销。在设计任务时,尽量减少不必要的任务切换,例如将相关的任务合并执行,减少任务粒度。
  2. 合理设置任务优先级:在优先级调度中,合理设置任务优先级非常关键。过高的优先级可能导致低优先级任务饥饿,而过低的优先级可能导致重要任务得不到及时执行。需要根据应用的业务逻辑和性能要求来平衡优先级设置。
  3. 动态调整调度策略:在一些复杂的应用场景中,可能需要根据系统运行状态动态调整调度策略。例如,当系统负载较高时,调整任务的执行时间片或者优先级,以保证系统的整体性能和任务的公平执行。

通过合理选择和优化任务调度策略,Rust 开发者可以充分发挥并发编程的优势,打造高效、稳定的应用程序。无论是简单的多线程任务,还是复杂的异步实时应用,都能通过合适的调度策略实现更好的性能和响应性。