C++并发编程与多线程管理
C++并发编程基础
1. 线程的基本概念
在现代计算机系统中,线程是程序执行的最小单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。并发编程就是利用多个线程同时执行不同任务,以提高程序的执行效率和响应能力。
在C++ 11之前,C++标准库中并没有对线程提供直接支持,开发者需要依赖操作系统特定的API,如POSIX线程(pthread)或Windows线程。C++ 11引入了<thread>
头文件,为线程编程提供了标准的支持,使得C++程序员可以编写跨平台的并发程序。
2. 创建和启动线程
要在C++中创建一个线程,需要包含<thread>
头文件,并使用std::thread
类。下面是一个简单的示例:
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}
int main() {
std::thread t(hello);
std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
t.join();
return 0;
}
在上述代码中:
- 首先定义了一个函数
hello
,它将在新线程中执行。 - 在
main
函数中,使用std::thread t(hello)
创建并启动了一个新线程,该线程将执行hello
函数。 std::this_thread::get_id()
用于获取当前线程的标识符。在hello
函数中,它输出新线程的ID,在main
函数中,输出主线程的ID。t.join()
用于等待线程t
完成执行。如果不调用join
,主线程可能在新线程完成之前就结束,导致程序异常退出。
3. 传递参数给线程函数
std::thread
的构造函数可以接受额外的参数,这些参数将被传递给线程函数。例如:
#include <iostream>
#include <thread>
void print_number(int num) {
std::cout << "Printing number: " << num << " from thread " << std::this_thread::get_id() << std::endl;
}
int main() {
int number = 42;
std::thread t(print_number, number);
t.join();
return 0;
}
这里,print_number
函数接受一个整数参数num
。在main
函数中,创建线程时将变量number
作为参数传递给print_number
函数。
4. 分离线程
除了join
方法,std::thread
还提供了detach
方法。detach
会使线程在后台运行,与主线程分离,主线程不再等待该线程完成。例如:
#include <iostream>
#include <thread>
#include <chrono>
void background_task() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Background task completed" << std::endl;
}
int main() {
std::thread t(background_task);
t.detach();
std::cout << "Main thread continues without waiting" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
return 0;
}
在这个例子中,background_task
函数会睡眠3秒后输出一条消息。主线程创建并分离了这个线程后,继续执行自己的任务,5秒后退出。由于线程已经分离,主线程不会等待background_task
完成。
5. 线程局部存储
有时,我们希望每个线程都有自己独立的变量副本,这就需要用到线程局部存储(Thread - Local Storage,TLS)。在C++中,可以使用thread_local
关键字来声明线程局部变量。例如:
#include <iostream>
#include <thread>
thread_local int thread_local_variable = 0;
void increment_variable() {
++thread_local_variable;
std::cout << "Thread " << std::this_thread::get_id() << " has value: " << thread_local_variable << std::endl;
}
int main() {
std::thread t1(increment_variable);
std::thread t2(increment_variable);
t1.join();
t2.join();
return 0;
}
在上述代码中,thread_local_variable
是一个线程局部变量。每个线程对它进行递增操作时,都操作的是自己的副本,不会相互影响。因此,两个线程输出的值都是1。
线程同步
1. 竞争条件与临界区
当多个线程同时访问和修改共享资源时,就可能出现竞争条件(Race Condition)。竞争条件会导致程序产生不可预测的结果。例如:
#include <iostream>
#include <thread>
int shared_variable = 0;
void increment_shared_variable() {
for (int i = 0; i < 1000000; ++i) {
++shared_variable;
}
}
int main() {
std::thread t1(increment_shared_variable);
std::thread t2(increment_shared_variable);
t1.join();
t2.join();
std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
return 0;
}
在这个例子中,两个线程同时对shared_variable
进行递增操作。由于++shared_variable
不是原子操作,它实际上包含读取、递增和写入三个步骤,不同线程的这些操作可能交错执行,导致最终结果小于预期的2000000。
临界区(Critical Section)是指程序中访问共享资源的代码段,在同一时间内只允许一个线程进入临界区,以避免竞争条件。
2. 互斥锁(Mutex)
互斥锁(Mutex,即Mutual Exclusion的缩写)是一种最基本的线程同步工具,用于保护临界区。C++标准库提供了std::mutex
类。例如:
#include <iostream>
#include <thread>
#include <mutex>
int shared_variable = 0;
std::mutex mtx;
void increment_shared_variable() {
for (int i = 0; i < 1000000; ++i) {
mtx.lock();
++shared_variable;
mtx.unlock();
}
}
int main() {
std::thread t1(increment_shared_variable);
std::thread t2(increment_shared_variable);
t1.join();
t2.join();
std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
return 0;
}
在这个代码中,std::mutex mtx
定义了一个互斥锁。在访问shared_variable
之前,调用mtx.lock()
锁定互斥锁,其他线程就无法进入临界区。操作完成后,调用mtx.unlock()
释放互斥锁,允许其他线程进入。这样就避免了竞争条件,最终shared_variable
的值为2000000。
3. 锁的RAII封装
手动调用lock
和unlock
容易出错,例如在临界区发生异常时可能忘记调用unlock
。C++标准库提供了std::lock_guard
和std::unique_lock
来自动管理锁的生命周期,基于RAII(Resource Acquisition Is Initialization)原则。
std::lock_guard
是一个简单的RAII锁封装,构造时自动锁定互斥锁,析构时自动解锁。例如:
#include <iostream>
#include <thread>
#include <mutex>
int shared_variable = 0;
std::mutex mtx;
void increment_shared_variable() {
for (int i = 0; i < 1000000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++shared_variable;
}
}
int main() {
std::thread t1(increment_shared_variable);
std::thread t2(increment_shared_variable);
t1.join();
t2.join();
std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
return 0;
}
std::unique_lock
更加灵活,它可以延迟锁定、锁定多个互斥锁、在运行时解锁和重新锁定等。例如:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
void thread_function() {
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
std::lock(lock1, lock2);
// 临界区
std::cout << "Both mutexes are locked" << std::endl;
// 离开作用域时,lock1和lock2会自动解锁
}
int main() {
std::thread t(thread_function);
t.join();
return 0;
}
在这个例子中,std::unique_lock
使用std::defer_lock
延迟锁定,然后通过std::lock
同时锁定mtx1
和mtx2
,避免了死锁。
4. 条件变量(Condition Variable)
条件变量用于线程间的同步,当某个条件满足时,通知等待的线程。C++标准库提供了std::condition_variable
和std::condition_variable_any
。std::condition_variable
只能与std::mutex
一起使用,而std::condition_variable_any
可以与任何满足特定要求的锁类型一起使用。
下面是一个使用std::condition_variable
的生产者 - 消费者模型的示例:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cond;
bool finished = false;
void producer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << std::endl;
lock.unlock();
cond.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::unique_lock<std::mutex> lock(mtx);
finished = true;
lock.unlock();
cond.notify_all();
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cond.wait(lock, [] { return!data_queue.empty() || finished; });
if (data_queue.empty() && finished) {
break;
}
int value = data_queue.front();
data_queue.pop();
std::cout << "Consumed: " << value << std::endl;
lock.unlock();
}
}
int main() {
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();
return 0;
}
在这个例子中:
- 生产者线程不断生成数据并放入队列,然后通知消费者线程。
- 消费者线程使用
cond.wait
等待条件变量,当队列不为空或生产结束时被唤醒。cond.wait
接受一个锁和一个谓词,只有当谓词为真时才会返回。 - 最后,生产者线程设置
finished
为true
并通知所有等待的线程,消费者线程在处理完所有数据后退出。
5. 信号量(Semaphore)
信号量是一种更通用的同步工具,它可以控制同时访问某个资源的线程数量。C++标准库中没有直接提供信号量类,但可以通过std::mutex
和std::condition_variable
来实现。下面是一个简单的二元信号量实现:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
class BinarySemaphore {
public:
BinarySemaphore(bool initial_state = false) : m_count(initial_state? 1 : 0) {}
void wait() {
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [this] { return m_count > 0; });
--m_count;
}
void signal() {
std::unique_lock<std::mutex> lock(m_mtx);
++m_count;
m_cond.notify_one();
}
private:
int m_count;
std::mutex m_mtx;
std::condition_variable m_cond;
};
BinarySemaphore semaphore(false);
void thread_function() {
std::cout << "Thread waiting for semaphore" << std::endl;
semaphore.wait();
std::cout << "Thread got semaphore" << std::endl;
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Thread releasing semaphore" << std::endl;
semaphore.signal();
}
int main() {
std::thread t(thread_function);
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Main thread signaling semaphore" << std::endl;
semaphore.signal();
t.join();
return 0;
}
在这个代码中,BinarySemaphore
类实现了一个二元信号量。wait
方法等待信号量可用,signal
方法释放信号量。
原子操作
1. 原子类型与原子操作的概念
原子操作是指不会被线程调度机制打断的操作,要么全部执行,要么完全不执行。在C++中,<atomic>
头文件提供了一系列原子类型和原子操作,用于避免竞争条件,特别是对于简单数据类型的操作。
原子类型是一种特殊的数据类型,对其操作是原子的。例如std::atomic<int>
,对std::atomic<int>
类型的变量进行读取、写入、递增等操作都是原子的,不需要额外的锁。
2. 原子类型的使用
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> shared_variable(0);
void increment_shared_variable() {
for (int i = 0; i < 1000000; ++i) {
++shared_variable;
}
}
int main() {
std::thread t1(increment_shared_variable);
std::thread t2(increment_shared_variable);
t1.join();
t2.join();
std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
return 0;
}
在这个例子中,shared_variable
是std::atomic<int>
类型。对它的递增操作是原子的,因此不需要使用互斥锁,两个线程同时递增也能得到正确的结果。
3. 原子操作的内存模型
原子操作不仅保证操作的原子性,还涉及内存模型的问题。C++提供了不同的内存序(Memory Order)来控制原子操作的可见性和顺序。常见的内存序有:
std::memory_order_relaxed
:最宽松的内存序,只保证原子操作本身的原子性,不保证任何内存可见性和顺序。std::memory_order_release
和std::memory_order_acquire
:std::memory_order_release
用于写操作,确保在释放前的所有写操作对其他线程可见。std::memory_order_acquire
用于读操作,确保在获取后的所有读操作都能看到之前的写操作。std::memory_order_seq_cst
:顺序一致性内存序,是最严格的内存序,保证所有线程看到的原子操作顺序是一致的。
例如,使用std::memory_order_release
和std::memory_order_acquire
:
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<bool> ready(false);
std::atomic<int> data(0);
void producer() {
data.store(42, std::memory_order_release);
ready.store(true, std::memory_order_release);
}
void consumer() {
while (!ready.load(std::memory_order_acquire));
std::cout << "Data: " << data.load(std::memory_order_acquire) << std::endl;
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
在这个例子中,生产者线程先存储数据,然后设置ready
为true
,使用std::memory_order_release
。消费者线程在ready
为true
时读取数据,使用std::memory_order_acquire
,确保能看到生产者线程存储的数据。
多线程管理的高级话题
1. 线程池
线程池是一种多线程处理模式,它预先创建一组线程,并将任务分配给这些线程执行。这样可以避免频繁创建和销毁线程的开销,提高程序性能。下面是一个简单的线程池实现:
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop ||!this->tasks.empty();
});
if (this->stop && this->tasks.empty()) {
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &thread : threads) {
thread.join();
}
}
template<class F, class... Args>
auto enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
};
// 示例任务
int factorial(int n) {
int result = 1;
for (int i = 1; i <= n; ++i) {
result *= i;
}
return result;
}
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 1; i <= 8; ++i) {
results.emplace_back(pool.enqueue(factorial, i));
}
for (auto &result : results) {
std::cout << result.get() << std::endl;
}
return 0;
}
在这个线程池实现中:
- 构造函数创建指定数量的线程,这些线程在后台等待任务。
enqueue
方法将任务添加到任务队列,并通知一个等待的线程。- 析构函数停止线程池,等待所有线程完成任务。
2. 死锁检测与避免
死锁是指两个或多个线程相互等待对方释放资源,导致程序无法继续执行的情况。避免死锁的方法有:
- 破坏死锁的四个必要条件:
- 互斥条件:尽量减少对资源的独占使用,例如使用读写锁代替互斥锁,允许多个线程同时读。
- 占有并等待条件:一次性获取所有需要的资源,而不是先获取部分资源再等待其他资源。
- 不可剥夺条件:允许资源被剥夺,例如当一个线程等待资源超时,可以释放它已占有的资源。
- 循环等待条件:对资源进行排序,线程按照顺序获取资源,避免循环等待。
- 使用死锁检测算法:在程序运行时检测死锁的发生,并采取相应的措施,如终止其中一个线程。虽然C++标准库没有直接提供死锁检测功能,但可以使用一些第三方工具,如
helgrind
(Valgrind工具集的一部分)来检测死锁。
3. 并发数据结构
为了提高并发编程的效率,专门设计了一些并发数据结构,如无锁队列、无锁栈等。这些数据结构使用原子操作和更复杂的算法来实现线程安全,避免了锁的开销。
例如,无锁队列的一种简单实现:
#include <iostream>
#include <memory>
#include <atomic>
template<typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::unique_ptr<Node> next;
Node(const T &value) : data(value), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
auto sentinel = std::make_unique<Node>(T());
head.store(sentinel.get());
tail.store(sentinel.get());
}
~LockFreeQueue() {
while (head.load() != nullptr) {
auto old_head = head.load();
head.store(old_head->next.get());
}
}
bool push(const T &value) {
auto new_node = std::make_unique<Node>(value);
Node *old_tail = tail.load();
while (true) {
Node *next = old_tail->next.load();
if (next == nullptr) {
if (tail.compare_exchange_weak(old_tail, new_node.get())) {
old_tail->next = std::move(new_node);
return true;
}
} else {
tail.compare_exchange_weak(old_tail, next);
}
}
}
bool pop(T &value) {
Node *old_head = head.load();
while (true) {
Node *tail_node = tail.load();
Node *next = old_head->next.load();
if (old_head == tail_node) {
if (next == nullptr) {
return false;
}
tail.compare_exchange_weak(tail_node, next);
} else {
value = next->data;
if (head.compare_exchange_weak(old_head, next)) {
return true;
}
}
}
}
};
int main() {
LockFreeQueue<int> queue;
queue.push(10);
int value;
if (queue.pop(value)) {
std::cout << "Popped value: " << value << std::endl;
}
return 0;
}
在这个无锁队列实现中,使用std::atomic
和compare_exchange_weak
来实现线程安全的入队和出队操作,避免了使用锁带来的开销。
4. 并发编程的性能优化
- 减少锁的粒度:尽量缩小临界区的范围,只在必要时锁定资源,例如将大的临界区分成多个小的临界区。
- 使用无锁数据结构:对于高并发场景,无锁数据结构可以提高性能,因为它们避免了锁竞争。
- 合理分配任务:根据任务的性质和CPU核心数,合理分配任务到不同线程,避免线程之间的负载不均衡。
- 缓存友好:注意数据的访问模式,尽量让线程访问的数据在CPU缓存中,减少内存访问的开销。例如,将经常访问的数据结构设计得更紧凑,减少缓存行的争用。