MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言线程池模型的任务优先级设定

2023-10-035.3k 阅读

线程池模型基础概述

线程池概念

在 Linux C 语言开发中,线程池是一种多线程处理形式,它预先创建一定数量的线程并将其放入池中。这些线程处于等待任务的状态,当有新任务到来时,线程池分配一个线程来处理该任务。任务完成后,线程并不会被销毁,而是返回线程池中等待下一个任务。这种机制避免了频繁创建和销毁线程带来的开销,提高了系统资源的利用率和程序的性能。

线程池工作原理

线程池的工作过程大致如下:首先,初始化线程池,创建一定数量的线程并将它们加入线程池中。然后,任务被提交到任务队列中。线程池中的线程不断从任务队列中取出任务并执行。当任务队列为空时,线程会进入等待状态,直到有新任务加入。当线程池需要销毁时,会通知所有线程停止工作,等待所有线程执行完当前任务后,销毁线程池。

任务优先级设定的需求与意义

为何需要任务优先级

在实际应用场景中,不同的任务对系统资源的需求和执行的紧急程度可能不同。例如,在一个网络服务器程序中,处理实时通信的任务可能比处理日志记录的任务更加紧急。如果所有任务都按照先来先服务的原则执行,可能会导致重要任务得不到及时处理,影响系统的整体性能和响应速度。因此,为任务设定优先级可以使线程池根据任务的重要性和紧急程度合理分配资源,优先处理关键任务。

优先级设定带来的优势

  1. 提高系统响应性:高优先级任务能够优先得到处理,减少其等待时间,使系统能够更快地对关键事件做出响应。例如,在一个监控系统中,处理报警信息的任务优先级较高,及时处理这些任务可以让管理人员第一时间了解到异常情况。
  2. 优化资源分配:根据任务优先级合理分配线程资源,避免低优先级任务占用过多资源,导致高优先级任务无法执行。这有助于提高系统资源的整体利用率,使得各个任务都能在合适的时间得到处理。
  3. 提升用户体验:对于交互式应用程序,用户输入相关的任务通常具有较高优先级。优先处理这些任务可以让用户感受到程序的流畅性和响应性,提升用户体验。

实现任务优先级设定的方法

基于任务队列的优先级实现

  1. 优先级队列数据结构 在实现任务优先级时,我们可以使用优先级队列作为任务队列。优先级队列是一种特殊的队列,其中每个元素都有一个优先级。在队列中,高优先级的元素排在低优先级元素之前。常见的优先级队列实现方式有堆(如二叉堆)和平衡二叉搜索树(如红黑树)。在 Linux C 语言中,我们可以使用堆来实现优先级队列,因为堆的插入和删除操作时间复杂度较低,适合频繁的任务添加和取出操作。

  2. 堆的基本操作

    • 插入操作:当有新任务加入时,将任务插入到堆的末尾,然后通过向上调整堆的操作,将新插入的任务调整到合适的位置,以维护堆的性质(对于最大堆,父节点的值大于子节点的值;对于最小堆,父节点的值小于子节点的值)。假设我们使用最大堆来实现任务优先级队列,任务的优先级作为堆节点的值。下面是插入操作的代码示例:
// 定义任务结构体
typedef struct Task {
    void (*func)(void*);
    void *arg;
    int priority;
} Task;

// 定义堆结构体
typedef struct Heap {
    Task *tasks;
    int capacity;
    int size;
} Heap;

// 初始化堆
Heap* createHeap(int capacity) {
    Heap *heap = (Heap*)malloc(sizeof(Heap));
    heap->tasks = (Task*)malloc(capacity * sizeof(Task));
    heap->capacity = capacity;
    heap->size = 0;
    return heap;
}

// 向上调整堆
void heapifyUp(Heap *heap, int index) {
    int parent = (index - 1) / 2;
    while (index > 0 && heap->tasks[parent].priority < heap->tasks[index].priority) {
        Task temp = heap->tasks[parent];
        heap->tasks[parent] = heap->tasks[index];
        heap->tasks[index] = temp;
        index = parent;
        parent = (index - 1) / 2;
    }
}

// 插入任务到堆
void insertTask(Heap *heap, Task task) {
    if (heap->size == heap->capacity) {
        // 堆已满,可考虑扩展堆
        return;
    }
    heap->tasks[heap->size] = task;
    heapifyUp(heap, heap->size);
    heap->size++;
}
- **删除操作**:当线程从任务队列中取出任务时,需要删除堆顶元素(即优先级最高的任务)。删除堆顶元素后,将堆的最后一个元素移动到堆顶,然后通过向下调整堆的操作,重新维护堆的性质。以下是删除操作的代码示例:
// 向下调整堆
void heapifyDown(Heap *heap, int index) {
    int left = 2 * index + 1;
    int right = 2 * index + 2;
    int largest = index;
    if (left < heap->size && heap->tasks[left].priority > heap->tasks[largest].priority) {
        largest = left;
    }
    if (right < heap->size && heap->tasks[right].priority > heap->tasks[largest].priority) {
        largest = right;
    }
    if (largest != index) {
        Task temp = heap->tasks[index];
        heap->tasks[index] = heap->tasks[largest];
        heap->tasks[largest] = temp;
        heapifyDown(heap, largest);
    }
}

// 删除堆顶任务
Task removeTask(Heap *heap) {
    if (heap->size == 0) {
        // 堆为空
        Task emptyTask = {NULL, NULL, -1};
        return emptyTask;
    }
    Task task = heap->tasks[0];
    heap->tasks[0] = heap->tasks[heap->size - 1];
    heap->size--;
    heapifyDown(heap, 0);
    return task;
}

基于线程调度策略的优先级实现

  1. Linux 线程调度策略 在 Linux 系统中,线程的调度策略有多种,包括 SCHED_OTHER(普通调度策略)、SCHED_FIFO(先进先出调度策略)和 SCHED_RR(时间片轮转调度策略)。我们可以利用这些调度策略来实现任务的优先级。对于高优先级任务,可以将执行该任务的线程设置为较高的调度优先级,使用 SCHED_FIFO 或 SCHED_RR 调度策略,使线程能够优先获得 CPU 资源。

  2. 设置线程调度优先级示例

#include <pthread.h>
#include <sched.h>
#include <stdio.h>

void* taskFunction(void* arg) {
    // 获取当前线程句柄
    pthread_t self = pthread_self();
    struct sched_param param;
    // 获取当前线程的调度参数
    pthread_getschedparam(self, NULL, &param);
    // 设置较高的调度优先级
    param.sched_priority = sched_get_priority_max(SCHED_FIFO);
    // 设置线程调度策略和参数
    pthread_setschedparam(self, SCHED_FIFO, &param);
    // 任务执行代码
    printf("Task is running with priority %d\n", param.sched_priority);
    return NULL;
}

int main() {
    pthread_t thread;
    // 创建线程
    pthread_create(&thread, NULL, taskFunction, NULL);
    // 等待线程结束
    pthread_join(thread, NULL);
    return 0;
}

在上述代码中,通过 pthread_getschedparam 获取当前线程的调度参数,然后设置较高的调度优先级 sched_get_priority_max(SCHED_FIFO),并使用 pthread_setschedparam 设置线程的调度策略为 SCHED_FIFO。这样,该线程在执行任务时就会具有较高的优先级,优先获得 CPU 资源。

完整的线程池模型中任务优先级设定实现

线程池结构体定义

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// 定义任务结构体
typedef struct Task {
    void (*func)(void*);
    void *arg;
    int priority;
} Task;

// 定义堆结构体
typedef struct Heap {
    Task *tasks;
    int capacity;
    int size;
} Heap;

// 线程池结构体
typedef struct ThreadPool {
    Heap *taskQueue;
    pthread_t *threads;
    int numThreads;
    int stop;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
} ThreadPool;

在上述代码中,Task 结构体定义了任务的函数指针 func、参数 arg 和优先级 priorityHeap 结构体用于实现任务优先级队列,ThreadPool 结构体包含任务队列 taskQueue、线程数组 threads、线程数量 numThreads、停止标志 stop,以及用于线程同步的互斥锁 mutex 和条件变量 cond

线程池初始化

// 初始化堆
Heap* createHeap(int capacity) {
    Heap *heap = (Heap*)malloc(sizeof(Heap));
    heap->tasks = (Task*)malloc(capacity * sizeof(Task));
    heap->capacity = capacity;
    heap->size = 0;
    return heap;
}

// 向上调整堆
void heapifyUp(Heap *heap, int index) {
    int parent = (index - 1) / 2;
    while (index > 0 && heap->tasks[parent].priority < heap->tasks[index].priority) {
        Task temp = heap->tasks[parent];
        heap->tasks[parent] = heap->tasks[index];
        heap->tasks[index] = temp;
        index = parent;
        parent = (index - 1) / 2;
    }
}

// 插入任务到堆
void insertTask(Heap *heap, Task task) {
    if (heap->size == heap->capacity) {
        // 堆已满,可考虑扩展堆
        return;
    }
    heap->tasks[heap->size] = task;
    heapifyUp(heap, heap->size);
    heap->size++;
}

// 向下调整堆
void heapifyDown(Heap *heap, int index) {
    int left = 2 * index + 1;
    int right = 2 * index + 2;
    int largest = index;
    if (left < heap->size && heap->tasks[left].priority > heap->tasks[largest].priority) {
        largest = left;
    }
    if (right < heap->size && heap->tasks[right].priority > heap->tasks[largest].priority) {
        largest = right;
    }
    if (largest != index) {
        Task temp = heap->tasks[index];
        heap->tasks[index] = heap->tasks[largest];
        heap->tasks[largest] = temp;
        heapifyDown(heap, largest);
    }
}

// 删除堆顶任务
Task removeTask(Heap *heap) {
    if (heap->size == 0) {
        // 堆为空
        Task emptyTask = {NULL, NULL, -1};
        return emptyTask;
    }
    Task task = heap->tasks[0];
    heap->tasks[0] = heap->tasks[heap->size - 1];
    heap->size--;
    heapifyDown(heap, 0);
    return task;
}

// 初始化线程池
ThreadPool* createThreadPool(int numThreads, int queueCapacity) {
    ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    pool->taskQueue = createHeap(queueCapacity);
    pool->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
    pool->numThreads = numThreads;
    pool->stop = 0;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);

    for (int i = 0; i < numThreads; i++) {
        pthread_create(&pool->threads[i], NULL, workerThread, pool);
    }
    return pool;
}

createThreadPool 函数中,首先初始化任务队列(即堆)和线程数组,设置线程数量和停止标志。然后初始化互斥锁和条件变量,并创建指定数量的线程,每个线程执行 workerThread 函数。

线程工作函数

void* workerThread(void* arg) {
    ThreadPool *pool = (ThreadPool*)arg;
    while (1) {
        Task task;
        pthread_mutex_lock(&pool->mutex);
        while (pool->taskQueue->size == 0 &&!pool->stop) {
            pthread_cond_wait(&pool->cond, &pool->mutex);
        }
        if (pool->stop) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        task = removeTask(pool->taskQueue);
        pthread_mutex_unlock(&pool->mutex);

        // 执行任务
        if (task.func != NULL) {
            task.func(task.arg);
        }
    }
    return NULL;
}

workerThread 函数是线程池中的线程执行的函数。线程首先获取互斥锁,然后检查任务队列是否为空且线程池是否停止。如果任务队列为空且线程池未停止,则线程进入等待状态,直到有新任务加入或线程池停止。当有任务时,线程从任务队列中取出任务,释放互斥锁并执行任务。

提交任务到线程池

// 提交任务到线程池
void submitTask(ThreadPool *pool, void (*func)(void*), void *arg, int priority) {
    Task task = {func, arg, priority};
    pthread_mutex_lock(&pool->mutex);
    insertTask(pool->taskQueue, task);
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
}

submitTask 函数用于将任务提交到线程池。首先创建任务结构体,然后获取互斥锁,将任务插入任务队列(堆),并通过条件变量通知等待的线程有新任务到来,最后释放互斥锁。

销毁线程池

// 销毁线程池
void destroyThreadPool(ThreadPool *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->stop = 1;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);

    for (int i = 0; i < pool->numThreads; i++) {
        pthread_join(pool->threads[i], NULL);
    }

    free(pool->threads);
    free(pool->taskQueue->tasks);
    free(pool->taskQueue);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
    free(pool);
}

destroyThreadPool 函数用于销毁线程池。首先设置停止标志,并通过条件变量通知所有线程停止工作。然后等待所有线程执行完当前任务后,释放线程数组、任务队列以及互斥锁和条件变量占用的资源,最后释放线程池结构体。

示例任务函数

// 示例任务函数
void exampleTask(void* arg) {
    int num = *(int*)arg;
    printf("Task with priority %d is running, number is %d\n", *((int*)arg + 1), num);
}

exampleTask 是一个示例任务函数,它打印出任务的优先级和传入的参数。

主函数测试

int main() {
    ThreadPool *pool = createThreadPool(3, 10);

    int num1 = 10;
    int priority1 = 3;
    int *args1 = (int*)malloc(2 * sizeof(int));
    args1[0] = num1;
    args1[1] = priority1;
    submitTask(pool, exampleTask, args1, priority1);

    int num2 = 20;
    int priority2 = 2;
    int *args2 = (int*)malloc(2 * sizeof(int));
    args2[0] = num2;
    args2[1] = priority2;
    submitTask(pool, exampleTask, args2, priority2);

    int num3 = 30;
    int priority3 = 1;
    int *args3 = (int*)malloc(2 * sizeof(int));
    args3[0] = num3;
    args3[1] = priority3;
    submitTask(pool, exampleTask, args3, priority3);

    sleep(2);
    destroyThreadPool(pool);
    return 0;
}

main 函数中,首先创建一个线程池,包含 3 个线程,任务队列容量为 10。然后提交 3 个任务,每个任务具有不同的优先级。最后,等待 2 秒后销毁线程池。

通过上述完整的代码实现,我们在 Linux C 语言线程池模型中成功实现了任务优先级设定,使线程池能够根据任务的优先级合理分配资源,优先处理重要任务。这种机制在实际应用中能够显著提高系统的性能和响应速度。在实际开发中,可以根据具体需求进一步优化和扩展该线程池模型,例如动态调整线程数量、优化任务队列的性能等,以适应不同场景的要求。同时,需要注意在多线程编程中,线程同步和资源管理的重要性,避免出现死锁和数据竞争等问题。