MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

poll机制在不同操作系统中的实现与优化

2024-05-045.6k 阅读

一、poll机制概述

(一)基本概念

在网络编程中,poll机制是一种多路复用 I/O 模型,它允许应用程序同时监视多个文件描述符的状态变化,比如可读、可写或异常。通过 poll,程序可以高效地处理多个并发的 I/O 操作,而无需为每个文件描述符创建单独的线程或进程。

poll 机制的核心数据结构是 pollfd 结构体数组,每个 pollfd 结构体代表一个需要监视的文件描述符及其相关事件。

(二)与其他I/O多路复用模型的对比

  1. select:select 是最早的多路复用模型,它通过一个 fd_set 集合来管理文件描述符,其最大的限制是文件描述符数量有限(通常为1024),并且每次调用 select 时都需要将整个 fd_set 从用户空间复制到内核空间,效率较低。
  2. epoll:epoll 是 Linux 特有的高性能多路复用模型,它采用事件驱动的方式,在内核中维护一个红黑树来管理文件描述符,并且通过一个链表来保存就绪的文件描述符,避免了大量文件描述符的无效扫描,效率比 poll 更高,尤其适用于处理大量并发连接。

二、poll机制在Linux操作系统中的实现

(一)相关数据结构

在 Linux 中,poll 机制主要涉及 pollfd 结构体,定义如下:

struct pollfd {
    int fd;         /* 文件描述符 */
    short events;   /* 等待的事件 */
    short revents;  /* 实际发生的事件 */
};
  1. fd:需要监视的文件描述符,可以是套接字、管道、设备文件等。
  2. events:指定要监视的事件类型,常见的事件有 POLLIN(数据可读)、POLLOUT(数据可写)、POLLERR(错误)等。
  3. revents:由内核填充,用于返回实际发生的事件。

(二)系统调用

Linux 提供了 poll 系统调用,原型如下:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  1. fds:指向 pollfd 结构体数组的指针,该数组包含了所有需要监视的文件描述符及其相关事件。
  2. nfdsfds 数组中的元素个数。
  3. timeout:指定等待的超时时间(单位为毫秒)。如果设置为 -1,则表示无限期等待;如果设置为 0,则表示立即返回,不等待任何事件发生。

(三)实现原理

  1. 内核空间:当应用程序调用 poll 系统调用时,内核会遍历 pollfd 数组,为每个文件描述符注册对应的事件回调函数。这些回调函数会在文件描述符状态发生变化时被触发,将对应的事件记录到 revents 字段中。
  2. 用户空间poll 系统调用返回后,应用程序可以检查 revents 字段,判断哪些文件描述符上发生了感兴趣的事件,并进行相应的处理。

(四)代码示例

以下是一个简单的使用 poll 机制进行网络编程的示例代码,实现一个简单的服务器,监听多个客户端连接:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <poll.h>

#define PORT 8080
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    struct pollfd fds[MAX_CLIENTS + 1];
    char buffer[1024] = {0};

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 初始化 pollfd 数组
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;
    int nfds = 1;

    printf("Server is listening on port %d...\n", PORT);

    while (1) {
        int activity = poll(fds, nfds, -1);

        if (activity < 0) {
            perror("poll error");
            break;
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                // 有新的客户端连接
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    continue;
                }

                printf("New client connected: %d\n", new_socket);

                // 将新的客户端套接字添加到 pollfd 数组
                fds[nfds].fd = new_socket;
                fds[nfds].events = POLLIN;
                nfds++;
            }

            // 处理客户端数据
            for (int i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    valread = read(fds[i].fd, buffer, 1024);
                    buffer[valread] = '\0';
                    printf("Received from client %d: %s\n", fds[i].fd, buffer);

                    // 回显数据给客户端
                    send(fds[i].fd, buffer, strlen(buffer), 0);
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

三、poll机制在Windows操作系统中的实现

(一)相关数据结构

在 Windows 操作系统中,与 poll 类似的功能是通过 WSAAsyncSelectWSAEventSelect 实现的。这里我们主要介绍 WSAEventSelect,它涉及到 WSAEVENT 事件对象和 WSANETWORKEVENTS 结构体。

WSAEVENT 是一个事件对象句柄,定义如下:

typedef HANDLE WSAEVENT;

WSANETWORKEVENTS 结构体用于接收网络事件,定义如下:

typedef struct _WSANETWORKEVENTS {
    long lNetworkEvents;
    WSABUF NetworkEvents;
} WSANETWORKEVENTS, FAR * LPWSANETWORKEVENTS;
  1. lNetworkEvents:表示发生的网络事件类型,如 FD_READ(可读事件)、FD_WRITE(可写事件)等。
  2. NetworkEvents:包含与事件相关的数据。

(二)函数调用

  1. WSAEventSelect:用于注册网络事件,原型如下:
int WSAEventSelect(
    SOCKET s,
    WSAEVENT hEventObject,
    long lNetworkEvents
);
- **s**:需要监视的套接字。
- **hEventObject**:事件对象句柄。
- **lNetworkEvents**:指定要监视的网络事件类型。

2. WSAWaitForMultipleEvents:用于等待事件发生,原型如下:

DWORD WSAWaitForMultipleEvents(
    DWORD cEvents,
    const WSAEVENT *lphEvents,
    BOOL fWaitAll,
    DWORD dwTimeout,
    BOOL fAlertable
);
- **cEvents**:事件对象数组中的事件数量。
- **lphEvents**:指向事件对象数组的指针。
- **fWaitAll**:如果为 `TRUE`,则等待所有事件都发生;如果为 `FALSE`,则只要有一个事件发生就返回。
- **dwTimeout**:等待的超时时间(单位为毫秒)。
- **fAlertable**:如果为 `TRUE`,则在等待过程中可以被异步过程调用(APC)唤醒。

3. WSAEnumNetworkEvents:用于获取发生的网络事件,原型如下:

int WSAEnumNetworkEvents(
    SOCKET s,
    WSAEVENT hEventObject,
    LPWSANETWORKEVENTS lpNetworkEvents
);
- **s**:套接字。
- **hEventObject**:事件对象句柄。
- **lpNetworkEvents**:指向 `WSANETWORKEVENTS` 结构体的指针,用于接收网络事件信息。

(三)实现原理

  1. 事件注册:应用程序通过 WSAEventSelect 函数为套接字注册感兴趣的网络事件,并将事件与一个 WSAEVENT 事件对象关联。
  2. 事件等待:使用 WSAWaitForMultipleEvents 函数等待事件发生,该函数会阻塞线程,直到有事件发生或超时。
  3. 事件处理:当事件发生后,通过 WSAEnumNetworkEvents 函数获取具体发生的网络事件,并进行相应的处理。

(四)代码示例

以下是一个简单的使用 WSAEventSelect 实现的服务器示例代码:

#include <winsock2.h>
#include <windows.h>
#include <stdio.h>

#define DEFAULT_PORT 8080
#define MAX_CLIENTS 10

#pragma comment(lib, "ws2_32.lib")

int main() {
    WSADATA wsaData;
    SOCKET listenSocket, clientSocket;
    sockaddr_in serverAddr, clientAddr;
    int clientAddrLen = sizeof(clientAddr);
    WSAEVENT eventArray[MAX_CLIENTS + 1];
    WSANETWORKEVENTS networkEvents;

    // 初始化 Winsock
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        printf("WSAStartup failed: %d\n", WSAGetLastError());
        return 1;
    }

    // 创建监听套接字
    listenSocket = socket(AF_INET, SOCK_STREAM, 0);
    if (listenSocket == INVALID_SOCKET) {
        printf("Socket creation failed: %d\n", WSAGetLastError());
        WSACleanup();
        return 1;
    }

    // 设置服务器地址
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(DEFAULT_PORT);
    serverAddr.sin_addr.s_addr = INADDR_ANY;

    // 绑定套接字
    if (bind(listenSocket, (sockaddr *)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
        printf("Bind failed: %d\n", WSAGetLastError());
        closesocket(listenSocket);
        WSACleanup();
        return 1;
    }

    // 监听套接字
    if (listen(listenSocket, 5) == SOCKET_ERROR) {
        printf("Listen failed: %d\n", WSAGetLastError());
        closesocket(listenSocket);
        WSACleanup();
        return 1;
    }

    printf("Server is listening on port %d...\n", DEFAULT_PORT);

    // 初始化事件数组
    eventArray[0] = WSACreateEvent();
    if (eventArray[0] == WSA_INVALID_EVENT) {
        printf("WSACreateEvent failed: %d\n", WSAGetLastError());
        closesocket(listenSocket);
        WSACleanup();
        return 1;
    }

    if (WSAEventSelect(listenSocket, eventArray[0], FD_ACCEPT) == SOCKET_ERROR) {
        printf("WSAEventSelect failed: %d\n", WSAGetLastError());
        WSACloseEvent(eventArray[0]);
        closesocket(listenSocket);
        WSACleanup();
        return 1;
    }

    int nfds = 1;

    while (1) {
        // 等待事件发生
        DWORD eventIndex = WSAWaitForMultipleEvents(nfds, eventArray, FALSE, WSA_INFINITE, FALSE);
        if (eventIndex == WSA_WAIT_FAILED) {
            printf("WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError());
            break;
        }

        int index = eventIndex - WSA_WAIT_EVENT_0;
        if (index == 0) {
            // 有新的客户端连接
            clientSocket = accept(listenSocket, (sockaddr *)&clientAddr, &clientAddrLen);
            if (clientSocket == INVALID_SOCKET) {
                printf("Accept failed: %d\n", WSAGetLastError());
                continue;
            }

            printf("New client connected: %d\n", clientSocket);

            // 为新客户端创建事件对象并注册事件
            eventArray[nfds] = WSACreateEvent();
            if (eventArray[nfds] == WSA_INVALID_EVENT) {
                printf("WSACreateEvent failed for client: %d\n", WSAGetLastError());
                closesocket(clientSocket);
                continue;
            }

            if (WSAEventSelect(clientSocket, eventArray[nfds], FD_READ | FD_WRITE) == SOCKET_ERROR) {
                printf("WSAEventSelect failed for client: %d\n", WSAGetLastError());
                WSACloseEvent(eventArray[nfds]);
                closesocket(clientSocket);
                continue;
            }

            nfds++;
        } else {
            // 处理客户端事件
            SOCKET client = (SOCKET)WSARecvEvent(eventArray[index]);
            if (WSAEnumNetworkEvents(client, eventArray[index], &networkEvents) == SOCKET_ERROR) {
                printf("WSAEnumNetworkEvents failed: %d\n", WSAGetLastError());
                continue;
            }

            if (networkEvents.lNetworkEvents & FD_READ) {
                char buffer[1024] = {0};
                int bytesRead = recv(client, buffer, sizeof(buffer), 0);
                if (bytesRead > 0) {
                    buffer[bytesRead] = '\0';
                    printf("Received from client %d: %s\n", client, buffer);

                    // 回显数据给客户端
                    send(client, buffer, strlen(buffer), 0);
                }
            }
        }
    }

    // 清理资源
    for (int i = 0; i < nfds; i++) {
        WSACloseEvent(eventArray[i]);
    }
    closesocket(listenSocket);
    WSACleanup();

    return 0;
}

四、poll机制的优化

(一)优化策略

  1. 减少文件描述符数量:尽量减少需要监视的文件描述符数量,避免不必要的资源消耗。可以通过合理设计应用程序架构,将一些不需要实时监视的文件描述符在需要时再进行添加。
  2. 优化事件处理逻辑:在事件发生后,尽快处理相关事件,避免在事件处理过程中进行长时间的阻塞操作,以免影响其他事件的处理。
  3. 合理设置超时时间:根据应用程序的需求,合理设置 poll 的超时时间。如果设置过长,可能导致应用程序在某些情况下响应不及时;如果设置过短,可能会频繁地进行无效的轮询。

(二)性能测试与分析

  1. 测试工具:可以使用一些性能测试工具,如 iperfab 等,对基于 poll 机制的网络应用程序进行性能测试。通过模拟大量并发连接,测量应用程序的吞吐量、延迟等性能指标。
  2. 分析方法:通过分析性能测试结果,找出性能瓶颈所在。例如,如果发现 poll 调用的返回时间过长,可以检查是否存在大量无效的文件描述符,或者事件处理逻辑是否过于复杂。

(三)示例优化代码

以下是对前面 Linux 示例代码的一些简单优化,比如在处理客户端数据时,采用非阻塞 I/O 方式,以避免长时间阻塞:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <poll.h>
#include <fcntl.h>

#define PORT 8080
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    struct pollfd fds[MAX_CLIENTS + 1];
    char buffer[1024] = {0};

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 设置服务器套接字为非阻塞
    int flags = fcntl(server_fd, F_GETFL, 0);
    fcntl(server_fd, F_SETFL, flags | O_NONBLOCK);

    // 初始化 pollfd 数组
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;
    int nfds = 1;

    printf("Server is listening on port %d...\n", PORT);

    while (1) {
        int activity = poll(fds, nfds, -1);

        if (activity < 0) {
            perror("poll error");
            break;
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                // 有新的客户端连接
                while ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) >= 0) {
                    printf("New client connected: %d\n", new_socket);

                    // 设置客户端套接字为非阻塞
                    flags = fcntl(new_socket, F_GETFL, 0);
                    fcntl(new_socket, F_SETFL, flags | O_NONBLOCK);

                    // 将新的客户端套接字添加到 pollfd 数组
                    fds[nfds].fd = new_socket;
                    fds[nfds].events = POLLIN;
                    nfds++;
                }

                if (new_socket < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                    perror("accept");
                }
            }

            // 处理客户端数据
            for (int i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    while ((valread = read(fds[i].fd, buffer, 1024)) > 0) {
                        buffer[valread] = '\0';
                        printf("Received from client %d: %s\n", fds[i].fd, buffer);

                        // 回显数据给客户端
                        send(fds[i].fd, buffer, strlen(buffer), 0);
                    }

                    if (valread < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("read");
                        // 处理错误,例如关闭连接
                        close(fds[i].fd);
                        // 将该套接字从 pollfd 数组中移除
                        for (int j = i; j < nfds - 1; j++) {
                            fds[j] = fds[j + 1];
                        }
                        nfds--;
                        i--;
                    }
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

通过上述优化,在处理大量并发连接时,应用程序能够更高效地利用系统资源,提高整体性能。在实际应用中,还需要根据具体的业务需求和系统环境,进一步调整优化策略,以达到最佳的性能表现。