MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言线程池模型的任务队列管理

2022-06-224.3k 阅读

线程池与任务队列概述

线程池概念

在Linux环境下使用C语言进行多线程编程时,线程池是一种非常重要的技术手段。线程池本质上是一个线程的集合,这些线程在程序启动时被创建并等待执行任务。与每次有任务就创建新线程的方式不同,线程池中的线程可以被重复利用。这大大减少了线程创建和销毁的开销,提高了系统资源的利用率。

例如,在一个高并发的网络服务器应用中,如果每来一个新的客户端连接就创建一个新线程去处理,当连接数较多时,频繁的线程创建和销毁会消耗大量的系统资源,导致系统性能下降。而线程池则可以预先创建一定数量的线程,当有任务时,从线程池中取出一个空闲线程来处理,处理完后该线程又回到线程池中等待下一个任务。

任务队列的作用

任务队列是线程池的关键组成部分。它用于存储等待执行的任务。当有新任务到来时,会被添加到任务队列中。线程池中的线程则不断从任务队列中取出任务并执行。任务队列起到了一个缓冲的作用,将任务的产生和任务的执行解耦开来。

以一个简单的文件处理程序为例,假设有多个文件需要进行某种格式转换的任务。这些任务可以被依次添加到任务队列中,线程池中的线程则逐个从任务队列中取出文件处理任务进行执行。这样,即使任务产生的速度较快,任务队列也能暂时存储这些任务,避免任务丢失,同时线程池中的线程按照一定的节奏从队列中取出任务执行,保证系统的稳定运行。

任务队列数据结构设计

简单链表结构

一种常见的任务队列数据结构是简单链表。在C语言中,我们可以这样定义链表节点:

typedef struct Task {
    void (*func)(void*);
    void *arg;
    struct Task *next;
} Task;

其中func是一个函数指针,指向具体要执行的任务函数,arg是传递给该任务函数的参数,next指针指向下一个任务节点。

任务队列可以定义为:

typedef struct TaskQueue {
    Task *head;
    Task *tail;
} TaskQueue;

head指针指向任务队列的头部,tail指针指向任务队列的尾部。

向任务队列中添加任务的函数可以如下实现:

void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
}

从任务队列中取出任务的函数:

Task* getTask(TaskQueue *queue) {
    if (queue->head == NULL) {
        return NULL;
    }
    Task *task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    return task;
}

双向链表结构

双向链表在某些情况下比简单链表更有优势,特别是在需要频繁删除中间节点或者需要双向遍历任务队列的场景。双向链表节点定义如下:

typedef struct Task {
    void (*func)(void*);
    void *arg;
    struct Task *prev;
    struct Task *next;
} Task;

任务队列定义为:

typedef struct TaskQueue {
    Task *head;
    Task *tail;
} TaskQueue;

添加任务函数:

void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->prev = NULL;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        newTask->prev = queue->tail;
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
}

删除任务函数(假设已知要删除的任务节点):

void removeTask(TaskQueue *queue, Task *task) {
    if (task == queue->head) {
        queue->head = task->next;
        if (queue->head == NULL) {
            queue->tail = NULL;
        } else {
            queue->head->prev = NULL;
        }
    } else if (task == queue->tail) {
        queue->tail = task->prev;
        queue->tail->next = NULL;
    } else {
        task->prev->next = task->next;
        task->next->prev = task->prev;
    }
    free(task);
}

环形队列结构

环形队列在任务队列管理中也有应用,尤其是在对任务队列长度有固定限制且需要高效利用内存空间的场景。环形队列通常使用数组来实现。假设我们定义一个最大长度为MAX_TASKS的环形队列:

#define MAX_TASKS 100

typedef struct Task {
    void (*func)(void*);
    void *arg;
} Task;

typedef struct TaskQueue {
    Task tasks[MAX_TASKS];
    int head;
    int tail;
    int count;
} TaskQueue;

head表示队列头部位置,tail表示队列尾部位置,count表示队列中当前任务的数量。

添加任务函数:

int addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    if (queue->count == MAX_TASKS) {
        return -1; // 队列已满
    }
    queue->tasks[queue->tail].func = func;
    queue->tasks[queue->tail].arg = arg;
    queue->tail = (queue->tail + 1) % MAX_TASKS;
    queue->count++;
    return 0;
}

取出任务函数:

Task* getTask(TaskQueue *queue) {
    if (queue->count == 0) {
        return NULL;
    }
    Task *task = &(queue->tasks[queue->head]);
    queue->head = (queue->head + 1) % MAX_TASKS;
    queue->count--;
    return task;
}

任务队列的线程安全问题

竞态条件分析

在多线程环境下,任务队列的操作很容易出现竞态条件。例如,当多个线程同时尝试向任务队列中添加任务或者从任务队列中取出任务时,如果没有适当的同步机制,可能会导致数据不一致。

假设我们有一个简单链表结构的任务队列,两个线程同时执行addTask函数。线程A执行到newTask->next = queue->tail->next;语句,还未执行queue->tail->next = newTask;时,线程B也开始执行addTask函数。线程B同样执行到newTask->next = queue->tail->next;语句,此时queue->tail->next的值是线程A执行前的值,而不是线程A修改后的值。这就导致任务队列的链表结构出现错误,可能会丢失任务或者产生其他未定义行为。

互斥锁的使用

为了解决竞态条件问题,我们可以使用互斥锁(Mutex)。在POSIX线程库中,互斥锁相关的操作函数有pthread_mutex_initpthread_mutex_lockpthread_mutex_unlockpthread_mutex_destroy

首先,我们需要在任务队列结构体中添加一个互斥锁成员:

typedef struct TaskQueue {
    Task *head;
    Task *tail;
    pthread_mutex_t mutex;
} TaskQueue;

在初始化任务队列时,要初始化互斥锁:

void initTaskQueue(TaskQueue *queue) {
    queue->head = NULL;
    queue->tail = NULL;
    pthread_mutex_init(&queue->mutex, NULL);
}

在添加任务和取出任务函数中,需要在操作任务队列前后加锁和解锁:

void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    pthread_mutex_lock(&queue->mutex);
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
    pthread_mutex_unlock(&queue->mutex);
}

Task* getTask(TaskQueue *queue) {
    pthread_mutex_lock(&queue->mutex);
    if (queue->head == NULL) {
        pthread_mutex_unlock(&queue->mutex);
        return NULL;
    }
    Task *task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    pthread_mutex_unlock(&queue->mutex);
    return task;
}

读写锁的应用

当任务队列的读操作(如取出任务)远多于写操作(如添加任务)时,使用读写锁(Read - Write Lock)可以提高性能。读写锁允许多个线程同时进行读操作,但只允许一个线程进行写操作。

在POSIX线程库中,读写锁相关的函数有pthread_rwlock_initpthread_rwlock_rdlockpthread_rwlock_wrlockpthread_rwlock_unlockpthread_rwlock_destroy

我们修改任务队列结构体,添加读写锁成员:

typedef struct TaskQueue {
    Task *head;
    Task *tail;
    pthread_rwlock_t rwlock;
} TaskQueue;

初始化任务队列时初始化读写锁:

void initTaskQueue(TaskQueue *queue) {
    queue->head = NULL;
    queue->tail = NULL;
    pthread_rwlock_init(&queue->rwlock, NULL);
}

添加任务函数(写操作):

void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    pthread_rwlock_wrlock(&queue->rwlock);
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
    pthread_rwlock_unlock(&queue->rwlock);
}

取出任务函数(读操作):

Task* getTask(TaskQueue *queue) {
    pthread_rwlock_rdlock(&queue->rwlock);
    if (queue->head == NULL) {
        pthread_rwlock_unlock(&queue->rwlock);
        return NULL;
    }
    Task *task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    pthread_rwlock_unlock(&queue->rwlock);
    return task;
}

任务队列的条件变量应用

条件变量的作用

在任务队列管理中,条件变量(Condition Variable)用于线程间的同步。当任务队列中没有任务时,线程池中的线程可以等待在条件变量上,直到有新任务被添加到任务队列中。这样可以避免线程在没有任务时进行无效的循环检查,从而节省CPU资源。

条件变量的使用示例

我们在任务队列结构体中添加条件变量成员:

typedef struct TaskQueue {
    Task *head;
    Task *tail;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
} TaskQueue;

初始化任务队列时初始化条件变量和互斥锁:

void initTaskQueue(TaskQueue *queue) {
    queue->head = NULL;
    queue->tail = NULL;
    pthread_mutex_init(&queue->mutex, NULL);
    pthread_cond_init(&queue->cond, NULL);
}

添加任务函数:

void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    pthread_mutex_lock(&queue->mutex);
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
    pthread_cond_signal(&queue->cond);
    pthread_mutex_unlock(&queue->mutex);
}

线程从任务队列中取出任务函数:

Task* getTask(TaskQueue *queue) {
    pthread_mutex_lock(&queue->mutex);
    while (queue->head == NULL) {
        pthread_cond_wait(&queue->cond, &queue->mutex);
    }
    Task *task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    pthread_mutex_unlock(&queue->mutex);
    return task;
}

在上述代码中,pthread_cond_wait函数会使线程等待在条件变量cond上,并释放互斥锁mutex。当有新任务添加到任务队列中时,pthread_cond_signal函数会唤醒等待在条件变量上的一个线程,被唤醒的线程会重新获取互斥锁,然后继续执行从任务队列中取出任务的操作。

任务队列的动态调整

任务队列长度的监控

为了保证线程池和任务队列的高效运行,我们需要监控任务队列的长度。以简单链表结构的任务队列为例,我们可以添加一个函数来获取当前任务队列的长度:

int getTaskQueueLength(TaskQueue *queue) {
    pthread_mutex_lock(&queue->mutex);
    int length = 0;
    Task *current = queue->head;
    while (current != NULL) {
        length++;
        current = current->next;
    }
    pthread_mutex_unlock(&queue->mutex);
    return length;
}

根据任务队列长度调整线程池

当任务队列长度超过一定阈值时,我们可以考虑增加线程池中的线程数量,以加快任务的处理速度。例如:

#define MAX_QUEUE_LENGTH 50
#define MAX_THREADS 100

void adjustThreadPool(TaskQueue *queue, pthread_t *threads, int *threadCount) {
    int length = getTaskQueueLength(queue);
    if (length > MAX_QUEUE_LENGTH && *threadCount < MAX_THREADS) {
        pthread_t newThread;
        if (pthread_create(&newThread, NULL, workerThread, (void *)queue) == 0) {
            threads[*threadCount] = newThread;
            (*threadCount)++;
        }
    }
}

上述代码中,当任务队列长度超过MAX_QUEUE_LENGTH且当前线程池中的线程数量小于MAX_THREADS时,会创建一个新线程并添加到线程池中。

当任务队列长度长时间处于较低水平且线程池中的线程数量较多时,我们可以考虑减少线程池中的线程数量,以节省系统资源:

#define MIN_QUEUE_LENGTH 10

void adjustThreadPool(TaskQueue *queue, pthread_t *threads, int *threadCount) {
    int length = getTaskQueueLength(queue);
    if (length < MIN_QUEUE_LENGTH && *threadCount > 1) {
        // 选择一个线程让其退出
        pthread_cancel(threads[*threadCount - 1]);
        (*threadCount)--;
    }
}

在实际应用中,还需要更复杂的逻辑来处理线程的优雅退出,比如使用pthread_setcancelstatepthread_setcanceltype函数来设置线程的取消状态和类型,确保线程在合适的时机安全退出。

完整线程池与任务队列示例代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

// 定义任务结构体
typedef struct Task {
    void (*func)(void*);
    void *arg;
    struct Task *next;
} Task;

// 定义任务队列结构体
typedef struct TaskQueue {
    Task *head;
    Task *tail;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
} TaskQueue;

// 定义线程池结构体
typedef struct ThreadPool {
    pthread_t *threads;
    TaskQueue queue;
    int threadCount;
    int isShutdown;
} ThreadPool;

// 初始化任务队列
void initTaskQueue(TaskQueue *queue) {
    queue->head = NULL;
    queue->tail = NULL;
    pthread_mutex_init(&queue->mutex, NULL);
    pthread_cond_init(&queue->cond, NULL);
}

// 添加任务到任务队列
void addTask(TaskQueue *queue, void (*func)(void*), void *arg) {
    pthread_mutex_lock(&queue->mutex);
    Task *newTask = (Task *)malloc(sizeof(Task));
    newTask->func = func;
    newTask->arg = arg;
    newTask->next = NULL;

    if (queue->tail == NULL) {
        queue->head = newTask;
        queue->tail = newTask;
    } else {
        queue->tail->next = newTask;
        queue->tail = newTask;
    }
    pthread_cond_signal(&queue->cond);
    pthread_mutex_unlock(&queue->mutex);
}

// 从任务队列中取出任务
Task* getTask(TaskQueue *queue) {
    pthread_mutex_lock(&queue->mutex);
    while (queue->head == NULL) {
        pthread_cond_wait(&queue->cond, &queue->mutex);
    }
    Task *task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    pthread_mutex_unlock(&queue->mutex);
    return task;
}

// 线程执行的函数
void* workerThread(void *arg) {
    ThreadPool *pool = (ThreadPool *)arg;
    TaskQueue *queue = &pool->queue;
    while (1) {
        Task *task = getTask(queue);
        if (task == NULL && pool->isShutdown) {
            break;
        }
        task->func(task->arg);
        free(task);
    }
    return NULL;
}

// 初始化线程池
void initThreadPool(ThreadPool *pool, int threadCount) {
    pool->threads = (pthread_t *)malloc(threadCount * sizeof(pthread_t));
    pool->threadCount = threadCount;
    pool->isShutdown = 0;
    initTaskQueue(&pool->queue);
    for (int i = 0; i < threadCount; i++) {
        pthread_create(&pool->threads[i], NULL, workerThread, (void *)pool);
    }
}

// 销毁线程池
void destroyThreadPool(ThreadPool *pool) {
    pool->isShutdown = 1;
    pthread_mutex_lock(&pool->queue.mutex);
    pthread_cond_broadcast(&pool->queue.cond);
    pthread_mutex_unlock(&pool->queue.mutex);
    for (int i = 0; i < pool->threadCount; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    free(pool->threads);
    Task *current = pool->queue.head;
    Task *next;
    while (current != NULL) {
        next = current->next;
        free(current);
        current = next;
    }
    pthread_mutex_destroy(&pool->queue.mutex);
    pthread_cond_destroy(&pool->queue.cond);
}

// 示例任务函数
void exampleTask(void *arg) {
    int num = *((int *)arg);
    printf("Task %d is running\n", num);
}

int main() {
    ThreadPool pool;
    initThreadPool(&pool, 5);

    int tasks[] = {1, 2, 3, 4, 5};
    for (int i = 0; i < 5; i++) {
        addTask(&pool.queue, exampleTask, &tasks[i]);
    }

    sleep(2);
    destroyThreadPool(&pool);
    return 0;
}

上述代码实现了一个简单的线程池和任务队列模型。通过initThreadPool函数初始化线程池,addTask函数向任务队列中添加任务,workerThread函数作为线程执行的函数从任务队列中取出任务并执行,destroyThreadPool函数用于销毁线程池和释放相关资源。在main函数中,创建了一个包含5个线程的线程池,并向任务队列中添加了5个示例任务。

通过以上对Linux C语言线程池模型中任务队列管理的详细介绍,包括任务队列的数据结构设计、线程安全问题处理、条件变量应用、动态调整以及完整示例代码,希望读者能够对任务队列管理有更深入的理解,并能够在实际项目中灵活应用这些技术。