MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++多线程编程中互斥锁的应用

2021-08-191.1k 阅读

C++ 多线程编程基础

线程的概念

在现代操作系统中,线程是程序执行流的最小单元。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。与进程相比,线程的创建和销毁开销更小,上下文切换也更高效。这使得多线程编程成为提高程序性能,特别是在多核处理器环境下,充分利用硬件资源的有效手段。

在 C++ 中,从 C++11 标准开始,引入了对多线程编程的支持,这大大简化了 C++ 程序员编写多线程程序的难度。通过 <thread> 头文件,我们可以创建、管理和控制线程。例如,下面是一个简单的创建线程并等待其完成的代码示例:

#include <iostream>
#include <thread>

void threadFunction() {
    std::cout << "This is a thread." << std::endl;
}

int main() {
    std::thread myThread(threadFunction);
    std::cout << "Main thread is running." << std::endl;
    myThread.join();
    std::cout << "Thread has finished." << std::endl;
    return 0;
}

在上述代码中,std::thread myThread(threadFunction); 创建了一个新线程,该线程会执行 threadFunction 函数。myThread.join(); 语句使得主线程等待 myThread 线程执行完毕。

共享资源与竞态条件

当多个线程同时访问和修改共享资源时,就可能会出现竞态条件(Race Condition)。竞态条件是指程序在多线程环境下运行时,由于线程执行顺序的不确定性,导致程序产生非预期的结果。

例如,考虑一个简单的计数器:

#include <iostream>
#include <thread>
#include <vector>

int counter = 0;

void increment() {
    for (int i = 0; i < 10000; ++i) {
        ++counter;
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << counter << std::endl;
    return 0;
}

理论上,10 个线程每个线程对计数器递增 10000 次,最终计数器的值应该是 100000。然而,由于竞态条件,实际运行结果往往小于 100000。这是因为 ++counter 操作不是原子的,它包含了读取、递增和写入三个步骤。在多线程环境下,当一个线程读取 counter 的值后,还未完成递增和写入操作时,另一个线程可能也读取了相同的值,导致两次递增操作只增加了 1 而不是 2。

互斥锁的基本概念

什么是互斥锁

互斥锁(Mutex,即 Mutual Exclusion 的缩写)是一种同步原语,用于保护共享资源,确保在同一时间只有一个线程能够访问该资源。其原理类似于一把锁,线程在访问共享资源前必须先获取锁,访问完毕后释放锁。其他线程在锁被占用时,必须等待锁被释放才能获取锁并访问共享资源。

在 C++ 中,<mutex> 头文件提供了互斥锁的相关实现。其中,std::mutex 是最基本的互斥锁类型。

互斥锁的工作原理

互斥锁通常基于操作系统提供的底层同步机制实现,例如信号量等。当一个线程调用 lock() 方法获取互斥锁时,如果锁当前处于未锁定状态,线程将成功获取锁并将其状态设置为锁定,然后可以安全地访问共享资源。如果锁当前已被其他线程锁定,调用 lock() 的线程将被阻塞,直到锁被释放。当线程完成对共享资源的访问后,调用 unlock() 方法释放锁,此时等待获取锁的线程中的一个将有机会获取锁并继续执行。

例如,我们可以修改前面的计数器示例,使用互斥锁来避免竞态条件:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int counter = 0;
std::mutex counterMutex;

void increment() {
    for (int i = 0; i < 10000; ++i) {
        counterMutex.lock();
        ++counter;
        counterMutex.unlock();
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << counter << std::endl;
    return 0;
}

在这个示例中,counterMutex.lock(); 确保在任何时刻只有一个线程能够执行 ++counter 操作,counterMutex.unlock(); 释放锁,使得其他线程有机会获取锁并递增计数器。这样,无论线程执行顺序如何,最终计数器的值都将是 100000。

C++ 中互斥锁的类型

std::mutex

std::mutex 是 C++ 中最基本的互斥锁类型,提供了 lock()unlock() 方法用于获取和释放锁。如前面的计数器示例中使用的就是 std::mutex。它适用于大多数简单的同步场景。但是,std::mutex 有一些局限性,例如,如果在获取锁后线程异常终止,锁将永远不会被释放,这会导致死锁。

std::recursive_mutex

std::recursive_mutex 允许同一个线程多次获取锁,而不会造成死锁。这在一些递归函数或嵌套调用需要锁定同一互斥锁的场景中非常有用。例如:

#include <iostream>
#include <mutex>

std::recursive_mutex recursiveMutex;

void recursiveFunction(int level) {
    recursiveMutex.lock();
    std::cout << "Entering recursiveFunction at level " << level << std::endl;
    if (level > 0) {
        recursiveFunction(level - 1);
    }
    std::cout << "Exiting recursiveFunction at level " << level << std::endl;
    recursiveMutex.unlock();
}

int main() {
    recursiveFunction(3);
    return 0;
}

在上述代码中,recursiveFunction 是一个递归函数,每次调用都会获取 recursiveMutex 锁。如果使用 std::mutex,当函数递归调用第二次获取锁时,就会造成死锁,因为 std::mutex 不允许同一个线程多次获取锁。而 std::recursive_mutex 则可以避免这种情况,它内部维护了一个计数器,每次获取锁时计数器加一,每次释放锁时计数器减一,只有当计数器为 0 时,锁才真正被释放。

std::timed_mutex

std::timed_mutex 是一种带有超时功能的互斥锁。它除了提供 lock()unlock() 方法外,还提供了 try_lock()try_lock_for() 以及 try_lock_until() 方法。try_lock() 尝试获取锁,如果锁不可用,立即返回,不会阻塞线程。try_lock_for()try_lock_until() 则允许线程在一定时间内尝试获取锁。例如:

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>

std::timed_mutex timedMutex;

void threadFunction() {
    if (timedMutex.try_lock_for(std::chrono::seconds(2))) {
        std::cout << "Thread acquired the lock." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
        timedMutex.unlock();
    } else {
        std::cout << "Thread could not acquire the lock within 2 seconds." << std::endl;
    }
}

int main() {
    std::thread myThread(threadFunction);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    if (timedMutex.try_lock()) {
        std::cout << "Main thread acquired the lock." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(4));
        timedMutex.unlock();
    } else {
        std::cout << "Main thread could not acquire the lock." << std::endl;
    }
    myThread.join();
    return 0;
}

在这个示例中,threadFunction 使用 try_lock_for(std::chrono::seconds(2)) 尝试在 2 秒内获取锁。如果在 2 秒内成功获取锁,线程将执行相应操作并在 3 秒后释放锁;如果未能获取锁,将输出提示信息。主线程先睡眠 1 秒,然后尝试获取锁,如果获取成功则执行操作并在 4 秒后释放锁。

std::recursive_timed_mutex

std::recursive_timed_mutex 结合了 std::recursive_mutexstd::timed_mutex 的特性,既允许同一个线程多次获取锁,又支持超时获取锁的功能。例如,在一个递归函数需要在一定时间内获取锁的场景中,就可以使用 std::recursive_timed_mutex

互斥锁的使用技巧与注意事项

RAII 与互斥锁

资源获取即初始化(RAII,Resource Acquisition Is Initialization)是 C++ 中一种重要的编程技巧,用于自动管理资源的生命周期。在互斥锁的使用中,RAII 同样非常有用。std::lock_guardstd::unique_lock 就是基于 RAII 机制的互斥锁管理类。

std::lock_guard 是一个简单的 RAII 封装类,它在构造函数中获取互斥锁,在析构函数中释放互斥锁。例如,我们可以将前面的计数器示例改写为使用 std::lock_guard

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

int counter = 0;
std::mutex counterMutex;

void increment() {
    for (int i = 0; i < 10000; ++i) {
        std::lock_guard<std::mutex> lock(counterMutex);
        ++counter;
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Final counter value: " << counter << std::endl;
    return 0;
}

在这个示例中,std::lock_guard<std::mutex> lock(counterMutex); 语句在创建 lock 对象时自动调用 counterMutex.lock() 获取锁,当 lock 对象超出作用域被销毁时,自动调用 counterMutex.unlock() 释放锁。这样可以确保即使在 ++counter 操作过程中抛出异常,锁也能被正确释放,避免死锁。

std::unique_lockstd::lock_guard 更灵活,它同样基于 RAII 机制,但提供了更多的功能,如延迟锁定、解锁和重新锁定等。例如:

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void print_id(int id) {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    if (id % 2 == 0) {
        lock.lock();
        std::cout << "thread " << id << " locked the mutex." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        lock.unlock();
        std::cout << "thread " << id << " unlocked the mutex." << std::endl;
    } else {
        if (lock.try_lock()) {
            std::cout << "thread " << id << " locked the mutex." << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            lock.unlock();
            std::cout << "thread " << id << " unlocked the mutex." << std::endl;
        } else {
            std::cout << "thread " << id << " could not lock the mutex." << std::endl;
        }
    }
}

int main() {
    std::thread threads[10];
    for (int i = 0; i < 10; ++i) {
        threads[i] = std::thread(print_id, i);
    }

    for (auto& th : threads) {
        th.join();
    }

    return 0;
}

在上述代码中,std::unique_lock<std::mutex> lock(mtx, std::defer_lock); 使用 std::defer_lock 标志初始化 lock 对象,此时并不会立即获取锁。然后根据线程 ID 的奇偶性,偶数 ID 的线程调用 lock.lock() 来获取锁,奇数 ID 的线程调用 lock.try_lock() 尝试获取锁。这种灵活性使得 std::unique_lock 在更复杂的同步场景中非常有用。

死锁问题及避免

死锁是多线程编程中一个严重的问题,当两个或多个线程相互等待对方释放锁,从而导致所有线程都无法继续执行时,就会发生死锁。例如:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void thread1Function() {
    mutex1.lock();
    std::cout << "Thread 1 locked mutex1." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    mutex2.lock();
    std::cout << "Thread 1 locked mutex2." << std::endl;
    mutex2.unlock();
    mutex1.unlock();
}

void thread2Function() {
    mutex2.lock();
    std::cout << "Thread 2 locked mutex2." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    mutex1.lock();
    std::cout << "Thread 2 locked mutex1." << std::endl;
    mutex1.unlock();
    mutex2.unlock();
}

int main() {
    std::thread thread1(thread1Function);
    std::thread thread2(thread2Function);

    thread1.join();
    thread2.join();

    return 0;
}

在这个示例中,thread1Function 先获取 mutex1,然后尝试获取 mutex2,而 thread2Function 先获取 mutex2,然后尝试获取 mutex1。如果 thread1 先获取了 mutex1thread2 先获取了 mutex2,那么两个线程将相互等待对方释放锁,从而导致死锁。

为了避免死锁,可以采取以下几种方法:

  1. 按照固定顺序获取锁:所有线程都按照相同的顺序获取多个锁。例如,在上述示例中,如果两个线程都先获取 mutex1,再获取 mutex2,就不会发生死锁。
  2. 使用 std::lock 一次性获取多个锁:C++ 提供了 std::lock 函数,它可以一次性获取多个互斥锁,并且保证不会发生死锁。例如:
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void thread1Function() {
    std::lock(mutex1, mutex2);
    std::cout << "Thread 1 locked mutex1 and mutex2." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    mutex2.unlock();
    mutex1.unlock();
}

void thread2Function() {
    std::lock(mutex1, mutex2);
    std::cout << "Thread 2 locked mutex1 and mutex2." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    mutex2.unlock();
    mutex1.unlock();
}

int main() {
    std::thread thread1(thread1Function);
    std::thread thread2(thread2Function);

    thread1.join();
    thread2.join();

    return 0;
}

std::lock 会尝试以一种不会导致死锁的方式获取所有指定的互斥锁。如果无法获取所有锁,它会释放已经获取的锁,并等待直到可以获取所有锁。 3. 使用超时机制:如前面提到的 std::timed_mutex,通过设置获取锁的超时时间,如果在规定时间内无法获取锁,可以采取其他措施,如放弃操作或重试,从而避免无限期等待导致的死锁。

性能考虑

虽然互斥锁是解决多线程同步问题的有效工具,但过度使用或不合理使用互斥锁可能会导致性能下降。因为线程获取和释放锁的操作会带来一定的开销,而且当一个线程持有锁时,其他线程必须等待,这可能会降低并行度。

为了提高性能,可以考虑以下几点:

  1. 缩小锁的保护范围:尽量只在访问共享资源的关键代码段使用锁,减少锁的持有时间。例如,在前面的计数器示例中,如果 ++counter 操作是唯一需要保护的共享资源访问,那么锁的作用域应该只包含这一行代码。
  2. 减少锁的竞争:如果可能,尽量将共享资源进行划分,使用多个互斥锁分别保护不同的部分,从而减少线程之间对同一把锁的竞争。例如,在一个包含多个数据结构的程序中,可以为每个数据结构分配一个单独的互斥锁。
  3. 使用无锁数据结构:在某些场景下,无锁数据结构可以提供更好的性能。无锁数据结构通过使用原子操作和其他技术,允许多个线程在不使用锁的情况下安全地访问和修改数据。例如,std::atomic 类型可以用于实现简单的无锁计数器等。

互斥锁在实际项目中的应用场景

多线程数据库访问

在数据库应用程序中,多个线程可能需要同时访问数据库进行读写操作。为了保证数据的一致性和完整性,需要使用互斥锁来保护数据库连接和相关操作。例如,在一个多线程的 Web 应用中,多个用户请求可能会同时访问数据库进行查询或更新操作。可以为每个数据库连接对象关联一个互斥锁,当线程需要使用该连接时,先获取互斥锁,操作完成后释放锁。这样可以避免多个线程同时对数据库进行操作导致的数据冲突。

多线程文件操作

在多线程环境下进行文件操作时,也需要使用互斥锁来确保文件的正确读写。例如,在一个日志记录系统中,多个线程可能需要同时向同一个日志文件写入日志信息。如果不进行同步,可能会导致日志内容混乱。通过使用互斥锁,每次只有一个线程能够写入文件,从而保证日志的完整性和正确性。

共享内存管理

在一些高性能计算或分布式系统中,会使用共享内存来提高数据共享和传输的效率。多个线程可能需要同时访问共享内存区域,这时就需要互斥锁来保护共享内存的访问。例如,在一个基于共享内存的分布式缓存系统中,多个线程可能会同时读取或更新缓存数据,通过互斥锁可以确保缓存数据的一致性。

并发数据结构实现

在实现并发数据结构(如并发队列、并发哈希表等)时,互斥锁是常用的同步手段。例如,在实现一个并发队列时,可以使用互斥锁来保护队列的插入和删除操作,确保在多线程环境下队列的正确性。下面是一个简单的并发队列实现示例:

#include <iostream>
#include <queue>
#include <mutex>
#include <thread>

template <typename T>
class ConcurrentQueue {
private:
    std::queue<T> queue_;
    std::mutex mutex_;

public:
    void push(T value) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(value);
    }

    bool pop(T& value) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }
};

ConcurrentQueue<int> queue;

void producer() {
    for (int i = 0; i < 10; ++i) {
        queue.push(i);
        std::cout << "Produced: " << i << std::endl;
    }
}

void consumer() {
    int value;
    while (true) {
        if (queue.pop(value)) {
            std::cout << "Consumed: " << value << std::endl;
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
}

int main() {
    std::thread producerThread(producer);
    std::thread consumerThread(consumer);

    producerThread.join();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    consumerThread.join();

    return 0;
}

在这个示例中,ConcurrentQueue 类使用 std::mutex 来保护队列的 pushpop 操作,确保在多线程环境下队列的正确使用。producer 线程向队列中插入数据,consumer 线程从队列中取出数据。

总结

互斥锁是 C++ 多线程编程中不可或缺的同步工具,它通过提供一种简单而有效的方式来保护共享资源,避免竞态条件和数据冲突。在使用互斥锁时,需要了解不同类型互斥锁的特点和适用场景,掌握 RAII 机制以及如何避免死锁和提高性能。通过合理使用互斥锁,可以编写出高效、可靠的多线程程序,充分发挥多核处理器的性能优势。在实际项目中,互斥锁广泛应用于数据库访问、文件操作、共享内存管理以及并发数据结构实现等多个方面。随着多线程编程需求的不断增加,深入理解和熟练运用互斥锁对于 C++ 开发者来说至关重要。

希望通过本文的介绍和示例,读者能够对 C++ 多线程编程中互斥锁的应用有更深入的理解,并在实际开发中灵活运用互斥锁解决多线程同步问题。