MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言非阻塞I/O与多路复用

2024-05-226.2k 阅读

一、I/O 模型基础概念

在深入探讨 Linux C 语言中的非阻塞 I/O 与多路复用之前,我们先来了解一些基本的 I/O 模型概念。

1.1 阻塞 I/O 模型

阻塞 I/O 是最基本的 I/O 模型。在这种模型下,当应用程序调用一个 I/O 函数(如 readwrite)时,进程会被挂起,直到 I/O 操作完成。例如,当调用 read 从套接字读取数据时,如果此时套接字缓冲区中没有数据,进程会一直等待,直到有数据可读,然后才会继续执行后续代码。以下是一个简单的阻塞 I/O 的代码示例:

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        return 1;
    }

    char buffer[1024];
    ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
    if (bytes_read == -1) {
        perror("read");
        close(fd);
        return 1;
    }

    buffer[bytes_read] = '\0';
    printf("Read data: %s\n", buffer);
    close(fd);
    return 0;
}

在上述代码中,read 操作会阻塞进程,直到从文件 test.txt 中读取到数据或者发生错误。这种模型简单直观,但在处理多个 I/O 操作时效率较低,因为一个 I/O 操作未完成,其他 I/O 操作也无法进行。

1.2 非阻塞 I/O 模型

非阻塞 I/O 与阻塞 I/O 不同,当应用程序调用 I/O 函数时,无论操作是否能立即完成,函数都会立即返回。如果操作不能立即完成,函数会返回一个错误码(通常是 EAGAINEWOULDBLOCK),表示操作需要稍后重试。这允许进程在等待 I/O 操作完成的同时,继续执行其他任务。要将一个文件描述符设置为非阻塞模式,可以使用 fcntl 函数,示例如下:

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        return 1;
    }

    // 设置为非阻塞模式
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);

    char buffer[1024];
    ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
    if (bytes_read == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            printf("No data available yet, will retry later.\n");
        } else {
            perror("read");
        }
    } else {
        buffer[bytes_read] = '\0';
        printf("Read data: %s\n", buffer);
    }
    close(fd);
    return 0;
}

在上述代码中,通过 fcntl 函数将文件描述符 fd 设置为非阻塞模式。之后调用 read 时,如果没有数据可读,函数会立即返回 -1,并且 errno 会被设置为 EAGAINEWOULDBLOCK,进程可以根据这个错误码决定是否稍后重试。

二、多路复用技术

虽然非阻塞 I/O 允许进程在 I/O 操作未完成时继续执行其他任务,但如果需要同时处理多个 I/O 操作,就需要频繁地轮询每个文件描述符,检查是否有数据可读或可写,这会消耗大量的 CPU 资源。多路复用技术就是为了解决这个问题而出现的。多路复用允许一个进程监视多个文件描述符,当其中任何一个文件描述符准备好进行 I/O 操作时,通知进程进行相应处理。

2.1 select 函数

select 是最早的多路复用函数,它允许进程监视一组文件描述符,等待其中一个或多个文件描述符变为可读、可写或有异常事件发生。select 函数的原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
  • nfds:需要监视的文件描述符集合中最大文件描述符的值加 1。
  • readfds:指向一个 fd_set 结构体的指针,用于检查可读性的文件描述符集合。
  • writefds:指向一个 fd_set 结构体的指针,用于检查可写性的文件描述符集合。
  • exceptfds:指向一个 fd_set 结构体的指针,用于检查异常情况的文件描述符集合。
  • timeout:指定等待的时间,如果为 NULL,则 select 会一直阻塞,直到有文件描述符准备好;如果 timeout 中的时间为 0,则 select 不阻塞,立即返回。

fd_set 是一个文件描述符集合的数据类型,可以使用一些宏来操作它,例如:

FD_ZERO(fd_set *fdset);  // 清空文件描述符集合
FD_SET(int fd, fd_set *fdset);  // 将指定的文件描述符添加到集合中
FD_CLR(int fd, fd_set *fdset);  // 将指定的文件描述符从集合中移除
FD_ISSET(int fd, fd_set *fdset);  // 检查指定的文件描述符是否在集合中

下面是一个使用 select 实现同时监视标准输入和一个套接字的示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/types.h>
#include <sys/time.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    int sockfd;
    struct sockaddr_in servaddr, cliaddr;

    sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(STDIN_FILENO, &read_fds);
    FD_SET(sockfd, &read_fds);

    int max_fd = (STDIN_FILENO > sockfd)? STDIN_FILENO : sockfd;
    max_fd++;

    struct timeval timeout;
    timeout.tv_sec = 5;
    timeout.tv_usec = 0;

    while (1) {
        fd_set tmp_fds = read_fds;
        int activity = select(max_fd, &tmp_fds, NULL, NULL, &timeout);

        if (activity < 0) {
            perror("select error");
            break;
        } else if (activity == 0) {
            printf("Timeout occurred, no activity.\n");
        } else {
            if (FD_ISSET(STDIN_FILENO, &tmp_fds)) {
                char buffer[BUFFER_SIZE];
                fgets(buffer, sizeof(buffer), stdin);
                printf("Read from stdin: %s", buffer);
            }
            if (FD_ISSET(sockfd, &tmp_fds)) {
                char buffer[BUFFER_SIZE];
                socklen_t len = sizeof(cliaddr);
                int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE, MSG_WAITALL, (struct sockaddr *)&cliaddr, &len);
                buffer[n] = '\0';
                printf("Received from client: %s\n", buffer);
            }
        }
    }

    close(sockfd);
    return 0;
}

在上述代码中,select 函数同时监视标准输入(STDIN_FILENO)和一个 UDP 套接字。如果标准输入有数据可读或者套接字有数据到达,select 会返回,然后通过 FD_ISSET 宏检查是哪个文件描述符准备好,进而进行相应的处理。

2.2 poll 函数

poll 函数与 select 类似,但在某些方面有所改进。poll 函数的原型如下:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:一个指向 struct pollfd 数组的指针,struct pollfd 结构体定义如下:
struct pollfd {
    int fd;         /* 文件描述符 */
    short events;   /* 等待的事件 */
    short revents;  /* 实际发生的事件 */
};
  • nfdsfds 数组中的元素个数。
  • timeout:等待的时间(毫秒),如果为 -1,则 poll 会一直阻塞,直到有文件描述符准备好;如果为 0,则 poll 不阻塞,立即返回。

events 字段可以设置为一些预定义的常量,如 POLLIN(可读)、POLLOUT(可写)、POLLERR(错误)等,用于指定需要监视的事件。revents 字段会在 poll 返回时,由内核填充实际发生的事件。

以下是一个使用 poll 实现类似功能的示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <poll.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    int sockfd;
    struct sockaddr_in servaddr, cliaddr;

    sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    memset(&cliaddr, 0, sizeof(cliaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct pollfd fds[2];
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;
    fds[1].fd = sockfd;
    fds[1].events = POLLIN;

    int timeout = 5000;  // 5 seconds

    while (1) {
        int activity = poll(fds, 2, timeout);

        if (activity < 0) {
            perror("poll error");
            break;
        } else if (activity == 0) {
            printf("Timeout occurred, no activity.\n");
        } else {
            if (fds[0].revents & POLLIN) {
                char buffer[BUFFER_SIZE];
                fgets(buffer, sizeof(buffer), stdin);
                printf("Read from stdin: %s", buffer);
            }
            if (fds[1].revents & POLLIN) {
                char buffer[BUFFER_SIZE];
                socklen_t len = sizeof(cliaddr);
                int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE, MSG_WAITALL, (struct sockaddr *)&cliaddr, &len);
                buffer[n] = '\0';
                printf("Received from client: %s\n", buffer);
            }
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,通过 poll 函数同时监视标准输入和 UDP 套接字。poll 函数返回后,通过检查 revents 字段来确定哪些文件描述符发生了相应的事件,并进行处理。

2.3 epoll 函数

epoll 是 Linux 特有的多路复用机制,它在处理大量文件描述符时比 selectpoll 更高效。epoll 有两种工作模式:水平触发(Level Triggered,LT)和边缘触发(Edge Triggered,ET)。

epoll 使用三个函数来完成操作:epoll_createepoll_ctlepoll_wait

epoll_create 函数用于创建一个 epoll 实例,返回一个文件描述符,原型如下:

int epoll_create(int size);

size 参数在 Linux 2.6.8 之后已被忽略,但仍需提供一个大于 0 的值。

epoll_ctl 函数用于控制 epoll 实例,向其中添加、修改或删除文件描述符及其感兴趣的事件,原型如下:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfdepoll_create 返回的 epoll 实例的文件描述符。
  • op:操作类型,可以是 EPOLL_CTL_ADD(添加文件描述符)、EPOLL_CTL_MOD(修改文件描述符的事件)或 EPOLL_CTL_DEL(删除文件描述符)。
  • fd:要操作的文件描述符。
  • event:指向一个 struct epoll_event 结构体的指针,用于指定感兴趣的事件和关联的数据,struct epoll_event 定义如下:
struct epoll_event {
    uint32_t events;      /* 感兴趣的事件 */
    epoll_data_t data;    /* 关联的数据 */
};

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

events 字段可以设置为一些预定义的常量,如 EPOLLIN(可读)、EPOLLOUT(可写)、EPOLLERR(错误)等,还可以与 EPOLLET(边缘触发模式)等标志位组合。

epoll_wait 函数用于等待 epoll 实例所监视的文件描述符上有事件发生,原型如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfdepoll_create 返回的 epoll 实例的文件描述符。
  • events:一个指向 struct epoll_event 数组的指针,用于存储发生事件的文件描述符及其事件信息。
  • maxeventsevents 数组的大小。
  • timeout:等待的时间(毫秒),如果为 -1,则 epoll_wait 会一直阻塞,直到有文件描述符准备好;如果为 0,则 epoll_wait 不阻塞,立即返回。

以下是一个使用 epoll 实现的简单示例,监视标准输入和一个 TCP 套接字:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/epoll.h>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10

int main() {
    int sockfd, epollfd;
    struct sockaddr_in servaddr;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, 5) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    epollfd = epoll_create1(0);
    if (epollfd == -1) {
        perror("epoll_create1");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = STDIN_FILENO;
    event.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {
        perror("epoll_ctl: STDIN_FILENO");
        close(epollfd);
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    event.data.fd = sockfd;
    event.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
        perror("epoll_ctl: sockfd");
        close(epollfd);
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    int num_events;

    while (1) {
        num_events = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == STDIN_FILENO) {
                char buffer[BUFFER_SIZE];
                fgets(buffer, sizeof(buffer), stdin);
                printf("Read from stdin: %s", buffer);
            } else if (events[i].data.fd == sockfd) {
                int connfd = accept(sockfd, NULL, NULL);
                if (connfd == -1) {
                    perror("accept");
                    continue;
                }

                char buffer[BUFFER_SIZE];
                ssize_t bytes_read = recv(connfd, buffer, sizeof(buffer), 0);
                if (bytes_read > 0) {
                    buffer[bytes_read] = '\0';
                    printf("Received from client: %s\n", buffer);
                } else if (bytes_read == -1) {
                    perror("recv");
                }
                close(connfd);
            }
        }
    }

    close(epollfd);
    close(sockfd);
    return 0;
}

在这个示例中,首先通过 epoll_create1 创建一个 epoll 实例,然后使用 epoll_ctl 将标准输入和 TCP 套接字添加到 epoll 实例中,并指定感兴趣的事件为可读。epoll_wait 函数会阻塞等待事件发生,当有事件发生时,通过遍历 events 数组来处理相应的文件描述符上的事件。

三、非阻塞 I/O 与多路复用的结合应用

在实际应用中,通常会将非阻塞 I/O 与多路复用技术结合使用。以 epoll 为例,当使用边缘触发模式(ET)时,由于 ET 模式下文件描述符只有在状态发生变化时才会被通知,所以通常需要将文件描述符设置为非阻塞模式,以避免在处理事件时阻塞其他事件的处理。

假设我们有一个简单的网络服务器,需要同时处理多个客户端连接,并且要高效地处理 I/O 操作,可以这样实现:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int sockfd, epollfd;
    struct sockaddr_in servaddr;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    if (listen(sockfd, 5) < 0) {
        perror("listen failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    set_nonblocking(sockfd);

    epollfd = epoll_create1(0);
    if (epollfd == -1) {
        perror("epoll_create1");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
        perror("epoll_ctl: sockfd");
        close(epollfd);
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    int num_events;

    while (1) {
        num_events = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == sockfd) {
                while (1) {
                    int connfd = accept(sockfd, NULL, NULL);
                    if (connfd == -1) {
                        if (errno == EAGAIN || errno == EWOULDBLOCK) {
                            break;
                        } else {
                            perror("accept");
                            break;
                        }
                    }

                    set_nonblocking(connfd);

                    event.data.fd = connfd;
                    event.events = EPOLLIN | EPOLLET;
                    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &event) == -1) {
                        perror("epoll_ctl: connfd");
                        close(connfd);
                    }
                }
            } else {
                int client_fd = events[i].data.fd;
                char buffer[BUFFER_SIZE];
                ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);
                if (bytes_read > 0) {
                    buffer[bytes_read] = '\0';
                    printf("Received from client %d: %s\n", client_fd, buffer);

                    // 回显数据
                    send(client_fd, buffer, bytes_read, 0);
                } else if (bytes_read == 0) {
                    printf("Client %d disconnected.\n", client_fd);
                    epoll_ctl(epollfd, EPOLL_CTL_DEL, client_fd, NULL);
                    close(client_fd);
                } else if (bytes_read == -1) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    } else {
                        perror("recv");
                        epoll_ctl(epollfd, EPOLL_CTL_DEL, client_fd, NULL);
                        close(client_fd);
                    }
                }
            }
        }
    }

    close(epollfd);
    close(sockfd);
    return 0;
}

在上述代码中,首先将监听套接字 sockfd 设置为非阻塞模式,并添加到 epoll 实例中,使用边缘触发模式监听可读事件。当有新的客户端连接时,在一个循环中不断调用 accept 接受连接,同样将新的连接套接字设置为非阻塞模式,并添加到 epoll 实例中。在处理客户端数据时,如果 recv 返回 EAGAINEWOULDBLOCK,表示当前没有数据可读,继续处理其他事件。这样可以高效地处理多个客户端的 I/O 操作,避免阻塞。

四、性能对比与适用场景

4.1 性能对比

  • selectselect 有一些局限性。它能监视的文件描述符数量受限于 FD_SETSIZE(通常为 1024),并且每次调用 select 时都需要将文件描述符集合从用户空间复制到内核空间,返回时又要从内核空间复制回用户空间,随着文件描述符数量的增加,这种复制操作的开销会变得很大。此外,select 使用轮询的方式检查文件描述符,时间复杂度为 O(n),当文件描述符数量较多时,性能会明显下降。
  • pollpoll 改进了 select 中文件描述符数量的限制,它通过 struct pollfd 数组来管理文件描述符,理论上可以监视的文件描述符数量只受限于系统资源。但 poll 同样需要在每次调用时将 struct pollfd 数组从用户空间复制到内核空间,返回时再复制回来,并且也是采用轮询方式,时间复杂度同样为 O(n),在处理大量文件描述符时性能也不理想。
  • epollepoll 在性能上有显著提升。它在内核中维护一个事件表,通过 epoll_ctl 函数将文件描述符添加到事件表中,这样避免了每次调用时的大量数据复制。epoll 使用回调机制,当文件描述符状态发生变化时,内核会将其添加到就绪列表中,epoll_wait 只需要检查就绪列表,时间复杂度为 O(1)。特别是在处理大量文件描述符时,epoll 的性能优势更加明显。

4.2 适用场景

  • select:适用于小规模的应用程序,文件描述符数量较少且对性能要求不是特别高的场景。由于其简单易用,在一些简单的测试程序或小型项目中仍可能会被使用。
  • pollpoll 相对于 select 有一定的改进,适用于中等规模的应用程序,对文件描述符数量有一定要求,但性能要求不是极致的场景。它在一些传统的网络应用开发中仍有使用。
  • epoll:适用于大规模的网络应用程序,尤其是需要处理大量并发连接的场景,如高性能的网络服务器。epoll 的高效性能使其成为这类应用的首选多路复用机制。

通过对 Linux C 语言中非阻塞 I/O 与多路复用技术的深入了解,包括阻塞与非阻塞 I/O 模型的原理、selectpollepoll 等多路复用函数的使用,以及它们的性能对比和适用场景,开发者可以根据具体的应用需求选择合适的技术,开发出高效、稳定的网络应用程序。在实际开发中,还需要结合具体的业务逻辑和系统资源等因素,进行综合考虑和优化。