MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Rust原子操作中的比较与交换策略

2022-05-032.7k 阅读

Rust 原子操作概述

在并发编程领域,原子操作是构建线程安全数据结构和实现高效同步机制的基石。Rust 作为一门致力于在保证性能的同时提供内存安全和并发安全的编程语言,为原子操作提供了强大的支持。

原子类型位于 std::sync::atomic 模块中。这些类型提供了对底层原子操作的抽象,允许开发者在多线程环境中进行安全的读写操作。常见的原子类型包括 AtomicBoolAtomicI32AtomicUsize 等,它们分别对应不同的数据类型,并提供了一系列原子方法。

例如,使用 AtomicI32 进行简单的原子加法操作:

use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let num = AtomicI32::new(0);
    num.fetch_add(1, Ordering::SeqCst);
    assert_eq!(num.load(Ordering::SeqCst), 1);
}

在上述代码中,fetch_add 方法以原子方式将指定的值加到 AtomicI32 实例上,并返回旧值。load 方法则以原子方式加载当前值。Ordering 参数用于指定内存顺序,这在原子操作中起着至关重要的作用,后续会详细介绍。

比较与交换(Compare and Swap,CAS)基础

比较与交换(CAS)是一种实现原子操作的核心策略。它的基本思想是:在对共享变量进行更新时,首先检查当前值是否与预期值相等,如果相等,则将其更新为新值;如果不相等,则不进行更新操作,并返回当前值。这种操作是原子的,即在多线程环境下,不会出现部分更新的情况。

在 Rust 中,Atomic 类型提供了 compare_exchangecompare_exchange_weak 方法来实现 CAS 操作。这两个方法的主要区别在于它们对 CAS 操作失败的处理方式,后续会展开讨论。

Rust 中的 compare_exchange 方法

compare_exchange 方法的签名如下:

fn compare_exchange(
    &self,
    expected: T,
    new: T,
    success: Ordering,
    failure: Ordering
) -> Result<T, T>
  • expected:预期值,即我们期望当前原子变量所具有的值。
  • new:新值,当原子变量的值与预期值相等时,将被设置为该值。
  • success:当比较和交换操作成功时使用的内存顺序。
  • failure:当比较和交换操作失败时使用的内存顺序。

该方法返回一个 ResultOk 变体包含旧值(即成功更新前的原子变量的值),Err 变体也包含旧值(表示更新失败时原子变量的当前值)。

以下是一个简单的示例,展示如何使用 compare_exchange 实现一个简单的自旋锁:

use std::sync::atomic::{AtomicBool, Ordering};

struct SpinLock {
    locked: AtomicBool,
}

impl SpinLock {
    fn new() -> Self {
        SpinLock {
            locked: AtomicBool::new(false),
        }
    }

    fn lock(&self) {
        while self.locked.compare_exchange(
            false,
            true,
            Ordering::Acquire,
            Ordering::Relaxed
        ).is_err() {
            // 自旋等待
        }
    }

    fn unlock(&self) {
        assert!(self.locked.compare_exchange(
            true,
            false,
            Ordering::Release,
            Ordering::Relaxed
        ).is_ok());
    }
}

fn main() {
    let lock = SpinLock::new();
    lock.lock();
    // 临界区代码
    lock.unlock();
}

在上述自旋锁的实现中,lock 方法通过 compare_exchange 不断尝试将 lockedfalse 设为 true。如果当前 lockedfalse(即锁未被持有),则成功获取锁并返回;否则,进入自旋等待。unlock 方法则通过 compare_exchangelockedtrue 设为 false,释放锁。

内存顺序在 compare_exchange 中的作用

内存顺序(Ordering)是原子操作中的一个关键概念,它决定了原子操作与其他内存操作之间的顺序关系。在 compare_exchange 方法中,successfailure 两个参数分别指定了操作成功和失败时的内存顺序。

  • Ordering::SeqCst(顺序一致性):这是最严格的内存顺序。使用 SeqCst 时,所有线程都以相同的顺序观察到所有 SeqCst 原子操作。这保证了全局的顺序一致性,但通常也是性能开销最大的选项。
  • Ordering::Acquire:当一个线程以 Acquire 顺序读取一个原子变量时,在这个读取操作之前的所有内存操作都对后续的读取操作可见。在 compare_exchange 成功时使用 Acquire 顺序,可以确保在获取锁(例如上述自旋锁的例子)之后,临界区内的代码能够看到之前对共享数据的所有修改。
  • Ordering::Release:与 Acquire 相对,当一个线程以 Release 顺序写入一个原子变量时,所有在这个写入操作之前的内存操作都对后续以 AcquireSeqCst 顺序读取该变量的线程可见。在 compare_exchange 成功释放锁(如自旋锁的 unlock 方法)时使用 Release 顺序,可以确保临界区内的修改对其他获取锁的线程可见。
  • Ordering::Relaxed:这是最宽松的内存顺序。Relaxed 原子操作仅保证自身的原子性,不提供任何内存顺序的保证。在 compare_exchange 失败时使用 Relaxed 顺序,因为此时我们只关心操作是否失败,而不关心与其他内存操作的顺序关系。

compare_exchange_weak 方法

compare_exchange_weak 方法与 compare_exchange 方法类似,但它在实现上可能会更高效,尤其是在多处理器系统上。其签名如下:

fn compare_exchange_weak(
    &self,
    expected: T,
    new: T,
    success: Ordering,
    failure: Ordering
) -> Result<T, T>

compare_exchange 的主要区别在于,compare_exchange_weak 可能会在预期值与当前值相等的情况下仍然失败,即所谓的“伪失败”。这是因为在某些硬件平台上,实现无锁数据结构时,CAS 操作可能会受到其他因素的干扰,导致偶尔的错误失败。

虽然 compare_exchange_weak 存在伪失败的情况,但在循环中使用时,它可以提供更好的性能。以下是一个使用 compare_exchange_weak 实现的简单计数器示例:

use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let counter = AtomicI32::new(0);
    let mut expected = counter.load(Ordering::Relaxed);
    loop {
        let new = expected + 1;
        match counter.compare_exchange_weak(
            expected,
            new,
            Ordering::SeqCst,
            Ordering::Relaxed
        ) {
            Ok(_) => break,
            Err(val) => expected = val,
        }
    }
    assert_eq!(counter.load(Ordering::SeqCst), 1);
}

在上述代码中,通过循环不断尝试使用 compare_exchange_weak 更新计数器。如果操作失败(无论是因为预期值不相等还是伪失败),则更新 expected 为当前值并继续尝试,直到成功更新为止。

选择 compare_exchange 还是 compare_exchange_weak

在选择使用 compare_exchange 还是 compare_exchange_weak 时,需要考虑具体的应用场景和性能需求。

  • 性能敏感场景:如果应用程序对性能非常敏感,并且可以接受偶尔的伪失败,那么 compare_exchange_weak 是一个更好的选择。在多处理器系统上,它通常可以提供更高的吞吐量,因为它的实现可能利用了硬件的一些特性,减少了不必要的同步开销。例如,在实现高性能的无锁数据结构时,compare_exchange_weak 经常被使用。
  • 正确性优先场景:如果应用程序对正确性要求极高,不允许任何意外的失败情况(除了预期值不相等导致的失败),那么应该使用 compare_exchange。例如,在实现关键的同步原语(如互斥锁)时,为了确保绝对的正确性,compare_exchange 更为合适。

基于 CAS 实现更复杂的数据结构

利用比较与交换操作,可以实现各种复杂的无锁数据结构,如无锁链表、无锁队列等。以无锁链表为例,以下是一个简单的实现框架:

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

struct Node {
    value: i32,
    next: AtomicPtr<Node>,
}

impl Node {
    fn new(value: i32) -> *mut Node {
        Box::into_raw(Box::new(Node {
            value,
            next: AtomicPtr::new(ptr::null_mut()),
        }))
    }
}

struct LockFreeList {
    head: AtomicPtr<Node>,
}

impl LockFreeList {
    fn new() -> Self {
        LockFreeList {
            head: AtomicPtr::new(ptr::null_mut()),
        }
    }

    fn push(&self, value: i32) {
        let new_node = Node::new(value);
        loop {
            let old_head = self.head.load(Ordering::Relaxed);
            let mut new_next = old_head;
            let result = self.head.compare_exchange_weak(
                old_head,
                new_node,
                Ordering::Release,
                Ordering::Relaxed
            );
            if result.is_ok() {
                (*new_node).next.store(new_next, Ordering::Relaxed);
                break;
            }
        }
    }

    fn pop(&self) -> Option<i32> {
        loop {
            let old_head = self.head.load(Ordering::Relaxed);
            if old_head.is_null() {
                return None;
            }
            let new_head = (*old_head).next.load(Ordering::Relaxed);
            let result = self.head.compare_exchange_weak(
                old_head,
                new_head,
                Ordering::Acquire,
                Ordering::Relaxed
            );
            if result.is_ok() {
                let value = (*old_head).value;
                Box::from_raw(old_head);
                return Some(value);
            }
        }
    }
}

fn main() {
    let list = LockFreeList::new();
    list.push(1);
    list.push(2);
    assert_eq!(list.pop(), Some(2));
    assert_eq!(list.pop(), Some(1));
    assert_eq!(list.pop(), None);
}

在上述无锁链表的实现中,push 方法通过 compare_exchange_weak 不断尝试将新节点插入到链表头部。pop 方法则通过 compare_exchange_weak 尝试删除链表头部节点,并返回其值。这种实现利用了 CAS 操作的原子性,确保在多线程环境下链表操作的正确性。

CAS 操作的局限性与替代方案

虽然比较与交换操作在并发编程中非常强大,但它也存在一些局限性。

  • ABA 问题:ABA 问题是 CAS 操作中常见的问题。当一个值从 A 变为 B,再变回 A 时,CAS 操作可能会误判为没有发生变化。例如,在无锁链表中,如果一个节点被删除并重新插入,其内存地址可能不变,但节点的状态可能已经发生了变化。为了解决 ABA 问题,可以使用版本号(如 AtomicU64 结合 AtomicPtr 实现带版本号的指针)或者其他标记机制来跟踪值的变化。
  • 性能瓶颈:在高竞争环境下,大量的 CAS 操作失败可能会导致性能瓶颈。因为失败的 CAS 操作会导致线程不断重试,消耗大量的 CPU 资源。在这种情况下,可以考虑使用传统的锁机制(如互斥锁、读写锁等),或者更高级的并发控制策略,如乐观锁与悲观锁的结合使用。

总结与展望

Rust 的原子操作中的比较与交换策略为并发编程提供了强大而灵活的工具。通过 compare_exchangecompare_exchange_weak 方法,开发者可以实现各种线程安全的数据结构和同步机制。在使用这些方法时,合理选择内存顺序和根据场景选择合适的 CAS 方法是关键。

随着硬件技术的不断发展,并发编程的需求也在不断增加。Rust 作为一门注重并发安全和性能的语言,其原子操作的支持也将不断完善。未来,我们可以期待 Rust 在原子操作领域提供更多的优化和创新,以满足日益复杂的并发编程场景的需求。同时,开发者在使用原子操作时,也需要不断深入理解其底层原理,以编写出高效、安全的并发代码。