MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

网络编程中的非阻塞IO与多路复用技术

2021-01-203.6k 阅读

网络编程基础概念回顾

在深入探讨非阻塞 I/O 与多路复用技术之前,我们先来回顾一些网络编程的基础概念。

1. I/O 操作的本质

在计算机系统中,I/O(输入/输出)操作涉及到数据在不同设备之间的传输,比如从磁盘读取数据到内存,或者将内存中的数据发送到网络。对于网络编程而言,I/O 操作主要是指在网络套接字(Socket)上进行数据的读取和写入。

一个典型的网络应用程序需要与远程服务器建立连接,发送请求并接收响应。这个过程涉及到在套接字上进行多次 I/O 操作。例如,使用 Python 的 socket 模块创建一个简单的 TCP 客户端:

import socket

# 创建一个 TCP 套接字
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接到服务器
client_socket.connect(('127.0.0.1', 8080))
# 发送数据
client_socket.sendall(b'Hello, Server!')
# 接收数据
data = client_socket.recv(1024)
print('Received:', data.decode())
# 关闭套接字
client_socket.close()

上述代码展示了基本的网络 I/O 操作流程:创建套接字、连接服务器、发送数据、接收数据以及关闭套接字。

2. 阻塞与非阻塞的概念

在传统的 I/O 模型中,I/O 操作通常是阻塞的。当一个线程执行一个阻塞的 I/O 操作时,它会被挂起,直到该操作完成。例如,在上面的代码中,当执行 recv 方法时,如果没有数据到达,线程将一直等待,期间无法执行其他任务。

这种阻塞行为在某些场景下会带来性能问题。想象一下,一个服务器需要同时处理多个客户端连接,如果每个连接的 I/O 操作都是阻塞的,那么服务器需要为每个客户端创建一个单独的线程或进程来处理,这会消耗大量的系统资源。

为了解决这个问题,非阻塞 I/O 模型应运而生。在非阻塞 I/O 中,I/O 操作不会阻塞线程。当执行一个非阻塞的 I/O 操作时,如果操作不能立即完成,系统会立即返回一个错误码(例如 EWOULDBLOCK),线程可以继续执行其他任务,然后在合适的时候再次尝试该 I/O 操作。

非阻塞 I/O 详解

1. 非阻塞 I/O 的原理

非阻塞 I/O 允许应用程序在发起 I/O 操作后,不必等待操作完成就继续执行后续代码。操作系统会为每个套接字维护一个状态,当数据准备好时,操作系统会通知应用程序可以进行 I/O 操作。

以 Linux 系统为例,通过 fcntl 函数可以将套接字设置为非阻塞模式:

#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    // 获取当前套接字的文件描述符标志
    int flags = fcntl(sockfd, F_GETFL, 0);
    // 设置为非阻塞模式
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
    // 后续代码...
    close(sockfd);
    return 0;
}

在上述 C 语言代码中,首先获取套接字的当前文件描述符标志,然后通过 fcntl 函数将其设置为非阻塞模式。

2. 非阻塞 I/O 的优势与挑战

优势

  • 提高资源利用率:非阻塞 I/O 避免了线程在 I/O 操作上的阻塞等待,使得单个线程可以同时处理多个 I/O 操作,从而提高了系统资源的利用率。
  • 支持高并发:在处理大量并发连接时,无需为每个连接创建单独的线程或进程,减少了上下文切换开销,更适合构建高并发的网络应用程序。

挑战

  • 复杂的编程模型:由于 I/O 操作不会立即完成,应用程序需要不断地轮询检查 I/O 操作的状态,这使得代码逻辑变得复杂,增加了编程的难度。
  • 资源浪费:如果轮询频率过高,会消耗大量的 CPU 资源;而轮询频率过低,又可能导致数据处理不及时。

3. 非阻塞 I/O 的代码示例(Python)

下面是一个使用 Python 和非阻塞 I/O 的简单示例,展示如何在单线程中处理多个套接字:

import socket
import selectors

# 创建一个默认的选择器对象
sel = selectors.DefaultSelector()


def accept(sock, mask):
    conn, addr = sock.accept()  # 接受新连接
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)  # 设置为非阻塞模式
    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    data = conn.recv(1024)  # 读取数据
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # 回显数据
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


# 创建一个 TCP 套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8080))
sock.listen(100)
sock.setblocking(False)  # 设置为非阻塞模式
sel.register(sock, selectors.EVENT_READ, accept)

try:
    while True:
        events = sel.select()  # 等待事件发生
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)
except KeyboardInterrupt:
    print('caught keyboard interrupt, exiting')
finally:
    sel.close()

在这个示例中,首先创建了一个选择器对象 sel。然后定义了 acceptread 两个回调函数,分别用于处理新连接的接受和数据的读取。通过 sel.register 方法将套接字注册到选择器中,并指定相应的回调函数。在主循环中,通过 sel.select 等待事件发生,当有事件发生时,调用相应的回调函数进行处理。

多路复用技术

1. 多路复用的概念

多路复用技术是一种允许一个进程同时处理多个 I/O 流的技术。它的核心思想是通过一个系统调用,一次性监听多个文件描述符(例如套接字)的状态变化,当其中任何一个文件描述符准备好进行 I/O 操作时,系统调用返回,应用程序可以对这些准备好的文件描述符进行相应的 I/O 操作。

多路复用技术主要有三种实现方式:selectpollepoll(在 Linux 系统下),它们的基本原理类似,但在性能和使用方式上有所差异。

2. select 模型

原理select 函数通过监听一组文件描述符(包括标准输入、套接字等),当其中任何一个文件描述符准备好进行 I/O 操作(例如可读、可写或有异常)时,函数返回。select 函数的原型如下:

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

其中,nfds 是需要检查的文件描述符的最大值加 1;readfdswritefdsexceptfds 分别是需要监听读、写和异常事件的文件描述符集合;timeout 是一个可选的超时时间。

示例代码(C 语言)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>

#define PORT 8080
#define MAX_CLIENTS 100

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    fd_set read_fds;
    fd_set tmp_fds;
    FD_ZERO(&read_fds);
    FD_ZERO(&tmp_fds);

    // 将服务器套接字添加到读集合中
    FD_SET(server_fd, &read_fds);
    int activity, i, val;

    while (1) {
        // 备份读集合
        tmp_fds = read_fds;
        activity = select(server_fd + 1, &tmp_fds, NULL, NULL, NULL);

        if ((activity < 0) && (errno!= EINTR)) {
            printf("select error");
        } else if (activity) {
            if (FD_ISSET(server_fd, &tmp_fds)) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    continue;
                }

                // 将新连接的套接字添加到读集合中
                FD_SET(new_socket, &read_fds);
                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
            }

            for (i = 0; i <= server_fd; i++) {
                if (FD_ISSET(i, &tmp_fds)) {
                    if ((valread = read(i, buffer, 1024)) == 0) {
                        // 客户端关闭连接
                        getpeername(i, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(i);
                        FD_CLR(i, &read_fds);
                    } else {
                        buffer[valread] = '\0';
                        send(i, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }
    return 0;
}

局限性

  • 文件描述符数量限制select 通常有文件描述符数量的限制(例如在一些系统中默认是 1024),这在处理大量并发连接时会成为瓶颈。
  • 性能问题select 需要遍历所有注册的文件描述符来检查哪些是准备好的,随着文件描述符数量的增加,性能会显著下降。

3. poll 模型

原理poll 函数与 select 类似,但它使用一个 pollfd 结构体数组来表示需要监听的文件描述符及其事件,而不是像 select 那样使用固定大小的文件描述符集合。poll 函数的原型如下:

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

其中,fds 是一个 pollfd 结构体数组,nfds 是数组中元素的数量,timeout 是超时时间。

示例代码(C 语言)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <poll.h>

#define PORT 8080
#define MAX_CLIENTS 100

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    struct pollfd fds[MAX_CLIENTS + 1];
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;
    int nfds = 1;
    int activity, i, val;

    while (1) {
        activity = poll(fds, nfds, -1);

        if (activity < 0) {
            perror("poll error");
        } else if (activity) {
            if (fds[0].revents & POLLIN) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    continue;
                }

                fds[nfds].fd = new_socket;
                fds[nfds].events = POLLIN;
                nfds++;
                printf("New connection, socket fd is %d, ip is : %s, port : %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
            }

            for (i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    if ((valread = read(fds[i].fd, buffer, 1024)) == 0) {
                        // 客户端关闭连接
                        getpeername(fds[i].fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(fds[i].fd);
                        for (int j = i; j < nfds - 1; j++) {
                            fds[j] = fds[j + 1];
                        }
                        nfds--;
                    } else {
                        buffer[valread] = '\0';
                        send(fds[i].fd, buffer, strlen(buffer), 0);
                    }
                }
            }
        }
    }
    return 0;
}

优势与不足

  • 优势poll 没有文件描述符数量的限制,因为它使用数组来存储文件描述符,理论上可以处理大量的文件描述符。
  • 不足poll 同样需要遍历所有注册的文件描述符来检查哪些是准备好的,在处理大量文件描述符时性能仍然不够理想。

4. epoll 模型(Linux 特有)

原理epoll 是 Linux 内核为处理大批量文件描述符而作了改进的多路复用 I/O 接口。epoll 使用一个文件描述符 epollfd 来管理所有需要监听的文件描述符,通过 epoll_ctl 函数来添加、修改或删除要监听的文件描述符及其事件,通过 epoll_wait 函数等待事件发生。

epoll 有两种工作模式:水平触发(Level Triggered, LT)和边缘触发(Edge Triggered, ET)。

  • 水平触发:只要文件描述符对应的缓冲区还有数据可读(或可写),epoll_wait 就会一直通知应用程序。
  • 边缘触发:只有当文件描述符对应的缓冲区状态发生变化(例如从无数据到有数据)时,epoll_wait 才会通知应用程序。边缘触发模式通常可以减少不必要的系统调用,提高性能,但编程难度相对较高。

示例代码(C 语言,水平触发)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>

#define PORT 8080
#define MAX_CLIENTS 100
#define EVENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    int epollfd = epoll_create1(0);
    if (epollfd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = server_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[EVENTS];
    int nfds, i;

    while (1) {
        nfds = epoll_wait(epollfd, events, EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

        for (i = 0; i < nfds; i++) {
            if (events[i].data.fd == server_fd) {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1) {
                    perror("accept");
                    continue;
                }

                event.data.fd = new_socket;
                event.events = EPOLLIN;
                if (epoll_ctl(epollfd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
                    perror("epoll_ctl: new_socket");
                    close(new_socket);
                }
            } else {
                valread = read(events[i].data.fd, buffer, 1024);
                if (valread == 0) {
                    // 客户端关闭连接
                    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, events[i].data.fd, NULL) == -1) {
                        perror("epoll_ctl: del");
                    }
                    close(events[i].data.fd);
                } else {
                    buffer[valread] = '\0';
                    send(events[i].data.fd, buffer, strlen(buffer), 0);
                }
            }
        }
    }
    close(server_fd);
    close(epollfd);
    return 0;
}

优势

  • 高效的事件通知epoll 使用回调机制,只有当文件描述符状态发生变化时才会通知应用程序,而不是像 selectpoll 那样需要遍历所有文件描述符,大大提高了性能。
  • 支持大量文件描述符epoll 没有文件描述符数量的限制,非常适合处理高并发的网络应用。

非阻塞 I/O 与多路复用的结合

在实际应用中,非阻塞 I/O 与多路复用技术通常结合使用。多路复用技术(如 epoll)负责监听多个文件描述符的状态变化,当某个文件描述符准备好进行 I/O 操作时,应用程序使用非阻塞 I/O 来执行具体的 I/O 操作,这样可以充分发挥两者的优势,实现高效的高并发网络编程。

例如,在上面的 epoll 示例代码中,虽然没有显式地将套接字设置为非阻塞模式,但在实际的高性能应用中,通常会将套接字设置为非阻塞模式,以避免在读取或写入数据时阻塞线程,从而进一步提高系统的并发处理能力。

应用场景分析

1. 高并发 Web 服务器

在高并发的 Web 服务器场景中,需要同时处理大量的客户端连接。使用非阻塞 I/O 和多路复用技术,可以在单线程或少量线程内处理大量的连接,减少线程上下文切换开销,提高服务器的性能和吞吐量。例如,Nginx 就是一个基于非阻塞 I/O 和多路复用技术(如 epoll)实现的高性能 Web 服务器,能够高效地处理大量并发请求。

2. 实时通信系统

对于实时通信系统,如即时通讯(IM)应用或在线游戏服务器,需要实时处理大量客户端的消息收发。非阻塞 I/O 和多路复用技术可以确保服务器能够及时响应客户端的请求,实现实时通信的功能。通过使用多路复用技术监听多个客户端套接字的状态变化,当有数据到达时,使用非阻塞 I/O 快速读取和处理数据,保证系统的实时性和高并发处理能力。

3. 网络爬虫

网络爬虫需要同时下载多个网页,这涉及到大量的网络 I/O 操作。使用非阻塞 I/O 和多路复用技术,可以在一个线程中同时处理多个网络请求,提高爬虫的抓取效率。通过多路复用技术监听多个套接字的状态,当某个套接字准备好读取数据(即网页下载完成)时,使用非阻塞 I/O 读取网页内容,然后继续发起新的下载请求,从而实现高效的网页抓取。