MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C++ 异步多路复用

2022-06-297.5k 阅读

C++ 异步多路复用概述

在现代计算机编程中,高效处理多个并发任务是提升程序性能的关键。C++ 异步多路复用技术为此提供了有力的支持。异步多路复用允许程序在单线程或多线程环境下,同时监控多个文件描述符(例如套接字、管道等)的状态变化,而无需为每个文件描述符创建单独的线程,从而大大提高了资源利用率。

多路复用基本概念

多路复用的核心思想是通过一个机制,让程序能够同时处理多个 I/O 事件。传统的 I/O 模型中,程序往往是阻塞式的,即当进行 I/O 操作(如读取套接字数据)时,线程会被阻塞,直到操作完成。这在需要处理多个 I/O 源时效率极低。多路复用技术则打破了这种局限,它可以同时监听多个 I/O 源,当其中任何一个 I/O 源有数据可读或可写时,多路复用机制会通知程序,程序再对相应的 I/O 源进行操作。

异步操作概念

异步操作是与同步操作相对的概念。在同步操作中,程序执行到 I/O 操作时,会等待操作完成后才继续执行后续代码。而异步操作则允许程序在发起 I/O 操作后,无需等待操作完成,继续执行其他代码。当 I/O 操作完成时,通过回调函数、事件通知等机制告知程序。异步操作结合多路复用,使得程序能够在单线程或少量线程内高效处理大量并发 I/O 任务。

C++ 中常用的异步多路复用机制

select 函数

select 是最古老的多路复用函数之一,它在 <sys/select.h> 头文件中定义(在 Windows 系统上,winsock2.h 中也有类似功能的函数)。select 函数允许程序监控多个文件描述符的可读、可写或异常状态。

#include <iostream>
#include <sys/select.h>
#include <unistd.h>
#include <fcntl.h>

int main() {
    fd_set read_fds;
    FD_ZERO(&read_fds);
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        return 1;
    }
    FD_SET(fd, &read_fds);

    struct timeval timeout;
    timeout.tv_sec = 5;
    timeout.tv_usec = 0;

    int activity = select(fd + 1, &read_fds, NULL, NULL, &timeout);
    if (activity == -1) {
        perror("select");
    } else if (activity) {
        char buffer[1024];
        ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
        if (bytes_read == -1) {
            perror("read");
        } else {
            buffer[bytes_read] = '\0';
            std::cout << "Read data: " << buffer << std::endl;
        }
    } else {
        std::cout << "Timeout occurred" << std::endl;
    }
    close(fd);
    return 0;
}

在上述代码中,首先使用 FD_ZERO 初始化一个文件描述符集合 read_fds,然后使用 FD_SET 将需要监控的文件描述符(这里是打开的文件 test.txt)添加到集合中。select 函数的第一个参数是需要监控的文件描述符中的最大值加 1,后三个参数分别是可读、可写和异常的文件描述符集合,这里我们只关注可读,所以后两个参数设为 NULLtimeout 结构体设置了 select 等待的超时时间。如果 select 返回值为 -1,表示发生错误;如果返回值大于 0,表示有文件描述符就绪;如果返回值为 0,表示超时。

select 函数的优点是跨平台性较好,几乎所有操作系统都支持。然而,它也存在一些缺点。首先,select 所能监控的文件描述符数量有限,通常在 1024 个左右。其次,每次调用 select 时,都需要将文件描述符集合从用户空间复制到内核空间,并且每次返回后都需要重新设置文件描述符集合,这在一定程度上增加了开销。

poll 函数

poll 函数定义在 <poll.h> 头文件中,它也是一种多路复用机制。与 select 不同,poll 使用一个 pollfd 结构体数组来表示需要监控的文件描述符集合。

#include <iostream>
#include <poll.h>
#include <unistd.h>
#include <fcntl.h>

int main() {
    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        return 1;
    }

    struct pollfd fds[1];
    fds[0].fd = fd;
    fds[0].events = POLLIN;

    int poll_result = poll(fds, 1, 5000);
    if (poll_result == -1) {
        perror("poll");
    } else if (poll_result > 0) {
        if (fds[0].revents & POLLIN) {
            char buffer[1024];
            ssize_t bytes_read = read(fd, buffer, sizeof(buffer));
            if (bytes_read == -1) {
                perror("read");
            } else {
                buffer[bytes_read] = '\0';
                std::cout << "Read data: " << buffer << std::endl;
            }
        }
    } else {
        std::cout << "Timeout occurred" << std::endl;
    }
    close(fd);
    return 0;
}

在上述代码中,首先定义了一个 pollfd 结构体数组 fds,并将需要监控的文件描述符 fd 及其事件(这里是 POLLIN,表示可读事件)设置到数组中。poll 函数的第一个参数是 pollfd 结构体数组,第二个参数是数组元素的个数,第三个参数是超时时间(单位为毫秒)。如果 poll 返回值为 -1,表示发生错误;如果返回值大于 0,表示有文件描述符就绪;如果返回值为 0,表示超时。

poll 相对于 select 的优点是没有文件描述符数量的限制,并且每次调用 poll 时不需要重新设置文件描述符集合,因为它是基于结构体数组的方式。但是,poll 仍然需要将结构体数组从用户空间复制到内核空间,随着监控的文件描述符数量增多,开销也会增大。

epoll 机制

epoll 是 Linux 特有的高性能多路复用机制,定义在 <sys/epoll.h> 头文件中。epoll 采用事件驱动的方式,通过 epoll_create 创建一个 epoll 实例,通过 epoll_ctl 来添加、修改或删除需要监控的文件描述符及其事件,通过 epoll_wait 来等待事件发生。

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>

const int MAX_EVENTS = 10;

int main() {
    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        return 1;
    }

    int fd = open("test.txt", O_RDONLY);
    if (fd == -1) {
        perror("open");
        close(epoll_fd);
        return 1;
    }

    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
        perror("epoll_ctl: add");
        close(fd);
        close(epoll_fd);
        return 1;
    }

    struct epoll_event events[MAX_EVENTS];
    int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, 5000);
    if (num_events == -1) {
        perror("epoll_wait");
    } else {
        for (int i = 0; i < num_events; ++i) {
            if (events[i].events & EPOLLIN) {
                int ready_fd = events[i].data.fd;
                char buffer[1024];
                ssize_t bytes_read = read(ready_fd, buffer, sizeof(buffer));
                if (bytes_read == -1) {
                    perror("read");
                } else {
                    buffer[bytes_read] = '\0';
                    std::cout << "Read data: " << buffer << std::endl;
                }
            }
        }
    }
    close(fd);
    close(epoll_fd);
    return 0;
}

在上述代码中,首先使用 epoll_create1 创建一个 epoll 实例,参数 0 表示使用默认属性。然后打开文件 test.txt,并将其添加到 epoll 实例中,设置监控事件为 EPOLLIN(可读事件)。接着使用 epoll_wait 等待事件发生,epoll_wait 的第一个参数是 epoll 实例的文件描述符,第二个参数是用于存储发生事件的 epoll_event 结构体数组,第三个参数是数组的大小,第四个参数是超时时间(单位为毫秒)。如果 epoll_wait 返回值为 -1,表示发生错误;如果返回值大于 0,表示有事件发生,通过遍历 events 数组可以获取就绪的文件描述符及其事件。

epoll 的优点非常显著。它在内核中维护一个事件表,不需要每次都将文件描述符集合从用户空间复制到内核空间,大大减少了开销。并且 epoll 采用事件驱动的方式,只有当监控的文件描述符有事件发生时才会通知程序,而不像 selectpoll 那样需要遍历所有监控的文件描述符。这使得 epoll 在处理大量并发连接时性能远优于 selectpoll

C++ 异步多路复用在网络编程中的应用

基于 epoll 的简单 TCP 服务器

在网络编程中,异步多路复用技术常用于实现高性能的服务器。以下是一个基于 epoll 的简单 TCP 服务器示例。

#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring>

const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl get flags");
        return;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl set non - blocking");
    }
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd == -1) {
        perror("socket");
        return 1;
    }

    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind");
        close(listen_fd);
        return 1;
    }

    if (listen(listen_fd, 5) == -1) {
        perror("listen");
        close(listen_fd);
        return 1;
    }

    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        close(listen_fd);
        return 1;
    }

    struct epoll_event event;
    event.data.fd = listen_fd;
    event.events = EPOLLIN;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
        perror("epoll_ctl: add listen fd");
        close(listen_fd);
        close(epoll_fd);
        return 1;
    }

    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < num_events; ++i) {
            if (events[i].data.fd == listen_fd) {
                sockaddr_in client_addr;
                socklen_t client_addr_len = sizeof(client_addr);
                int client_fd = accept(listen_fd, (sockaddr*)&client_addr, &client_addr_len);
                if (client_fd == -1) {
                    perror("accept");
                    continue;
                }
                set_nonblocking(client_fd);

                event.data.fd = client_fd;
                event.events = EPOLLIN | EPOLLET;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1) {
                    perror("epoll_ctl: add client fd");
                    close(client_fd);
                }
            } else {
                int client_fd = events[i].data.fd;
                char buffer[BUFFER_SIZE];
                ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);
                if (bytes_read == -1) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    }
                    perror("recv");
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
                    close(client_fd);
                } else if (bytes_read == 0) {
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
                    close(client_fd);
                } else {
                    buffer[bytes_read] = '\0';
                    std::cout << "Received from client: " << buffer << std::endl;
                    std::string response = "Message received: " + std::string(buffer);
                    send(client_fd, response.c_str(), response.size(), 0);
                }
            }
        }
    }
    close(listen_fd);
    close(epoll_fd);
    return 0;
}

在上述代码中,首先创建一个 TCP 套接字并绑定到本地地址和端口 8888,然后开始监听连接。接着创建一个 epoll 实例,并将监听套接字添加到 epoll 实例中,设置监听可读事件。在 epoll_wait 的循环中,当有事件发生时,如果是监听套接字的可读事件,表示有新的客户端连接,调用 accept 接受连接,并将新的客户端套接字设置为非阻塞模式,然后将其添加到 epoll 实例中,设置监听可读事件(这里使用了边缘触发模式 EPOLLET)。如果是客户端套接字的可读事件,读取客户端发送的数据,并回显一条包含接收到数据的响应消息。如果读取时发生错误且错误不是 EAGAINEWOULDBLOCK,或者读取到的数据长度为 0(表示客户端关闭连接),则从 epoll 实例中删除该客户端套接字并关闭。

异步多路复用与线程池结合

在实际应用中,为了进一步提高性能,可以将异步多路复用与线程池结合。线程池可以预先创建一定数量的线程,当有任务(如处理客户端请求)时,将任务分配给线程池中的线程,避免了频繁创建和销毁线程的开销。以下是一个简单的线程池与 epoll 结合的示例框架。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>

const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;

class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] { return this->stop ||!this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& thread : threads) {
            thread.join();
        }
    }

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};

void handle_client(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);
    if (bytes_read == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return;
        }
        perror("recv");
    } else if (bytes_read == 0) {
        close(client_fd);
    } else {
        buffer[bytes_read] = '\0';
        std::cout << "Received from client: " << buffer << std::endl;
        std::string response = "Message received: " + std::string(buffer);
        send(client_fd, response.c_str(), response.size(), 0);
    }
}

int main() {
    ThreadPool pool(std::thread::hardware_concurrency());

    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd == -1) {
        perror("socket");
        return 1;
    }

    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8888);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind");
        close(listen_fd);
        return 1;
    }

    if (listen(listen_fd, 5) == -1) {
        perror("listen");
        close(listen_fd);
        return 1;
    }

    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        close(listen_fd);
        return 1;
    }

    struct epoll_event event;
    event.data.fd = listen_fd;
    event.events = EPOLLIN;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
        perror("epoll_ctl: add listen fd");
        close(listen_fd);
        close(epoll_fd);
        return 1;
    }

    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < num_events; ++i) {
            if (events[i].data.fd == listen_fd) {
                sockaddr_in client_addr;
                socklen_t client_addr_len = sizeof(client_addr);
                int client_fd = accept(listen_fd, (sockaddr*)&client_addr, &client_addr_len);
                if (client_fd == -1) {
                    perror("accept");
                    continue;
                }
                pool.enqueue([client_fd] { handle_client(client_fd); });
            }
        }
    }
    close(listen_fd);
    close(epoll_fd);
    return 0;
}

在上述代码中,ThreadPool 类实现了一个简单的线程池。ThreadPool 的构造函数创建一定数量的线程,这些线程在后台运行,等待任务。enqueue 方法用于将任务添加到任务队列中,并通知一个线程来执行该任务。handle_client 函数用于处理客户端的请求,从客户端读取数据并回显响应。在 main 函数中,创建了一个线程池,并结合 epoll 实现了一个 TCP 服务器。当有新的客户端连接时,将处理客户端请求的任务添加到线程池中,由线程池中的线程来处理,从而实现了异步处理客户端请求,提高了服务器的并发处理能力。

总结异步多路复用的优势与应用场景

优势

  1. 高效资源利用:异步多路复用避免了为每个 I/O 源创建单独线程的开销,通过单线程或少量线程监控多个文件描述符,大大提高了资源利用率。例如在高并发的网络服务器中,使用 epoll 可以在单线程内处理大量客户端连接,减少了线程上下文切换的开销。
  2. 高性能:像 epoll 这样的机制采用事件驱动,只有当文件描述符有事件发生时才通知程序,相比 selectpoll 遍历所有监控文件描述符的方式,性能有显著提升。在处理大量并发连接时,epoll 的优势尤为明显。
  3. 灵活性:异步多路复用可以与其他编程模型(如线程池、回调函数等)结合使用,根据不同的应用场景进行灵活配置。例如在需要大量计算的场景下,可以结合线程池,将 I/O 操作与计算任务分离,提高整体性能。

应用场景

  1. 网络服务器:无论是 HTTP 服务器、TCP 服务器还是 UDP 服务器,异步多路复用都是实现高性能的关键技术。它可以同时处理大量客户端连接,提高服务器的并发处理能力。例如 Nginx 等高性能 Web 服务器就广泛使用了 epoll 机制。
  2. 分布式系统:在分布式系统中,节点之间需要进行大量的网络通信。异步多路复用可以高效处理这些通信,确保系统的稳定性和高性能。例如分布式数据库的节点间通信、分布式缓存系统等都可以应用异步多路复用技术。
  3. 实时应用:对于一些对实时性要求较高的应用,如实时监控系统、游戏服务器等,异步多路复用能够及时响应 I/O 事件,保证系统的实时性。例如游戏服务器需要实时处理大量玩家的网络请求,异步多路复用可以帮助服务器高效处理这些请求,提供流畅的游戏体验。

通过深入理解和应用 C++ 异步多路复用技术,开发者可以构建出高效、高性能的应用程序,满足现代计算机编程对于并发处理的需求。无论是在网络编程、分布式系统还是实时应用等领域,异步多路复用都有着广泛的应用前景。