MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Rust 进度报告的原子方案

2022-05-156.9k 阅读

Rust 中的原子操作概述

在多线程编程领域,原子操作是一种不可分割的操作,在执行过程中不会被其他线程干扰。Rust 通过 std::sync::atomic 模块提供了对原子类型和操作的支持,这对于编写高效且线程安全的代码至关重要。

原子类型保证了在多线程环境下对其值的修改和读取是原子性的。例如,AtomicI32 类型用于对 32 位整数进行原子操作。

基本原子类型

  1. AtomicBool:用于布尔值的原子操作。它提供了 store 方法来设置值,load 方法来获取值,并且这些操作都是原子的。
use std::sync::atomic::{AtomicBool, Ordering};

fn main() {
    let flag = AtomicBool::new(false);
    flag.store(true, Ordering::SeqCst);
    assert!(flag.load(Ordering::SeqCst));
}

在上述代码中,首先创建了一个 AtomicBool 实例 flag 并初始化为 false。然后使用 store 方法将其设置为 true,最后通过 load 方法获取值并进行断言。Ordering::SeqCst 是一种内存序,它保证了操作在所有线程间的顺序一致性。

  1. AtomicI*AtomicU*:分别用于有符号和无符号整数的原子操作。以 AtomicI32 为例,除了 storeload 方法外,还提供了 fetch_addfetch_sub 等算术操作。
use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let counter = AtomicI32::new(0);
    let result = counter.fetch_add(1, Ordering::Relaxed);
    assert_eq!(result, 0);
    assert_eq!(counter.load(Ordering::Relaxed), 1);
}

这里创建了一个 AtomicI32 类型的 counter 并初始化为 0。调用 fetch_add 方法将其值加 1,该方法返回操作前的值,所以 result 为 0,而此时 counter 的值变为 1。

内存序

内存序(Memory Ordering)在原子操作中起着关键作用。它定义了原子操作与其他内存访问操作之间的顺序关系。Rust 提供了几种不同的内存序:

  1. SeqCst(顺序一致性):这是最严格的内存序。所有线程对原子操作的顺序是一致的,就好像所有线程的操作按照一个全局的顺序依次执行。这保证了最强的一致性,但通常也是性能开销最大的。
  2. AcquireReleaseRelease 内存序用于在存储操作时标记该操作对后续线程可见。Acquire 内存序用于在加载操作时确保在此之前的所有内存访问都已完成。
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;

fn main() {
    let data = AtomicI32::new(0);
    let ready = AtomicBool::new(false);

    let handle = thread::spawn(move || {
        data.store(42, Ordering::Release);
        ready.store(true, Ordering::Release);
    });

    while!ready.load(Ordering::Acquire) {
        thread::yield_now();
    }
    assert_eq!(data.load(Ordering::Acquire), 42);

    handle.join().unwrap();
}

在这个例子中,子线程先使用 Release 内存序存储数据到 dataready。主线程在 ready 变为 true 之前不断调用 yield_now 让出 CPU 时间片。一旦 readytrue,主线程以 Acquire 内存序加载 data,确保在读取 data 之前,子线程对 data 的存储操作已经完成。

  1. Relaxed:这是最宽松的内存序。它只保证原子操作本身的原子性,不保证与其他内存访问操作的顺序关系。适用于一些不需要严格顺序保证的场景,性能开销最小。

原子引用计数

Rust 的 std::sync::Arc 类型实现了原子引用计数。它允许多个线程安全地共享一个对象,并且引用计数的增减是原子操作。

use std::sync::Arc;
use std::thread;

fn main() {
    let shared_data = Arc::new(String::from("Hello, Rust!"));
    let handles = (0..10).map(|_| {
        let data = Arc::clone(&shared_data);
        thread::spawn(move || {
            println!("{}", data);
        })
    }).collect::<Vec<_>>();

    for handle in handles {
        handle.join().unwrap();
    }
}

在这个例子中,Arc::new 创建了一个指向 String 的共享指针 shared_data。通过 Arc::clone 方法在不同线程间共享这个指针,每个线程都可以安全地访问 shared_data 指向的数据。

原子操作在进度报告中的应用

假设我们正在编写一个多线程应用程序,需要对任务的进度进行跟踪和报告。可以使用原子类型来实现这一功能。

  1. 简单的任务进度跟踪
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let total_tasks = 100;
    let completed_tasks = AtomicUsize::new(0);

    let handles = (0..10).map(|_| {
        let tasks = &completed_tasks;
        thread::spawn(move || {
            for _ in 0..(total_tasks / 10) {
                tasks.fetch_add(1, Ordering::Relaxed);
            }
        })
    }).collect::<Vec<_>>();

    for handle in handles {
        handle.join().unwrap();
    }

    assert_eq!(completed_tasks.load(Ordering::Relaxed), total_tasks);
}

在这个示例中,创建了一个 AtomicUsize 类型的 completed_tasks 来跟踪已完成的任务数量。10 个线程并行执行任务,每个线程负责完成 total_tasks / 10 个任务,通过 fetch_add 方法原子地增加已完成任务的计数。

  1. 更复杂的进度报告 我们可以进一步扩展进度报告功能,添加任务类型、预计完成时间等信息。
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use std::thread;
use std::time::Duration;

struct TaskProgress {
    total: usize,
    completed: AtomicUsize,
    estimated_time: Duration,
}

impl TaskProgress {
    fn new(total: usize, estimated_time: Duration) -> Self {
        TaskProgress {
            total,
            completed: AtomicUsize::new(0),
            estimated_time,
        }
    }

    fn update(&self) {
        self.completed.fetch_add(1, Ordering::Relaxed);
    }

    fn progress(&self) -> f64 {
        self.completed.load(Ordering::Relaxed) as f64 / self.total as f64
    }
}

fn main() {
    let task_types = HashMap::from([
        ("type1".to_string(), TaskProgress::new(50, Duration::from_secs(10))),
        ("type2".to_string(), TaskProgress::new(30, Duration::from_secs(5))),
        ("type3".to_string(), TaskProgress::new(20, Duration::from_secs(3))),
    ]);

    let handles = task_types.keys().map(|task_type| {
        let tasks = task_types.get(task_type).unwrap();
        thread::spawn(move || {
            for _ in 0..tasks.total {
                tasks.update();
                thread::sleep(Duration::from_millis(100));
            }
        })
    }).collect::<Vec<_>>();

    for handle in handles {
        handle.join().unwrap();
    }

    for (task_type, progress) in &task_types {
        println!("Task type: {}, Progress: {:.2}%", task_type, progress.progress() * 100.0);
    }
}

在这个代码中,定义了 TaskProgress 结构体来跟踪每种任务类型的进度。每个任务类型有一个 total 表示总任务数,completed 用于原子地记录已完成的任务数,estimated_time 表示预计完成时间。update 方法用于更新完成的任务数,progress 方法用于计算当前进度。通过 HashMap 来管理不同类型的任务进度,并启动多个线程来模拟任务执行。最后打印出每种任务类型的进度。

原子操作的性能考量

虽然原子操作提供了线程安全,但不同的内存序和原子类型的操作在性能上有显著差异。

  1. 宽松内存序的性能优势Relaxed 内存序由于不需要保证操作顺序,性能通常比 SeqCst 要好。在一些对数据一致性要求不高的场景,如简单的计数器,使用 Relaxed 内存序可以获得更好的性能。
  2. 原子类型的选择:不同的原子类型在内存占用和操作性能上也有所不同。例如,AtomicU8AtomicI32 占用的内存少,在只需要处理小整数且性能敏感的场景下,选择 AtomicU8 可能更合适。

与其他并发原语的结合使用

  1. Mutex 和原子操作:有时候,需要结合 Mutex 和原子类型来实现更复杂的线程安全逻辑。例如,在需要对复杂数据结构进行线程安全访问,同时又需要对某些简单状态进行原子更新时。
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(Vec::new()));
    let is_done = Arc::new(AtomicBool::new(false));

    let data_clone = Arc::clone(&data);
    let is_done_clone = Arc::clone(&is_done);
    let handle = thread::spawn(move || {
        let mut inner = data_clone.lock().unwrap();
        inner.push(42);
        is_done_clone.store(true, Ordering::Release);
    });

    while!is_done.load(Ordering::Acquire) {
        thread::yield_now();
    }

    let inner = data.lock().unwrap();
    assert_eq!(inner[0], 42);

    handle.join().unwrap();
}

在这个例子中,Mutex 用于保护 Vec 的安全访问,而 AtomicBool 用于标记任务是否完成。子线程在修改 Vec 后,通过 AtomicBool 标记任务完成,主线程等待任务完成后再访问 Vec

  1. 条件变量和原子操作:条件变量(Condvar)可以与原子类型结合,实现更高效的线程间同步。
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Condvar;
use std::thread;

fn main() {
    let data = Arc::new((Mutex::new(0), Condvar::new()));
    let is_ready = Arc::new(AtomicBool::new(false));

    let data_clone = Arc::clone(&data);
    let is_ready_clone = Arc::clone(&is_ready);
    let handle = thread::spawn(move || {
        let (lock, cvar) = &*data_clone;
        let mut value = lock.lock().unwrap();
        *value = 42;
        is_ready_clone.store(true, Ordering::Release);
        cvar.notify_one();
    });

    let (lock, cvar) = &*data;
    let mut value = lock.lock().unwrap();
    while!is_ready.load(Ordering::Acquire) {
        value = cvar.wait(value).unwrap();
    }
    assert_eq!(*value, 42);

    handle.join().unwrap();
}

这里,AtomicBool 用于标记数据是否准备好,条件变量 Condvar 用于在数据准备好时通知等待的线程。子线程在更新数据后,设置 AtomicBool 并通过 Condvar 通知主线程,主线程在等待数据准备好的过程中可以进入睡眠状态,提高了效率。

原子操作的错误处理

在 Rust 的原子操作中,通常不会像文件操作或网络操作那样返回 Result 类型来处理错误。因为原子操作本身的失败概率极低,并且在现代硬件上,原子操作通常是由硬件指令直接支持的。

然而,如果在使用原子操作时结合了其他可能失败的操作,那么就需要适当的错误处理。例如,在原子操作前进行资源分配时,如果资源分配失败,就需要处理这个错误情况。

use std::sync::atomic::{AtomicI32, Ordering};
use std::fs::File;
use std::io::Write;

fn main() {
    let counter = AtomicI32::new(0);
    let file_result = File::create("output.txt");
    match file_result {
        Ok(mut file) => {
            counter.fetch_add(1, Ordering::Relaxed);
            let write_result = file.write_all(b"Some data");
            match write_result {
                Ok(_) => {}
                Err(e) => {
                    eprintln!("Write error: {}", e);
                }
            }
        }
        Err(e) => {
            eprintln!("File creation error: {}", e);
        }
    }
}

在这个例子中,首先尝试创建文件,如果文件创建成功,才对原子计数器进行操作并写入数据。如果文件创建或写入数据过程中出现错误,进行相应的错误处理。

原子操作的高级应用

  1. 无锁数据结构:利用原子操作可以实现无锁数据结构,避免了锁带来的性能开销和死锁问题。例如,无锁队列可以通过原子指针操作来实现。
use std::sync::atomic::{AtomicPtr, Ordering};
use std::mem;

struct Node<T> {
    data: T,
    next: AtomicPtr<Node<T>>,
}

impl<T> Node<T> {
    fn new(data: T) -> *mut Node<T> {
        Box::into_raw(Box::new(Node {
            data,
            next: AtomicPtr::new(std::ptr::null_mut()),
        }))
    }
}

struct LockFreeQueue<T> {
    head: AtomicPtr<Node<T>>,
    tail: AtomicPtr<Node<T>>,
}

impl<T> LockFreeQueue<T> {
    fn new() -> Self {
        let sentinel = Node::new(T::default());
        LockFreeQueue {
            head: AtomicPtr::new(sentinel),
            tail: AtomicPtr::new(sentinel),
        }
    }

    fn enqueue(&self, data: T) {
        let new_node = Node::new(data);
        loop {
            let tail = self.tail.load(Ordering::Acquire);
            let next = unsafe { (*tail).next.load(Ordering::Acquire) };
            if tail == self.tail.load(Ordering::Acquire) {
                if next.is_null() {
                    if unsafe { (*tail).next.compare_and_swap(
                        std::ptr::null_mut(),
                        new_node,
                        Ordering::Release
                    ) }.is_null() {
                        break;
                    }
                } else {
                    self.tail.compare_and_swap(
                        tail,
                        next,
                        Ordering::Release
                    );
                }
            }
        }
        self.tail.compare_and_swap(
            self.tail.load(Ordering::Acquire),
            new_node,
            Ordering::Release
        );
    }

    fn dequeue(&self) -> Option<T> {
        loop {
            let head = self.head.load(Ordering::Acquire);
            let tail = self.tail.load(Ordering::Acquire);
            let next = unsafe { (*head).next.load(Ordering::Acquire) };
            if head == self.head.load(Ordering::Acquire) {
                if head == tail {
                    if next.is_null() {
                        return None;
                    }
                    self.tail.compare_and_swap(
                        tail,
                        next,
                        Ordering::Release
                    );
                } else {
                    let data = unsafe { (*next).data.clone() };
                    if self.head.compare_and_swap(
                        head,
                        next,
                        Ordering::Release
                    ) == head {
                        unsafe {
                            mem::drop(Box::from_raw(head));
                        }
                        return Some(data);
                    }
                }
            }
        }
    }
}

fn main() {
    let queue = LockFreeQueue::<i32>::new();
    queue.enqueue(10);
    queue.enqueue(20);
    assert_eq!(queue.dequeue(), Some(10));
    assert_eq!(queue.dequeue(), Some(20));
    assert_eq!(queue.dequeue(), None);
}

这个无锁队列通过 AtomicPtr 来管理节点的指针,利用 compare_and_swap 原子操作来实现线程安全的入队和出队操作。

  1. 原子操作与缓存一致性:在多处理器系统中,原子操作与缓存一致性密切相关。不同处理器核心可能有自己的缓存,原子操作需要确保数据在各个缓存间的一致性。Rust 的原子操作通过内存序来帮助开发者控制这种一致性。例如,使用 SeqCst 内存序可以确保所有处理器核心看到的原子操作顺序是一致的,从而保证数据的强一致性。在编写高性能、多线程的 Rust 程序时,理解和正确使用内存序对于充分利用硬件特性、提高程序性能至关重要。

总结原子操作在 Rust 中的应用

原子操作在 Rust 的多线程编程中是一个强大的工具。通过合理使用原子类型和内存序,可以实现高效、线程安全的代码。无论是简单的计数器,还是复杂的无锁数据结构,原子操作都能发挥重要作用。在实际应用中,需要根据具体的需求和性能要求,选择合适的原子类型和内存序,同时要注意与其他并发原语的结合使用,以构建健壮的多线程应用程序。并且,在涉及到原子操作与其他可能失败的操作结合时,要进行适当的错误处理。随着硬件技术的不断发展,原子操作的性能和功能也将不断提升,Rust 开发者需要持续关注并利用这些特性来优化自己的代码。