MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++ 多线程编程实践指南

2023-04-293.8k 阅读

C++ 多线程编程基础

多线程概念

在现代计算机编程中,多线程编程是一项关键技术。传统的单线程程序按照顺序依次执行代码,在执行一个任务时,其他任务必须等待。而多线程允许程序同时执行多个任务,这些任务可以共享程序的资源,如内存空间、文件描述符等。

多线程带来了诸多优势。例如,在一个图形化应用程序中,主线程负责处理用户界面的更新,而另一个线程可以在后台执行数据的加载或计算任务,这样用户不会因为长时间的等待而觉得程序卡顿。同时,多线程在多核处理器上能够充分利用硬件资源,提高程序的运行效率。

然而,多线程编程也引入了一些挑战。由于多个线程共享资源,可能会出现资源竞争的情况,比如两个线程同时尝试修改同一个变量的值,这可能导致数据不一致或程序崩溃。因此,在多线程编程中,必须谨慎处理资源共享和线程同步的问题。

C++ 多线程库

C++11 标准引入了 <thread> 头文件,为 C++ 语言提供了原生的多线程支持。这个库使得在 C++ 中编写多线程程序变得更加容易和直观。

  1. 创建线程 使用 std::thread 类来创建一个新线程。下面是一个简单的示例,展示如何创建一个线程并让它执行一个函数:
#include <iostream>
#include <thread>

void hello() {
    std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t(hello);
    std::cout << "Main thread " << std::this_thread::get_id() << " is running" << std::endl;
    t.join();
    return 0;
}

在上述代码中,std::thread t(hello); 创建了一个新线程 t,它将执行 hello 函数。std::this_thread::get_id() 函数用于获取当前线程的唯一标识符。t.join() 语句用于等待线程 t 执行完毕,确保主线程不会在子线程之前退出。

  1. 传递参数 可以向线程函数传递参数。例如:
#include <iostream>
#include <thread>

void print_number(int num) {
    std::cout << "Thread " << std::this_thread::get_id() << " prints " << num << std::endl;
}

int main() {
    int number = 42;
    std::thread t(print_number, number);
    t.join();
    return 0;
}

这里,print_number 函数接受一个整数参数 num,在创建线程时,将变量 number 作为参数传递给了该函数。

  1. 线程的分离 除了 join 之外,还可以使用 detach 方法将线程分离。分离后的线程将在后台独立运行,主线程不再等待它结束。例如:
#include <iostream>
#include <thread>
#include <chrono>

void background_task() {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Background task finished" << std::endl;
}

int main() {
    std::thread t(background_task);
    t.detach();
    std::cout << "Main thread continues without waiting" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return 0;
}

在这个例子中,t.detach() 将线程 t 分离,主线程继续执行自己的任务,而分离的线程 t 在后台休眠 3 秒后输出信息。

线程同步

互斥锁

  1. 互斥锁的概念 当多个线程访问共享资源时,为了避免数据竞争,需要使用同步机制。互斥锁(Mutex,即 Mutual Exclusion 的缩写)是最基本的同步工具之一。它就像一把锁,一次只允许一个线程进入临界区(访问共享资源的代码段)。

  2. 使用 std::mutex 在 C++ 中,std::mutex 类提供了互斥锁的功能。以下是一个简单的示例,展示了如何使用互斥锁来保护共享资源:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock();
        ++shared_variable;
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

在上述代码中,std::mutex mtx; 定义了一个互斥锁 mtx。在 increment 函数中,mtx.lock() 用于锁定互斥锁,这样其他线程就无法进入临界区,直到 mtx.unlock() 解锁。如果不使用互斥锁,两个线程同时访问和修改 shared_variable,可能会导致最终结果不正确。

锁的 RAII 封装

  1. RAII 概念 资源获取即初始化(RAII,Resource Acquisition Is Initialization)是 C++ 中一种重要的编程技巧,用于自动管理资源的生命周期。在多线程编程中,使用 RAII 来管理锁可以避免手动锁定和解锁可能出现的错误,如忘记解锁导致死锁。

  2. std::lock_guardstd::unique_lock

    • std::lock_guard:它是一个基于 RAII 的互斥锁包装器。当 std::lock_guard 对象被创建时,它会自动锁定关联的互斥锁,当对象被销毁时(例如函数结束),它会自动解锁互斥锁。以下是使用 std::lock_guard 重写上述示例的代码:
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++shared_variable;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}
  • std::unique_lockstd::unique_lockstd::lock_guard 更灵活。它同样基于 RAII 管理锁,但提供了更多的功能,如延迟锁定、解锁和重新锁定等。例如:
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    // 延迟锁定,此时互斥锁未锁定
    for (int i = 0; i < 1000000; ++i) {
        lock.lock();
        ++shared_variable;
        lock.unlock();
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

在这个例子中,std::unique_lock<std::mutex> lock(mtx, std::defer_lock); 创建了一个 std::unique_lock 对象,但并没有立即锁定互斥锁,而是在需要时通过 lock.lock() 手动锁定,之后可以通过 lock.unlock() 手动解锁。

条件变量

  1. 条件变量的作用 条件变量(std::condition_variable)用于线程间的同步,它允许线程等待某个条件满足后再继续执行。这在多个线程需要协作完成任务时非常有用。

  2. 使用 std::condition_variable 下面是一个生产者 - 消费者模型的示例,展示了条件变量的使用:

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

std::mutex mtx;
std::queue<int> data_queue;
std::condition_variable data_cond;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        data_cond.notify_one();
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        data_cond.wait(lock, [] { return!data_queue.empty(); });
        int value = data_queue.front();
        data_queue.pop();
        std::cout << "Consumed: " << value << std::endl;
        if (value == 9) break;
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

在这个示例中,生产者线程将数据放入队列 data_queue 中,并通过 data_cond.notify_one() 通知等待的消费者线程。消费者线程在 data_cond.wait(lock, [] { return!data_queue.empty(); }); 处等待,直到条件 !data_queue.empty() 满足(即队列中有数据),才会继续执行并从队列中取出数据。

原子操作

原子类型

  1. 原子类型的定义 原子类型是 C++ 提供的一种特殊类型,其操作不会被线程调度机制打断。也就是说,对原子类型的操作是不可分割的,要么全部完成,要么根本没有开始。这避免了多线程环境下的数据竞争问题,不需要额外的锁机制。

  2. 使用原子类型 C++ 提供了 <atomic> 头文件来支持原子类型。例如,std::atomic<int> 是一个原子整数类型。以下是一个使用原子类型的简单示例:

#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> atomic_variable(0);

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        ++atomic_variable;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of atomic variable: " << atomic_variable << std::endl;
    return 0;
}

在这个例子中,std::atomic<int> atomic_variable(0); 定义了一个原子整数变量 atomic_variable。对 atomic_variable 的自增操作 ++atomic_variable; 是原子的,即使在多线程环境下也不会出现数据竞争问题,无需使用互斥锁。

原子操作的内存模型

  1. 内存模型的概念 在多线程编程中,内存模型描述了线程如何与共享内存进行交互。不同的内存模型会影响原子操作的行为和可见性。

  2. C++ 的内存模型 C++ 提供了几种内存顺序选项,如 std::memory_order_seq_cst(顺序一致性)、std::memory_order_relaxed(宽松内存顺序)等。

    • std::memory_order_seq_cst:这是默认的内存顺序,它保证所有线程看到的所有原子操作都具有相同的顺序,这提供了最强的同步保证,但也可能带来一定的性能开销。例如:
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<bool> ready(false);
std::atomic<int> data(0);

void producer() {
    data.store(42, std::memory_order_seq_cst);
    ready.store(true, std::memory_order_seq_cst);
}

void consumer() {
    while (!ready.load(std::memory_order_seq_cst));
    std::cout << "Data: " << data.load(std::memory_order_seq_cst) << std::endl;
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

在这个示例中,std::memory_order_seq_cst 确保了 producer 线程先存储数据到 data,然后设置 readytrueconsumer 线程在 readytrue 后读取 data,保证了操作的顺序性和可见性。

  • std::memory_order_relaxed:宽松内存顺序允许编译器和处理器进行更多的优化,但同步保证较弱。例如:
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> counter(0);

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        counter.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);

    t1.join();
    t2.join();

    std::cout << "Final value of counter: " << counter << std::endl;
    return 0;
}

这里 std::memory_order_relaxed 用于 fetch_add 操作,它只保证 counter 的自增操作是原子的,但不保证其他线程对 counter 操作的顺序和可见性。在一些情况下,这种宽松的内存顺序可能会提高性能,但需要谨慎使用,因为可能会导致难以调试的问题。

线程池

线程池的概念

线程池是一种多线程编程中的设计模式,它维护着一组预先创建的线程,这些线程可以重复使用来执行提交的任务。线程池避免了频繁创建和销毁线程带来的开销,提高了程序的性能和资源利用率。

实现一个简单的线程池

以下是一个简单的线程池实现示例:

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] {
                            return this->stop ||!this->tasks.empty();
                        });
                        if (this->stop && this->tasks.empty()) return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &thread : threads) {
            thread.join();
        }
    }

    template<class F, class... Args>
    auto enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};

// 示例任务函数
int factorial(int n) {
    int result = 1;
    for (int i = 1; i <= n; ++i) {
        result *= i;
    }
    return result;
}

int main() {
    ThreadPool pool(4);
    std::vector<std::future<int>> results;
    for (int i = 1; i <= 8; ++i) {
        results.emplace_back(pool.enqueue(factorial, i));
    }
    for (auto &result : results) {
        std::cout << result.get() << std::endl;
    }
    return 0;
}

在这个线程池实现中:

  1. 构造函数:创建指定数量的线程,这些线程在后台等待任务。
  2. 析构函数:停止线程池并等待所有线程完成任务。
  3. enqueue 函数:将任务添加到任务队列中,并通知一个等待的线程来执行该任务。
  4. 示例任务函数 factorial:计算阶乘。在 main 函数中,将多个计算阶乘的任务提交到线程池,然后获取并输出结果。

多线程编程中的性能调优

线程数量的优化

  1. 合适线程数量的重要性 在多线程编程中,线程数量的选择对性能有很大影响。如果线程数量过少,可能无法充分利用多核处理器的资源;而线程数量过多,则会增加线程上下文切换的开销,降低性能。

  2. 确定合适线程数量的方法 可以根据处理器核心数来大致确定线程数量。例如,在一个具有 N 个核心的处理器上,可以创建 NN + 1 个线程。对于 CPU 密集型任务,线程数量接近核心数通常是比较合适的;对于 I/O 密集型任务,由于线程在等待 I/O 操作时会释放 CPU 资源,可以适当增加线程数量。

减少锁的竞争

  1. 锁竞争的影响 锁竞争是多线程性能的一个重要瓶颈。当多个线程频繁地竞争同一个锁时,会导致线程等待,降低整体的执行效率。

  2. 减少锁竞争的策略

    • 缩小临界区:尽量减少在锁保护下的代码段长度,只将必须保护的共享资源访问部分放在临界区内。
    • 使用细粒度锁:将大的共享资源分割成多个小的部分,每个部分使用单独的锁。这样不同线程可以同时访问不同部分的资源,减少锁竞争。例如,在一个存储大量数据的容器中,可以为每个数据块使用一个单独的锁。

缓存友好性

  1. 缓存对性能的影响 现代处理器都有高速缓存(Cache),如果多线程程序能够充分利用缓存,将大大提高性能。当线程访问的数据在缓存中时,访问速度会非常快;而如果数据不在缓存中,需要从主内存中读取,速度会慢很多。

  2. 提高缓存友好性的方法

    • 数据局部性:尽量让线程访问的数据在内存中连续存储,这样可以提高缓存命中率。例如,在遍历数组时,顺序访问比随机访问更有利于缓存利用。
    • 线程亲和性:某些操作系统允许将线程绑定到特定的处理器核心上,这样线程可以在一段时间内持续使用该核心的缓存,提高缓存的利用率。在 Linux 系统中,可以使用 sched_setaffinity 函数来设置线程亲和性。

通过以上对多线程编程基础、线程同步、原子操作、线程池以及性能调优的介绍,希望读者能够对 C++ 多线程编程有更深入的理解和实践能力,编写出高效、稳定的多线程程序。在实际应用中,还需要根据具体的需求和场景,灵活运用这些技术和方法。