MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Rust线程安全与并发编程基础

2023-06-065.1k 阅读

Rust 线程模型概述

Rust 基于 std::thread 模块提供了多线程编程能力,其线程模型接近操作系统原生线程。在 Rust 中创建线程非常简单,通过 thread::spawn 函数就可以生成一个新线程并执行闭包中的代码。例如:

use std::thread;

fn main() {
    thread::spawn(|| {
        println!("这是新线程中的代码");
    });
    println!("主线程继续执行");
}

在这个例子中,thread::spawn 创建了一个新线程并执行闭包内的打印语句。主线程在新线程启动后继续执行自己的打印语句。不过要注意,主线程默认不会等待新线程执行完毕,如果主线程执行结束,整个程序就会终止,包括新线程也会被终止。为了让主线程等待新线程完成,可以使用 join 方法。

use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("这是新线程中的代码");
    });
    handle.join().unwrap();
    println!("主线程等待新线程执行完毕后继续执行");
}

这里 handle.join() 会阻塞主线程,直到新线程执行完成。unwrap 用于处理可能出现的错误,如果新线程发生恐慌(panic),join 会返回一个 Errunwrap 会将这个 Err 转换为恐慌,终止程序。

线程安全与所有权

所有权转移与线程安全

Rust 的所有权系统在多线程编程中起到关键作用,确保线程安全。当我们在线程间传递数据时,数据的所有权会发生转移。例如:

use std::thread;

fn main() {
    let data = String::from("Hello, Rust");
    let handle = thread::spawn(move || {
        println!("新线程获取数据: {}", data);
    });
    handle.join().unwrap();
}

在这个例子中,move 关键字将 data 的所有权转移到了新线程的闭包中。这确保了只有新线程可以访问 data,避免了数据竞争。如果没有 move 关键字,闭包会尝试借用 data,而主线程在闭包执行期间仍然持有 data 的所有权,这会导致编译错误,因为 Rust 不允许在多线程环境下共享可变引用(可能导致数据竞争)。

共享所有权与 Arc

有时候我们需要在多个线程间共享数据的所有权,这时可以使用 Arc(原子引用计数)类型。ArcRc(引用计数)的线程安全版本。例如:

use std::sync::Arc;
use std::thread;

fn main() {
    let shared_data = Arc::new(String::from("共享数据"));
    let mut handles = vec![];
    for _ in 0..3 {
        let data = Arc::clone(&shared_data);
        let handle = thread::spawn(move || {
            println!("线程内共享数据: {}", data);
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
}

这里通过 Arc::new 创建了一个共享数据,然后在每个新线程中通过 Arc::clone 克隆 Arc,每个克隆的 Arc 指向相同的数据。由于 Arc 是线程安全的,多个线程可以安全地共享这份数据。不过,Arc 本身只是保证引用计数操作的原子性,对于共享数据的修改,还需要额外的同步机制。

同步原语

Mutex:互斥锁

Mutex(互斥锁)是 Rust 中常用的同步原语,用于保护共享数据,确保同一时间只有一个线程可以访问数据。Mutex 提供了 lock 方法来获取锁,如果锁已被其他线程持有,lock 会阻塞当前线程,直到锁可用。例如:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let shared_data = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let data = Arc::clone(&shared_data);
        let handle = thread::spawn(move || {
            let mut num = data.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("最终数据: {}", *shared_data.lock().unwrap());
}

在这个例子中,Arc<Mutex<i32>> 用于在多个线程间共享一个可变的整数。每个线程通过 lock 方法获取锁,修改数据后,锁会自动释放(当 num 离开作用域时)。unwrap 用于处理获取锁失败的情况,在实际应用中,可能需要更优雅的错误处理方式。

RwLock:读写锁

RwLock 也是一种同步原语,它允许多个线程同时进行读操作,但只允许一个线程进行写操作。这在读取操作频繁而写入操作较少的场景下非常有用,可以提高并发性能。例如:

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
    let shared_data = Arc::new(RwLock::new(String::from("初始数据")));
    let mut handles = vec![];
    for _ in 0..3 {
        let data = Arc::clone(&shared_data);
        let handle = thread::spawn(move || {
            let read_data = data.read().unwrap();
            println!("线程读取数据: {}", read_data);
        });
        handles.push(handle);
    }
    let write_handle = thread::spawn(move || {
        let mut write_data = shared_data.write().unwrap();
        *write_data = String::from("修改后的数据");
    });
    write_handle.join().unwrap();
    for handle in handles {
        handle.join().unwrap();
    }
    println!("最终数据: {}", *shared_data.read().unwrap());
}

这里通过 Arc<RwLock<String>> 共享一个字符串。读操作通过 read 方法获取读锁,允许多个线程同时读取。写操作通过 write 方法获取写锁,此时其他线程的读写操作都会被阻塞。同样,unwrap 用于处理获取锁失败的情况。

条件变量

条件变量基础

条件变量(Condvar)用于线程间的通信和同步,它通常与 Mutex 一起使用。Condvar 允许一个线程等待某个条件满足,而其他线程可以通知等待的线程条件已经满足。例如:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let data = Arc::new((Mutex::new(false), Condvar::new()));
    let data_clone = Arc::clone(&data);
    let handle = thread::spawn(move || {
        let (lock, cvar) = &*data_clone;
        let mut ready = lock.lock().unwrap();
        while!*ready {
            ready = cvar.wait(ready).unwrap();
        }
        println!("线程被唤醒,条件已满足");
    });
    let (lock, cvar) = &*data;
    let mut ready = lock.lock().unwrap();
    *ready = true;
    cvar.notify_one();
    handle.join().unwrap();
}

在这个例子中,Arc<(Mutex<bool>, Condvar)> 用于线程间的同步。新线程通过 wait 方法等待条件变量 cvar,并在等待时释放 Mutex 锁,允许其他线程修改共享数据。主线程修改共享数据后,通过 notify_one 方法通知等待的线程,等待的线程被唤醒后重新获取 Mutex 锁,检查条件是否满足。

复杂条件变量应用

条件变量在更复杂的场景中也非常有用,比如生产者 - 消费者模型。下面是一个简单的生产者 - 消费者模型示例:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let shared_queue = Arc::new((Mutex::new(vec![]), Condvar::new()));
    let producer_handle = thread::spawn(move || {
        let (queue, cvar) = &*shared_queue;
        for i in 0..10 {
            let mut q = queue.lock().unwrap();
            q.push(i);
            println!("生产者生产: {}", i);
            q = cvar.notify_one().unwrap();
            std::thread::sleep(Duration::from_millis(100));
        }
    });
    let consumer_handle = thread::spawn(move || {
        let (queue, cvar) = &*shared_queue;
        loop {
            let mut q = queue.lock().unwrap();
            while q.is_empty() {
                q = cvar.wait(q).unwrap();
            }
            let item = q.pop().unwrap();
            println!("消费者消费: {}", item);
            std::thread::sleep(Duration::from_millis(200));
        }
    });
    producer_handle.join().unwrap();
    consumer_handle.join().unwrap();
}

在这个生产者 - 消费者模型中,生产者线程不断生成数据并放入共享队列,然后通知消费者线程。消费者线程在队列空时等待,收到通知后检查队列是否有数据,有数据则消费。这里的 while q.is_empty() 循环是为了防止虚假唤醒,确保消费者线程只在队列有数据时才进行消费。

通道(Channel)

基本通道操作

Rust 的通道(std::sync::mpsc)用于线程间的消息传递,它提供了一种安全的方式在不同线程间发送和接收数据。mpsc 代表多生产者单消费者(Multiple Producer, Single Consumer)。例如:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();
    let sender1 = sender.clone();
    thread::spawn(move || {
        sender1.send(String::from("消息1")).unwrap();
    });
    thread::spawn(move || {
        sender.send(String::from("消息2")).unwrap();
    });
    for _ in 0..2 {
        let received = receiver.recv().unwrap();
        println!("收到消息: {}", received);
    }
}

在这个例子中,mpsc::channel() 创建了一个通道,返回一个发送者(sender)和一个接收者(receiver)。通过 sender.clone() 可以创建多个发送者,不同线程可以通过发送者向通道发送消息,接收者通过 recv 方法接收消息。recv 方法会阻塞,直到有消息可用。

通道的应用场景

通道在实际应用中有很多用途,比如构建分布式系统中的节点间通信。下面是一个简单的模拟示例:

use std::sync::mpsc;
use std::thread;

fn worker(sender: mpsc::Sender<String>) {
    let result = "工作完成".to_string();
    sender.send(result).unwrap();
}

fn main() {
    let (sender, receiver) = mpsc::channel();
    let num_workers = 3;
    for _ in 0..num_workers {
        let sender_clone = sender.clone();
        thread::spawn(move || {
            worker(sender_clone);
        });
    }
    for _ in 0..num_workers {
        let received = receiver.recv().unwrap();
        println!("收到工作结果: {}", received);
    }
}

在这个例子中,worker 函数模拟一个工作线程,完成工作后通过通道将结果发送回主线程。主线程创建多个工作线程,并接收它们的工作结果。通道提供了一种清晰的方式来组织线程间的异步通信。

线程局部存储(TLS)

TLS 概念

线程局部存储(TLS)允许每个线程拥有自己独立的数据副本。在 Rust 中,可以通过 thread_local! 宏来实现 TLS。例如:

thread_local! {
    static LOCAL_DATA: u32 = 0;
}

fn main() {
    let handle = std::thread::spawn(|| {
        LOCAL_DATA.with(|data| {
            *data.borrow_mut() += 1;
            println!("新线程中的局部数据: {}", *data.borrow());
        });
    });
    LOCAL_DATA.with(|data| {
        *data.borrow_mut() += 2;
        println!("主线程中的局部数据: {}", *data.borrow());
    });
    handle.join().unwrap();
}

在这个例子中,thread_local! 定义了一个线程局部变量 LOCAL_DATA。每个线程通过 with 方法访问和修改自己的局部数据副本。主线程和新线程对 LOCAL_DATA 的修改互不影响,因为它们操作的是各自的副本。

TLS 的应用场景

TLS 在很多场景下都很有用,比如每个线程需要维护自己的日志记录器。假设我们有一个简单的日志记录函数:

thread_local! {
    static LOGGER: Vec<String> = Vec::new();
}

fn log_message(message: &str) {
    LOGGER.with(|log| {
        let mut log = log.borrow_mut();
        log.push(message.to_string());
    });
}

fn main() {
    let handle = std::thread::spawn(|| {
        log_message("新线程日志消息");
        LOGGER.with(|log| {
            for msg in log.borrow().iter() {
                println!("新线程日志: {}", msg);
            }
        });
    });
    log_message("主线程日志消息");
    LOGGER.with(|log| {
        for msg in log.borrow().iter() {
            println!("主线程日志: {}", msg);
        }
    });
    handle.join().unwrap();
}

这里通过 TLS 为每个线程创建了独立的日志记录器。每个线程可以独立地记录日志,而不会干扰其他线程的日志记录。

线程安全的设计模式

单例模式与线程安全

在 Rust 中实现线程安全的单例模式可以使用 lazy_static 库结合 MutexRwLock。例如:

use std::sync::{Mutex, RwLock};
use lazy_static::lazy_static;

lazy_static! {
    static ref SINGLETON: Mutex<String> = Mutex::new(String::from("单例实例"));
}

fn main() {
    let handle1 = std::thread::spawn(|| {
        let mut singleton = SINGLETON.lock().unwrap();
        *singleton = String::from("线程1修改后的单例");
    });
    let handle2 = std::thread::spawn(|| {
        let singleton = SINGLETON.lock().unwrap();
        println!("线程2读取单例: {}", singleton);
    });
    handle1.join().unwrap();
    handle2.join().unwrap();
}

这里 lazy_static 确保 SINGLETON 只在第一次使用时初始化,并且由于使用了 Mutex,多个线程可以安全地访问和修改单例实例。如果单例主要用于读取操作,可以使用 RwLock 提高并发性能。

生产者 - 消费者模式的优化

前面提到的生产者 - 消费者模型可以进一步优化,比如使用有界队列来避免生产者过度生产导致内存占用过高。可以使用 std::sync::mpsc::sync_channel 创建有界通道:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (sender, receiver) = mpsc::sync_channel(3);
    let producer_handle = thread::spawn(move || {
        for i in 0..10 {
            match sender.send(i) {
                Ok(_) => println!("生产者生产: {}", i),
                Err(_) => println!("队列已满,无法生产"),
            }
            std::thread::sleep(Duration::from_millis(100));
        }
    });
    let consumer_handle = thread::spawn(move || {
        loop {
            match receiver.recv() {
                Ok(item) => println!("消费者消费: {}", item),
                Err(_) => break,
            }
            std::thread::sleep(Duration::from_millis(200));
        }
    });
    producer_handle.join().unwrap();
    drop(sender);
    consumer_handle.join().unwrap();
}

在这个例子中,mpsc::sync_channel(3) 创建了一个容量为 3 的有界通道。生产者在队列满时无法发送数据,避免了内存无限制增长。通过 drop(sender) 通知消费者不再有数据发送,消费者在 recv 返回 Err 时结束循环。

并发编程中的错误处理

线程恐慌处理

当线程发生恐慌(panic)时,默认情况下会导致整个程序终止。为了更好地处理线程恐慌,可以使用 thread::Builder 设置线程的恐慌策略。例如:

use std::thread;

fn main() {
    let handle = thread::Builder::new()
        .panic_handler(|panic_info| {
            eprintln!("线程发生恐慌: {:?}", panic_info);
        })
        .spawn(|| {
            panic!("故意引发恐慌");
        })
        .unwrap();
    handle.join().unwrap();
    println!("主线程继续执行");
}

这里通过 thread::Builder 设置了自定义的恐慌处理函数,当线程发生恐慌时,会打印恐慌信息,而不会导致整个程序终止。主线程可以继续执行。

通道错误处理

在通道操作中,sendrecv 方法都可能返回错误。例如,send 方法在通道关闭时会返回 Err。正确处理这些错误可以提高程序的健壮性。例如:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();
    let sender_clone = sender.clone();
    let handle = thread::spawn(move || {
        drop(sender_clone);
    });
    match receiver.recv() {
        Ok(_) => println!("收到消息"),
        Err(_) => println!("通道已关闭,无法接收消息"),
    }
    handle.join().unwrap();
}

在这个例子中,通过 drop(sender_clone) 关闭了发送端,接收端在 recv 时会返回 Err,程序可以根据这个错误进行相应处理,而不是导致未定义行为。

通过深入理解 Rust 的线程安全与并发编程基础,开发者可以编写出高效、健壮且线程安全的并发程序,充分利用现代多核处理器的性能优势。无论是构建高性能的服务器应用,还是开发多线程的桌面软件,这些基础知识都是必不可少的。同时,在实际应用中,还需要根据具体的需求和场景,合理选择同步原语、设计并发模式,并妥善处理错误,以确保程序的稳定性和可靠性。