MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

同步原语在多线程环境中的应用实例

2024-06-207.0k 阅读

同步原语概述

在多线程编程的复杂环境中,同步原语扮演着至关重要的角色。它们是用于协调多个线程之间对共享资源的访问,确保程序正确性和稳定性的关键工具。同步原语本质上是一种操作系统提供的机制,通过对线程的执行顺序和资源访问进行控制,避免出现诸如竞态条件(Race Condition)、死锁(Deadlock)等多线程编程中常见的问题。

常见同步原语类型

  1. 互斥锁(Mutex):互斥锁,全称为“相互排斥锁”,是最基本的同步原语之一。它的核心功能是在同一时间只允许一个线程进入临界区(访问共享资源的代码段),其他线程必须等待,直到持有锁的线程释放锁。可以将互斥锁看作是一把钥匙,每次只有一个线程能拿到这把钥匙进入临界区,当线程离开临界区时,它将钥匙放回,其他线程才有机会获取。

  2. 信号量(Semaphore):信号量是一个整型变量,它可以允许一定数量的线程同时进入临界区。与互斥锁不同,互斥锁只允许一个线程访问临界区,而信号量可以通过设定初始值来控制同时访问临界区的线程数量。例如,初始值为 3 的信号量,意味着最多可以有 3 个线程同时进入临界区。

  3. 条件变量(Condition Variable):条件变量通常与互斥锁配合使用,用于线程之间的复杂同步。它允许线程等待某个特定条件满足后再继续执行。比如,在生产者 - 消费者模型中,消费者线程可能需要等待生产者线程生产出数据后才能继续消费,这时就可以使用条件变量。

  4. 读写锁(Read - Write Lock):读写锁区分了读操作和写操作。允许多个线程同时进行读操作,因为读操作不会修改共享资源,所以不会产生数据不一致问题。但是,写操作必须是独占的,即当有一个线程进行写操作时,其他线程不能进行读或写操作。这保证了数据在写操作时的一致性。

互斥锁在多线程环境中的应用实例

简单的计数器示例

下面通过一个简单的计数器示例来展示互斥锁的应用。假设有多个线程需要对一个共享的计数器进行累加操作,如果不进行同步控制,就会出现竞态条件,导致最终结果错误。

#include <stdio.h>
#include <pthread.h>

// 共享计数器
int counter = 0;
// 定义互斥锁
pthread_mutex_t mutex;

// 线程执行函数
void* increment(void* arg) {
    for (int i = 0; i < 1000000; i++) {
        // 加锁
        pthread_mutex_lock(&mutex);
        counter++;
        // 解锁
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main() {
    // 初始化互斥锁
    pthread_mutex_init(&mutex, NULL);

    pthread_t threads[10];
    for (int i = 0; i < 10; i++) {
        // 创建 10 个线程
        pthread_create(&threads[i], NULL, increment, NULL);
    }

    for (int i = 0; i < 10; i++) {
        // 等待所有线程执行完毕
        pthread_join(threads[i], NULL);
    }

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);

    printf("Final counter value: %d\n", counter);
    return 0;
}

在上述代码中,我们定义了一个共享计数器 counter 和一个互斥锁 mutex。在 increment 函数中,每次对 counter 进行累加操作前,先通过 pthread_mutex_lock 函数获取互斥锁,操作完成后通过 pthread_mutex_unlock 函数释放互斥锁。这样就保证了在同一时间只有一个线程能够修改 counter,避免了竞态条件。

复杂数据结构的保护

在实际应用中,我们经常需要保护更复杂的数据结构,比如链表。假设有一个链表,多个线程可能同时对其进行插入和删除操作,如果不进行同步控制,链表可能会出现数据损坏。

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

// 定义链表节点
typedef struct Node {
    int data;
    struct Node* next;
} Node;

// 链表头节点
Node* head = NULL;
// 定义互斥锁
pthread_mutex_t mutex;

// 创建新节点
Node* createNode(int data) {
    Node* newNode = (Node*)malloc(sizeof(Node));
    newNode->data = data;
    newNode->next = NULL;
    return newNode;
}

// 插入节点到链表头部
void insertNode(int data) {
    pthread_mutex_lock(&mutex);
    Node* newNode = createNode(data);
    newNode->next = head;
    head = newNode;
    pthread_mutex_unlock(&mutex);
}

// 删除链表头部节点
void deleteNode() {
    pthread_mutex_lock(&mutex);
    if (head != NULL) {
        Node* temp = head;
        head = head->next;
        free(temp);
    }
    pthread_mutex_unlock(&mutex);
}

// 线程执行函数
void* insertThread(void* arg) {
    for (int i = 0; i < 1000; i++) {
        insertNode(i);
    }
    return NULL;
}

void* deleteThread(void* arg) {
    for (int i = 0; i < 500; i++) {
        deleteNode();
    }
    return NULL;
}

int main() {
    // 初始化互斥锁
    pthread_mutex_init(&mutex, NULL);

    pthread_t insertThreads[5];
    pthread_t deleteThreads[5];

    for (int i = 0; i < 5; i++) {
        // 创建插入线程
        pthread_create(&insertThreads[i], NULL, insertThread, NULL);
        // 创建删除线程
        pthread_create(&deleteThreads[i], NULL, deleteThread, NULL);
    }

    for (int i = 0; i < 5; i++) {
        // 等待插入线程执行完毕
        pthread_join(insertThreads[i], NULL);
        // 等待删除线程执行完毕
        pthread_join(deleteThreads[i], NULL);
    }

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);

    // 打印链表剩余节点
    Node* current = head;
    while (current != NULL) {
        printf("%d -> ", current->data);
        current = current->next;
    }
    printf("NULL\n");

    return 0;
}

在这个链表操作的示例中,无论是插入节点还是删除节点的操作,都通过互斥锁进行保护。这样就确保了在多线程环境下,链表的操作是安全的,不会出现数据损坏的情况。

信号量在多线程环境中的应用实例

有限资源访问控制

假设我们有一个系统,其中有 5 个打印机,多个线程可能需要使用打印机进行打印任务。由于打印机数量有限,我们可以使用信号量来控制同时使用打印机的线程数量。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

// 定义信号量,初始值为 5,表示有 5 个可用打印机
sem_t printerSemaphore;

// 线程执行函数
void* printTask(void* arg) {
    int taskId = *((int*)arg);
    // 获取信号量
    sem_wait(&printerSemaphore);
    printf("Task %d is using a printer.\n", taskId);
    // 模拟打印任务执行时间
    sleep(1);
    printf("Task %d has finished printing.\n", taskId);
    // 释放信号量
    sem_post(&printerSemaphore);
    return NULL;
}

int main() {
    // 初始化信号量
    sem_init(&printerSemaphore, 0, 5);

    pthread_t threads[10];
    int taskIds[10];
    for (int i = 0; i < 10; i++) {
        taskIds[i] = i;
        // 创建 10 个线程
        pthread_create(&threads[i], NULL, printTask, &taskIds[i]);
    }

    for (int i = 0; i < 10; i++) {
        // 等待所有线程执行完毕
        pthread_join(threads[i], NULL);
    }

    // 销毁信号量
    sem_destroy(&printerSemaphore);

    return 0;
}

在上述代码中,我们定义了一个信号量 printerSemaphore,初始值为 5。每个线程在执行打印任务前,通过 sem_wait 函数获取信号量,如果此时信号量的值大于 0,则获取成功,信号量的值减 1,线程可以使用打印机;如果信号量的值为 0,则线程会阻塞等待,直到有其他线程释放信号量。当线程完成打印任务后,通过 sem_post 函数释放信号量,信号量的值加 1,其他等待的线程就有机会获取信号量并使用打印机。

生产者 - 消费者模型中的缓冲区控制

在生产者 - 消费者模型中,我们通常有一个缓冲区来存储生产者生产的数据,消费者从缓冲区中取出数据。为了控制缓冲区的使用,我们可以使用信号量。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

#define BUFFER_SIZE 5

// 缓冲区
int buffer[BUFFER_SIZE];
// 缓冲区索引
int in = 0;
int out = 0;

// 定义信号量,用于控制缓冲区的空闲槽位
sem_t empty;
// 定义信号量,用于控制缓冲区中的数据项
sem_t full;

// 生产者线程执行函数
void* producer(void* arg) {
    int item = 1;
    while (1) {
        // 等待缓冲区有空闲槽位
        sem_wait(&empty);
        buffer[in] = item;
        printf("Produced: %d\n", item);
        in = (in + 1) % BUFFER_SIZE;
        item++;
        // 通知缓冲区有新的数据项
        sem_post(&full);
    }
    return NULL;
}

// 消费者线程执行函数
void* consumer(void* arg) {
    while (1) {
        // 等待缓冲区有数据项
        sem_wait(&full);
        int item = buffer[out];
        printf("Consumed: %d\n", item);
        out = (out + 1) % BUFFER_SIZE;
        // 通知缓冲区有空闲槽位
        sem_post(&empty);
    }
    return NULL;
}

int main() {
    // 初始化空闲槽位信号量,初始值为缓冲区大小
    sem_init(&empty, 0, BUFFER_SIZE);
    // 初始化数据项信号量,初始值为 0
    sem_init(&full, 0, 0);

    pthread_t producerThread, consumerThread;

    // 创建生产者线程
    pthread_create(&producerThread, NULL, producer, NULL);
    // 创建消费者线程
    pthread_create(&consumerThread, NULL, consumer, NULL);

    // 等待生产者线程和消费者线程结束(这里实际上不会结束,因为是无限循环)
    pthread_join(producerThread, NULL);
    pthread_join(consumerThread, NULL);

    // 销毁信号量
    sem_destroy(&empty);
    sem_destroy(&full);

    return 0;
}

在这个生产者 - 消费者模型的示例中,我们使用了两个信号量:empty 用于表示缓冲区中的空闲槽位数量,full 用于表示缓冲区中的数据项数量。生产者线程在生产数据前先获取 empty 信号量,确保缓冲区有空闲槽位,生产完数据后释放 full 信号量,表示缓冲区有新的数据项。消费者线程在消费数据前先获取 full 信号量,确保缓冲区有数据项,消费完数据后释放 empty 信号量,表示缓冲区有空闲槽位。这样通过信号量的协调,实现了生产者和消费者之间对缓冲区的正确访问。

条件变量在多线程环境中的应用实例

基于条件变量的复杂同步

回到生产者 - 消费者模型,假设我们有一个需求:当缓冲区为空时,消费者线程进入等待状态,直到生产者线程生产出数据并通知消费者线程。这时候就可以使用条件变量来实现这种复杂的同步。

#include <stdio.h>
#include <pthread.h>

#define BUFFER_SIZE 5

// 缓冲区
int buffer[BUFFER_SIZE];
// 缓冲区索引
int in = 0;
int out = 0;
// 缓冲区是否有数据的标志
int hasData = 0;

// 定义互斥锁
pthread_mutex_t mutex;
// 定义条件变量
pthread_cond_t cond;

// 生产者线程执行函数
void* producer(void* arg) {
    int item = 1;
    while (1) {
        // 加锁
        pthread_mutex_lock(&mutex);
        buffer[in] = item;
        printf("Produced: %d\n", item);
        in = (in + 1) % BUFFER_SIZE;
        item++;
        hasData = 1;
        // 通知等待在条件变量上的线程
        pthread_cond_signal(&cond);
        // 解锁
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

// 消费者线程执行函数
void* consumer(void* arg) {
    while (1) {
        // 加锁
        pthread_mutex_lock(&mutex);
        while (!hasData) {
            // 等待条件变量,同时释放互斥锁,当被唤醒时重新获取互斥锁
            pthread_cond_wait(&cond, &mutex);
        }
        int item = buffer[out];
        printf("Consumed: %d\n", item);
        out = (out + 1) % BUFFER_SIZE;
        if (in == out) {
            hasData = 0;
        }
        // 解锁
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main() {
    // 初始化互斥锁
    pthread_mutex_init(&mutex, NULL);
    // 初始化条件变量
    pthread_cond_init(&cond, NULL);

    pthread_t producerThread, consumerThread;

    // 创建生产者线程
    pthread_create(&producerThread, NULL, producer, NULL);
    // 创建消费者线程
    pthread_create(&consumerThread, NULL, consumer, NULL);

    // 等待生产者线程和消费者线程结束(这里实际上不会结束,因为是无限循环)
    pthread_join(producerThread, NULL);
    pthread_join(consumerThread, NULL);

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);
    // 销毁条件变量
    pthread_cond_destroy(&cond);

    return 0;
}

在上述代码中,我们定义了一个互斥锁 mutex 和一个条件变量 cond。在生产者线程中,当生产出数据后,通过 pthread_cond_signal 函数通知等待在条件变量上的消费者线程。在消费者线程中,当发现缓冲区没有数据时,通过 pthread_cond_wait 函数等待条件变量,同时会自动释放互斥锁,当被唤醒时会重新获取互斥锁。这样就实现了基于条件变量的复杂同步机制。

线程协作完成任务

假设有一个任务,需要多个线程协作完成。例如,主线程需要等待所有子线程完成一些初始化操作后才能继续执行后续任务。我们可以使用条件变量来实现这种线程间的协作。

#include <stdio.h>
#include <pthread.h>

#define THREAD_COUNT 5

// 定义互斥锁
pthread_mutex_t mutex;
// 定义条件变量
pthread_cond_t cond;
// 子线程完成标志
int allThreadsReady = 0;

// 子线程执行函数
void* worker(void* arg) {
    int threadId = *((int*)arg);
    printf("Thread %d is initializing...\n", threadId);
    // 模拟初始化操作
    sleep(1);
    printf("Thread %d has finished initializing.\n", threadId);

    // 加锁
    pthread_mutex_lock(&mutex);
    if (--THREAD_COUNT == 0) {
        allThreadsReady = 1;
        // 通知主线程
        pthread_cond_signal(&cond);
    }
    // 解锁
    pthread_mutex_unlock(&mutex);

    return NULL;
}

int main() {
    // 初始化互斥锁
    pthread_mutex_init(&mutex, NULL);
    // 初始化条件变量
    pthread_cond_init(&cond, NULL);

    pthread_t threads[THREAD_COUNT];
    int threadIds[THREAD_COUNT];
    for (int i = 0; i < THREAD_COUNT; i++) {
        threadIds[i] = i;
        // 创建子线程
        pthread_create(&threads[i], NULL, worker, &threadIds[i]);
    }

    // 加锁
    pthread_mutex_lock(&mutex);
    while (!allThreadsReady) {
        // 等待条件变量,同时释放互斥锁,当被唤醒时重新获取互斥锁
        pthread_cond_wait(&cond, &mutex);
    }
    printf("All threads are ready. Main thread can continue.\n");
    // 解锁
    pthread_mutex_unlock(&mutex);

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);
    // 销毁条件变量
    pthread_cond_destroy(&cond);

    return 0;
}

在这个示例中,每个子线程在完成初始化操作后,通过互斥锁和条件变量来通知主线程所有子线程都已准备好。主线程在获取到所有子线程准备好的通知后,继续执行后续任务。这展示了条件变量在实现线程协作方面的强大功能。

读写锁在多线程环境中的应用实例

数据库查询与更新操作

假设我们有一个简单的数据库系统,其中有多个线程可能进行查询操作(读操作)和更新操作(写操作)。为了提高并发性能,我们可以使用读写锁来区分读操作和写操作。

#include <stdio.h>
#include <pthread.h>

// 模拟数据库数据
int databaseData = 0;

// 定义读写锁
pthread_rwlock_t rwlock;

// 读线程执行函数
void* reader(void* arg) {
    int threadId = *((int*)arg);
    // 获取读锁
    pthread_rwlock_rdlock(&rwlock);
    printf("Thread %d is reading data: %d\n", threadId, databaseData);
    // 模拟读操作时间
    sleep(1);
    // 释放读锁
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

// 写线程执行函数
void* writer(void* arg) {
    int threadId = *((int*)arg);
    // 获取写锁
    pthread_rwlock_wrlock(&rwlock);
    databaseData++;
    printf("Thread %d is writing data: %d\n", threadId, databaseData);
    // 模拟写操作时间
    sleep(1);
    // 释放写锁
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

int main() {
    // 初始化读写锁
    pthread_rwlock_init(&rwlock, NULL);

    pthread_t readers[10];
    pthread_t writers[5];
    int readerIds[10];
    int writerIds[5];

    for (int i = 0; i < 10; i++) {
        readerIds[i] = i;
        // 创建读线程
        pthread_create(&readers[i], NULL, reader, &readerIds[i]);
    }

    for (int i = 0; i < 5; i++) {
        writerIds[i] = i;
        // 创建写线程
        pthread_create(&writers[i], NULL, writer, &writerIds[i]);
    }

    for (int i = 0; i < 10; i++) {
        // 等待读线程执行完毕
        pthread_join(readers[i], NULL);
    }

    for (int i = 0; i < 5; i++) {
        // 等待写线程执行完毕
        pthread_join(writers[i], NULL);
    }

    // 销毁读写锁
    pthread_rwlock_destroy(&rwlock);

    return 0;
}

在上述代码中,我们定义了一个读写锁 rwlock。读线程通过 pthread_rwlock_rdlock 函数获取读锁,允许多个读线程同时读取数据;写线程通过 pthread_rwlock_wrlock 函数获取写锁,在写操作时独占资源,其他读线程和写线程都不能访问,确保了数据的一致性。

文件读写操作

假设有一个日志文件,多个线程可能对其进行读取操作(例如查看历史日志)和写入操作(例如记录新的日志)。同样可以使用读写锁来提高并发性能。

#include <stdio.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>

// 日志文件描述符
int logFile;

// 定义读写锁
pthread_rwlock_t rwlock;

// 读线程执行函数
void* readLog(void* arg) {
    int threadId = *((int*)arg);
    // 获取读锁
    pthread_rwlock_rdlock(&rwlock);
    char buffer[1024];
    lseek(logFile, 0, SEEK_SET);
    ssize_t bytesRead = read(logFile, buffer, sizeof(buffer));
    if (bytesRead > 0) {
        buffer[bytesRead] = '\0';
        printf("Thread %d is reading log:\n%s\n", threadId, buffer);
    }
    // 释放读锁
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

// 写线程执行函数
void* writeLog(void* arg) {
    int threadId = *((int*)arg);
    // 获取写锁
    pthread_rwlock_wrlock(&rwlock);
    char logMessage[100];
    sprintf(logMessage, "Log message from thread %d\n", threadId);
    write(logFile, logMessage, strlen(logMessage));
    // 释放写锁
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

int main() {
    // 打开日志文件
    logFile = open("log.txt", O_CREAT | O_RDWR, 0666);
    if (logFile < 0) {
        perror("Open log file failed");
        return 1;
    }

    // 初始化读写锁
    pthread_rwlock_init(&rwlock, NULL);

    pthread_t readers[10];
    pthread_t writers[5];
    int readerIds[10];
    int writerIds[5];

    for (int i = 0; i < 10; i++) {
        readerIds[i] = i;
        // 创建读线程
        pthread_create(&readers[i], NULL, readLog, &readerIds[i]);
    }

    for (int i = 0; i < 5; i++) {
        writerIds[i] = i;
        // 创建写线程
        pthread_create(&writers[i], NULL, writeLog, &writerIds[i]);
    }

    for (int i = 0; i < 10; i++) {
        // 等待读线程执行完毕
        pthread_join(readers[i], NULL);
    }

    for (int i = 0; i < 5; i++) {
        // 等待写线程执行完毕
        pthread_join(writers[i], NULL);
    }

    // 关闭日志文件
    close(logFile);
    // 销毁读写锁
    pthread_rwlock_destroy(&rwlock);

    return 0;
}

在这个文件读写操作的示例中,通过读写锁控制对日志文件的访问。读线程可以并发读取文件内容,而写线程在写入时独占文件,保证了文件数据的一致性和读写操作的高效性。

通过以上各种同步原语在不同应用场景下的实例,我们可以看到同步原语在多线程编程中对于保证程序正确性和提高并发性能的重要性。在实际的多线程项目开发中,需要根据具体的需求和场景选择合适的同步原语,并合理地使用它们,以构建出高效、稳定的多线程应用程序。同时,还需要注意避免死锁、饥饿等问题,这需要对同步原语的使用有深入的理解和丰富的实践经验。