MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

IO多路复用技术原理与实现详解

2023-04-017.2k 阅读

一、IO 多路复用技术概述

在深入探讨 IO 多路复用技术之前,我们先来理解一下基本的 I/O 概念。在计算机系统中,I/O(Input/Output)操作负责在外部设备(如磁盘、网络接口等)与内存之间传输数据。例如,从文件中读取数据或者通过网络发送数据。

在传统的网络编程中,处理多个 I/O 操作通常有两种常见方式:多进程/多线程模型和阻塞 I/O 模型。在多进程/多线程模型中,每一个 I/O 操作对应一个进程或者线程,这种方式虽然简单直观,但开销较大,因为进程或线程的创建、销毁以及上下文切换都需要消耗大量的系统资源。而阻塞 I/O 模型则是在进行 I/O 操作时,进程会被阻塞,直到操作完成。这种方式在处理多个 I/O 操作时效率低下,因为一个 I/O 操作的阻塞会导致整个进程无法处理其他任务。

IO 多路复用技术则提供了一种更高效的解决方案。它允许一个进程同时监控多个文件描述符(file descriptor,在 Unix 系统中,一切皆文件,所以网络套接字、文件等都可以用文件描述符来表示)的 I/O 状态变化,当其中任何一个文件描述符准备好进行 I/O 操作时,进程就可以被通知并进行相应的处理。这样,通过单个进程就可以高效地管理多个 I/O 流,大大提高了系统资源的利用率。

二、IO 多路复用技术原理

IO 多路复用技术的核心原理是通过一种机制(系统调用),让进程可以在多个文件描述符上等待事件的发生,而不是阻塞在单个 I/O 操作上。当有一个或多个文件描述符就绪(可以进行读或写操作)时,系统调用返回,进程就可以对这些就绪的文件描述符进行相应的 I/O 操作。

在 Unix/Linux 系统中,常见的实现 IO 多路复用的系统调用有 select、poll 和 epoll。下面我们分别深入探讨它们的原理。

2.1 select 原理

select 系统调用的原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

其中,nfds 是需要监控的最大文件描述符值加 1;readfdswritefdsexceptfds 分别是用来监控可读、可写和异常事件的文件描述符集合;timeout 是一个可选的超时时间。

select 工作原理是基于轮询机制。内核会遍历所有传入的文件描述符集合,检查每个文件描述符是否就绪。如果有任何一个文件描述符就绪,select 就会返回。返回后,应用程序需要再次遍历这些文件描述符集合,以确定哪些文件描述符真正就绪。

这种轮询机制存在一些缺点。首先,它支持的文件描述符数量有限,通常在 1024 个左右。其次,每次调用 select 时,都需要将文件描述符集合从用户空间拷贝到内核空间,并且返回后还需要从内核空间拷贝回用户空间,这增加了系统开销。另外,由于是轮询所有文件描述符,当文件描述符数量较多时,性能会明显下降。

2.2 poll 原理

poll 系统调用的原型如下:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

pollfd 结构体定义如下:

struct pollfd {
    int fd;         /* 文件描述符 */
    short events;   /* 等待的事件 */
    short revents;  /* 发生的事件 */
};

fds 是一个指向 pollfd 结构体数组的指针,nfds 表示数组中元素的个数,timeout 同样是超时时间。

poll 与 select 类似,也是基于轮询机制。不过,poll 解决了 select 中文件描述符数量受限的问题,它通过一个链表来管理文件描述符,理论上可以支持无限个文件描述符。然而,poll 仍然存在与 select 相同的一些问题,如每次调用都需要在用户空间和内核空间之间拷贝数据,并且轮询的效率随着文件描述符数量的增加而降低。

2.3 epoll 原理

epoll 是 Linux 特有的 IO 多路复用机制,它在 2.6 内核版本之后引入,相比 select 和 poll 有了很大的性能提升。epoll 有三个主要的系统调用:epoll_createepoll_ctlepoll_wait

epoll_create 用于创建一个 epoll 实例,返回一个 epoll 文件描述符:

int epoll_create(int size);

size 参数在 Linux 2.6.8 之后已经被忽略,但仍然需要提供一个大于 0 的值。

epoll_ctl 用于控制 epoll 实例,添加、修改或删除要监控的文件描述符:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

其中,epfd 是 epoll 文件描述符,op 可以是 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)或 EPOLL_CTL_DEL(删除),fd 是要操作的文件描述符,event 是一个指向 epoll_event 结构体的指针,用于指定要监控的事件。

epoll_event 结构体定义如下:

struct epoll_event {
    uint32_t events;      /* 事件 */
    epoll_data_t data;    /* 用户数据 */
};
typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

epoll_wait 用于等待事件的发生:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epfd 是 epoll 文件描述符,events 是一个 epoll_event 结构体数组,用于存储发生的事件,maxevents 表示 events 数组的大小,timeout 是超时时间。

epoll 的工作原理基于事件驱动。它在内核中维护了一个红黑树来管理要监控的文件描述符,这样添加、删除和查找文件描述符的时间复杂度都是 O(logn),效率很高。当有文件描述符就绪时,内核会将该事件添加到一个就绪链表中。epoll_wait 调用时,只需要检查这个就绪链表,而不需要像 select 和 poll 那样轮询所有文件描述符,大大提高了效率。另外,epoll 在用户空间和内核空间之间传递数据时,采用了 mmap 技术,减少了数据拷贝的开销。

三、IO 多路复用技术实现示例

下面我们通过代码示例来展示如何使用 select、poll 和 epoll 实现简单的网络服务器,以接受多个客户端的连接并处理数据。

3.1 使用 select 实现网络服务器

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/time.h>

#define PORT 8080
#define MAX_CLIENTS 1024

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    fd_set read_fds;
    fd_set tmp_fds;
    int activity, i, val;

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 初始化文件描述符集合
    FD_ZERO(&read_fds);
    FD_ZERO(&tmp_fds);
    FD_SET(server_fd, &read_fds);

    printf("Server started, listening on port %d...\n", PORT);

    while (1) {
        tmp_fds = read_fds;

        // 等待事件发生
        activity = select(FD_SETSIZE, &tmp_fds, NULL, NULL, NULL);

        if ((activity < 0) && (errno!= EINTR)) {
            printf("select error");
        } else if (activity > 0) {
            if (FD_ISSET(server_fd, &tmp_fds)) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }

                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));

                // 将新连接的套接字添加到文件描述符集合中
                for (i = 0; i < MAX_CLIENTS; i++) {
                    if (i == 0) {
                        FD_SET(new_socket, &read_fds);
                        break;
                    }
                }
            }

            for (i = 0; i < MAX_CLIENTS; i++) {
                val = 0;
                if (FD_ISSET(i, &tmp_fds)) {
                    if ((valread = read(i, buffer, 1024)) == 0) {
                        getpeername(i, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(i);
                        FD_CLR(i, &read_fds);
                    } else {
                        buffer[valread] = '\0';
                        send(i, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }

    return 0;
}

3.2 使用 poll 实现网络服务器

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <poll.h>

#define PORT 8080
#define MAX_CLIENTS 1024

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    struct pollfd fds[MAX_CLIENTS + 1];
    int activity, i, val;

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 初始化 pollfd 数组
    for (i = 0; i <= MAX_CLIENTS; i++) {
        fds[i].fd = -1;
        fds[i].events = 0;
        fds[i].revents = 0;
    }

    fds[0].fd = server_fd;
    fds[0].events = POLLIN;

    printf("Server started, listening on port %d...\n", PORT);

    while (1) {
        // 等待事件发生
        activity = poll(fds, MAX_CLIENTS + 1, -1);

        if (activity < 0) {
            printf("poll error");
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }

                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));

                // 将新连接的套接字添加到 pollfd 数组中
                for (i = 1; i <= MAX_CLIENTS; i++) {
                    if (fds[i].fd < 0) {
                        fds[i].fd = new_socket;
                        fds[i].events = POLLIN;
                        break;
                    }
                }
            }

            for (i = 1; i <= MAX_CLIENTS; i++) {
                if (fds[i].fd < 0) continue;
                if (fds[i].revents & (POLLIN | POLLERR)) {
                    if ((valread = read(fds[i].fd, buffer, 1024)) == 0) {
                        getpeername(fds[i].fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(fds[i].fd);
                        fds[i].fd = -1;
                    } else {
                        buffer[valread] = '\0';
                        send(fds[i].fd, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }

    return 0;
}

3.3 使用 epoll 实现网络服务器

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>

#define PORT 8080
#define MAX_EVENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    int epoll_fd, event_count;
    struct epoll_event events[MAX_EVENTS];

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = server_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }

    printf("Server started, listening on port %d...\n", PORT);

    while (1) {
        // 等待事件发生
        event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (event_count == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

        for (int i = 0; i < event_count; i++) {
            if (events[i].data.fd == server_fd) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1) {
                    perror("accept");
                    continue;
                }

                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));

                event.data.fd = new_socket;
                event.events = EPOLLIN | EPOLLET;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
                    perror("epoll_ctl: new_socket");
                }
            } else {
                int client_fd = events[i].data.fd;
                valread = read(client_fd, buffer, 1024);
                if (valread == -1) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    } else {
                        perror("read");
                        close(client_fd);
                    }
                } else if (valread == 0) {
                    printf("Client disconnected\n");
                    close(client_fd);
                } else {
                    buffer[valread] = '\0';
                    send(client_fd, buffer, strlen(buffer), 0);
                }
            }
        }
    }

    close(server_fd);
    close(epoll_fd);
    return 0;
}

四、IO 多路复用技术应用场景

IO 多路复用技术在很多场景下都有广泛应用。

  1. 网络服务器:如上述示例所示,网络服务器需要同时处理多个客户端的连接和数据传输。通过 IO 多路复用技术,服务器可以高效地管理大量的客户端连接,提高系统的并发处理能力。常见的 Web 服务器、邮件服务器等都可以利用 IO 多路复用技术来提升性能。

  2. 文件服务器:文件服务器需要同时处理多个用户的文件读写请求。IO 多路复用技术可以让服务器在多个文件描述符(对应不同的文件操作)上等待事件发生,从而及时响应各个用户的请求。

  3. 实时应用:在一些实时应用中,如实时监控系统、多媒体流处理等,需要及时处理多个数据源的输入。IO 多路复用技术可以帮助应用程序高效地监听多个数据通道,确保数据的实时处理。

  4. 分布式系统:在分布式系统中,节点之间需要进行大量的网络通信。IO 多路复用技术可以用于管理节点之间的连接,提高分布式系统的通信效率和可靠性。

五、IO 多路复用技术性能对比

我们已经了解了 select、poll 和 epoll 的原理和实现,下面对它们的性能进行一个简单的对比。

  1. 文件描述符数量限制:select 通常支持的文件描述符数量有限,一般在 1024 个左右;poll 通过链表管理文件描述符,理论上没有数量限制;epoll 同样没有文件描述符数量的限制,并且在内核中使用红黑树来管理,效率更高。

  2. 数据拷贝开销:select 和 poll 在每次调用时都需要将文件描述符集合在用户空间和内核空间之间进行拷贝,随着文件描述符数量的增加,这种拷贝开销会变得很大。而 epoll 采用了 mmap 技术,减少了数据拷贝的次数,降低了开销。

  3. 事件通知机制:select 和 poll 采用轮询机制,每次都需要遍历所有文件描述符来检查哪些就绪,当文件描述符数量较多时,性能会急剧下降。epoll 采用事件驱动机制,内核只需要将就绪的文件描述符添加到就绪链表中,epoll_wait 调用时只需要检查这个链表,大大提高了效率。

综上所述,在处理大量文件描述符和高并发场景下,epoll 具有明显的性能优势,是 Linux 系统中推荐使用的 IO 多路复用方式。

六、总结

IO 多路复用技术是后端开发网络编程中的一项重要技术,它通过允许一个进程同时监控多个文件描述符的 I/O 状态变化,有效地提高了系统资源的利用率和并发处理能力。在 Unix/Linux 系统中,select、poll 和 epoll 是常见的实现方式,它们各有特点,但 epoll 在处理高并发场景下表现更为出色。通过掌握这些技术,开发者可以编写更高效、更健壮的网络应用程序,满足不同场景下的需求。在实际应用中,需要根据具体的业务需求和系统环境选择合适的 IO 多路复用方式,以达到最佳的性能表现。希望本文的内容能帮助读者深入理解 IO 多路复用技术的原理与实现,并在实际开发中灵活运用。

以上内容详细介绍了 IO 多路复用技术的原理、实现示例、应用场景以及性能对比,希望对您有所帮助。如果您还有其他问题,欢迎继续提问。