MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++并发编程与多线程管理

2022-02-232.6k 阅读

C++并发编程基础

1. 线程的基本概念

在现代计算机系统中,线程是程序执行的最小单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。并发编程就是利用多个线程同时执行不同任务,以提高程序的执行效率和响应能力。

在C++ 11之前,C++标准库中并没有对线程提供直接支持,开发者需要依赖操作系统特定的API,如POSIX线程(pthread)或Windows线程。C++ 11引入了<thread>头文件,为线程编程提供了标准的支持,使得C++程序员可以编写跨平台的并发程序。

2. 创建和启动线程

要在C++中创建一个线程,需要包含<thread>头文件,并使用std::thread类。下面是一个简单的示例:

#include <iostream>
#include <thread>

void hello() {
    std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t(hello);
    std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
    t.join();
    return 0;
}

在上述代码中:

  • 首先定义了一个函数hello,它将在新线程中执行。
  • main函数中,使用std::thread t(hello)创建并启动了一个新线程,该线程将执行hello函数。
  • std::this_thread::get_id()用于获取当前线程的标识符。在hello函数中,它输出新线程的ID,在main函数中,输出主线程的ID。
  • t.join()用于等待线程t完成执行。如果不调用join,主线程可能在新线程完成之前就结束,导致程序异常退出。

3. 传递参数给线程函数

std::thread的构造函数可以接受额外的参数,这些参数将被传递给线程函数。例如:

#include <iostream>
#include <thread>

void print_number(int num) {
    std::cout << "Printing number: " << num << " from thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    int number = 42;
    std::thread t(print_number, number);
    t.join();
    return 0;
}

这里,print_number函数接受一个整数参数num。在main函数中,创建线程时将变量number作为参数传递给print_number函数。

4. 分离线程

除了join方法,std::thread还提供了detach方法。detach会使线程在后台运行,与主线程分离,主线程不再等待该线程完成。例如:

#include <iostream>
#include <thread>
#include <chrono>

void background_task() {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Background task completed" << std::endl;
}

int main() {
    std::thread t(background_task);
    t.detach();
    std::cout << "Main thread continues without waiting" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return 0;
}

在这个例子中,background_task函数会睡眠3秒后输出一条消息。主线程创建并分离了这个线程后,继续执行自己的任务,5秒后退出。由于线程已经分离,主线程不会等待background_task完成。

5. 线程局部存储

有时,我们希望每个线程都有自己独立的变量副本,这就需要用到线程局部存储(Thread - Local Storage,TLS)。在C++中,可以使用thread_local关键字来声明线程局部变量。例如:

#include <iostream>
#include <thread>

thread_local int thread_local_variable = 0;

void increment_variable() {
    ++thread_local_variable;
    std::cout << "Thread " << std::this_thread::get_id() << " has value: " << thread_local_variable << std::endl;
}

int main() {
    std::thread t1(increment_variable);
    std::thread t2(increment_variable);

    t1.join();
    t2.join();

    return 0;
}

在上述代码中,thread_local_variable是一个线程局部变量。每个线程对它进行递增操作时,都操作的是自己的副本,不会相互影响。因此,两个线程输出的值都是1。

线程同步

1. 竞争条件与临界区

当多个线程同时访问和修改共享资源时,就可能出现竞争条件(Race Condition)。竞争条件会导致程序产生不可预测的结果。例如:

#include <iostream>
#include <thread>

int shared_variable = 0;

void increment_shared_variable() {
    for (int i = 0; i < 1000000; ++i) {
        ++shared_variable;
    }
}

int main() {
    std::thread t1(increment_shared_variable);
    std::thread t2(increment_shared_variable);

    t1.join();
    t2.join();

    std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
    return 0;
}

在这个例子中,两个线程同时对shared_variable进行递增操作。由于++shared_variable不是原子操作,它实际上包含读取、递增和写入三个步骤,不同线程的这些操作可能交错执行,导致最终结果小于预期的2000000。

临界区(Critical Section)是指程序中访问共享资源的代码段,在同一时间内只允许一个线程进入临界区,以避免竞争条件。

2. 互斥锁(Mutex)

互斥锁(Mutex,即Mutual Exclusion的缩写)是一种最基本的线程同步工具,用于保护临界区。C++标准库提供了std::mutex类。例如:

#include <iostream>
#include <thread>
#include <mutex>

int shared_variable = 0;
std::mutex mtx;

void increment_shared_variable() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock();
        ++shared_variable;
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment_shared_variable);
    std::thread t2(increment_shared_variable);

    t1.join();
    t2.join();

    std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
    return 0;
}

在这个代码中,std::mutex mtx定义了一个互斥锁。在访问shared_variable之前,调用mtx.lock()锁定互斥锁,其他线程就无法进入临界区。操作完成后,调用mtx.unlock()释放互斥锁,允许其他线程进入。这样就避免了竞争条件,最终shared_variable的值为2000000。

3. 锁的RAII封装

手动调用lockunlock容易出错,例如在临界区发生异常时可能忘记调用unlock。C++标准库提供了std::lock_guardstd::unique_lock来自动管理锁的生命周期,基于RAII(Resource Acquisition Is Initialization)原则。

std::lock_guard是一个简单的RAII锁封装,构造时自动锁定互斥锁,析构时自动解锁。例如:

#include <iostream>
#include <thread>
#include <mutex>

int shared_variable = 0;
std::mutex mtx;

void increment_shared_variable() {
    for (int i = 0; i < 1000000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++shared_variable;
    }
}

int main() {
    std::thread t1(increment_shared_variable);
    std::thread t2(increment_shared_variable);

    t1.join();
    t2.join();

    std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
    return 0;
}

std::unique_lock更加灵活,它可以延迟锁定、锁定多个互斥锁、在运行时解锁和重新锁定等。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;

void thread_function() {
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);

    std::lock(lock1, lock2);

    // 临界区
    std::cout << "Both mutexes are locked" << std::endl;

    // 离开作用域时,lock1和lock2会自动解锁
}

int main() {
    std::thread t(thread_function);
    t.join();
    return 0;
}

在这个例子中,std::unique_lock使用std::defer_lock延迟锁定,然后通过std::lock同时锁定mtx1mtx2,避免了死锁。

4. 条件变量(Condition Variable)

条件变量用于线程间的同步,当某个条件满足时,通知等待的线程。C++标准库提供了std::condition_variablestd::condition_variable_anystd::condition_variable只能与std::mutex一起使用,而std::condition_variable_any可以与任何满足特定要求的锁类型一起使用。

下面是一个使用std::condition_variable的生产者 - 消费者模型的示例:

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cond;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        cond.notify_one();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    std::unique_lock<std::mutex> lock(mtx);
    finished = true;
    lock.unlock();
    cond.notify_all();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cond.wait(lock, [] { return!data_queue.empty() || finished; });
        if (data_queue.empty() && finished) {
            break;
        }
        int value = data_queue.front();
        data_queue.pop();
        std::cout << "Consumed: " << value << std::endl;
        lock.unlock();
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

在这个例子中:

  • 生产者线程不断生成数据并放入队列,然后通知消费者线程。
  • 消费者线程使用cond.wait等待条件变量,当队列不为空或生产结束时被唤醒。cond.wait接受一个锁和一个谓词,只有当谓词为真时才会返回。
  • 最后,生产者线程设置finishedtrue并通知所有等待的线程,消费者线程在处理完所有数据后退出。

5. 信号量(Semaphore)

信号量是一种更通用的同步工具,它可以控制同时访问某个资源的线程数量。C++标准库中没有直接提供信号量类,但可以通过std::mutexstd::condition_variable来实现。下面是一个简单的二元信号量实现:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class BinarySemaphore {
public:
    BinarySemaphore(bool initial_state = false) : m_count(initial_state? 1 : 0) {}

    void wait() {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [this] { return m_count > 0; });
        --m_count;
    }

    void signal() {
        std::unique_lock<std::mutex> lock(m_mtx);
        ++m_count;
        m_cond.notify_one();
    }

private:
    int m_count;
    std::mutex m_mtx;
    std::condition_variable m_cond;
};

BinarySemaphore semaphore(false);

void thread_function() {
    std::cout << "Thread waiting for semaphore" << std::endl;
    semaphore.wait();
    std::cout << "Thread got semaphore" << std::endl;
    // 模拟一些工作
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Thread releasing semaphore" << std::endl;
    semaphore.signal();
}

int main() {
    std::thread t(thread_function);
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Main thread signaling semaphore" << std::endl;
    semaphore.signal();
    t.join();
    return 0;
}

在这个代码中,BinarySemaphore类实现了一个二元信号量。wait方法等待信号量可用,signal方法释放信号量。

原子操作

1. 原子类型与原子操作的概念

原子操作是指不会被线程调度机制打断的操作,要么全部执行,要么完全不执行。在C++中,<atomic>头文件提供了一系列原子类型和原子操作,用于避免竞争条件,特别是对于简单数据类型的操作。

原子类型是一种特殊的数据类型,对其操作是原子的。例如std::atomic<int>,对std::atomic<int>类型的变量进行读取、写入、递增等操作都是原子的,不需要额外的锁。

2. 原子类型的使用

#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> shared_variable(0);

void increment_shared_variable() {
    for (int i = 0; i < 1000000; ++i) {
        ++shared_variable;
    }
}

int main() {
    std::thread t1(increment_shared_variable);
    std::thread t2(increment_shared_variable);

    t1.join();
    t2.join();

    std::cout << "Expected value: 2000000, Actual value: " << shared_variable << std::endl;
    return 0;
}

在这个例子中,shared_variablestd::atomic<int>类型。对它的递增操作是原子的,因此不需要使用互斥锁,两个线程同时递增也能得到正确的结果。

3. 原子操作的内存模型

原子操作不仅保证操作的原子性,还涉及内存模型的问题。C++提供了不同的内存序(Memory Order)来控制原子操作的可见性和顺序。常见的内存序有:

  • std::memory_order_relaxed:最宽松的内存序,只保证原子操作本身的原子性,不保证任何内存可见性和顺序。
  • std::memory_order_releasestd::memory_order_acquirestd::memory_order_release用于写操作,确保在释放前的所有写操作对其他线程可见。std::memory_order_acquire用于读操作,确保在获取后的所有读操作都能看到之前的写操作。
  • std::memory_order_seq_cst:顺序一致性内存序,是最严格的内存序,保证所有线程看到的原子操作顺序是一致的。

例如,使用std::memory_order_releasestd::memory_order_acquire

#include <iostream>
#include <thread>
#include <atomic>

std::atomic<bool> ready(false);
std::atomic<int> data(0);

void producer() {
    data.store(42, std::memory_order_release);
    ready.store(true, std::memory_order_release);
}

void consumer() {
    while (!ready.load(std::memory_order_acquire));
    std::cout << "Data: " << data.load(std::memory_order_acquire) << std::endl;
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

在这个例子中,生产者线程先存储数据,然后设置readytrue,使用std::memory_order_release。消费者线程在readytrue时读取数据,使用std::memory_order_acquire,确保能看到生产者线程存储的数据。

多线程管理的高级话题

1. 线程池

线程池是一种多线程处理模式,它预先创建一组线程,并将任务分配给这些线程执行。这样可以避免频繁创建和销毁线程的开销,提高程序性能。下面是一个简单的线程池实现:

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] {
                            return this->stop ||!this->tasks.empty();
                        });
                        if (this->stop && this->tasks.empty()) {
                            return;
                        }
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &thread : threads) {
            thread.join();
        }
    }

    template<class F, class... Args>
    auto enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};

// 示例任务
int factorial(int n) {
    int result = 1;
    for (int i = 1; i <= n; ++i) {
        result *= i;
    }
    return result;
}

int main() {
    ThreadPool pool(4);
    std::vector<std::future<int>> results;
    for (int i = 1; i <= 8; ++i) {
        results.emplace_back(pool.enqueue(factorial, i));
    }
    for (auto &result : results) {
        std::cout << result.get() << std::endl;
    }
    return 0;
}

在这个线程池实现中:

  • 构造函数创建指定数量的线程,这些线程在后台等待任务。
  • enqueue方法将任务添加到任务队列,并通知一个等待的线程。
  • 析构函数停止线程池,等待所有线程完成任务。

2. 死锁检测与避免

死锁是指两个或多个线程相互等待对方释放资源,导致程序无法继续执行的情况。避免死锁的方法有:

  • 破坏死锁的四个必要条件
    • 互斥条件:尽量减少对资源的独占使用,例如使用读写锁代替互斥锁,允许多个线程同时读。
    • 占有并等待条件:一次性获取所有需要的资源,而不是先获取部分资源再等待其他资源。
    • 不可剥夺条件:允许资源被剥夺,例如当一个线程等待资源超时,可以释放它已占有的资源。
    • 循环等待条件:对资源进行排序,线程按照顺序获取资源,避免循环等待。
  • 使用死锁检测算法:在程序运行时检测死锁的发生,并采取相应的措施,如终止其中一个线程。虽然C++标准库没有直接提供死锁检测功能,但可以使用一些第三方工具,如helgrind(Valgrind工具集的一部分)来检测死锁。

3. 并发数据结构

为了提高并发编程的效率,专门设计了一些并发数据结构,如无锁队列、无锁栈等。这些数据结构使用原子操作和更复杂的算法来实现线程安全,避免了锁的开销。

例如,无锁队列的一种简单实现:

#include <iostream>
#include <memory>
#include <atomic>

template<typename T>
class LockFreeQueue {
private:
    struct Node {
        T data;
        std::unique_ptr<Node> next;
        Node(const T &value) : data(value), next(nullptr) {}
    };

    std::atomic<Node*> head;
    std::atomic<Node*> tail;

public:
    LockFreeQueue() {
        auto sentinel = std::make_unique<Node>(T());
        head.store(sentinel.get());
        tail.store(sentinel.get());
    }

    ~LockFreeQueue() {
        while (head.load() != nullptr) {
            auto old_head = head.load();
            head.store(old_head->next.get());
        }
    }

    bool push(const T &value) {
        auto new_node = std::make_unique<Node>(value);
        Node *old_tail = tail.load();
        while (true) {
            Node *next = old_tail->next.load();
            if (next == nullptr) {
                if (tail.compare_exchange_weak(old_tail, new_node.get())) {
                    old_tail->next = std::move(new_node);
                    return true;
                }
            } else {
                tail.compare_exchange_weak(old_tail, next);
            }
        }
    }

    bool pop(T &value) {
        Node *old_head = head.load();
        while (true) {
            Node *tail_node = tail.load();
            Node *next = old_head->next.load();
            if (old_head == tail_node) {
                if (next == nullptr) {
                    return false;
                }
                tail.compare_exchange_weak(tail_node, next);
            } else {
                value = next->data;
                if (head.compare_exchange_weak(old_head, next)) {
                    return true;
                }
            }
        }
    }
};

int main() {
    LockFreeQueue<int> queue;
    queue.push(10);
    int value;
    if (queue.pop(value)) {
        std::cout << "Popped value: " << value << std::endl;
    }
    return 0;
}

在这个无锁队列实现中,使用std::atomiccompare_exchange_weak来实现线程安全的入队和出队操作,避免了使用锁带来的开销。

4. 并发编程的性能优化

  • 减少锁的粒度:尽量缩小临界区的范围,只在必要时锁定资源,例如将大的临界区分成多个小的临界区。
  • 使用无锁数据结构:对于高并发场景,无锁数据结构可以提高性能,因为它们避免了锁竞争。
  • 合理分配任务:根据任务的性质和CPU核心数,合理分配任务到不同线程,避免线程之间的负载不均衡。
  • 缓存友好:注意数据的访问模式,尽量让线程访问的数据在CPU缓存中,减少内存访问的开销。例如,将经常访问的数据结构设计得更紧凑,减少缓存行的争用。