C++多线程编程中同步机制的实现
C++ 多线程编程中同步机制的实现
多线程编程基础与同步机制的重要性
在现代计算机编程中,多线程编程已成为提高程序性能和响应性的重要手段。C++ 作为一种强大的编程语言,自 C++11 起引入了对多线程编程的支持,极大地方便了开发者编写并发程序。
多线程编程允许程序同时执行多个任务,这些任务可以共享程序的资源,如内存空间和文件句柄等。然而,这种资源共享也带来了一系列问题,其中最主要的就是数据竞争(Data Race)。当多个线程同时访问和修改共享数据时,如果没有适当的同步措施,就会导致程序出现未定义行为,结果可能是不可预测的,如程序崩溃、数据损坏等。
同步机制(Synchronization Mechanisms)就是为了解决这些问题而设计的。它们确保在同一时刻只有一个线程能够访问共享资源,或者协调多个线程对共享资源的访问顺序,从而避免数据竞争,保证程序的正确性。
C++ 多线程库简介
C++11 引入的 <thread>
头文件提供了多线程编程的基础支持,包括 std::thread
类用于创建和管理线程。同时,<mutex>
头文件引入了互斥锁(Mutex,即 Mutual Exclusion 的缩写),这是最基本的同步工具。<condition_variable>
头文件提供了条件变量,用于线程间的复杂同步,<future>
头文件则用于处理异步任务的结果。
互斥锁(Mutex)
基本概念
互斥锁是一种二元信号量,它只有两种状态:锁定(locked)和解锁(unlocked)。当一个线程获取(lock)了互斥锁,它就进入了锁定状态,其他线程试图获取该互斥锁时就会被阻塞,直到该线程释放(unlock)互斥锁。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
int shared_variable = 0;
void increment() {
for (int i = 0; i < 1000000; ++i) {
mtx.lock();
++shared_variable;
mtx.unlock();
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final value of shared_variable: " << shared_variable << std::endl;
return 0;
}
在上述代码中,std::mutex mtx
定义了一个互斥锁。increment
函数试图对 shared_variable
进行 100 万次的递增操作。在每次操作前,线程通过 mtx.lock()
获取互斥锁,操作完成后通过 mtx.unlock()
释放互斥锁。这样,当一个线程在修改 shared_variable
时,其他线程无法同时进行操作,从而避免了数据竞争。
注意事项
- 死锁(Deadlock):如果一个线程获取了互斥锁但没有释放,或者多个线程相互等待对方释放锁,就会发生死锁。例如:
std::mutex mtx1, mtx2;
void thread1() {
mtx1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mtx2.lock();
// 执行一些操作
mtx2.unlock();
mtx1.unlock();
}
void thread2() {
mtx2.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mtx1.lock();
// 执行一些操作
mtx1.unlock();
mtx2.unlock();
}
在上述代码中,thread1
先获取 mtx1
,然后尝试获取 mtx2
,而 thread2
先获取 mtx2
,然后尝试获取 mtx1
。如果 thread1
先获取 mtx1
并睡眠 100 毫秒,thread2
在这期间获取 mtx2
,当它们尝试获取对方持有的锁时,就会发生死锁。
2. 性能问题:虽然互斥锁能保证数据的一致性,但过多地使用互斥锁会导致线程频繁地等待,从而降低程序的性能。因此,在设计程序时,应尽量减少锁的粒度和持有锁的时间。
锁的 RAII 封装:std::lock_guard 和 std::unique_lock
std::lock_guard
std::lock_guard
是 C++ 标准库提供的一个基于 RAII(Resource Acquisition Is Initialization)机制的互斥锁封装类。它在构造时自动锁定互斥锁,在析构时自动解锁互斥锁。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
int shared_variable = 0;
void increment() {
std::lock_guard<std::mutex> lock(mtx);
for (int i = 0; i < 1000000; ++i) {
++shared_variable;
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final value of shared_variable: " << shared_variable << std::endl;
return 0;
}
在上述代码中,std::lock_guard<std::mutex> lock(mtx)
在进入 increment
函数时自动锁定 mtx
,当函数结束,lock
被销毁时自动解锁 mtx
。这种方式比手动调用 lock
和 unlock
更加安全,因为即使函数在执行过程中抛出异常,lock_guard
的析构函数依然会被调用,从而确保互斥锁被正确释放。
std::unique_lock
std::unique_lock
同样基于 RAII 机制,但它比 std::lock_guard
更加灵活。std::unique_lock
可以延迟锁定、在运行时锁定和解锁,还可以与条件变量一起使用。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id(int id) {
std::unique_lock<std::mutex> lock(mtx);
while (!ready) cv.wait(lock);
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lock(mtx);
ready = true;
std::cout << "go\n";
cv.notify_all();
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go();
for (auto& th : threads) th.join();
return 0;
}
在上述代码中,std::unique_lock<std::mutex> lock(mtx)
定义了一个 unique_lock
。print_id
函数中的 cv.wait(lock)
表示线程在等待条件变量 cv
被通知,同时会自动释放 mtx
锁,避免死锁。当 go
函数中的 cv.notify_all()
被调用时,等待在 cv
上的所有线程会被唤醒,重新获取 mtx
锁并继续执行。
读写锁(Read - Write Lock)
基本概念
读写锁是一种特殊的同步机制,它区分了读操作和写操作。允许多个线程同时进行读操作,因为读操作不会修改共享数据,不会导致数据竞争。但是,当有一个线程进行写操作时,其他线程无论是读还是写都必须等待,直到写操作完成。
C++ 中的实现
在 C++ 标准库中,<shared_mutex>
头文件提供了读写锁的支持,包括 std::shared_mutex
和 std::shared_timed_mutex
。std::shared_lock
用于获取共享锁(读锁),std::unique_lock
用于获取独占锁(写锁)。
代码示例
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
std::shared_mutex rw_mutex;
int shared_data = 0;
void read(int id) {
std::shared_lock<std::shared_mutex> lock(rw_mutex);
std::cout << "Thread " << id << " reads data: " << shared_data << std::endl;
}
void write(int id, int value) {
std::unique_lock<std::shared_mutex> lock(rw_mutex);
shared_data = value;
std::cout << "Thread " << id << " writes data: " << shared_data << std::endl;
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
if (i % 2 == 0) {
threads.emplace_back(write, i, i * 10);
} else {
threads.emplace_back(read, i);
}
}
for (auto& th : threads) th.join();
return 0;
}
在上述代码中,read
函数使用 std::shared_lock
获取读锁,允许多个线程同时读取 shared_data
。write
函数使用 std::unique_lock
获取写锁,确保在写操作时没有其他线程可以访问 shared_data
。
条件变量(Condition Variable)
基本概念
条件变量是一种线程同步机制,它允许线程等待某个条件满足。一个线程可以在条件变量上等待,而另一个线程可以通知这个条件变量,从而唤醒等待的线程。条件变量通常与互斥锁一起使用。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;
void producer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << std::endl;
lock.unlock();
cv.notify_one();
}
std::unique_lock<std::mutex> lock(mtx);
finished = true;
cv.notify_all();
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return!data_queue.empty() || finished; });
if (data_queue.empty() && finished) break;
int value = data_queue.front();
data_queue.pop();
std::cout << "Consumed: " << value << std::endl;
}
}
int main() {
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();
return 0;
}
在上述代码中,producer
函数将数据放入队列 data_queue
并通知条件变量 cv
。consumer
函数在条件变量 cv
上等待,当队列不为空或者 finished
为 true
时被唤醒。cv.wait(lock, [] { return!data_queue.empty() || finished; });
中的第二个参数是一个谓词(Predicate),它确保只有在满足条件时才会唤醒线程,避免了虚假唤醒(Spurious Wakeup)。
信号量(Semaphore)
基本概念
信号量是一个计数器,它可以用来控制同时访问某个资源的线程数量。当一个线程获取信号量时,计数器减 1;当一个线程释放信号量时,计数器加 1。如果计数器为 0,则获取信号量的线程会被阻塞,直到有其他线程释放信号量。
C++ 中的实现
虽然 C++ 标准库没有直接提供信号量的实现,但可以通过 std::mutex
和 std::condition_variable
来模拟信号量。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
class Semaphore {
public:
Semaphore(unsigned int count = 0) : count(count) {}
void acquire() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return count > 0; });
--count;
}
void release() {
std::unique_lock<std::mutex> lock(mtx);
++count;
cv.notify_one();
}
private:
std::mutex mtx;
std::condition_variable cv;
unsigned int count;
};
Semaphore sem(1);
void critical_section(int id) {
sem.acquire();
std::cout << "Thread " << id << " entered critical section." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread " << id << " left critical section." << std::endl;
sem.release();
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(critical_section, i);
}
for (auto& th : threads) th.join();
return 0;
}
在上述代码中,Semaphore
类模拟了一个信号量。acquire
方法获取信号量,release
方法释放信号量。critical_section
函数在进入临界区前获取信号量,离开临界区时释放信号量,从而控制同时进入临界区的线程数量。
屏障(Barrier)
基本概念
屏障是一种同步机制,它可以让多个线程在某个点上等待,直到所有线程都到达这个点,然后它们再继续执行。这在需要多个线程协同完成某些任务,并且需要在某个阶段进行同步的场景中非常有用。
C++ 中的实现
同样,C++ 标准库没有直接提供屏障的实现,但可以通过 std::mutex
、std::condition_variable
和一个计数器来实现。
代码示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
class Barrier {
public:
Barrier(int count) : count(count), arrived(0) {}
void wait() {
std::unique_lock<std::mutex> lock(mtx);
++arrived;
if (arrived == count) {
arrived = 0;
cv.notify_all();
} else {
cv.wait(lock, [this] { return arrived == 0; });
}
}
private:
std::mutex mtx;
std::condition_variable cv;
int count;
int arrived;
};
Barrier barrier(5);
void thread_function(int id) {
std::cout << "Thread " << id << " is working." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(id));
std::cout << "Thread " << id << " is reaching the barrier." << std::endl;
barrier.wait();
std::cout << "Thread " << id << " passed the barrier." << std::endl;
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(thread_function, i);
}
for (auto& th : threads) th.join();
return 0;
}
在上述代码中,Barrier
类实现了一个屏障。每个线程在执行 barrier.wait()
时会等待,直到所有 count
个线程都调用了 wait
。当所有线程都到达时,屏障被重置,所有线程被唤醒继续执行。
总结
在 C++ 多线程编程中,同步机制是确保程序正确性和性能的关键。从最基本的互斥锁到复杂的条件变量、信号量、屏障等,每种同步机制都有其适用场景。开发者需要根据具体的需求,合理选择和使用这些同步机制,避免数据竞争和死锁等问题,同时优化程序的性能。通过对这些同步机制的深入理解和实践,能够编写出高效、可靠的多线程 C++ 程序。