MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Linux C语言非阻塞I/O的并发处理

2022-04-107.7k 阅读

一、Linux I/O 模型概述

在深入探讨非阻塞 I/O 的并发处理之前,我们先来了解一下 Linux 系统中常见的 I/O 模型。在 Linux 环境下,主要有以下几种 I/O 模型:

  1. 阻塞 I/O(Blocking I/O):这是最基本的 I/O 模型。当应用程序执行一个 I/O 操作(如读或写)时,进程会被阻塞,直到 I/O 操作完成。例如,调用 read 函数读取文件描述符时,如果数据尚未准备好,进程会一直等待,直到数据可读并被读取,在此期间进程不能执行其他任务。
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>

#define BUFFER_SIZE 1024

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        exit(EXIT_FAILURE);
    }

    char buffer[BUFFER_SIZE];
    ssize_t bytesRead = read(fd, buffer, BUFFER_SIZE);
    if (bytesRead == -1) {
        perror("read");
        close(fd);
        exit(EXIT_FAILURE);
    }

    printf("Read %zd bytes: %.*s\n", bytesRead, (int)bytesRead, buffer);
    close(fd);
    return 0;
}

在上述代码中,read 操作会阻塞进程,直到数据从文件中读取完毕。如果文件为空或者读取过程中出现问题,进程将一直处于等待状态。

  1. 非阻塞 I/O(Non - blocking I/O):与阻塞 I/O 不同,非阻塞 I/O 在执行 I/O 操作时,如果数据尚未准备好,系统调用不会阻塞进程,而是立即返回一个错误(通常是 EAGAINEWOULDBLOCK)。这样,进程可以继续执行其他任务,而不是一直等待 I/O 操作完成。通过不断轮询检查 I/O 操作是否完成,应用程序可以实现并发处理多个 I/O 操作。
  2. I/O 多路复用(I/O Multiplexing):这是一种可以同时监视多个文件描述符状态的技术。通过使用诸如 selectpollepoll 等系统调用,应用程序可以在一个进程中同时处理多个 I/O 流。这些系统调用会阻塞在等待文件描述符状态变化上,但在等待过程中可以同时检查多个文件描述符,当有任何一个文件描述符就绪时,系统调用返回,应用程序可以处理相应的 I/O 操作。
  3. 信号驱动 I/O(Signal - driven I/O):在这种模型中,应用程序通过 sigaction 函数注册一个信号处理函数,当 I/O 操作准备好时,内核会发送一个信号给进程,进程的信号处理函数会被调用,从而处理 I/O 操作。这种方式允许进程在等待 I/O 操作完成时继续执行其他任务,而不需要像非阻塞 I/O 那样不断轮询。
  4. 异步 I/O(Asynchronous I/O):异步 I/O 允许应用程序在发起 I/O 操作后继续执行其他任务,而 I/O 操作由内核在后台完成。当 I/O 操作完成时,内核会通知应用程序。Linux 提供了 aio_readaio_write 等函数来支持异步 I/O。

二、非阻塞 I/O 基础

(一)设置文件描述符为非阻塞模式

在 Linux 中,可以通过 fcntl 函数来设置文件描述符为非阻塞模式。fcntl 函数用于操作文件描述符的各种属性。以下是设置文件描述符为非阻塞模式的代码示例:

#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        close(fd);
        exit(EXIT_FAILURE);
    }

    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl F_SETFL");
        close(fd);
        exit(EXIT_FAILURE);
    }

    // 这里开始可以进行非阻塞 I/O 操作
    close(fd);
    return 0;
}

在上述代码中,首先通过 fcntl 函数的 F_GETFL 命令获取文件描述符 fd 的当前标志。然后,将 O_NONBLOCK 标志添加到这些标志中,并使用 F_SETFL 命令将修改后的标志设置回文件描述符。这样,fd 就被设置为非阻塞模式了。

(二)非阻塞 I/O 操作

当文件描述符处于非阻塞模式时,readwrite 等 I/O 操作的行为会发生变化。如果数据尚未准备好,read 操作将立即返回 -1,并且 errno 被设置为 EAGAINEWOULDBLOCK。以下是一个简单的非阻塞读操作示例:

#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#define BUFFER_SIZE 1024

int main() {
    int fd = open("test.txt", O_RDONLY | O_NONBLOCK);
    if (fd == -1) {
        perror("open");
        exit(EXIT_FAILURE);
    }

    char buffer[BUFFER_SIZE];
    ssize_t bytesRead;
    do {
        bytesRead = read(fd, buffer, BUFFER_SIZE);
        if (bytesRead == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 数据尚未准备好,继续执行其他任务
                printf("Data not ready, continue other tasks...\n");
                // 这里可以添加其他任务的代码
                continue;
            } else {
                perror("read");
                close(fd);
                exit(EXIT_FAILURE);
            }
        } else if (bytesRead > 0) {
            printf("Read %zd bytes: %.*s\n", bytesRead, (int)bytesRead, buffer);
        }
    } while (bytesRead > 0);

    close(fd);
    return 0;
}

在这个示例中,通过 do - while 循环不断尝试读取数据。当 read 返回 -1errnoEAGAINEWOULDBLOCK 时,说明数据尚未准备好,程序可以继续执行其他任务,然后再次尝试读取。当 read 返回大于 0 的值时,说明成功读取到数据,将其打印出来。

三、非阻塞 I/O 的并发处理

(一)轮询方式实现并发非阻塞 I/O

一种简单的实现并发非阻塞 I/O 的方式是通过轮询多个文件描述符。假设有多个套接字(socket)需要进行非阻塞读操作,可以使用一个循环不断检查每个套接字是否有数据可读。以下是一个简化的示例,假设有两个套接字 sockfd1sockfd2

#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#define BUFFER_SIZE 1024

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        exit(EXIT_FAILURE);
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl F_SETFL");
        exit(EXIT_FAILURE);
    }
}

int main() {
    // 创建并初始化 sockfd1
    int sockfd1 = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd1 == -1) {
        perror("socket1");
        exit(EXIT_FAILURE);
    }
    struct sockaddr_in servaddr1;
    servaddr1.sin_family = AF_INET;
    servaddr1.sin_port = htons(8080);
    servaddr1.sin_addr.s_addr = inet_addr("127.0.0.1");
    if (connect(sockfd1, (struct sockaddr *)&servaddr1, sizeof(servaddr1)) == -1) {
        perror("connect1");
        close(sockfd1);
        exit(EXIT_FAILURE);
    }
    set_nonblocking(sockfd1);

    // 创建并初始化 sockfd2
    int sockfd2 = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd2 == -1) {
        perror("socket2");
        close(sockfd1);
        exit(EXIT_FAILURE);
    }
    struct sockaddr_in servaddr2;
    servaddr2.sin_family = AF_INET;
    servaddr2.sin_port = htons(8081);
    servaddr2.sin_addr.s_addr = inet_addr("127.0.0.1");
    if (connect(sockfd2, (struct sockaddr *)&servaddr2, sizeof(servaddr2)) == -1) {
        perror("connect2");
        close(sockfd1);
        close(sockfd2);
        exit(EXIT_FAILURE);
    }
    set_nonblocking(sockfd2);

    char buffer[BUFFER_SIZE];
    while (1) {
        ssize_t bytesRead;
        // 检查 sockfd1
        bytesRead = read(sockfd1, buffer, BUFFER_SIZE);
        if (bytesRead == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 数据尚未准备好
            } else {
                perror("read sockfd1");
                close(sockfd1);
                close(sockfd2);
                exit(EXIT_FAILURE);
            }
        } else if (bytesRead > 0) {
            printf("Read from sockfd1: %.*s\n", (int)bytesRead, buffer);
        }

        // 检查 sockfd2
        bytesRead = read(sockfd2, buffer, BUFFER_SIZE);
        if (bytesRead == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 数据尚未准备好
            } else {
                perror("read sockfd2");
                close(sockfd1);
                close(sockfd2);
                exit(EXIT_FAILURE);
            }
        } else if (bytesRead > 0) {
            printf("Read from sockfd2: %.*s\n", (int)bytesRead, buffer);
        }

        // 这里可以添加其他任务的代码
    }

    close(sockfd1);
    close(sockfd2);
    return 0;
}

在上述代码中,首先创建并初始化两个套接字 sockfd1sockfd2,并将它们设置为非阻塞模式。然后,在一个无限循环中,通过 read 函数轮询检查每个套接字是否有数据可读。如果有数据可读,则打印出来;如果数据尚未准备好,根据 errno 判断是否为 EAGAINEWOULDBLOCK,若是则继续循环检查其他套接字或执行其他任务。

(二)结合 I/O 多路复用实现并发非阻塞 I/O

虽然轮询方式可以实现并发非阻塞 I/O,但这种方式效率较低,尤其是当需要处理大量文件描述符时。I/O 多路复用技术可以更高效地解决这个问题。下面以 epoll 为例,展示如何结合非阻塞 I/O 实现并发处理。

  1. epoll 简介epoll 是 Linux 内核提供的一种高效的 I/O 多路复用机制,相比传统的 selectpoll,它具有更好的性能,特别是在处理大量文件描述符时。epoll 使用一个文件描述符 epfd 来管理多个被监控的文件描述符,通过 epoll_ctl 函数来添加、修改或删除被监控的文件描述符,通过 epoll_wait 函数来等待文件描述符状态的变化。

  2. 代码示例

#include <sys/epoll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        exit(EXIT_FAILURE);
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl F_SETFL");
        exit(EXIT_FAILURE);
    }
}

int main() {
    int epfd = epoll_create1(0);
    if (epfd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    // 创建并初始化 sockfd1
    int sockfd1 = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd1 == -1) {
        perror("socket1");
        close(epfd);
        exit(EXIT_FAILURE);
    }
    struct sockaddr_in servaddr1;
    servaddr1.sin_family = AF_INET;
    servaddr1.sin_port = htons(8080);
    servaddr1.sin_addr.s_addr = inet_addr("127.0.0.1");
    if (connect(sockfd1, (struct sockaddr *)&servaddr1, sizeof(servaddr1)) == -1) {
        perror("connect1");
        close(sockfd1);
        close(epfd);
        exit(EXIT_FAILURE);
    }
    set_nonblocking(sockfd1);

    struct epoll_event event;
    event.data.fd = sockfd1;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd1, &event) == -1) {
        perror("epoll_ctl ADD sockfd1");
        close(sockfd1);
        close(epfd);
        exit(EXIT_FAILURE);
    }

    // 创建并初始化 sockfd2
    int sockfd2 = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd2 == -1) {
        perror("socket2");
        close(sockfd1);
        close(epfd);
        exit(EXIT_FAILURE);
    }
    struct sockaddr_in servaddr2;
    servaddr2.sin_family = AF_INET;
    servaddr2.sin_port = htons(8081);
    servaddr2.sin_addr.s_addr = inet_addr("127.0.0.1");
    if (connect(sockfd2, (struct sockaddr *)&servaddr2, sizeof(servaddr2)) == -1) {
        perror("connect2");
        close(sockfd1);
        close(sockfd2);
        close(epfd);
        exit(EXIT_FAILURE);
    }
    set_nonblocking(sockfd2);

    event.data.fd = sockfd2;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd2, &event) == -1) {
        perror("epoll_ctl ADD sockfd2");
        close(sockfd1);
        close(sockfd2);
        close(epfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    char buffer[BUFFER_SIZE];
    while (1) {
        int numEvents = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (numEvents == -1) {
            perror("epoll_wait");
            close(sockfd1);
            close(sockfd2);
            close(epfd);
            exit(EXIT_FAILURE);
        }

        for (int i = 0; i < numEvents; ++i) {
            int fd = events[i].data.fd;
            ssize_t bytesRead;
            do {
                bytesRead = read(fd, buffer, BUFFER_SIZE);
                if (bytesRead == -1) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        break;
                    } else {
                        perror("read");
                        close(sockfd1);
                        close(sockfd2);
                        close(epfd);
                        exit(EXIT_FAILURE);
                    }
                } else if (bytesRead > 0) {
                    buffer[bytesRead] = '\0';
                    printf("Read from fd %d: %s\n", fd, buffer);
                }
            } while (bytesRead > 0);
        }
    }

    close(sockfd1);
    close(sockfd2);
    close(epfd);
    return 0;
}

在上述代码中,首先通过 epoll_create1 创建一个 epoll 实例,得到 epfd。然后创建两个套接字 sockfd1sockfd2,将它们设置为非阻塞模式,并通过 epoll_ctl 将这两个套接字添加到 epoll 的监控列表中,设置感兴趣的事件为 EPOLLIN(可读事件)和 EPOLLET(边缘触发模式)。在主循环中,通过 epoll_wait 等待文件描述符状态变化。当有事件发生时,遍历 epoll_wait 返回的事件列表,对每个就绪的文件描述符进行非阻塞读操作。

四、非阻塞 I/O 并发处理的应用场景

  1. 网络服务器:在网络服务器开发中,非阻塞 I/O 的并发处理技术被广泛应用。例如,一个高性能的 Web 服务器需要同时处理大量客户端的连接和请求。通过将套接字设置为非阻塞模式,并结合 I/O 多路复用(如 epoll),服务器可以在一个进程或线程中高效地处理多个客户端的请求,而不会因为某个客户端的 I/O 操作未完成而阻塞其他客户端的处理。这大大提高了服务器的并发处理能力和响应速度。
  2. 实时数据采集系统:在实时数据采集系统中,可能需要同时从多个传感器采集数据。每个传感器的数据采集可以看作是一个 I/O 操作。通过将与传感器通信的设备文件描述符设置为非阻塞模式,并使用 I/O 多路复用技术,系统可以在一个进程中高效地并发处理多个传感器的数据采集,确保及时获取各个传感器的最新数据。
  3. 分布式系统:在分布式系统中,节点之间需要进行大量的网络通信。非阻塞 I/O 的并发处理可以使节点在等待网络 I/O 操作完成的同时,继续执行其他任务,如处理本地数据、进行计算等。这有助于提高分布式系统的整体性能和效率,确保各个节点之间的通信能够高效进行。

五、非阻塞 I/O 并发处理的注意事项

  1. 资源管理:在使用非阻塞 I/O 进行并发处理时,需要注意合理管理资源。例如,打开的文件描述符、分配的内存等资源在使用完毕后必须及时关闭和释放,否则可能会导致资源泄漏,最终耗尽系统资源。在代码示例中,每次打开文件描述符或套接字后,都需要在适当的时候通过 close 函数关闭。
  2. 错误处理:非阻塞 I/O 操作可能会因为各种原因返回错误。除了常见的 EAGAINEWOULDBLOCK 错误外,还可能出现其他错误,如 EBADF(无效的文件描述符)、EFAULT(内存访问错误)等。应用程序必须妥善处理这些错误,根据不同的错误类型采取相应的措施,如关闭文件描述符、重新初始化连接等,以确保程序的稳定性和可靠性。
  3. 性能优化:虽然非阻塞 I/O 和 I/O 多路复用技术可以提高并发处理能力,但在实际应用中,仍需要注意性能优化。例如,合理设置 epoll 的参数,避免不必要的系统调用和内存拷贝。在处理大量文件描述符时,需要根据系统资源和业务需求,合理调整监控的文件描述符数量和等待时间等参数,以达到最佳的性能表现。
  4. 线程安全:如果在多线程环境下使用非阻塞 I/O 进行并发处理,需要特别注意线程安全问题。例如,共享的文件描述符、缓冲区等资源在多个线程中访问时,可能会出现数据竞争和不一致的问题。可以通过使用互斥锁、信号量等同步机制来保证线程安全,确保不同线程对共享资源的访问是有序和正确的。

通过深入理解和合理应用 Linux C 语言中的非阻塞 I/O 并发处理技术,开发者可以开发出高性能、高并发的应用程序,满足各种复杂的业务需求。无论是网络服务器、实时数据采集系统还是分布式系统等领域,这些技术都有着广泛的应用前景。在实际开发中,需要根据具体的业务场景和需求,灵活选择合适的 I/O 模型和并发处理方式,并注意资源管理、错误处理、性能优化和线程安全等方面的问题,以开发出稳定、高效的软件系统。