Linux C语言非阻塞I/O与多路复用
一、I/O 模型基础概念
在深入探讨 Linux C 语言中的非阻塞 I/O 与多路复用之前,我们先来了解一些基本的 I/O 模型概念。
1.1 阻塞 I/O 模型
阻塞 I/O 是最基本的 I/O 模型。在这种模型下,当应用程序调用一个 I/O 函数(如 read
或 write
)时,进程会被挂起,直到 I/O 操作完成。例如,当调用 read
从套接字读取数据时,如果此时套接字缓冲区中没有数据,进程会一直等待,直到有数据可读,然后才会继续执行后续代码。以下是一个简单的阻塞 I/O 的代码示例:
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
int main() {
int fd = open("test.txt", O_RDONLY);
if (fd == -1) {
perror("open");
return 1;
}
char buffer[1024];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
if (bytes_read == -1) {
perror("read");
close(fd);
return 1;
}
buffer[bytes_read] = '\0';
printf("Read data: %s\n", buffer);
close(fd);
return 0;
}
在上述代码中,read
操作会阻塞进程,直到从文件 test.txt
中读取到数据或者发生错误。这种模型简单直观,但在处理多个 I/O 操作时效率较低,因为一个 I/O 操作未完成,其他 I/O 操作也无法进行。
1.2 非阻塞 I/O 模型
非阻塞 I/O 与阻塞 I/O 不同,当应用程序调用 I/O 函数时,无论操作是否能立即完成,函数都会立即返回。如果操作不能立即完成,函数会返回一个错误码(通常是 EAGAIN
或 EWOULDBLOCK
),表示操作需要稍后重试。这允许进程在等待 I/O 操作完成的同时,继续执行其他任务。要将一个文件描述符设置为非阻塞模式,可以使用 fcntl
函数,示例如下:
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
int main() {
int fd = open("test.txt", O_RDONLY);
if (fd == -1) {
perror("open");
return 1;
}
// 设置为非阻塞模式
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
char buffer[1024];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("No data available yet, will retry later.\n");
} else {
perror("read");
}
} else {
buffer[bytes_read] = '\0';
printf("Read data: %s\n", buffer);
}
close(fd);
return 0;
}
在上述代码中,通过 fcntl
函数将文件描述符 fd
设置为非阻塞模式。之后调用 read
时,如果没有数据可读,函数会立即返回 -1
,并且 errno
会被设置为 EAGAIN
或 EWOULDBLOCK
,进程可以根据这个错误码决定是否稍后重试。
二、多路复用技术
虽然非阻塞 I/O 允许进程在 I/O 操作未完成时继续执行其他任务,但如果需要同时处理多个 I/O 操作,就需要频繁地轮询每个文件描述符,检查是否有数据可读或可写,这会消耗大量的 CPU 资源。多路复用技术就是为了解决这个问题而出现的。多路复用允许一个进程监视多个文件描述符,当其中任何一个文件描述符准备好进行 I/O 操作时,通知进程进行相应处理。
2.1 select 函数
select
是最早的多路复用函数,它允许进程监视一组文件描述符,等待其中一个或多个文件描述符变为可读、可写或有异常事件发生。select
函数的原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds
:需要监视的文件描述符集合中最大文件描述符的值加 1。readfds
:指向一个fd_set
结构体的指针,用于检查可读性的文件描述符集合。writefds
:指向一个fd_set
结构体的指针,用于检查可写性的文件描述符集合。exceptfds
:指向一个fd_set
结构体的指针,用于检查异常情况的文件描述符集合。timeout
:指定等待的时间,如果为NULL
,则select
会一直阻塞,直到有文件描述符准备好;如果timeout
中的时间为 0,则select
不阻塞,立即返回。
fd_set
是一个文件描述符集合的数据类型,可以使用一些宏来操作它,例如:
FD_ZERO(fd_set *fdset); // 清空文件描述符集合
FD_SET(int fd, fd_set *fdset); // 将指定的文件描述符添加到集合中
FD_CLR(int fd, fd_set *fdset); // 将指定的文件描述符从集合中移除
FD_ISSET(int fd, fd_set *fdset); // 检查指定的文件描述符是否在集合中
下面是一个使用 select
实现同时监视标准输入和一个套接字的示例:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/types.h>
#include <sys/time.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int sockfd;
struct sockaddr_in servaddr, cliaddr;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
memset(&cliaddr, 0, sizeof(cliaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
perror("bind failed");
close(sockfd);
exit(EXIT_FAILURE);
}
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
FD_SET(sockfd, &read_fds);
int max_fd = (STDIN_FILENO > sockfd)? STDIN_FILENO : sockfd;
max_fd++;
struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
while (1) {
fd_set tmp_fds = read_fds;
int activity = select(max_fd, &tmp_fds, NULL, NULL, &timeout);
if (activity < 0) {
perror("select error");
break;
} else if (activity == 0) {
printf("Timeout occurred, no activity.\n");
} else {
if (FD_ISSET(STDIN_FILENO, &tmp_fds)) {
char buffer[BUFFER_SIZE];
fgets(buffer, sizeof(buffer), stdin);
printf("Read from stdin: %s", buffer);
}
if (FD_ISSET(sockfd, &tmp_fds)) {
char buffer[BUFFER_SIZE];
socklen_t len = sizeof(cliaddr);
int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE, MSG_WAITALL, (struct sockaddr *)&cliaddr, &len);
buffer[n] = '\0';
printf("Received from client: %s\n", buffer);
}
}
}
close(sockfd);
return 0;
}
在上述代码中,select
函数同时监视标准输入(STDIN_FILENO
)和一个 UDP 套接字。如果标准输入有数据可读或者套接字有数据到达,select
会返回,然后通过 FD_ISSET
宏检查是哪个文件描述符准备好,进而进行相应的处理。
2.2 poll 函数
poll
函数与 select
类似,但在某些方面有所改进。poll
函数的原型如下:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
:一个指向struct pollfd
数组的指针,struct pollfd
结构体定义如下:
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 实际发生的事件 */
};
nfds
:fds
数组中的元素个数。timeout
:等待的时间(毫秒),如果为-1
,则poll
会一直阻塞,直到有文件描述符准备好;如果为 0,则poll
不阻塞,立即返回。
events
字段可以设置为一些预定义的常量,如 POLLIN
(可读)、POLLOUT
(可写)、POLLERR
(错误)等,用于指定需要监视的事件。revents
字段会在 poll
返回时,由内核填充实际发生的事件。
以下是一个使用 poll
实现类似功能的示例:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <poll.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main() {
int sockfd;
struct sockaddr_in servaddr, cliaddr;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
memset(&cliaddr, 0, sizeof(cliaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
perror("bind failed");
close(sockfd);
exit(EXIT_FAILURE);
}
struct pollfd fds[2];
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
fds[1].fd = sockfd;
fds[1].events = POLLIN;
int timeout = 5000; // 5 seconds
while (1) {
int activity = poll(fds, 2, timeout);
if (activity < 0) {
perror("poll error");
break;
} else if (activity == 0) {
printf("Timeout occurred, no activity.\n");
} else {
if (fds[0].revents & POLLIN) {
char buffer[BUFFER_SIZE];
fgets(buffer, sizeof(buffer), stdin);
printf("Read from stdin: %s", buffer);
}
if (fds[1].revents & POLLIN) {
char buffer[BUFFER_SIZE];
socklen_t len = sizeof(cliaddr);
int n = recvfrom(sockfd, (char *)buffer, BUFFER_SIZE, MSG_WAITALL, (struct sockaddr *)&cliaddr, &len);
buffer[n] = '\0';
printf("Received from client: %s\n", buffer);
}
}
}
close(sockfd);
return 0;
}
在这个示例中,通过 poll
函数同时监视标准输入和 UDP 套接字。poll
函数返回后,通过检查 revents
字段来确定哪些文件描述符发生了相应的事件,并进行处理。
2.3 epoll 函数
epoll
是 Linux 特有的多路复用机制,它在处理大量文件描述符时比 select
和 poll
更高效。epoll
有两种工作模式:水平触发(Level Triggered,LT)和边缘触发(Edge Triggered,ET)。
epoll
使用三个函数来完成操作:epoll_create
、epoll_ctl
和 epoll_wait
。
epoll_create
函数用于创建一个 epoll
实例,返回一个文件描述符,原型如下:
int epoll_create(int size);
size
参数在 Linux 2.6.8 之后已被忽略,但仍需提供一个大于 0 的值。
epoll_ctl
函数用于控制 epoll
实例,向其中添加、修改或删除文件描述符及其感兴趣的事件,原型如下:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd
:epoll_create
返回的epoll
实例的文件描述符。op
:操作类型,可以是EPOLL_CTL_ADD
(添加文件描述符)、EPOLL_CTL_MOD
(修改文件描述符的事件)或EPOLL_CTL_DEL
(删除文件描述符)。fd
:要操作的文件描述符。event
:指向一个struct epoll_event
结构体的指针,用于指定感兴趣的事件和关联的数据,struct epoll_event
定义如下:
struct epoll_event {
uint32_t events; /* 感兴趣的事件 */
epoll_data_t data; /* 关联的数据 */
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
events
字段可以设置为一些预定义的常量,如 EPOLLIN
(可读)、EPOLLOUT
(可写)、EPOLLERR
(错误)等,还可以与 EPOLLET
(边缘触发模式)等标志位组合。
epoll_wait
函数用于等待 epoll
实例所监视的文件描述符上有事件发生,原型如下:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epfd
:epoll_create
返回的epoll
实例的文件描述符。events
:一个指向struct epoll_event
数组的指针,用于存储发生事件的文件描述符及其事件信息。maxevents
:events
数组的大小。timeout
:等待的时间(毫秒),如果为-1
,则epoll_wait
会一直阻塞,直到有文件描述符准备好;如果为 0,则epoll_wait
不阻塞,立即返回。
以下是一个使用 epoll
实现的简单示例,监视标准输入和一个 TCP 套接字:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/epoll.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10
int main() {
int sockfd, epollfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
perror("bind failed");
close(sockfd);
exit(EXIT_FAILURE);
}
if (listen(sockfd, 5) < 0) {
perror("listen failed");
close(sockfd);
exit(EXIT_FAILURE);
}
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
close(sockfd);
exit(EXIT_FAILURE);
}
struct epoll_event event;
event.data.fd = STDIN_FILENO;
event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {
perror("epoll_ctl: STDIN_FILENO");
close(epollfd);
close(sockfd);
exit(EXIT_FAILURE);
}
event.data.fd = sockfd;
event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
perror("epoll_ctl: sockfd");
close(epollfd);
close(sockfd);
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
int num_events;
while (1) {
num_events = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (num_events == -1) {
perror("epoll_wait");
break;
}
for (int i = 0; i < num_events; i++) {
if (events[i].data.fd == STDIN_FILENO) {
char buffer[BUFFER_SIZE];
fgets(buffer, sizeof(buffer), stdin);
printf("Read from stdin: %s", buffer);
} else if (events[i].data.fd == sockfd) {
int connfd = accept(sockfd, NULL, NULL);
if (connfd == -1) {
perror("accept");
continue;
}
char buffer[BUFFER_SIZE];
ssize_t bytes_read = recv(connfd, buffer, sizeof(buffer), 0);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("Received from client: %s\n", buffer);
} else if (bytes_read == -1) {
perror("recv");
}
close(connfd);
}
}
}
close(epollfd);
close(sockfd);
return 0;
}
在这个示例中,首先通过 epoll_create1
创建一个 epoll
实例,然后使用 epoll_ctl
将标准输入和 TCP 套接字添加到 epoll
实例中,并指定感兴趣的事件为可读。epoll_wait
函数会阻塞等待事件发生,当有事件发生时,通过遍历 events
数组来处理相应的文件描述符上的事件。
三、非阻塞 I/O 与多路复用的结合应用
在实际应用中,通常会将非阻塞 I/O 与多路复用技术结合使用。以 epoll
为例,当使用边缘触发模式(ET)时,由于 ET 模式下文件描述符只有在状态发生变化时才会被通知,所以通常需要将文件描述符设置为非阻塞模式,以避免在处理事件时阻塞其他事件的处理。
假设我们有一个简单的网络服务器,需要同时处理多个客户端连接,并且要高效地处理 I/O 操作,可以这样实现:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
int sockfd, epollfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
if (bind(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
perror("bind failed");
close(sockfd);
exit(EXIT_FAILURE);
}
if (listen(sockfd, 5) < 0) {
perror("listen failed");
close(sockfd);
exit(EXIT_FAILURE);
}
set_nonblocking(sockfd);
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
close(sockfd);
exit(EXIT_FAILURE);
}
struct epoll_event event;
event.data.fd = sockfd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
perror("epoll_ctl: sockfd");
close(epollfd);
close(sockfd);
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
int num_events;
while (1) {
num_events = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (num_events == -1) {
perror("epoll_wait");
break;
}
for (int i = 0; i < num_events; i++) {
if (events[i].data.fd == sockfd) {
while (1) {
int connfd = accept(sockfd, NULL, NULL);
if (connfd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
} else {
perror("accept");
break;
}
}
set_nonblocking(connfd);
event.data.fd = connfd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &event) == -1) {
perror("epoll_ctl: connfd");
close(connfd);
}
}
} else {
int client_fd = events[i].data.fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("Received from client %d: %s\n", client_fd, buffer);
// 回显数据
send(client_fd, buffer, bytes_read, 0);
} else if (bytes_read == 0) {
printf("Client %d disconnected.\n", client_fd);
epoll_ctl(epollfd, EPOLL_CTL_DEL, client_fd, NULL);
close(client_fd);
} else if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
perror("recv");
epoll_ctl(epollfd, EPOLL_CTL_DEL, client_fd, NULL);
close(client_fd);
}
}
}
}
}
close(epollfd);
close(sockfd);
return 0;
}
在上述代码中,首先将监听套接字 sockfd
设置为非阻塞模式,并添加到 epoll
实例中,使用边缘触发模式监听可读事件。当有新的客户端连接时,在一个循环中不断调用 accept
接受连接,同样将新的连接套接字设置为非阻塞模式,并添加到 epoll
实例中。在处理客户端数据时,如果 recv
返回 EAGAIN
或 EWOULDBLOCK
,表示当前没有数据可读,继续处理其他事件。这样可以高效地处理多个客户端的 I/O 操作,避免阻塞。
四、性能对比与适用场景
4.1 性能对比
- select:
select
有一些局限性。它能监视的文件描述符数量受限于FD_SETSIZE
(通常为 1024),并且每次调用select
时都需要将文件描述符集合从用户空间复制到内核空间,返回时又要从内核空间复制回用户空间,随着文件描述符数量的增加,这种复制操作的开销会变得很大。此外,select
使用轮询的方式检查文件描述符,时间复杂度为 O(n),当文件描述符数量较多时,性能会明显下降。 - poll:
poll
改进了select
中文件描述符数量的限制,它通过struct pollfd
数组来管理文件描述符,理论上可以监视的文件描述符数量只受限于系统资源。但poll
同样需要在每次调用时将struct pollfd
数组从用户空间复制到内核空间,返回时再复制回来,并且也是采用轮询方式,时间复杂度同样为 O(n),在处理大量文件描述符时性能也不理想。 - epoll:
epoll
在性能上有显著提升。它在内核中维护一个事件表,通过epoll_ctl
函数将文件描述符添加到事件表中,这样避免了每次调用时的大量数据复制。epoll
使用回调机制,当文件描述符状态发生变化时,内核会将其添加到就绪列表中,epoll_wait
只需要检查就绪列表,时间复杂度为 O(1)。特别是在处理大量文件描述符时,epoll
的性能优势更加明显。
4.2 适用场景
- select:适用于小规模的应用程序,文件描述符数量较少且对性能要求不是特别高的场景。由于其简单易用,在一些简单的测试程序或小型项目中仍可能会被使用。
- poll:
poll
相对于select
有一定的改进,适用于中等规模的应用程序,对文件描述符数量有一定要求,但性能要求不是极致的场景。它在一些传统的网络应用开发中仍有使用。 - epoll:适用于大规模的网络应用程序,尤其是需要处理大量并发连接的场景,如高性能的网络服务器。
epoll
的高效性能使其成为这类应用的首选多路复用机制。
通过对 Linux C 语言中非阻塞 I/O 与多路复用技术的深入了解,包括阻塞与非阻塞 I/O 模型的原理、select
、poll
和 epoll
等多路复用函数的使用,以及它们的性能对比和适用场景,开发者可以根据具体的应用需求选择合适的技术,开发出高效、稳定的网络应用程序。在实际开发中,还需要结合具体的业务逻辑和系统资源等因素,进行综合考虑和优化。