MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

IO多路复用技术在高性能服务器开发中的应用

2021-05-271.4k 阅读

1. 理解 I/O 操作

在计算机系统中,I/O(Input/Output)操作涉及到数据在外部设备(如磁盘、网络接口等)和内存之间的传输。例如,从网络套接字读取数据或者向磁盘写入文件等操作都属于 I/O 操作。传统的 I/O 模型,如同步阻塞 I/O(Blocking I/O),在进行 I/O 操作时,应用程序会被阻塞,直到操作完成。这意味着在等待 I/O 操作的过程中,程序无法执行其他任务,严重影响了程序的性能和资源利用率。

1.1 同步阻塞 I/O 示例

下面是一个简单的同步阻塞 I/O 的 Python 代码示例,使用 socket 模块创建一个 TCP 服务器:

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 8888))
server_socket.listen(1)

print('Server is listening on port 8888...')

while True:
    client_socket, client_address = server_socket.accept()
    print(f'Connected by {client_address}')

    data = client_socket.recv(1024)
    print(f'Received: {data.decode()}')

    response = 'Message received successfully!'
    client_socket.send(response.encode())

    client_socket.close()

在这个示例中,server_socket.accept()client_socket.recv() 方法都是阻塞的。当没有客户端连接或者没有数据可读时,程序会一直等待,这期间无法处理其他客户端的请求。

2. I/O 多路复用技术概述

I/O 多路复用技术允许应用程序在一个线程中同时监视多个 I/O 流,当其中任何一个 I/O 流准备好进行读写操作时,通知应用程序进行相应处理。这样可以显著提高应用程序的性能和资源利用率,特别是在处理大量并发连接的情况下。常见的 I/O 多路复用技术有 select、poll 和 epoll。

2.1 select

select 是最早出现的 I/O 多路复用技术,它通过一个 select 函数来监视多个文件描述符(如套接字)的状态变化。select 函数会阻塞等待,直到有一个或多个文件描述符准备好进行 I/O 操作,或者超时。

2.1.1 select 函数原型

在 Unix/Linux 系统中,select 函数的原型如下:

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds:所有文件描述符中的最大值加 1。
  • readfdswritefdsexceptfds:分别是可读、可写和异常事件的文件描述符集合。
  • timeout:指定等待的超时时间,如果为 NULL,则一直阻塞。
2.1.2 select 示例代码(C 语言)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>

#define PORT 8888
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, activity, valread;
    int max_sd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    fd_set read_fds;
    fd_set tmp_fds;

    FD_ZERO(&read_fds);
    FD_ZERO(&tmp_fds);

    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("Socket creation error");
        exit(EXIT_FAILURE);
    }

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("Setsockopt error");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("Listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    FD_SET(server_fd, &read_fds);
    max_sd = server_fd;

    while (1) {
        tmp_fds = read_fds;

        activity = select(max_sd + 1, &tmp_fds, NULL, NULL, NULL);
        if ((activity < 0) && (errno != EINTR)) {
            printf("Select error\n");
        } else if (activity > 0) {
            if (FD_ISSET(server_fd, &tmp_fds)) {
                new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                if (new_socket < 0) {
                    perror("Accept error");
                    continue;
                }

                FD_SET(new_socket, &read_fds);
                if (new_socket > max_sd) {
                    max_sd = new_socket;
                }

                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
            }

            for (int i = 0; i <= max_sd; i++) {
                if (FD_ISSET(i, &tmp_fds)) {
                    if (i != server_fd) {
                        valread = read(i, buffer, 1024);
                        if (valread == 0) {
                            getpeername(i, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                            printf("Host disconnected, ip %s, port %d\n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                            close(i);
                            FD_CLR(i, &read_fds);
                        } else {
                            buffer[valread] = '\0';
                            printf("Message received from socket fd %d is %s\n", i, buffer);
                            send(i, buffer, strlen(buffer), 0);
                        }
                    }
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

在这个示例中,select 函数监视 server_fd 和所有已连接客户端的套接字。当有新客户端连接或者已有客户端发送数据时,select 函数返回,程序可以处理相应的事件。然而,select 有一些局限性,比如它支持的文件描述符数量有限(通常为 1024),并且每次调用 select 时都需要将所有文件描述符集合从用户空间复制到内核空间,效率较低。

2.2 poll

poll 是对 select 的改进,它使用一个 pollfd 结构体数组来管理文件描述符及其事件,而不是像 select 那样使用固定大小的文件描述符集合。poll 函数同样会阻塞等待,直到有文件描述符准备好进行 I/O 操作或者超时。

2.2.1 poll 函数原型

在 Unix/Linux 系统中,poll 函数的原型如下:

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:一个指向 pollfd 结构体数组的指针,每个 pollfd 结构体包含一个文件描述符、要监视的事件和实际发生的事件。
  • nfds:数组中 pollfd 结构体的数量。
  • timeout:指定等待的超时时间,单位为毫秒,如果为 -1,则一直阻塞。
2.2.2 poll 示例代码(C 语言)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <poll.h>

#define PORT 8888
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, activity, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    struct pollfd fds[MAX_CLIENTS + 1];
    int nfds = 1;

    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("Socket creation error");
        exit(EXIT_FAILURE);
    }

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("Setsockopt error");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("Listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    fds[0].fd = server_fd;
    fds[0].events = POLLIN;

    while (1) {
        activity = poll(fds, nfds, -1);
        if (activity < 0) {
            perror("Poll error");
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                if (new_socket < 0) {
                    perror("Accept error");
                    continue;
                }

                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));

                fds[nfds].fd = new_socket;
                fds[nfds].events = POLLIN;
                nfds++;
            }

            for (int i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    valread = read(fds[i].fd, buffer, 1024);
                    if (valread == 0) {
                        getpeername(fds[i].fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d\n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(fds[i].fd);
                        for (int j = i; j < nfds - 1; j++) {
                            fds[j] = fds[j + 1];
                        }
                        nfds--;
                        i--;
                    } else {
                        buffer[valread] = '\0';
                        printf("Message received from socket fd %d is %s\n", fds[i].fd, buffer);
                        send(fds[i].fd, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

select 相比,poll 没有文件描述符数量的限制,并且每次调用 poll 时只需要传递需要监视的文件描述符,减少了数据复制的开销。但是,在处理大量文件描述符时,poll 的效率仍然会随着文件描述符数量的增加而降低,因为它需要线性遍历所有的文件描述符来检查事件。

2.3 epoll

epoll 是 Linux 特有的 I/O 多路复用技术,它在处理大量并发连接时表现出更高的性能。epoll 采用了一种事件驱动的方式,通过 epoll_create 创建一个 epoll 实例,通过 epoll_ctl 向 epoll 实例中添加、修改或删除要监视的文件描述符及其事件,通过 epoll_wait 等待事件发生。

2.3.1 epoll 相关函数原型
#include <sys/epoll.h>

// 创建一个 epoll 实例
int epoll_create(int size);

// 控制 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 等待事件发生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epoll_createsize 参数在 Linux 2.6.8 之后被忽略,但仍需提供一个大于 0 的值。
  • epoll_ctlepfd 是 epoll 实例的文件描述符,op 可以是 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)或 EPOLL_CTL_DEL(删除),fd 是要操作的文件描述符,event 是一个指向 epoll_event 结构体的指针,用于指定要监视的事件。
  • epoll_waitepfd 是 epoll 实例的文件描述符,events 是一个 epoll_event 结构体数组,用于存放发生的事件,maxeventsevents 数组的大小,timeout 是等待的超时时间,单位为毫秒,如果为 -1,则一直阻塞。
2.3.2 epoll 示例代码(C 语言)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define PORT 8888
#define MAX_EVENTS 10

int main() {
    int server_fd, new_socket, epoll_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    struct epoll_event event;
    struct epoll_event *events;

    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("Socket creation error");
        exit(EXIT_FAILURE);
    }

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("Setsockopt error");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, MAX_EVENTS) < 0) {
        perror("Listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    epoll_fd = epoll_create(10);
    if (epoll_fd < 0) {
        perror("Epoll creation error");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    event.data.fd = server_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {
        perror("Epoll_ctl add error");
        close(server_fd);
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }

    events = calloc(MAX_EVENTS, sizeof(event));

    while (1) {
        int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (n < 0) {
            perror("Epoll_wait error");
            break;
        } else if (n > 0) {
            for (int i = 0; i < n; i++) {
                if (events[i].data.fd == server_fd) {
                    new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                    if (new_socket < 0) {
                        perror("Accept error");
                        continue;
                    }

                    printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));

                    event.data.fd = new_socket;
                    event.events = EPOLLIN;
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) < 0) {
                        perror("Epoll_ctl add new socket error");
                        close(new_socket);
                    }
                } else {
                    int client_fd = events[i].data.fd;
                    int valread = read(client_fd, buffer, 1024);
                    if (valread == 0) {
                        getpeername(client_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d\n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(client_fd);
                        if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL) < 0) {
                            perror("Epoll_ctl del error");
                        }
                    } else {
                        buffer[valread] = '\0';
                        printf("Message received from socket fd %d is %s\n", client_fd, buffer);
                        send(client_fd, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }

    free(events);
    close(epoll_fd);
    close(server_fd);
    return 0;
}

epoll 的优势在于它采用了基于事件通知的机制,只有当被监视的文件描述符上有事件发生时,epoll_wait 才会返回,并且只返回发生事件的文件描述符,避免了线性遍历所有文件描述符的开销。这使得 epoll 在处理大量并发连接时具有很高的效率,非常适合高性能服务器开发。

3. I/O 多路复用技术在高性能服务器开发中的应用场景

3.1 网络服务器

在网络服务器开发中,如 Web 服务器、游戏服务器等,需要同时处理大量的客户端连接。使用 I/O 多路复用技术可以有效地管理这些连接,提高服务器的并发处理能力。例如,一个 Web 服务器可能同时接收来自多个客户端的 HTTP 请求,通过 I/O 多路复用技术可以在一个线程中监视所有客户端套接字的状态,当有请求到达时,及时处理请求,而不需要为每个客户端创建一个单独的线程或进程,从而节省系统资源。

3.2 实时数据处理

在实时数据处理系统中,如金融交易系统、物联网数据采集系统等,需要实时处理来自多个数据源的数据。I/O 多路复用技术可以用于监视多个数据输入源(如网络套接字、串口等),当有新数据到达时,立即进行处理,保证数据的实时性。例如,在金融交易系统中,需要实时接收来自不同交易市场的行情数据,通过 I/O 多路复用技术可以高效地管理这些数据输入通道,及时处理行情数据,为交易决策提供支持。

3.3 分布式系统

在分布式系统中,各个节点之间需要进行频繁的通信和数据交换。I/O 多路复用技术可以用于管理节点之间的网络连接,确保数据的高效传输和处理。例如,在一个分布式文件系统中,客户端节点需要与多个存储节点进行数据读写操作,通过 I/O 多路复用技术可以同时监视这些连接,提高系统的整体性能和响应速度。

4. 选择合适的 I/O 多路复用技术

在实际开发中,选择合适的 I/O 多路复用技术需要考虑多个因素,如应用场景、操作系统平台、性能要求等。

4.1 应用场景

如果应用程序只需要处理少量的并发连接,并且对性能要求不是特别高,那么 select 或 poll 可能就足够了。例如,一些简单的本地服务器应用,只需要与少量客户端进行通信,使用 select 或 poll 可以简单实现功能,并且代码相对简洁。然而,如果应用程序需要处理大量的并发连接,如大型 Web 服务器或游戏服务器,epoll 则是更好的选择,因为它在高并发场景下具有更高的性能。

4.2 操作系统平台

select 和 poll 在大多数 Unix/Linux 系统以及 Windows 系统上都有支持,具有较好的跨平台性。而 epoll 是 Linux 特有的技术,如果应用程序需要在多个操作系统平台上运行,并且对跨平台性有较高要求,可能需要考虑使用 select 或 poll。但如果应用程序只在 Linux 平台上运行,那么 epoll 可以充分发挥其性能优势。

4.3 性能要求

如果应用程序对性能要求极高,特别是在处理大量并发连接时,epoll 由于其基于事件驱动的机制和高效的事件通知方式,能够显著提高系统的性能。而 select 和 poll 在处理大量文件描述符时,性能会随着文件描述符数量的增加而下降。因此,对于对性能敏感的高性能服务器开发,epoll 通常是首选。

5. 总结与展望

I/O 多路复用技术是高性能服务器开发中的关键技术之一,通过允许应用程序在一个线程中同时监视多个 I/O 流,有效地提高了系统的性能和资源利用率。select、poll 和 epoll 是常见的 I/O 多路复用技术,它们各有优缺点,在不同的应用场景下有不同的适用性。在实际开发中,需要根据具体的需求和系统环境选择合适的 I/O 多路复用技术,以实现高效、稳定的服务器应用。随着硬件技术的不断发展和应用场景的日益复杂,I/O 多路复用技术也将不断演进和优化,为高性能服务器开发提供更强大的支持。未来,我们可以期待看到更多针对特定应用场景优化的 I/O 多路复用技术的出现,进一步推动服务器端技术的发展。同时,结合其他技术如异步 I/O、线程池等,能够构建更加高效、可扩展的服务器架构,满足不断增长的业务需求。