MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言线程池模型的线程空闲处理

2021-07-077.9k 阅读

线程池概述

在多线程编程中,线程池是一种常用的技术,它通过预先创建一定数量的线程,并将这些线程放入一个池中进行管理。当有任务需要处理时,从线程池中取出一个空闲线程来执行任务,任务完成后线程并不销毁,而是返回线程池等待下一个任务。这种方式避免了频繁创建和销毁线程带来的开销,提高了系统的性能和资源利用率。

在 Linux C 语言环境下实现线程池,涉及到多线程编程相关的知识,包括 pthread 库的使用。一个基本的线程池模型通常包含以下几个部分:任务队列,用于存放待处理的任务;线程数组,存放线程池中各个线程;条件变量和互斥锁,用于线程间的同步与通信。

Linux C 语言线程池的基本实现

下面是一个简单的 Linux C 语言线程池基本实现示例代码,以便更好地理解线程池的结构和工作原理。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

// 定义任务结构体
typedef struct task {
    void (*func)(void*);
    void* arg;
    struct task* next;
} task_t;

// 定义线程池结构体
typedef struct thread_pool {
    task_t* head;
    task_t* tail;
    pthread_t* threads;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int max_threads;
    int stop;
} thread_pool_t;

// 创建新任务
task_t* create_task(void (*func)(void*), void* arg) {
    task_t* new_task = (task_t*)malloc(sizeof(task_t));
    new_task->func = func;
    new_task->arg = arg;
    new_task->next = NULL;
    return new_task;
}

// 创建线程池
thread_pool_t* create_thread_pool(int max_threads) {
    thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));
    pool->head = NULL;
    pool->tail = NULL;
    pool->max_threads = max_threads;
    pool->stop = 0;
    pool->threads = (pthread_t*)malloc(max_threads * sizeof(pthread_t));

    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);

    for (int i = 0; i < max_threads; i++) {
        pthread_create(&pool->threads[i], NULL, (void* (*)(void*))worker, (void*)pool);
    }

    return pool;
}

// 工作线程函数
void* worker(void* arg) {
    thread_pool_t* pool = (thread_pool_t*)arg;
    task_t* task;

    while (1) {
        pthread_mutex_lock(&pool->mutex);

        // 等待任务到来或者线程池停止
        while (pool->head == NULL &&!pool->stop) {
            pthread_cond_wait(&pool->cond, &pool->mutex);
        }

        if (pool->stop && pool->head == NULL) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }

        task = pool->head;
        pool->head = task->next;
        if (pool->head == NULL) {
            pool->tail = NULL;
        }
        pthread_mutex_unlock(&pool->mutex);

        // 执行任务
        task->func(task->arg);
        free(task);
    }
}

// 添加任务到线程池
void add_task(thread_pool_t* pool, void (*func)(void*), void* arg) {
    task_t* new_task = create_task(func, arg);

    pthread_mutex_lock(&pool->mutex);
    if (pool->tail == NULL) {
        pool->head = pool->tail = new_task;
    } else {
        pool->tail->next = new_task;
        pool->tail = new_task;
    }
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
}

// 销毁线程池
void destroy_thread_pool(thread_pool_t* pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->stop = 1;
    pthread_mutex_unlock(&pool->mutex);

    pthread_cond_broadcast(&pool->cond);

    for (int i = 0; i < pool->max_threads; i++) {
        pthread_join(pool->threads[i], NULL);
    }

    task_t* current = pool->head;
    task_t* next;
    while (current != NULL) {
        next = current->next;
        free(current);
        current = next;
    }

    free(pool->threads);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
    free(pool);
}

// 示例任务函数
void example_task(void* arg) {
    int num = *((int*)arg);
    printf("Task %d is running in thread %ld\n", num, pthread_self());
}

int main() {
    thread_pool_t* pool = create_thread_pool(3);

    int nums[] = {1, 2, 3, 4, 5};
    for (int i = 0; i < 5; i++) {
        add_task(pool, example_task, &nums[i]);
    }

    sleep(2);
    destroy_thread_pool(pool);
    return 0;
}

在上述代码中,create_task 函数用于创建一个新的任务结构体。create_thread_pool 函数初始化线程池,创建指定数量的线程并启动它们。worker 函数是每个线程执行的函数,它从任务队列中取出任务并执行。add_task 函数用于将新任务添加到任务队列中。destroy_thread_pool 函数用于销毁线程池,等待所有线程完成任务并清理资源。

线程空闲处理的重要性

在实际应用中,线程空闲处理是线程池实现的一个关键部分。当任务队列中没有任务时,线程会进入等待状态,这时候如何高效地管理这些空闲线程就显得尤为重要。如果处理不当,可能会导致以下问题:

  1. 资源浪费:线程虽然处于等待状态,但依然占用系统资源,如内存等。如果长时间没有任务,过多的空闲线程会造成资源的浪费。
  2. 响应延迟:当新任务到来时,如果线程处于不合理的等待状态,可能导致任务不能及时被处理,从而增加系统的响应延迟。

因此,合理的线程空闲处理机制能够提高线程池的性能和资源利用率,使得系统在不同负载情况下都能高效运行。

线程空闲处理策略

简单等待策略

在前面的基本线程池实现代码中,我们已经看到了一种简单的线程空闲处理方式,即使用 pthread_cond_wait 函数让线程在任务队列空时进入等待状态,直到有新任务到来(通过 pthread_cond_signal 或者 pthread_cond_broadcast 唤醒)。这种方式的优点是实现简单,开销较小。然而,它也存在一些缺点,例如,如果长时间没有任务,线程会一直占用资源处于等待状态。

// 工作线程函数中的简单等待
while (1) {
    pthread_mutex_lock(&pool->mutex);

    // 等待任务到来或者线程池停止
    while (pool->head == NULL &&!pool->stop) {
        pthread_cond_wait(&pool->cond, &pool->mutex);
    }

    if (pool->stop && pool->head == NULL) {
        pthread_mutex_unlock(&pool->mutex);
        pthread_exit(NULL);
    }

    task = pool->head;
    pool->head = task->next;
    if (pool->head == NULL) {
        pool->tail = NULL;
    }
    pthread_mutex_unlock(&pool->mutex);

    // 执行任务
    task->func(task->arg);
    free(task);
}

动态调整线程数量策略

为了避免资源浪费,可以采用动态调整线程数量的策略。当任务队列长时间为空时,减少线程池中线程的数量;当任务队列中有较多任务时,增加线程数量。实现这种策略需要额外的机制来监控任务队列的状态以及线程的空闲时间。

  1. 监控任务队列状态:可以通过记录任务队列的长度以及任务到来的频率来判断系统的负载情况。例如,如果任务队列在一段时间内一直为空,说明系统负载较低,可以考虑减少线程数量。
  2. 监控线程空闲时间:为每个线程记录其空闲时间,当某个线程的空闲时间超过一定阈值时,可以将其销毁。

下面是在基本线程池代码基础上添加动态调整线程数量功能的示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>

// 定义任务结构体
typedef struct task {
    void (*func)(void*);
    void* arg;
    struct task* next;
} task_t;

// 定义线程池结构体
typedef struct thread_pool {
    task_t* head;
    task_t* tail;
    pthread_t* threads;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int max_threads;
    int min_threads;
    int stop;
    int idle_threads;
    time_t last_task_time;
} thread_pool_t;

// 创建新任务
task_t* create_task(void (*func)(void*), void* arg) {
    task_t* new_task = (task_t*)malloc(sizeof(task_t));
    new_task->func = func;
    new_task->arg = arg;
    new_task->next = NULL;
    return new_task;
}

// 创建线程池
thread_pool_t* create_thread_pool(int max_threads, int min_threads) {
    thread_pool_t* pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));
    pool->head = NULL;
    pool->tail = NULL;
    pool->max_threads = max_threads;
    pool->min_threads = min_threads;
    pool->stop = 0;
    pool->idle_threads = 0;
    pool->last_task_time = time(NULL);

    pool->threads = (pthread_t*)malloc(max_threads * sizeof(pthread_t));

    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);

    for (int i = 0; i < min_threads; i++) {
        pthread_create(&pool->threads[i], NULL, (void* (*)(void*))worker, (void*)pool);
    }

    return pool;
}

// 工作线程函数
void* worker(void* arg) {
    thread_pool_t* pool = (thread_pool_t*)arg;
    task_t* task;
    time_t idle_start_time = time(NULL);

    while (1) {
        pthread_mutex_lock(&pool->mutex);

        // 等待任务到来或者线程池停止
        while (pool->head == NULL &&!pool->stop) {
            pool->idle_threads++;
            pthread_cond_wait(&pool->cond, &pool->mutex);
            pool->idle_threads--;
            if (pool->stop && pool->head == NULL) {
                pthread_mutex_unlock(&pool->mutex);
                pthread_exit(NULL);
            }
            idle_start_time = time(NULL);
        }

        if (pool->stop && pool->head == NULL) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }

        task = pool->head;
        pool->head = task->next;
        if (pool->head == NULL) {
            pool->tail = NULL;
        }
        pool->last_task_time = time(NULL);
        pthread_mutex_unlock(&pool->mutex);

        // 执行任务
        task->func(task->arg);
        free(task);

        // 动态调整线程数量
        pthread_mutex_lock(&pool->mutex);
        if (difftime(time(NULL), pool->last_task_time) > 10 && pool->idle_threads > pool->min_threads) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        pthread_mutex_unlock(&pool->mutex);
    }
}

// 添加任务到线程池
void add_task(thread_pool_t* pool, void (*func)(void*), void* arg) {
    task_t* new_task = create_task(func, arg);

    pthread_mutex_lock(&pool->mutex);
    if (pool->tail == NULL) {
        pool->head = pool->tail = new_task;
    } else {
        pool->tail->next = new_task;
        pool->tail = new_task;
    }
    // 如果当前空闲线程不够且总线程数未达到最大值,创建新线程
    if (pool->idle_threads == 0 && pool->max_threads > pool->min_threads) {
        int new_thread_index = pool->min_threads + pool->idle_threads;
        if (new_thread_index < pool->max_threads) {
            pthread_create(&pool->threads[new_thread_index], NULL, (void* (*)(void*))worker, (void*)pool);
        }
    }
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
}

// 销毁线程池
void destroy_thread_pool(thread_pool_t* pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->stop = 1;
    pthread_mutex_unlock(&pool->mutex);

    pthread_cond_broadcast(&pool->cond);

    for (int i = 0; i < pool->max_threads; i++) {
        pthread_join(pool->threads[i], NULL);
    }

    task_t* current = pool->head;
    task_t* next;
    while (current != NULL) {
        next = current->next;
        free(current);
        current = next;
    }

    free(pool->threads);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
    free(pool);
}

// 示例任务函数
void example_task(void* arg) {
    int num = *((int*)arg);
    printf("Task %d is running in thread %ld\n", num, pthread_self());
}

int main() {
    thread_pool_t* pool = create_thread_pool(5, 2);

    int nums[] = {1, 2, 3, 4, 5};
    for (int i = 0; i < 5; i++) {
        add_task(pool, example_task, &nums[i]);
    }

    sleep(5);
    destroy_thread_pool(pool);
    return 0;
}

在上述代码中,我们在 thread_pool 结构体中添加了 min_threadsidle_threadslast_task_time 字段。min_threads 表示线程池最少保留的线程数,idle_threads 用于记录当前空闲线程的数量,last_task_time 记录最后一个任务到来的时间。在 worker 函数中,我们记录每个线程的空闲开始时间,并在任务执行后检查是否需要根据空闲时间和任务到来时间来调整线程数量。在 add_task 函数中,如果当前空闲线程不足且总线程数未达到最大值,会创建新的线程。

执行其他低优先级任务策略

除了上述两种策略外,还可以让空闲线程执行一些低优先级的任务。例如,系统中有一些后台任务,如日志清理、缓存数据整理等,这些任务对时间要求不高,可以在空闲线程时执行。

实现这种策略需要在任务队列中区分不同优先级的任务。我们可以修改任务结构体,添加一个优先级字段。

// 定义任务结构体,增加优先级字段
typedef struct task {
    void (*func)(void*);
    void* arg;
    struct task* next;
    int priority;
} task_t;

在添加任务时,根据任务的性质设置优先级。在 worker 函数中,当任务队列中没有高优先级任务时,线程可以检查是否有低优先级任务并执行。

// 工作线程函数
void* worker(void* arg) {
    thread_pool_t* pool = (thread_pool_t*)arg;
    task_t* task;

    while (1) {
        pthread_mutex_lock(&pool->mutex);

        // 优先等待高优先级任务
        while (pool->head == NULL || (pool->head->priority > 0 && pool->idle_threads > 0) &&!pool->stop) {
            pthread_cond_wait(&pool->cond, &pool->mutex);
        }

        if (pool->stop && pool->head == NULL) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }

        task = pool->head;
        pool->head = task->next;
        if (pool->head == NULL) {
            pool->tail = NULL;
        }
        pthread_mutex_unlock(&pool->mutex);

        // 执行任务
        task->func(task->arg);
        free(task);

        // 如果空闲,尝试执行低优先级任务
        pthread_mutex_lock(&pool->mutex);
        task_t* low_priority_task = pool->head;
        while (low_priority_task!= NULL && low_priority_task->priority > 0) {
            low_priority_task = low_priority_task->next;
        }
        if (low_priority_task!= NULL) {
            task = low_priority_task;
            pool->head = task->next;
            if (pool->head == NULL) {
                pool->tail = NULL;
            }
            pthread_mutex_unlock(&pool->mutex);
            task->func(task->arg);
            free(task);
        } else {
            pthread_mutex_unlock(&pool->mutex);
        }
    }
}

在上述代码中,worker 函数首先优先等待高优先级任务(这里假设优先级为 0 表示高优先级)。当没有高优先级任务且线程空闲时,会尝试从任务队列中查找低优先级任务并执行。

总结与注意事项

通过上述几种线程空闲处理策略的介绍,我们可以看到在 Linux C 语言线程池模型中,合理处理线程空闲状态对于提高系统性能和资源利用率至关重要。不同的策略适用于不同的应用场景,例如简单等待策略适用于任务较为频繁且对资源消耗不太敏感的场景;动态调整线程数量策略适用于对资源利用率要求较高,负载变化较大的场景;执行其他低优先级任务策略适用于系统中有一些后台低优先级任务需要处理的场景。

在实际应用中,还需要注意以下几点:

  1. 线程安全:在多线程环境下,对共享资源的访问必须保证线程安全。无论是任务队列的操作,还是线程数量的动态调整等,都需要通过互斥锁、条件变量等机制来保证数据的一致性和正确性。
  2. 性能测试与调优:不同的线程空闲处理策略对系统性能的影响不同,需要根据实际应用场景进行性能测试和调优。例如,动态调整线程数量策略中的线程数量调整阈值、空闲时间阈值等参数,需要根据系统的负载特点进行合理设置。
  3. 错误处理:在创建线程、互斥锁、条件变量等操作时,要注意检查返回值,处理可能出现的错误。例如,pthread_create 函数可能因为系统资源不足等原因创建线程失败,需要进行相应的错误处理。

通过合理选择和实现线程空闲处理策略,并注意多线程编程中的各种细节,能够构建出高效、稳定的 Linux C 语言线程池模型,满足不同应用场景的需求。