MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C 语言同步多路复用

2024-10-154.8k 阅读

C 语言同步多路复用基础概念

多路复用的定义

在计算机编程领域,多路复用(Multiplexing)是一种能够在单一通信信道上同时传输多个信号或数据流的技术。其目的在于通过高效利用有限的资源,提高系统的整体性能和效率。在 C 语言编程环境中,同步多路复用主要是指在一个程序中同时处理多个输入/输出(I/O)源,而不会让程序因为等待某一个 I/O 操作完成而阻塞其他操作。

同步与异步的区别

  1. 同步操作:同步 I/O 操作意味着程序执行到 I/O 操作时,会等待该操作完成后才继续执行后续代码。例如,当程序调用 read 函数从文件或套接字读取数据时,如果数据尚未准备好,程序将被阻塞,处于等待状态,直到数据可读。这就像是你在餐厅点菜,服务员会一直等你点完所有菜品后才离开去下单。

  2. 异步操作:而异步 I/O 操作则允许程序在发起 I/O 操作后,继续执行其他代码,而不需要等待操作完成。当 I/O 操作完成时,系统会通过某种方式通知程序(如信号、回调函数等)。这好比你在餐厅扫码点餐,点完后你可以继续做其他事情,当餐品准备好后服务员会通知你。

在同步多路复用中,虽然整体是同步处理多个 I/O 源,但它能在等待一个 I/O 操作时,检查其他 I/O 源是否有可操作的状态,从而提高了程序的并发性,而不是像传统同步 I/O 那样单一阻塞等待。

常用的同步多路复用机制

select 函数

  1. 原理select 函数是 C 语言中实现同步多路复用的经典方法。它通过监视一组文件描述符(fd_set),等待其中任何一个文件描述符变为可读、可写或出现异常。select 函数会阻塞调用进程,直到有文件描述符满足指定的条件,或者超时时间到达。

  2. 函数原型

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- `nfds`:需要检查的文件描述符集中最大文件描述符的值加 1。
- `readfds`:指向要检查可读性的文件描述符集的指针。
- `writefds`:指向要检查可写性的文件描述符集的指针。
- `exceptfds`:指向要检查异常情况的文件描述符集的指针。
- `timeout`:指向一个 `struct timeval` 结构体的指针,用于设置 `select` 等待的最长时间。如果设为 `NULL`,则 `select` 会一直阻塞,直到有文件描述符满足条件。

3. 示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/select.h>

#define PORT 8080
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    fd_set read_fds;
    fd_set tmp_fds;
    int activity, i, val;

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 初始化文件描述符集
    FD_ZERO(&read_fds);
    FD_ZERO(&tmp_fds);
    FD_SET(server_fd, &read_fds);

    while (1) {
        // 备份文件描述符集
        tmp_fds = read_fds;

        // 等待文件描述符状态变化
        activity = select(server_fd + 1, &tmp_fds, NULL, NULL, NULL);

        if ((activity < 0) && (errno!= EINTR)) {
            printf("select error");
        } else if (activity > 0) {
            if (FD_ISSET(server_fd, &tmp_fds)) {
                // 有新连接
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }

                // 将新连接的套接字添加到文件描述符集
                FD_SET(new_socket, &read_fds);
                printf("New connection, socket fd is %d, ip is : %s, port : %d \n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
            }

            for (i = 0; i <= server_fd; i++) {
                if (FD_ISSET(i, &tmp_fds)) {
                    if (i!= server_fd) {
                        // 处理客户端数据
                        valread = read(i, buffer, 1024);
                        if (valread == 0) {
                            // 客户端关闭连接
                            getpeername(i, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                            printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                            close(i);
                            FD_CLR(i, &read_fds);
                        } else {
                            buffer[valread] = '\0';
                            printf("Message from client %d : %s \n", i, buffer);
                        }
                    }
                }
            }
        }
    }
    return 0;
}

poll 函数

  1. 原理poll 函数也是一种同步多路复用机制,它通过一个 pollfd 结构体数组来监视一组文件描述符的状态变化。与 select 不同的是,poll 没有最大文件描述符数量的限制(在实际应用中,虽然理论上没有限制,但受系统资源等因素影响,数量也不能无限大),并且它的实现方式在某些情况下效率更高。

  2. 函数原型

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- `fds`:指向一个 `struct pollfd` 结构体数组的指针,每个 `pollfd` 结构体包含要监视的文件描述符、要监视的事件以及返回的事件。
- `nfds`:`fds` 数组中的元素数量。
- `timeout`:等待事件发生的超时时间,单位为毫秒。如果设为 -1,则 `poll` 会一直阻塞,直到有事件发生;如果设为 0,则 `poll` 不会阻塞,立即返回。

3. pollfd 结构体

struct pollfd {
    int fd;         /* 文件描述符 */
    short events;   /* 要监视的事件 */
    short revents;  /* 返回的事件 */
};
- `events` 可以设置为各种事件掩码,如 `POLLIN`(可读事件)、`POLLOUT`(可写事件)、`POLLERR`(错误事件)等。
- `revents` 是函数返回时设置的,用于指示实际发生的事件。

4. 示例代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>

#define PORT 8080
#define MAX_CLIENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    struct pollfd fds[MAX_CLIENTS + 1];
    int nfds = 1;
    int activity, i, val;

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, MAX_CLIENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 初始化 pollfd 数组
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;

    while (1) {
        // 等待事件发生
        activity = poll(fds, nfds, -1);

        if (activity < 0) {
            printf("poll error");
        } else if (activity > 0) {
            if (fds[0].revents & POLLIN) {
                // 有新连接
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0) {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }

                // 将新连接的套接字添加到 pollfd 数组
                fds[nfds].fd = new_socket;
                fds[nfds].events = POLLIN;
                nfds++;
                printf("New connection, socket fd is %d, ip is : %s, port : %d \n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
            }

            for (i = 1; i < nfds; i++) {
                if (fds[i].revents & POLLIN) {
                    // 处理客户端数据
                    valread = read(fds[i].fd, buffer, 1024);
                    if (valread == 0) {
                        // 客户端关闭连接
                        getpeername(fds[i].fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
                        printf("Host disconnected, ip %s, port %d \n", inet_ntoa(address.sin_addr), ntohs(address.sin_port));
                        close(fds[i].fd);
                        for (int j = i; j < nfds - 1; j++) {
                            fds[j] = fds[j + 1];
                        }
                        nfds--;
                    } else {
                        buffer[valread] = '\0';
                        printf("Message from client %d : %s \n", fds[i].fd, buffer);
                    }
                }
            }
        }
    }
    return 0;
}

epoll 函数(仅适用于 Linux 系统)

  1. 原理epoll 是 Linux 内核为处理大量并发连接而优化的多路复用机制。它采用事件驱动的方式,通过一个文件描述符(epoll 实例)来管理一组需要监视的文件描述符。epoll 可以高效地处理大量的 I/O 事件,特别适用于高并发服务器场景。

  2. 相关函数

    • epoll_create:创建一个 epoll 实例,返回一个 epoll 文件描述符。
#include <sys/epoll.h>

int epoll_create(int size);

size 参数在 Linux 2.6.8 之前是有意义的,用于指定 epoll 实例能处理的最大事件数,但之后该参数被忽略,仅作占位符使用,通常设置为一个大于 0 的值。

- **epoll_ctl**:用于控制 `epoll` 实例,添加、修改或删除要监视的文件描述符及其事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epfdepoll_create 返回的 epoll 文件描述符;op 表示操作类型,如 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)、EPOLL_CTL_DEL(删除);fd 是要操作的文件描述符;event 是一个指向 epoll_event 结构体的指针,用于指定要监视的事件和关联的数据。

- **epoll_wait**:等待 `epoll` 实例上的事件发生,返回发生事件的文件描述符数量。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epfdepoll 文件描述符;events 是一个 epoll_event 结构体数组,用于存储发生的事件;maxeventsevents 数组的大小;timeout 是等待的超时时间,单位为毫秒,设为 -1 表示一直阻塞,设为 0 表示立即返回。

  1. epoll_event 结构体
struct epoll_event {
    uint32_t events;      /* 事件掩码 */
    epoll_data_t data;    /* 关联的数据 */
};

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

events 可以设置为各种事件掩码,如 EPOLLIN(可读事件)、EPOLLOUT(可写事件)、EPOLLERR(错误事件)等。data 联合体可以用于关联文件描述符或自定义数据。

  1. 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define PORT 8080
#define MAX_EVENTS 10

int main() {
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[1024] = {0};
    int epoll_fd, nfds, i;
    struct epoll_event event;
    struct epoll_event events[MAX_EVENTS];

    // 创建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听套接字
    if (listen(server_fd, MAX_EVENTS) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    // 将服务器套接字添加到 epoll 实例
    event.data.fd = server_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }

    while (1) {
        // 等待事件发生
        nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

        for (i = 0; i < nfds; i++) {
            if (events[i].data.fd == server_fd) {
                // 有新连接
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) == -1) {
                    perror("accept");
                    continue;
                }

                // 将新连接的套接字添加到 epoll 实例
                event.data.fd = new_socket;
                event.events = EPOLLIN;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
                    perror("epoll_ctl: new_socket");
                    close(new_socket);
                }
            } else {
                // 处理客户端数据
                new_socket = events[i].data.fd;
                valread = read(new_socket, buffer, 1024);
                if (valread == 0) {
                    // 客户端关闭连接
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, new_socket, NULL) == -1) {
                        perror("epoll_ctl: del");
                    }
                    close(new_socket);
                } else {
                    buffer[valread] = '\0';
                    printf("Message from client %d : %s \n", new_socket, buffer);
                }
            }
        }
    }
    return 0;
}

同步多路复用机制的性能比较与选择

性能比较

  1. selectselect 函数的优点是跨平台兼容性好,几乎在所有操作系统上都有实现。然而,它存在一些性能瓶颈。首先,select 对文件描述符的数量有限制,通常为 1024 个(在不同系统上可能有所不同)。其次,每次调用 select 时,都需要将文件描述符集从用户空间复制到内核空间,并且返回时还需要从内核空间复制回用户空间,这在处理大量文件描述符时会带来较大的开销。此外,select 使用线性扫描的方式检查文件描述符集,时间复杂度为 O(n),随着文件描述符数量的增加,性能会显著下降。

  2. pollpoll 函数解决了 select 中文件描述符数量的限制问题,理论上可以处理任意数量的文件描述符。它同样需要在每次调用时将数据从用户空间复制到内核空间,但由于其数据结构的设计,在某些情况下性能比 select 略好。poll 也是采用线性扫描的方式检查事件,时间复杂度同样为 O(n),在处理大量文件描述符时性能提升有限。

  3. epollepoll 是 Linux 系统下专为高并发设计的多路复用机制,性能优势明显。它采用事件驱动的方式,在内核中维护一个事件表,当有事件发生时,内核直接将事件添加到这个表中,epoll_wait 函数只需从这个表中获取事件,无需像 selectpoll 那样进行线性扫描,时间复杂度为 O(1)。此外,epoll 采用共享内存的方式,避免了频繁地在用户空间和内核空间之间复制数据,大大提高了效率。在处理大量并发连接时,epoll 的性能远远优于 selectpoll

选择建议

  1. 跨平台需求:如果你的程序需要在多种操作系统上运行,并且对文件描述符数量要求不高(通常小于 1024),那么 select 是一个不错的选择,因为它具有良好的跨平台兼容性。

  2. 性能要求一般:如果对性能有一定要求,但不需要处理极其大量的并发连接,并且希望在类 Unix 系统上有更好的表现,poll 可以作为 select 的替代方案,它在一定程度上解决了文件描述符数量限制的问题,并且性能略好于 select

  3. 高并发场景(Linux 系统):当你的程序运行在 Linux 系统上,并且需要处理大量的并发连接(如高性能网络服务器),epoll 无疑是最佳选择。它的高性能和事件驱动的设计能够有效地处理大量并发 I/O 操作,大大提高系统的整体性能和响应能力。

在实际应用中,还需要根据具体的业务需求、系统资源等因素综合考虑选择合适的同步多路复用机制,以达到最优的性能和效率。同时,对于多路复用机制的使用,还需要注意错误处理、资源管理等方面,确保程序的稳定性和可靠性。

同步多路复用在实际项目中的应用场景

网络服务器开发

  1. HTTP 服务器:在构建 HTTP 服务器时,需要同时处理多个客户端的连接请求。使用同步多路复用技术,如 epoll,可以高效地管理这些连接。服务器可以监听多个客户端套接字的可读事件,当有数据可读时,读取客户端发送的 HTTP 请求,并处理请求,然后将响应数据写回客户端。通过这种方式,服务器可以在不阻塞的情况下处理大量并发的 HTTP 请求,提高服务器的并发处理能力和响应速度。

  2. 即时通讯服务器:即时通讯服务器需要实时处理多个用户的消息收发。利用同步多路复用机制,服务器可以同时监听多个用户连接的套接字,当有新消息到达时,及时读取并转发给目标用户。例如,在一个多人聊天的场景中,服务器通过多路复用技术可以高效地管理所有用户的连接,确保消息能够及时准确地传递,为用户提供流畅的即时通讯体验。

文件 I/O 管理

  1. 日志系统:在一些大型应用程序的日志系统中,可能需要同时处理多个日志文件的写入操作。通过同步多路复用技术,可以监视多个文件描述符的可写状态。当某个日志文件缓冲区已满,需要写入磁盘时,程序可以检查对应的文件描述符是否可写。如果可写,则将缓冲区数据写入文件。这样可以提高日志写入的效率,避免因为等待某个文件的 I/O 操作而阻塞其他日志文件的处理。

  2. 数据采集与存储:在数据采集系统中,可能需要从多个数据源采集数据,并将数据存储到不同的文件或数据库中。使用同步多路复用技术,可以同时监听多个数据源的输入事件(如串口数据到达、网络数据接收等),以及存储目标的可写事件。当有数据从数据源到达时,及时读取并存储到相应的目标位置,实现高效的数据采集与存储过程。

设备驱动开发

  1. 串口通信:在与多个串口设备进行通信的应用中,同步多路复用可以用于监视多个串口设备的文件描述符。当某个串口有数据可读时,程序可以及时读取数据并进行处理。例如,在工业自动化控制领域,一个系统可能需要同时与多个串口设备(如传感器、执行器等)进行通信,通过同步多路复用技术,可以高效地管理这些串口连接,确保数据的及时传输和处理。

  2. 网络设备驱动:对于网络设备驱动开发,同步多路复用可以帮助驱动程序同时处理多个网络连接的 I/O 操作。例如,在网卡驱动中,可能需要同时接收和发送多个网络数据包。通过监视网络套接字的可读和可写事件,驱动程序可以在不阻塞的情况下高效地处理这些网络 I/O 操作,提高网络设备的性能和吞吐量。

在实际项目中,同步多路复用技术的应用能够显著提高系统的并发处理能力和资源利用率,适用于各种需要同时处理多个 I/O 源的场景。然而,在使用过程中,需要深入理解不同多路复用机制的特点和适用场景,结合项目的具体需求进行合理选择和优化,以确保系统的高效稳定运行。同时,还需要注意在代码实现中对错误处理、资源管理等方面进行细致的设计,避免出现内存泄漏、文件描述符泄漏等问题,提高程序的健壮性。