MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++多线程编程中互斥与同步的区别

2023-01-188.0k 阅读

一、多线程编程基础概念

1.1 线程与进程

在深入探讨 C++ 多线程编程中互斥与同步的区别之前,我们先来回顾一下线程和进程的基本概念。

进程是程序在操作系统中的一次执行实例,它有自己独立的内存空间、系统资源,如文件描述符、信号处理等。每个进程在运行时都有自己独立的地址空间,进程之间相互隔离,数据不共享(通过特定的进程间通信机制除外)。

而线程则是进程中的一个执行单元,一个进程可以包含多个线程。线程共享进程的资源,如内存空间、文件描述符等,它们在同一地址空间内运行。线程相比进程更轻量级,创建和销毁的开销相对较小,线程间的切换也更为迅速。

1.2 多线程编程的意义

多线程编程在现代软件开发中具有重要意义。它可以充分利用多核处理器的性能,提高程序的执行效率。例如,在一个图形渲染程序中,可以让一个线程负责渲染图形,另一个线程负责处理用户输入,这样可以使程序在渲染图形的同时及时响应用户操作,提升用户体验。

在服务器端编程中,多线程可以同时处理多个客户端的请求,提高服务器的并发处理能力。比如一个 Web 服务器,通过多线程可以同时处理多个用户的网页请求,避免单个请求处理时间过长导致其他请求等待。

二、C++ 多线程编程简介

2.1 C++ 线程库

C++11 引入了标准线程库 <thread>,使得在 C++ 中进行多线程编程变得更加容易和标准化。<thread> 库提供了 std::thread 类来创建和管理线程。

以下是一个简单的创建线程的示例代码:

#include <iostream>
#include <thread>

void threadFunction() {
    std::cout << "This is a thread function." << std::endl;
}

int main() {
    std::thread myThread(threadFunction);
    std::cout << "Main thread is running." << std::endl;
    myThread.join();
    return 0;
}

在上述代码中,我们定义了一个 threadFunction 函数,然后在 main 函数中通过 std::thread myThread(threadFunction); 创建了一个新线程,并通过 myThread.join(); 等待新线程执行完毕。

2.2 线程安全问题

当多个线程同时访问和修改共享资源时,就会出现线程安全问题。例如,假设有两个线程同时对一个共享的计数器变量进行加 1 操作,如果没有适当的保护机制,可能会导致数据竞争,最终得到的结果可能不是预期的。

以下是一个简单的线程安全问题示例:

#include <iostream>
#include <thread>
#include <vector>

int sharedCounter = 0;

void incrementCounter() {
    for (int i = 0; i < 10000; ++i) {
        sharedCounter++;
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(incrementCounter);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << sharedCounter << std::endl;
    return 0;
}

在这个示例中,我们期望 sharedCounter 最终的值是 100000(10 个线程,每个线程加 10000 次),但由于线程竞争,实际得到的值可能小于 100000。这就是典型的线程安全问题,为了解决这类问题,我们需要使用互斥和同步机制。

三、互斥(Mutex)

3.1 互斥的概念

互斥(Mutual Exclusion,缩写为 Mutex)是一种用于控制多个线程对共享资源访问的机制。其核心思想是通过一个锁来保证在同一时刻只有一个线程能够访问共享资源。当一个线程获取到锁时,其他线程就必须等待,直到该线程释放锁。

3.2 C++ 中的互斥类型

3.2.1 std::mutex

std::mutex 是 C++ 标准库中最基本的互斥类型。它提供了 lockunlock 方法来获取和释放锁。以下是使用 std::mutex 解决前面计数器线程安全问题的示例:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int sharedCounter = 0;
std::mutex counterMutex;

void incrementCounter() {
    counterMutex.lock();
    for (int i = 0; i < 10000; ++i) {
        sharedCounter++;
    }
    counterMutex.unlock();
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(incrementCounter);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << sharedCounter << std::endl;
    return 0;
}

在上述代码中,我们在 incrementCounter 函数中通过 counterMutex.lock() 获取锁,在操作完 sharedCounter 后通过 counterMutex.unlock() 释放锁,这样就保证了同一时刻只有一个线程能够访问 sharedCounter,从而解决了线程安全问题。

3.2.2 std::recursive_mutex

std::recursive_mutex 允许同一个线程多次获取锁,而不会产生死锁。这在一些递归函数或者需要多次锁定同一互斥锁的场景中非常有用。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex recursiveMutex;

void recursiveFunction(int count) {
    recursiveMutex.lock();
    if (count > 0) {
        std::cout << "Recursion level: " << count << std::endl;
        recursiveFunction(count - 1);
    }
    recursiveMutex.unlock();
}

int main() {
    std::thread myThread(recursiveFunction, 5);
    myThread.join();
    return 0;
}

在这个示例中,recursiveFunction 是一个递归函数,每次递归调用都会获取 recursiveMutex 的锁。如果使用普通的 std::mutex,第二次获取锁时会导致死锁,而 std::recursive_mutex 可以避免这种情况。

3.2.3 std::timed_mutex

std::timed_mutex 提供了带超时的锁获取操作。它除了 lockunlock 方法外,还提供了 try_locktry_lock_for 等方法。try_lock 尝试获取锁,如果锁不可用立即返回,而 try_lock_for 则可以设置一个等待时间,如果在指定时间内获取到锁则返回 true,否则返回 false。以下是一个使用 std::timed_mutex 的示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::timed_mutex timedMutex;

void threadWithTimedLock() {
    if (timedMutex.try_lock_for(std::chrono::seconds(2))) {
        std::cout << "Thread acquired the lock." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
        timedMutex.unlock();
    } else {
        std::cout << "Thread could not acquire the lock within 2 seconds." << std::endl;
    }
}

int main() {
    std::thread myThread(threadWithTimedLock);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    timedMutex.lock();
    std::cout << "Main thread acquired the lock." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(4));
    timedMutex.unlock();
    myThread.join();
    return 0;
}

在这个示例中,threadWithTimedLock 线程尝试在 2 秒内获取锁,而主线程先睡眠 1 秒后获取锁并保持 4 秒。由于主线程先获取锁且保持时间较长,threadWithTimedLock 线程在 2 秒内无法获取锁,从而输出相应提示信息。

3.2.4 std::recursive_timed_mutex

std::recursive_timed_mutex 结合了 std::recursive_mutexstd::timed_mutex 的特性,既允许同一个线程多次获取锁,又提供了带超时的锁获取操作。

3.3 锁的管理与 RAII

手动调用 lockunlock 方法容易出错,例如在 lock 之后忘记调用 unlock 可能会导致死锁。为了更安全地管理锁,C++ 标准库提供了基于 RAII(Resource Acquisition Is Initialization)的锁管理类。

3.3.1 std::lock_guard

std::lock_guard 是一个简单的 RAII 锁管理类。它在构造函数中获取锁,在析构函数中释放锁。以下是使用 std::lock_guard 重写前面计数器示例的代码:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int sharedCounter = 0;
std::mutex counterMutex;

void incrementCounter() {
    std::lock_guard<std::mutex> lock(counterMutex);
    for (int i = 0; i < 10000; ++i) {
        sharedCounter++;
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(incrementCounter);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << sharedCounter << std::endl;
    return 0;
}

在这个示例中,std::lock_guard<std::mutex> lock(counterMutex);incrementCounter 函数开始时获取锁,当函数结束时,lock 对象析构,自动释放锁,这样就避免了手动调用 unlock 可能出现的错误。

3.3.2 std::unique_lock

std::unique_lockstd::lock_guard 更灵活。它不仅提供了 RAII 风格的锁管理,还支持延迟锁定、锁的所有权转移等操作。例如,std::unique_lock 可以在构造时不立即获取锁,而是通过 lock 方法手动获取,并且可以通过 try_lock 等方法进行带条件的锁获取。以下是一个示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::mutex myMutex;

void threadFunction() {
    std::unique_lock<std::mutex> lock(myMutex, std::defer_lock);
    if (lock.try_lock_for(std::chrono::seconds(2))) {
        std::cout << "Thread acquired the lock." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
    } else {
        std::cout << "Thread could not acquire the lock within 2 seconds." << std::endl;
    }
}

int main() {
    std::thread myThread(threadFunction);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::unique_lock<std::mutex> mainLock(myMutex);
    std::cout << "Main thread acquired the lock." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(4));
    mainLock.unlock();
    myThread.join();
    return 0;
}

在这个示例中,std::unique_lock<std::mutex> lock(myMutex, std::defer_lock); 构造了一个 std::unique_lock 对象但不立即获取锁,然后通过 lock.try_lock_for(std::chrono::seconds(2)); 尝试在 2 秒内获取锁,展示了 std::unique_lock 的灵活性。

四、同步(Synchronization)

4.1 同步的概念

同步是一个更广泛的概念,它不仅仅是控制对共享资源的访问,还包括协调线程之间的执行顺序和事件通知等。同步机制确保线程按照预期的顺序执行,以避免数据不一致或其他错误。

4.2 条件变量(std::condition_variable)

4.2.1 基本概念

std::condition_variable 是 C++ 标准库提供的一种同步工具,用于线程间的条件通知。它通常与 std::mutex 一起使用,允许一个线程在某个条件满足时唤醒其他等待的线程。

4.2.2 示例代码

以下是一个生产者 - 消费者模型的示例,展示了 std::condition_variable 的使用:

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::queue<int> dataQueue;
std::mutex queueMutex;
std::condition_variable dataAvailable;
bool finishedProducing = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::unique_lock<std::mutex> lock(queueMutex);
        dataQueue.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        dataAvailable.notify_one();
    }
    std::unique_lock<std::mutex> lock(queueMutex);
    finishedProducing = true;
    lock.unlock();
    dataAvailable.notify_all();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(queueMutex);
        dataAvailable.wait(lock, [] { return!dataQueue.empty() || finishedProducing; });
        if (dataQueue.empty() && finishedProducing) {
            break;
        }
        int value = dataQueue.front();
        dataQueue.pop();
        std::cout << "Consumed: " << value << std::endl;
        lock.unlock();
    }
}

int main() {
    std::thread producerThread(producer);
    std::thread consumerThread(consumer);
    producerThread.join();
    consumerThread.join();
    return 0;
}

在这个示例中,生产者线程将数据放入队列,并通过 dataAvailable.notify_one() 通知消费者线程。消费者线程通过 dataAvailable.wait(lock, [] { return!dataQueue.empty() || finishedProducing; }); 等待数据可用或生产结束的条件。当条件满足时,消费者线程获取数据并处理。

4.3 信号量(Semaphore)

4.3.1 基本概念

信号量是一种计数型的同步工具,它通过一个计数器来控制对资源的访问。当线程获取信号量时,如果计数器大于 0,则计数器减 1,线程可以继续执行;如果计数器为 0,则线程等待,直到计数器大于 0。当线程释放信号量时,计数器加 1。

4.3.2 C++ 中的实现

虽然 C++ 标准库没有直接提供信号量类型,但可以通过 std::mutexstd::condition_variable 来实现一个简单的信号量。以下是一个简单的信号量实现示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(int count = 0) : count(count) {}

    void acquire() {
        std::unique_lock<std::mutex> lock(mutex);
        condition.wait(lock, [this] { return count > 0; });
        --count;
    }

    void release() {
        std::unique_lock<std::mutex> lock(mutex);
        ++count;
        condition.notify_one();
    }

private:
    int count;
    std::mutex mutex;
    std::condition_variable condition;
};

Semaphore semaphore(1);

void threadFunction() {
    semaphore.acquire();
    std::cout << "Thread acquired the semaphore." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Thread releasing the semaphore." << std::endl;
    semaphore.release();
}

int main() {
    std::thread myThread(threadFunction);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main thread trying to acquire the semaphore." << std::endl;
    semaphore.acquire();
    std::cout << "Main thread acquired the semaphore." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Main thread releasing the semaphore." << std::endl;
    semaphore.release();
    myThread.join();
    return 0;
}

在这个示例中,Semaphore 类实现了一个简单的信号量。acquire 方法用于获取信号量,release 方法用于释放信号量。通过 std::mutexstd::condition_variable 来实现信号量的计数和等待机制。

4.4 屏障(Barrier)

4.4.1 基本概念

屏障是一种同步机制,它允许一组线程在某个点上等待,直到所有线程都到达该点,然后所有线程再继续执行。这在需要多个线程完成某些任务后再进行下一步操作的场景中非常有用。

4.4.2 示例代码

以下是一个简单的屏障实现示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class Barrier {
public:
    Barrier(int numThreads) : numThreads(numThreads), count(0) {}

    void wait() {
        std::unique_lock<std::mutex> lock(mutex);
        ++count;
        if (count == numThreads) {
            count = 0;
            condition.notify_all();
        } else {
            condition.wait(lock, [this] { return count == 0; });
        }
    }

private:
    int numThreads;
    int count;
    std::mutex mutex;
    std::condition_variable condition;
};

Barrier barrier(3);

void threadFunction(int id) {
    std::cout << "Thread " << id << " is doing some work." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(id));
    std::cout << "Thread " << id << " is waiting at the barrier." << std::endl;
    barrier.wait();
    std::cout << "Thread " << id << " passed the barrier." << std::endl;
}

int main() {
    std::thread thread1(threadFunction, 1);
    std::thread thread2(threadFunction, 2);
    std::thread thread3(threadFunction, 3);
    thread1.join();
    thread2.join();
    thread3.join();
    return 0;
}

在这个示例中,Barrier 类实现了一个简单的屏障。每个线程在执行到 barrier.wait() 时会等待,直到所有线程都调用了 wait 方法,然后所有线程同时继续执行。

五、互斥与同步的区别

5.1 目的不同

互斥的主要目的是保护共享资源,确保在同一时刻只有一个线程能够访问共享资源,防止数据竞争。它侧重于解决多线程对共享数据访问的冲突问题。

而同步的目的更广泛,除了控制对共享资源的访问外,还包括协调线程之间的执行顺序,确保线程按照预期的逻辑顺序执行,以及实现线程间的事件通知等。同步机制用于解决线程之间的协作问题,使多个线程能够有序地完成复杂的任务。

5.2 应用场景不同

互斥通常应用于简单的共享资源保护场景,例如多个线程对一个全局变量的读写操作。在这种情况下,只需要保证同一时刻只有一个线程能够访问该变量,使用互斥锁就可以满足需求。

同步则适用于更复杂的场景,如生产者 - 消费者模型、多线程计算中的数据依赖场景等。在生产者 - 消费者模型中,不仅需要保护共享队列(可以使用互斥锁),还需要通过条件变量等同步机制来通知消费者有新数据可用,以及协调生产者和消费者的执行节奏。

5.3 实现方式不同

互斥主要通过锁机制来实现,如 std::mutex 及其派生类。线程通过获取锁来访问共享资源,获取锁失败则等待,直到锁被释放。

同步则涉及多种机制,除了锁之外,还包括条件变量、信号量、屏障等。这些机制通过不同的方式来协调线程的执行,例如条件变量通过等待和通知机制,信号量通过计数机制,屏障通过等待所有线程到达某个点的机制来实现线程间的同步。

5.4 复杂度不同

互斥的实现相对简单,主要是围绕锁的获取和释放操作。使用互斥锁时,只需要在访问共享资源前后进行锁的操作即可。

同步机制通常更为复杂,需要考虑更多的因素,如条件的判断、事件的通知、线程间的协调等。例如在使用条件变量时,不仅要正确地使用互斥锁来保护共享数据,还要准确地设置等待条件和进行通知操作,否则可能会出现死锁或数据不一致等问题。

5.5 对性能的影响不同

互斥锁在竞争激烈时可能会导致性能瓶颈,因为每次只有一个线程能够获取锁,其他线程需要等待。频繁的锁获取和释放操作也会带来一定的开销。

同步机制虽然也会带来开销,但在合理设计的情况下,可以更好地利用多线程的并行性。例如在生产者 - 消费者模型中,通过条件变量的合理使用,可以避免消费者线程不必要的等待,提高整体的执行效率。

六、总结与建议

在 C++ 多线程编程中,互斥和同步都是非常重要的概念。互斥是同步的基础,用于保护共享资源,而同步则涵盖了更广泛的线程协作场景。

在实际编程中,需要根据具体的需求选择合适的互斥或同步机制。对于简单的共享资源保护,使用互斥锁通常就足够了;而对于复杂的线程协作场景,如生产者 - 消费者模型、多线程数据处理等,需要综合运用多种同步机制来实现高效、正确的多线程程序。

同时,要注意避免死锁、数据竞争等问题。合理地设计锁的粒度和使用顺序,以及正确地使用同步机制的等待和通知条件,是编写健壮多线程程序的关键。

通过深入理解互斥与同步的区别,并在实践中灵活运用它们,开发者可以编写出性能优良、线程安全的 C++ 多线程程序,充分发挥多核处理器的优势,提升软件的性能和用户体验。