MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

IO多路复用技术在处理大量并发连接时的策略

2021-05-074.3k 阅读

1. 理解 IO 多路复用技术基础

在后端开发的网络编程中,当面临大量并发连接时,传统的每个连接对应一个进程或线程的模型会因为资源消耗过大而变得不切实际。这时候,IO 多路复用技术就应运而生。

IO 多路复用,简单来说,就是通过一种机制,一个进程可以监视多个文件描述符(fd),一旦某个描述符就绪(一般是读就绪或者写就绪),内核能够通知程序进行相应的读写操作。这样,就避免了在每个连接上阻塞等待 I/O 操作完成,大大提高了系统资源的利用率。

在 Linux 系统中,常用的 IO 多路复用技术有 select、poll 和 epoll。

1.1 select 机制

select 是最早的多路复用机制,它通过设置一组文件描述符集合,然后调用 select 函数,等待其中一个或多个文件描述符变为就绪状态。

#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define PORT 8080
#define BACKLOG 5
#define BUFFER_SIZE 1024

int main() {
    int sockfd, new_sockfd;
    struct sockaddr_in servaddr, cliaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, BACKLOG) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(sockfd, &read_fds);
    int max_fd = sockfd;

    char buffer[BUFFER_SIZE] = {0};
    while (1) {
        fd_set tmp_fds = read_fds;
        int activity = select(max_fd + 1, &tmp_fds, NULL, NULL, NULL);
        if (activity < 0) {
            perror("select error");
            break;
        } else if (activity > 0) {
            if (FD_ISSET(sockfd, &tmp_fds)) {
                socklen_t len = sizeof(cliaddr);
                new_sockfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
                if (new_sockfd < 0) {
                    perror("accept failed");
                    continue;
                }
                FD_SET(new_sockfd, &read_fds);
                if (new_sockfd > max_fd) {
                    max_fd = new_sockfd;
                }
            }
            for (int i = sockfd + 1; i <= max_fd; i++) {
                if (FD_ISSET(i, &tmp_fds)) {
                    int valread = read(i, buffer, BUFFER_SIZE);
                    if (valread == 0) {
                        close(i);
                        FD_CLR(i, &read_fds);
                    } else {
                        buffer[valread] = '\0';
                        printf("Message from client: %s\n", buffer);
                        send(i, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }
    close(sockfd);
    return 0;
}

select 的缺点也很明显,首先它能监视的文件描述符数量是有限的,在 Linux 系统中,这个限制通常是 1024。其次,select 采用轮询的方式检查文件描述符是否就绪,随着文件描述符数量的增加,性能会急剧下降。

1.2 poll 机制

poll 与 select 类似,但它解决了 select 中文件描述符数量受限的问题。poll 使用一个 pollfd 结构体数组来表示需要监视的文件描述符集合。

#include <poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define BACKLOG 5
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 1024

int main() {
    int sockfd, new_sockfd;
    struct sockaddr_in servaddr, cliaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, BACKLOG) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct pollfd fds[MAX_CLIENTS + 1];
    fds[0].fd = sockfd;
    fds[0].events = POLLIN;
    int nfds = 1;

    char buffer[BUFFER_SIZE] = {0};
    while (1) {
        int activity = poll(fds, nfds, -1);
        if (activity < 0) {
            perror("poll error");
            break;
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                socklen_t len = sizeof(cliaddr);
                new_sockfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
                if (new_sockfd < 0) {
                    perror("accept failed");
                    continue;
                }
                fds[nfds].fd = new_sockfd;
                fds[nfds].events = POLLIN;
                nfds++;
            }
            for (int i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    int valread = read(fds[i].fd, buffer, BUFFER_SIZE);
                    if (valread == 0) {
                        close(fds[i].fd);
                        for (int j = i; j < nfds - 1; j++) {
                            fds[j] = fds[j + 1];
                        }
                        nfds--;
                        i--;
                    } else {
                        buffer[valread] = '\0';
                        printf("Message from client: %s\n", buffer);
                        send(fds[i].fd, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }
    close(sockfd);
    return 0;
}

poll 虽然解决了文件描述符数量的限制问题,但它同样采用轮询的方式检查文件描述符状态,在处理大量并发连接时,性能依然不佳。

1.3 epoll 机制

epoll 是 Linux 内核为处理大量并发连接而设计的高性能 IO 多路复用机制。它采用事件驱动的方式,只有在文件描述符真正就绪时才会通知应用程序。

epoll 有两种工作模式:LT(水平触发)和 ET(边缘触发)。

  • LT(水平触发)模式:当 epoll 检测到某文件描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件。下次调用 epoll_wait 时,会再次通知应用程序该事件。
  • ET(边缘触发)模式:当文件描述符状态从非就绪变为就绪时,epoll 只通知一次。如果应用程序没有及时处理,之后将不会再收到通知,直到该文件描述符状态再次发生变化。
#include <sys/epoll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define BACKLOG 5
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10

int main() {
    int sockfd, new_sockfd;
    struct sockaddr_in servaddr, cliaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, BACKLOG) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    int epollfd = epoll_create1(0);
    if (epollfd == -1) {
        perror("epoll_create1");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
        perror("epoll_ctl: listen_sock");
        close(sockfd);
        close(epollfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    char buffer[BUFFER_SIZE] = {0};
    while (1) {
        int n = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (n == -1) {
            perror("epoll_wait");
            break;
        }
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == sockfd) {
                socklen_t len = sizeof(cliaddr);
                new_sockfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
                if (new_sockfd == -1) {
                    perror("accept");
                    continue;
                }
                event.data.fd = new_sockfd;
                event.events = EPOLLIN | EPOLLET;
                if (epoll_ctl(epollfd, EPOLL_CTL_ADD, new_sockfd, &event) == -1) {
                    perror("epoll_ctl: conn_sock");
                    close(new_sockfd);
                }
            } else {
                int clientfd = events[i].data.fd;
                int valread = read(clientfd, buffer, BUFFER_SIZE);
                if (valread == 0) {
                    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) {
                        perror("epoll_ctl: del");
                    }
                    close(clientfd);
                } else {
                    buffer[valread] = '\0';
                    printf("Message from client: %s\n", buffer);
                    send(clientfd, buffer, strlen(buffer), 0);
                }
            }
        }
    }
    close(sockfd);
    close(epollfd);
    return 0;
}

epoll 的优点在于其高效性,它通过内核与用户空间共享内存的方式,减少了数据拷贝的开销。同时,epoll 采用红黑树来管理文件描述符,大大提高了查找效率。

2. 基于 IO 多路复用的并发连接处理策略

2.1 连接管理策略

在使用 IO 多路复用技术处理大量并发连接时,连接的管理至关重要。

连接的建立:在服务器端,通过监听套接字接受客户端的连接请求。以 epoll 为例,当监听套接字收到连接请求时,调用 accept 函数接受连接,并将新的套接字添加到 epoll 实例中进行监视。

if (events[i].data.fd == sockfd) {
    socklen_t len = sizeof(cliaddr);
    new_sockfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
    if (new_sockfd == -1) {
        perror("accept");
        continue;
    }
    event.data.fd = new_sockfd;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, new_sockfd, &event) == -1) {
        perror("epoll_ctl: conn_sock");
        close(new_sockfd);
    }
}

连接的关闭:当客户端关闭连接或者出现异常时,服务器需要及时关闭对应的套接字,并从 IO 多路复用机制中移除该套接字。在 epoll 中,使用 epoll_ctl 函数的 EPOLL_CTL_DEL 操作来移除套接字。

if (valread == 0) {
    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) {
        perror("epoll_ctl: del");
    }
    close(clientfd);
}

2.2 数据读写策略

读数据:当 IO 多路复用机制通知某个套接字可读时,应用程序需要及时读取数据。在 ET 模式下,由于只通知一次,需要确保一次性将数据读完。可以使用循环读取的方式,直到 read 函数返回值小于请求读取的字节数或者返回 -1(表示错误)。

while ((n = read(clientfd, buffer + read_bytes, BUFFER_SIZE - read_bytes)) > 0) {
    read_bytes += n;
}
if (n == -1 && errno != EAGAIN) {
    // 处理错误
}

写数据:写数据时同样需要注意。在处理大量并发连接时,可能会出现套接字写缓冲区已满的情况,此时 write 函数会返回 -1 并设置 errnoEAGAINEWOULDBLOCK。应用程序应该将未写完的数据保存起来,等待下次可写时继续写入。

ssize_t write_bytes = 0;
while (write_bytes < data_length) {
    ssize_t n = write(clientfd, data + write_bytes, data_length - write_bytes);
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            // 保存未写完的数据
            break;
        } else {
            // 处理错误
        }
    } else {
        write_bytes += n;
    }
}

2.3 性能优化策略

减少系统调用开销:系统调用的开销相对较大,在处理大量并发连接时,应尽量减少不必要的系统调用。例如,在 epoll 中,可以批量处理就绪的文件描述符,而不是每次只处理一个。

int n = epoll_wait(epollfd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
    // 处理事件
}

合理设置缓冲区大小:适当调整套接字的接收和发送缓冲区大小,可以提高数据传输的效率。可以使用 setsockopt 函数来设置缓冲区大小。

int sendbuf = 65536;
setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof(sendbuf));
int recvbuf = 65536;
setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &recvbuf, sizeof(recvbuf));

使用内存池:在处理大量并发连接时,频繁的内存分配和释放会导致性能下降。可以使用内存池技术,预先分配一定数量的内存块,当需要时直接从内存池中获取,使用完毕后再归还到内存池。

3. 不同场景下的 IO 多路复用选择

3.1 低并发场景

在低并发场景下,连接数量较少,对性能的要求相对不那么高。此时,select 或 poll 可能是比较合适的选择。虽然它们采用轮询方式检查文件描述符状态,但由于连接数量少,轮询带来的性能损耗并不明显。而且,select 和 poll 的实现相对简单,代码编写难度较低。

3.2 高并发且连接活跃度高的场景

对于高并发且连接活跃度高的场景,epoll 的 LT 模式是一个不错的选择。在 LT 模式下,只要文件描述符处于就绪状态,epoll 就会不断通知应用程序,这对于频繁读写数据的连接非常友好。应用程序可以在合适的时机处理数据,而不用担心错过事件通知。

3.3 高并发且连接活跃度低的场景

在高并发且连接活跃度低的场景中,epoll 的 ET 模式更具优势。由于 ET 模式只在文件描述符状态发生变化时通知一次,对于那些长时间处于非活动状态的连接,可以减少不必要的通知,从而提高系统的整体性能。但在 ET 模式下,应用程序需要更加小心地处理数据读写,确保不会错过事件。

4. 实际案例分析

假设我们正在开发一个简单的即时通讯服务器,需要处理大量用户的并发连接。

需求分析

  • 支持大量用户同时在线。
  • 能够实时接收和发送消息。
  • 保证系统的稳定性和性能。

技术选型: 基于上述需求,我们选择 epoll 作为 IO 多路复用技术,采用 ET 模式来处理连接。因为即时通讯服务器通常会有大量的并发连接,且连接活跃度相对较低(用户并非时刻都在发送消息)。

实现思路

  1. 服务器初始化:创建监听套接字,绑定端口并开始监听。同时创建 epoll 实例,并将监听套接字添加到 epoll 中。
  2. 连接处理:当有新连接到来时,接受连接并将新的套接字以 ET 模式添加到 epoll 中。
  3. 消息处理:当 epoll 通知某个套接字可读时,读取消息并进行处理,例如转发给其他在线用户。
  4. 连接关闭:当客户端关闭连接或者出现异常时,及时关闭套接字并从 epoll 中移除。
#include <sys/epoll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <list>

#define PORT 8080
#define BACKLOG 5
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10

std::list<int> client_fds;

void broadcast_message(const char* message, int sender_fd) {
    for (int fd : client_fds) {
        if (fd != sender_fd) {
            send(fd, message, strlen(message), 0);
        }
    }
}

int main() {
    int sockfd, new_sockfd;
    struct sockaddr_in servaddr, cliaddr;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, BACKLOG) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    int epollfd = epoll_create1(0);
    if (epollfd == -1) {
        perror("epoll_create1");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
        perror("epoll_ctl: listen_sock");
        close(sockfd);
        close(epollfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    char buffer[BUFFER_SIZE] = {0};
    while (1) {
        int n = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (n == -1) {
            perror("epoll_wait");
            break;
        }
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == sockfd) {
                socklen_t len = sizeof(cliaddr);
                new_sockfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
                if (new_sockfd == -1) {
                    perror("accept");
                    continue;
                }
                event.data.fd = new_sockfd;
                event.events = EPOLLIN | EPOLLET;
                if (epoll_ctl(epollfd, EPOLL_CTL_ADD, new_sockfd, &event) == -1) {
                    perror("epoll_ctl: conn_sock");
                    close(new_sockfd);
                }
                client_fds.push_back(new_sockfd);
            } else {
                int clientfd = events[i].data.fd;
                int valread = read(clientfd, buffer, BUFFER_SIZE);
                if (valread == 0) {
                    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1) {
                        perror("epoll_ctl: del");
                    }
                    close(clientfd);
                    client_fds.remove(clientfd);
                } else {
                    buffer[valread] = '\0';
                    printf("Message from client: %s\n", buffer);
                    broadcast_message(buffer, clientfd);
                }
            }
        }
    }
    close(sockfd);
    close(epollfd);
    return 0;
}

通过上述代码示例和分析,我们可以看到如何在实际项目中运用 IO 多路复用技术来处理大量并发连接,满足业务需求并提高系统性能。

5. 总结与展望

IO 多路复用技术是后端开发网络编程中处理大量并发连接的关键技术。select、poll 和 epoll 各有特点,在不同的场景下发挥着重要作用。通过合理选择 IO 多路复用机制,并结合连接管理、数据读写和性能优化等策略,我们能够开发出高效、稳定的网络应用程序。

随着网络技术的不断发展,对并发处理能力的要求也越来越高。未来,IO 多路复用技术可能会进一步优化,以适应更加复杂和高负载的网络环境。例如,可能会出现更高效的事件通知机制,或者更好地与硬件特性相结合,进一步提升系统性能。作为开发者,我们需要不断学习和探索,以跟上技术发展的步伐,为用户提供更好的网络服务。