MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++多线程编程中同步机制的实现

2024-02-272.5k 阅读

C++ 多线程编程中同步机制的实现

多线程编程基础与同步机制的重要性

在现代计算机编程中,多线程编程已成为提高程序性能和响应性的重要手段。C++ 作为一种强大的编程语言,自 C++11 起引入了对多线程编程的支持,极大地方便了开发者编写并发程序。

多线程编程允许程序同时执行多个任务,这些任务可以共享程序的资源,如内存空间和文件句柄等。然而,这种资源共享也带来了一系列问题,其中最主要的就是数据竞争(Data Race)。当多个线程同时访问和修改共享数据时,如果没有适当的同步措施,就会导致程序出现未定义行为,结果可能是不可预测的,如程序崩溃、数据损坏等。

同步机制(Synchronization Mechanisms)就是为了解决这些问题而设计的。它们确保在同一时刻只有一个线程能够访问共享资源,或者协调多个线程对共享资源的访问顺序,从而避免数据竞争,保证程序的正确性。

C++ 多线程库简介

C++11 引入的 <thread> 头文件提供了多线程编程的基础支持,包括 std::thread 类用于创建和管理线程。同时,<mutex> 头文件引入了互斥锁(Mutex,即 Mutual Exclusion 的缩写),这是最基本的同步工具。<condition_variable> 头文件提供了条件变量,用于线程间的复杂同步,<future> 头文件则用于处理异步任务的结果。

互斥锁(Mutex)

基本概念

互斥锁是一种二元信号量,它只有两种状态:锁定(locked)和解锁(unlocked)。当一个线程获取(lock)了互斥锁,它就进入了锁定状态,其他线程试图获取该互斥锁时就会被阻塞,直到该线程释放(unlock)互斥锁。

代码示例

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock();
        ++shared_variable;
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of shared_variable: " << shared_variable << std::endl;
    return 0;
}

在上述代码中,std::mutex mtx 定义了一个互斥锁。increment 函数试图对 shared_variable 进行 100 万次的递增操作。在每次操作前,线程通过 mtx.lock() 获取互斥锁,操作完成后通过 mtx.unlock() 释放互斥锁。这样,当一个线程在修改 shared_variable 时,其他线程无法同时进行操作,从而避免了数据竞争。

注意事项

  1. 死锁(Deadlock):如果一个线程获取了互斥锁但没有释放,或者多个线程相互等待对方释放锁,就会发生死锁。例如:
std::mutex mtx1, mtx2;

void thread1() {
    mtx1.lock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mtx2.lock();
    // 执行一些操作
    mtx2.unlock();
    mtx1.unlock();
}

void thread2() {
    mtx2.lock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mtx1.lock();
    // 执行一些操作
    mtx1.unlock();
    mtx2.unlock();
}

在上述代码中,thread1 先获取 mtx1,然后尝试获取 mtx2,而 thread2 先获取 mtx2,然后尝试获取 mtx1。如果 thread1 先获取 mtx1 并睡眠 100 毫秒,thread2 在这期间获取 mtx2,当它们尝试获取对方持有的锁时,就会发生死锁。 2. 性能问题:虽然互斥锁能保证数据的一致性,但过多地使用互斥锁会导致线程频繁地等待,从而降低程序的性能。因此,在设计程序时,应尽量减少锁的粒度和持有锁的时间。

锁的 RAII 封装:std::lock_guard 和 std::unique_lock

std::lock_guard

std::lock_guard 是 C++ 标准库提供的一个基于 RAII(Resource Acquisition Is Initialization)机制的互斥锁封装类。它在构造时自动锁定互斥锁,在析构时自动解锁互斥锁。

代码示例

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    std::lock_guard<std::mutex> lock(mtx);
    for (int i = 0; i < 1000000; ++i) {
        ++shared_variable;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of shared_variable: " << shared_variable << std::endl;
    return 0;
}

在上述代码中,std::lock_guard<std::mutex> lock(mtx) 在进入 increment 函数时自动锁定 mtx,当函数结束,lock 被销毁时自动解锁 mtx。这种方式比手动调用 lockunlock 更加安全,因为即使函数在执行过程中抛出异常,lock_guard 的析构函数依然会被调用,从而确保互斥锁被正确释放。

std::unique_lock

std::unique_lock 同样基于 RAII 机制,但它比 std::lock_guard 更加灵活。std::unique_lock 可以延迟锁定、在运行时锁定和解锁,还可以与条件变量一起使用。

代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lock(mtx);
    while (!ready) cv.wait(lock);
    std::cout << "thread " << id << '\n';
}

void go() {
    std::unique_lock<std::mutex> lock(mtx);
    ready = true;
    std::cout << "go\n";
    cv.notify_all();
}

int main() {
    std::thread threads[10];
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(print_id, i);

    std::cout << "10 threads ready to race...\n";
    go();

    for (auto& th : threads) th.join();

    return 0;
}

在上述代码中,std::unique_lock<std::mutex> lock(mtx) 定义了一个 unique_lockprint_id 函数中的 cv.wait(lock) 表示线程在等待条件变量 cv 被通知,同时会自动释放 mtx 锁,避免死锁。当 go 函数中的 cv.notify_all() 被调用时,等待在 cv 上的所有线程会被唤醒,重新获取 mtx 锁并继续执行。

读写锁(Read - Write Lock)

基本概念

读写锁是一种特殊的同步机制,它区分了读操作和写操作。允许多个线程同时进行读操作,因为读操作不会修改共享数据,不会导致数据竞争。但是,当有一个线程进行写操作时,其他线程无论是读还是写都必须等待,直到写操作完成。

C++ 中的实现

在 C++ 标准库中,<shared_mutex> 头文件提供了读写锁的支持,包括 std::shared_mutexstd::shared_timed_mutexstd::shared_lock 用于获取共享锁(读锁),std::unique_lock 用于获取独占锁(写锁)。

代码示例

#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>

std::shared_mutex rw_mutex;
int shared_data = 0;

void read(int id) {
    std::shared_lock<std::shared_mutex> lock(rw_mutex);
    std::cout << "Thread " << id << " reads data: " << shared_data << std::endl;
}

void write(int id, int value) {
    std::unique_lock<std::shared_mutex> lock(rw_mutex);
    shared_data = value;
    std::cout << "Thread " << id << " writes data: " << shared_data << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        if (i % 2 == 0) {
            threads.emplace_back(write, i, i * 10);
        } else {
            threads.emplace_back(read, i);
        }
    }

    for (auto& th : threads) th.join();

    return 0;
}

在上述代码中,read 函数使用 std::shared_lock 获取读锁,允许多个线程同时读取 shared_datawrite 函数使用 std::unique_lock 获取写锁,确保在写操作时没有其他线程可以访问 shared_data

条件变量(Condition Variable)

基本概念

条件变量是一种线程同步机制,它允许线程等待某个条件满足。一个线程可以在条件变量上等待,而另一个线程可以通知这个条件变量,从而唤醒等待的线程。条件变量通常与互斥锁一起使用。

代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        cv.notify_one();
    }

    std::unique_lock<std::mutex> lock(mtx);
    finished = true;
    cv.notify_all();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return!data_queue.empty() || finished; });
        if (data_queue.empty() && finished) break;
        int value = data_queue.front();
        data_queue.pop();
        std::cout << "Consumed: " << value << std::endl;
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

在上述代码中,producer 函数将数据放入队列 data_queue 并通知条件变量 cvconsumer 函数在条件变量 cv 上等待,当队列不为空或者 finishedtrue 时被唤醒。cv.wait(lock, [] { return!data_queue.empty() || finished; }); 中的第二个参数是一个谓词(Predicate),它确保只有在满足条件时才会唤醒线程,避免了虚假唤醒(Spurious Wakeup)。

信号量(Semaphore)

基本概念

信号量是一个计数器,它可以用来控制同时访问某个资源的线程数量。当一个线程获取信号量时,计数器减 1;当一个线程释放信号量时,计数器加 1。如果计数器为 0,则获取信号量的线程会被阻塞,直到有其他线程释放信号量。

C++ 中的实现

虽然 C++ 标准库没有直接提供信号量的实现,但可以通过 std::mutexstd::condition_variable 来模拟信号量。

代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(unsigned int count = 0) : count(count) {}

    void acquire() {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [this] { return count > 0; });
        --count;
    }

    void release() {
        std::unique_lock<std::mutex> lock(mtx);
        ++count;
        cv.notify_one();
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    unsigned int count;
};

Semaphore sem(1);

void critical_section(int id) {
    sem.acquire();
    std::cout << "Thread " << id << " entered critical section." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Thread " << id << " left critical section." << std::endl;
    sem.release();
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(critical_section, i);
    }

    for (auto& th : threads) th.join();

    return 0;
}

在上述代码中,Semaphore 类模拟了一个信号量。acquire 方法获取信号量,release 方法释放信号量。critical_section 函数在进入临界区前获取信号量,离开临界区时释放信号量,从而控制同时进入临界区的线程数量。

屏障(Barrier)

基本概念

屏障是一种同步机制,它可以让多个线程在某个点上等待,直到所有线程都到达这个点,然后它们再继续执行。这在需要多个线程协同完成某些任务,并且需要在某个阶段进行同步的场景中非常有用。

C++ 中的实现

同样,C++ 标准库没有直接提供屏障的实现,但可以通过 std::mutexstd::condition_variable 和一个计数器来实现。

代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class Barrier {
public:
    Barrier(int count) : count(count), arrived(0) {}

    void wait() {
        std::unique_lock<std::mutex> lock(mtx);
        ++arrived;
        if (arrived == count) {
            arrived = 0;
            cv.notify_all();
        } else {
            cv.wait(lock, [this] { return arrived == 0; });
        }
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
    int arrived;
};

Barrier barrier(5);

void thread_function(int id) {
    std::cout << "Thread " << id << " is working." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(id));
    std::cout << "Thread " << id << " is reaching the barrier." << std::endl;
    barrier.wait();
    std::cout << "Thread " << id << " passed the barrier." << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(thread_function, i);
    }

    for (auto& th : threads) th.join();

    return 0;
}

在上述代码中,Barrier 类实现了一个屏障。每个线程在执行 barrier.wait() 时会等待,直到所有 count 个线程都调用了 wait。当所有线程都到达时,屏障被重置,所有线程被唤醒继续执行。

总结

在 C++ 多线程编程中,同步机制是确保程序正确性和性能的关键。从最基本的互斥锁到复杂的条件变量、信号量、屏障等,每种同步机制都有其适用场景。开发者需要根据具体的需求,合理选择和使用这些同步机制,避免数据竞争和死锁等问题,同时优化程序的性能。通过对这些同步机制的深入理解和实践,能够编写出高效、可靠的多线程 C++ 程序。