MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言线程池多线程服务器模型实践

2024-12-277.1k 阅读

1. 线程池简介

线程池是一种多线程处理形式,它维护着一个线程队列,这些线程可以被重复使用来执行任务。在服务器开发场景中,每当有新的客户端连接或者新的任务到来时,如果为每个任务都创建一个新的线程,开销是非常大的。创建线程需要分配内存、初始化栈空间、进行上下文切换等操作,这些操作会消耗大量的系统资源。而线程池则通过预先创建一定数量的线程,并将任务分配给这些线程执行,避免了频繁创建和销毁线程带来的开销,提高了系统的性能和响应速度。

2. 多线程服务器模型概述

在传统的单线程服务器模型中,服务器一次只能处理一个客户端请求,在处理当前请求完成之前,无法响应其他客户端的请求,这在高并发场景下性能极低。多线程服务器模型则通过为每个客户端请求分配一个线程来处理,从而实现并发处理多个客户端请求。然而,正如前面提到的,如果为每个请求都创建新线程,资源消耗过大。线程池多线程服务器模型结合了线程池和多线程的优点,使用线程池中的线程来处理客户端请求,既实现了并发处理,又避免了频繁创建线程的开销。

3. 线程池的实现

3.1 数据结构定义

首先,我们需要定义一些数据结构来实现线程池。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

// 任务结构体
typedef struct task {
    void (*func)(void *);
    void *arg;
    struct task *next;
} task_t;

// 线程池结构体
typedef struct threadpool {
    task_t *head;
    task_t *tail;
    pthread_t *threads;
    pthread_mutex_t lock;
    pthread_cond_t cond;
    int thread_count;
    int shutdown;
} threadpool_t;

在上述代码中,task_t 结构体用于表示一个任务,它包含一个函数指针 func 指向要执行的任务函数,arg 是传递给任务函数的参数,next 用于将任务链接成一个链表。threadpool_t 结构体则定义了线程池,包含任务链表的头指针 head 和尾指针 tail,一个数组 threads 用于存储线程 ID,互斥锁 lock 用于保护任务链表的访问,条件变量 cond 用于线程间的同步,thread_count 表示线程池中的线程数量,shutdown 用于标记线程池是否关闭。

3.2 线程池初始化

接下来实现线程池的初始化函数。

threadpool_t *threadpool_create(int thread_count) {
    threadpool_t *pool = (threadpool_t *)malloc(sizeof(threadpool_t));
    if (pool == NULL) {
        return NULL;
    }
    pool->head = NULL;
    pool->tail = NULL;
    pool->threads = (pthread_t *)malloc(thread_count * sizeof(pthread_t));
    if (pool->threads == NULL) {
        free(pool);
        return NULL;
    }
    pool->thread_count = thread_count;
    pool->shutdown = 0;
    if (pthread_mutex_init(&pool->lock, NULL) != 0) {
        free(pool->threads);
        free(pool);
        return NULL;
    }
    if (pthread_cond_init(&pool->cond, NULL) != 0) {
        pthread_mutex_destroy(&pool->lock);
        free(pool->threads);
        free(pool);
        return NULL;
    }
    for (int i = 0; i < thread_count; i++) {
        if (pthread_create(&pool->threads[i], NULL, (void *(*)(void *))threadpool_worker, (void *)pool) != 0) {
            pthread_mutex_destroy(&pool->lock);
            pthread_cond_destroy(&pool->cond);
            free(pool->threads);
            free(pool);
            return NULL;
        }
    }
    return pool;
}

threadpool_create 函数首先为线程池结构体分配内存,然后初始化任务链表指针、线程数组、互斥锁和条件变量。接着,它通过 pthread_create 函数创建指定数量的线程,并将线程的执行函数指定为 threadpool_worker(后续会实现)。如果在任何一步出现错误,函数会释放已分配的资源并返回 NULL

3.3 任务添加

int threadpool_add_task(threadpool_t *pool, void (*func)(void *), void *arg) {
    task_t *new_task = (task_t *)malloc(sizeof(task_t));
    if (new_task == NULL) {
        return -1;
    }
    new_task->func = func;
    new_task->arg = arg;
    new_task->next = NULL;
    pthread_mutex_lock(&pool->lock);
    if (pool->shutdown) {
        pthread_mutex_unlock(&pool->lock);
        free(new_task);
        return -1;
    }
    if (pool->tail == NULL) {
        pool->head = new_task;
        pool->tail = new_task;
    } else {
        pool->tail->next = new_task;
        pool->tail = new_task;
    }
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->lock);
    return 0;
}

threadpool_add_task 函数用于向线程池添加任务。它首先为新任务分配内存并初始化任务结构体。然后获取互斥锁,检查线程池是否已经关闭,如果已关闭则释放新任务内存并返回错误。如果线程池未关闭,则将新任务添加到任务链表的尾部,并通过条件变量 cond 唤醒一个等待的线程来执行该任务。最后释放互斥锁。

3.4 线程工作函数

void *threadpool_worker(void *arg) {
    threadpool_t *pool = (threadpool_t *)arg;
    task_t *task;
    while (1) {
        pthread_mutex_lock(&pool->lock);
        while (pool->head == NULL &&!pool->shutdown) {
            pthread_cond_wait(&pool->cond, &pool->lock);
        }
        if (pool->shutdown && pool->head == NULL) {
            pthread_mutex_unlock(&pool->lock);
            pthread_exit(NULL);
        }
        task = pool->head;
        pool->head = task->next;
        if (pool->head == NULL) {
            pool->tail = NULL;
        }
        pthread_mutex_unlock(&pool->lock);
        (*(task->func))(task->arg);
        free(task);
    }
    return NULL;
}

threadpool_worker 函数是线程池中每个线程的执行函数。线程首先获取互斥锁,然后在任务链表为空且线程池未关闭的情况下,通过 pthread_cond_wait 函数等待条件变量 cond 被唤醒。当有任务到来或者线程池关闭时,线程被唤醒。如果线程池关闭且任务链表为空,则释放互斥锁并退出线程。否则,从任务链表头部取出一个任务,释放互斥锁,执行任务函数,最后释放任务结构体的内存。

3.5 线程池销毁

void threadpool_destroy(threadpool_t *pool) {
    pthread_mutex_lock(&pool->lock);
    pool->shutdown = 1;
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->lock);
    for (int i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    pthread_mutex_destroy(&pool->lock);
    pthread_cond_destroy(&pool->cond);
    task_t *cur = pool->head;
    task_t *next;
    while (cur != NULL) {
        next = cur->next;
        free(cur);
        cur = next;
    }
    free(pool->threads);
    free(pool);
}

threadpool_destroy 函数用于销毁线程池。首先获取互斥锁,设置 shutdown 标志为 1 表示线程池要关闭,并通过 pthread_cond_broadcast 唤醒所有等待的线程。然后释放互斥锁,通过 pthread_join 等待所有线程执行完毕。接着销毁互斥锁和条件变量,释放任务链表中的所有任务内存,最后释放线程数组和线程池结构体的内存。

4. 基于线程池的多线程服务器模型实现

4.1 服务器初始化

#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

// 服务器初始化函数
int server_init(int port) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        return -1;
    }
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(port);
    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        return -1;
    }
    if (listen(sockfd, 10) < 0) {
        perror("listen failed");
        close(sockfd);
        return -1;
    }
    return sockfd;
}

server_init 函数用于初始化服务器套接字。它首先调用 socket 函数创建一个 TCP 套接字,然后设置服务器地址结构体 servaddr,包括地址族、IP 地址和端口号。接着使用 bind 函数将套接字绑定到指定的地址和端口,最后通过 listen 函数使套接字进入监听状态,等待客户端连接。如果任何一步操作失败,函数会关闭套接字并返回 -1。

4.2 客户端处理函数

void handle_client(void *arg) {
    int clientfd = *((int *)arg);
    char buffer[1024];
    ssize_t bytes_read = read(clientfd, buffer, sizeof(buffer) - 1);
    if (bytes_read > 0) {
        buffer[bytes_read] = '\0';
        printf("Received from client: %s\n", buffer);
        const char *response = "Message received successfully";
        write(clientfd, response, strlen(response));
    }
    close(clientfd);
}

handle_client 函数是处理客户端请求的函数,它作为任务函数被线程池中的线程调用。函数首先从客户端套接字 clientfd 读取数据,将读取到的数据存储在 buffer 中并添加字符串结束符。然后打印接收到的消息,并向客户端发送响应消息。最后关闭客户端套接字。

4.3 主函数

int main() {
    int port = 8080;
    int sockfd = server_init(port);
    if (sockfd < 0) {
        return 1;
    }
    threadpool_t *pool = threadpool_create(5);
    if (pool == NULL) {
        close(sockfd);
        return 1;
    }
    while (1) {
        struct sockaddr_in cliaddr;
        socklen_t len = sizeof(cliaddr);
        int clientfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
        if (clientfd < 0) {
            perror("accept failed");
            continue;
        }
        int *clientfd_ptr = (int *)malloc(sizeof(int));
        *clientfd_ptr = clientfd;
        if (threadpool_add_task(pool, handle_client, clientfd_ptr) != 0) {
            free(clientfd_ptr);
            close(clientfd);
        }
    }
    threadpool_destroy(pool);
    close(sockfd);
    return 0;
}

main 函数中,首先初始化服务器套接字并创建一个包含 5 个线程的线程池。然后进入一个无限循环,通过 accept 函数等待客户端连接。当有客户端连接时,为客户端套接字分配内存并将其指针作为参数传递给 threadpool_add_task 函数,将处理客户端请求的任务添加到线程池中。如果添加任务失败,则释放内存并关闭客户端套接字。最后,当程序结束时,销毁线程池并关闭服务器套接字。

5. 线程池多线程服务器模型的优化

5.1 动态调整线程数量

在上述实现中,线程池的线程数量是固定的。在实际应用中,可以根据系统负载动态调整线程数量。例如,可以通过监控任务队列的长度来决定是否需要增加或减少线程。如果任务队列长度持续增长且超过一定阈值,可以创建新的线程来处理任务;如果任务队列长度持续较短且空闲线程较多,可以销毁一些线程以节省资源。

5.2 任务优先级

可以为任务添加优先级属性,在任务链表中按照优先级排序。这样,高优先级的任务可以优先被线程处理,提高系统对重要任务的响应速度。在 task_t 结构体中可以添加一个优先级字段,在 threadpool_add_task 函数中按照优先级将任务插入到合适的位置,并且在 threadpool_worker 函数中从链表头部取出任务时,始终取优先级最高的任务。

5.3 错误处理优化

在上述代码中,虽然对一些常见的错误进行了处理,但可以进一步完善错误处理机制。例如,在 pthread_createpthread_mutex_init 等函数调用失败时,可以记录详细的错误信息,便于调试和定位问题。同时,在处理客户端连接和数据读写时,也可以对更多的错误情况进行处理,如网络中断、客户端异常关闭等。

6. 线程池多线程服务器模型的注意事项

6.1 线程安全

在多线程环境下,共享资源的访问必须保证线程安全。在我们的线程池实现中,通过互斥锁 lock 来保护任务链表的访问,确保多个线程不会同时修改任务链表。同时,在条件变量的使用中,也要注意与互斥锁的配合,避免出现死锁等问题。在实际应用中,对于其他可能被多个线程共享的数据结构,同样要采取合适的线程同步机制来保证数据的一致性和完整性。

6.2 资源管理

合理管理资源是非常重要的。在线程池销毁时,要确保所有的线程都正确退出,所有的任务都被处理完毕,并且所有分配的内存都被释放。在处理客户端连接时,要及时关闭不再使用的套接字,避免资源泄漏。同时,对于动态分配的内存,如任务结构体、客户端套接字指针等,要在合适的时机进行释放,防止内存泄漏。

6.3 性能调优

虽然线程池多线程服务器模型能够提高系统性能,但在实际应用中,还需要根据具体的业务场景进行性能调优。例如,选择合适的线程数量,线程数量过多可能导致上下文切换开销增大,线程数量过少则无法充分利用系统资源。另外,优化任务处理函数的性能,减少任务执行时间,也可以提高整个服务器的性能。

7. 总结与展望

通过以上详细的介绍和代码示例,我们实现了一个基于 Linux C 语言的线程池多线程服务器模型。这种模型结合了线程池和多线程的优点,在高并发场景下能够有效地提高服务器的性能和响应速度。然而,在实际应用中,还需要根据具体的业务需求和系统环境进行进一步的优化和调整。未来,随着硬件技术的不断发展和软件应用场景的日益复杂,多线程编程技术将继续发挥重要作用,我们需要不断探索和学习新的技术和方法,以提升系统的性能和可靠性。例如,随着多核处理器的广泛应用,如何更好地利用多核资源,进一步提高多线程程序的并行度,将是一个值得深入研究的方向。同时,在分布式系统中,如何将线程池多线程模型与分布式架构相结合,实现高效的分布式计算和服务,也是一个具有挑战性的课题。总之,不断优化和创新多线程编程技术,将为我们构建更强大、更高效的软件系统提供有力的支持。