MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++多线程编程技术与实例

2021-10-187.1k 阅读

C++ 多线程编程基础概念

在现代计算机系统中,多核处理器已成为主流,这使得程序能够同时执行多个任务,提高整体性能。多线程编程就是利用这种多核特性的有效手段。在 C++ 中,多线程编程是通过 <thread> 头文件来实现的。

线程的创建与启动

创建一个线程很简单,只需要实例化一个 std::thread 对象,并传入一个可调用对象(函数、函数对象或 lambda 表达式)作为线程执行的任务。以下是一个简单的示例:

#include <iostream>
#include <thread>

void thread_function() {
    std::cout << "This is a thread function." << std::endl;
}

int main() {
    std::thread my_thread(thread_function);
    my_thread.join();
    return 0;
}

在上述代码中,我们定义了一个 thread_function 函数,然后在 main 函数中创建了一个 std::thread 对象 my_thread,并将 thread_function 作为参数传递给它,这就启动了一个新的线程。my_thread.join() 语句的作用是等待 my_thread 线程执行完毕,确保主线程不会在子线程之前结束。

传递参数给线程函数

线程函数可以接受参数,这样我们就能让线程执行更灵活的任务。例如:

#include <iostream>
#include <thread>

void print_number(int num) {
    std::cout << "The number is: " << num << std::endl;
}

int main() {
    int number = 42;
    std::thread my_thread(print_number, number);
    my_thread.join();
    return 0;
}

在这个例子中,print_number 函数接受一个 int 类型的参数 num,我们在创建线程时将变量 number 传递给了这个函数。

线程同步

当多个线程同时访问共享资源时,可能会出现数据竞争的问题,导致程序出现未定义行为。为了解决这个问题,我们需要使用线程同步机制。

互斥锁(Mutex)

互斥锁(std::mutex)是一种最基本的线程同步工具,它用于保证在同一时间只有一个线程能够访问共享资源。以下是一个简单的示例,展示如何使用互斥锁来保护共享变量:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock();
        ++shared_variable;
        mtx.unlock();
    }
}

int main() {
    std::thread thread1(increment);
    std::thread thread2(increment);

    thread1.join();
    thread2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

在上述代码中,我们定义了一个 std::mutex 对象 mtx,并在 increment 函数中,在访问 shared_variable 之前调用 mtx.lock() 锁定互斥锁,访问结束后调用 mtx.unlock() 解锁互斥锁。这样就保证了在同一时间只有一个线程能够修改 shared_variable,避免了数据竞争。

锁的自动管理(std::lock_guard 和 std::unique_lock)

手动调用 lockunlock 函数容易出错,特别是在函数中有多个返回点的情况下。C++ 提供了 std::lock_guardstd::unique_lock 来自动管理锁的生命周期。

std::lock_guard 是一个简单的 RAII(Resource Acquisition Is Initialization) 类,它在构造时锁定互斥锁,在析构时解锁互斥锁。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++shared_variable;
    }
}

int main() {
    std::thread thread1(increment);
    std::thread thread2(increment);

    thread1.join();
    thread2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

std::unique_lockstd::lock_guard 更灵活,它可以延迟锁定、在不同作用域间移动,还支持一些额外的功能,如尝试锁定(try_lock)。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    if (lock.try_lock()) {
        for (int i = 0; i < 1000000; ++i) {
            ++shared_variable;
        }
        lock.unlock();
    }
}

int main() {
    std::thread thread1(increment);
    std::thread thread2(increment);

    thread1.join();
    thread2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

条件变量(std::condition_variable)

条件变量(std::condition_variable)用于线程间的同步,它允许一个线程等待某个条件满足后再继续执行。通常与互斥锁一起使用。以下是一个生产者 - 消费者模型的示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>

std::mutex mtx;
std::queue<int> data_queue;
std::condition_variable data_cond;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        data_cond.notify_one();
    }
    std::unique_lock<std::mutex> lock(mtx);
    finished = true;
    lock.unlock();
    data_cond.notify_all();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        data_cond.wait(lock, []{ return!data_queue.empty() || finished; });
        if (data_queue.empty() && finished) {
            break;
        }
        int value = data_queue.front();
        data_queue.pop();
        std::cout << "Consumed: " << value << std::endl;
        lock.unlock();
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

在这个示例中,生产者线程向队列中添加数据,并通过 data_cond.notify_one() 通知消费者线程。消费者线程使用 data_cond.wait 等待条件满足(队列中有数据或生产者完成)。data_cond.wait 会自动解锁互斥锁,并在条件满足时重新锁定。

线程安全的数据结构

除了使用锁来保护共享数据,还可以使用线程安全的数据结构,这些数据结构内部已经实现了同步机制,使得多个线程可以安全地访问。

std::atomic

std::atomic 是 C++ 提供的原子类型,它保证了对该类型变量的操作是原子的,即不可分割的,不会被其他线程干扰。例如:

#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> atomic_variable(0);

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        ++atomic_variable;
    }
}

int main() {
    std::thread thread1(increment);
    std::thread thread2(increment);

    thread1.join();
    thread2.join();

    std::cout << "Final value of atomic variable: " << atomic_variable << std::endl;
    return 0;
}

在这个例子中,std::atomic<int> 类型的变量 atomic_variable 可以在多线程环境下安全地进行自增操作,无需额外的锁。

线程安全的队列

虽然 C++ 标准库没有提供直接的线程安全队列,但我们可以自己实现一个。以下是一个简单的基于互斥锁和条件变量的线程安全队列实现:

#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {
public:
    void push(T value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(std::move(value));
        lock.unlock();
        cond_.notify_one();
    }

    bool pop(T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this]{ return!queue_.empty(); });
        if (queue_.empty()) {
            return false;
        }
        value = std::move(queue_.front());
        queue_.pop();
        return true;
    }

private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

ThreadSafeQueue<int> safe_queue;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        safe_queue.push(i);
        std::cout << "Produced: " << i << std::endl;
    }
}

void consumer() {
    int value;
    while (true) {
        if (safe_queue.pop(value)) {
            std::cout << "Consumed: " << value << std::endl;
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

在这个 ThreadSafeQueue 类中,push 方法用于向队列中添加元素,pop 方法用于从队列中取出元素。通过互斥锁和条件变量来保证线程安全。

多线程性能优化

虽然多线程编程可以提高程序的性能,但如果使用不当,反而可能导致性能下降。以下是一些多线程性能优化的建议。

减少锁的粒度

锁的粒度是指被锁保护的代码范围。尽量缩小锁的保护范围,只在访问共享资源时锁定,这样可以减少线程等待锁的时间,提高并发性能。例如,在前面的 increment 函数中,如果 shared_variable 的更新操作可以拆分成多个步骤,我们可以在每个步骤之间解锁互斥锁,如下所示:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_variable = 0;

void increment() {
    for (int i = 0; i < 1000000; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        int temp = shared_variable;
        lock.unlock();
        temp++;
        std::unique_lock<std::mutex> lock2(mtx);
        shared_variable = temp;
    }
}

int main() {
    std::thread thread1(increment);
    std::thread thread2(increment);

    thread1.join();
    thread2.join();

    std::cout << "Final value of shared variable: " << shared_variable << std::endl;
    return 0;
}

但需要注意的是,这种方法增加了代码的复杂性,并且在某些情况下可能会引入其他问题,如缓存一致性问题,所以需要谨慎使用。

避免不必要的同步

如果某些数据不需要在多个线程之间共享,就没有必要对其进行同步。例如,每个线程都有自己独立的局部变量,这些变量不需要同步。另外,如果共享数据只读,也不需要加锁(前提是没有其他线程对其进行写操作)。

合理分配任务

合理地将任务分配到不同的线程中可以提高性能。一般来说,应该尽量让每个线程的工作量均衡,避免某个线程过于繁忙,而其他线程空闲。例如,在一个计算密集型的程序中,可以将数据分成多个部分,每个线程处理一部分数据。

多线程编程中的常见问题及解决方法

在多线程编程过程中,会遇到一些常见的问题,需要我们掌握相应的解决方法。

死锁

死锁是多线程编程中最常见的问题之一,它发生在两个或多个线程相互等待对方释放锁的情况下。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void thread1_function() {
    mutex1.lock();
    std::cout << "Thread 1 locked mutex1" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mutex2.lock();
    std::cout << "Thread 1 locked mutex2" << std::endl;
    mutex2.unlock();
    mutex1.unlock();
}

void thread2_function() {
    mutex2.lock();
    std::cout << "Thread 2 locked mutex2" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mutex1.lock();
    std::cout << "Thread 2 locked mutex1" << std::endl;
    mutex1.unlock();
    mutex2.unlock();
}

int main() {
    std::thread thread1(thread1_function);
    std::thread thread2(thread2_function);

    thread1.join();
    thread2.join();

    return 0;
}

在上述代码中,thread1_function 先锁定 mutex1,然后试图锁定 mutex2,而 thread2_function 先锁定 mutex2,然后试图锁定 mutex1,这就导致了死锁。

解决死锁的方法有多种,例如:

  1. 避免嵌套锁:尽量避免在一个线程中同时获取多个锁,如果确实需要,确保所有线程获取锁的顺序一致。
  2. 使用 std::lock:C++ 提供了 std::lock 函数,它可以一次性锁定多个互斥锁,避免死锁。例如:
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void thread1_function() {
    std::lock(mutex1, mutex2);
    std::cout << "Thread 1 locked mutex1 and mutex2" << std::endl;
    std::unique_lock<std::mutex> lock1(mutex1, std::adopt_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::adopt_lock);
    // 执行任务
    lock2.unlock();
    lock1.unlock();
}

void thread2_function() {
    std::lock(mutex1, mutex2);
    std::cout << "Thread 2 locked mutex1 and mutex2" << std::endl;
    std::unique_lock<std::mutex> lock1(mutex1, std::adopt_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::adopt_lock);
    // 执行任务
    lock2.unlock();
    lock1.unlock();
}

int main() {
    std::thread thread1(thread1_function);
    std::thread thread2(thread2_function);

    thread1.join();
    thread2.join();

    return 0;
}

竞态条件

竞态条件是指多个线程访问共享资源时,由于执行顺序的不确定性而导致结果不可预测。除了使用锁来解决竞态条件,还可以使用 std::atomic 类型或线程安全的数据结构。另外,在设计程序时,尽量减少共享资源的使用,也可以降低竞态条件出现的概率。

多线程编程在实际项目中的应用

多线程编程在很多实际项目中都有广泛的应用,以下是一些常见的场景。

服务器编程

在服务器端编程中,多线程常用于处理多个客户端的并发请求。例如,一个网络服务器可以为每个客户端连接创建一个独立的线程,这样可以同时处理多个客户端的请求,提高服务器的并发处理能力。以下是一个简单的基于多线程的 TCP 服务器示例:

#include <iostream>
#include <thread>
#include <vector>
#include <WinSock2.h>

#pragma comment(lib, "ws2_32.lib")

void handle_client(SOCKET client_socket) {
    char buffer[1024];
    int bytes_received = recv(client_socket, buffer, sizeof(buffer), 0);
    if (bytes_received > 0) {
        buffer[bytes_received] = '\0';
        std::cout << "Received from client: " << buffer << std::endl;
        const char* response = "Message received!";
        send(client_socket, response, strlen(response), 0);
    }
    closesocket(client_socket);
}

int main() {
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "WSAStartup failed" << std::endl;
        return 1;
    }

    SOCKET listen_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_socket == INVALID_SOCKET) {
        std::cerr << "Socket creation failed" << std::endl;
        WSACleanup();
        return 1;
    }

    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(12345);

    if (bind(listen_socket, (sockaddr*)&server_addr, sizeof(server_addr)) == SOCKET_ERROR) {
        std::cerr << "Bind failed" << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    if (listen(listen_socket, 5) == SOCKET_ERROR) {
        std::cerr << "Listen failed" << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    std::vector<std::thread> threads;
    while (true) {
        sockaddr_in client_addr;
        int client_addr_len = sizeof(client_addr);
        SOCKET client_socket = accept(listen_socket, (sockaddr*)&client_addr, &client_addr_len);
        if (client_socket != INVALID_SOCKET) {
            threads.emplace_back(handle_client, client_socket);
        }
    }

    for (auto& thread : threads) {
        thread.join();
    }

    closesocket(listen_socket);
    WSACleanup();
    return 0;
}

在这个示例中,服务器监听指定端口,每当有新的客户端连接时,就创建一个新的线程来处理该客户端的请求。

并行计算

在科学计算、数据分析等领域,经常需要处理大量的数据。多线程编程可以将数据分成多个部分,并行地进行计算,从而提高计算速度。例如,计算一个数组元素的总和,可以将数组分成多个子数组,每个线程计算一个子数组的和,最后将所有子数组的和累加起来。

#include <iostream>
#include <thread>
#include <vector>

void sum_subarray(const std::vector<int>& subarray, int& result) {
    for (int num : subarray) {
        result += num;
    }
}

int main() {
    std::vector<int> array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int num_threads = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    std::vector<int> partial_sums(num_threads, 0);

    int subarray_size = array.size() / num_threads;
    for (int i = 0; i < num_threads; ++i) {
        int start = i * subarray_size;
        int end = (i == num_threads - 1)? array.size() : (i + 1) * subarray_size;
        std::vector<int> subarray(array.begin() + start, array.begin() + end);
        threads.emplace_back(sum_subarray, std::ref(subarray), std::ref(partial_sums[i]));
    }

    for (auto& thread : threads) {
        thread.join();
    }

    int total_sum = 0;
    for (int sum : partial_sums) {
        total_sum += sum;
    }

    std::cout << "Total sum: " << total_sum << std::endl;
    return 0;
}

在这个例子中,我们根据系统的 CPU 核心数将数组分成多个子数组,每个线程计算一个子数组的和,最后将所有部分和累加得到数组的总和。

总结

C++ 多线程编程是一项强大的技术,它可以充分利用多核处理器的性能,提高程序的并发处理能力和运行效率。通过掌握线程的创建与管理、线程同步机制、线程安全的数据结构以及性能优化技巧,我们能够编写出高效、稳定的多线程程序。在实际项目中,多线程编程广泛应用于服务器编程、并行计算等领域,为解决复杂的计算和并发处理问题提供了有效的手段。然而,多线程编程也带来了一些挑战,如死锁、竞态条件等问题,需要我们在编程过程中谨慎处理。通过不断的实践和学习,我们可以更好地运用多线程技术,开发出更优秀的软件产品。

以上就是关于 C++ 多线程编程技术与实例的详细介绍,希望对读者有所帮助。在实际应用中,还需要根据具体的需求和场景,灵活运用多线程技术,以达到最佳的性能和效果。同时,多线程编程是一个复杂且不断发展的领域,新的技术和方法不断涌现,建议读者持续关注相关的技术动态,不断提升自己的编程能力。