MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

非阻塞I/O模型下的Socket连接管理与维护

2021-07-132.2k 阅读

非阻塞I/O模型基础

在传统的阻塞I/O模型中,当执行I/O操作(如读取或写入Socket)时,程序会一直等待操作完成,期间无法执行其他任务。这在处理大量并发连接时会严重影响效率。而非阻塞I/O模型则不同,当执行I/O操作时,系统调用会立即返回。如果操作尚未就绪,系统调用会返回一个错误(如EWOULDBLOCKEAGAIN),程序可以继续执行其他任务,稍后再尝试操作。

以UNIX系统为例,在创建Socket时,可以通过fcntl函数将其设置为非阻塞模式:

#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

在这个示例中,首先通过socket函数创建了一个TCP套接字。然后使用fcntl函数获取当前套接字的标志位,再通过fcntl函数将O_NONBLOCK标志位添加进去,从而将套接字设置为非阻塞模式。

Socket连接管理

连接建立

在非阻塞模式下建立Socket连接,connect系统调用会立即返回。如果连接尚未建立完成,它会返回一个错误(通常是EINPROGRESS)。应用程序需要通过其他方式(如selectpollepoll)来检测连接是否成功。

以下是一个简单的C语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <errno.h>

#define PORT 8080
#define IP "127.0.0.1"

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = inet_addr(IP);

    int ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if (ret < 0 && errno != EINPROGRESS) {
        perror("Connect failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    fd_set write_fds;
    FD_ZERO(&write_fds);
    FD_SET(sockfd, &write_fds);

    struct timeval timeout;
    timeout.tv_sec = 5;
    timeout.tv_usec = 0;

    ret = select(sockfd + 1, NULL, &write_fds, NULL, &timeout);
    if (ret < 0) {
        perror("Select error");
        close(sockfd);
        exit(EXIT_FAILURE);
    } else if (ret == 0) {
        printf("Connection timed out\n");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    int error;
    socklen_t len = sizeof(error);
    if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
        perror("Getsockopt error");
        close(sockfd);
        exit(EXIT_FAILURE);
    }
    if (error != 0) {
        errno = error;
        perror("Connect error");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    printf("Connected successfully\n");
    close(sockfd);
    return 0;
}

在这个示例中,首先创建了一个非阻塞的Socket,然后调用connect尝试连接服务器。由于是非阻塞模式,connect可能不会立即完成连接。接着使用select函数等待套接字可写,这意味着连接已经建立或出现错误。通过getsockopt获取套接字的错误状态,如果没有错误则表示连接成功。

连接监听

在服务器端,监听新连接也需要在非阻塞模式下进行管理。通过将监听Socket设置为非阻塞,accept系统调用会立即返回,即使当前没有新的连接到达。

以下是一个简单的非阻塞监听示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>

#define PORT 8080
#define BACKLOG 10

int main() {
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(listenfd, F_GETFL, 0);
    fcntl(listenfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = INADDR_ANY;

    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("Bind failed");
        close(listenfd);
        exit(EXIT_FAILURE);
    }

    if (listen(listenfd, BACKLOG) < 0) {
        perror("Listen failed");
        close(listenfd);
        exit(EXIT_FAILURE);
    }

    while (1) {
        int connfd = accept(listenfd, NULL, NULL);
        if (connfd < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 没有新连接,继续执行其他任务
                continue;
            } else {
                perror("Accept error");
                break;
            }
        }

        // 处理新连接
        printf("New connection accepted: %d\n", connfd);
        close(connfd);
    }

    close(listenfd);
    return 0;
}

在这个示例中,创建了一个非阻塞的监听Socket,并绑定到指定端口。在while循环中,调用accept接受新连接。如果accept返回错误且错误码为EAGAINEWOULDBLOCK,表示当前没有新连接,程序可以继续执行其他任务。否则,处理新连接。

数据读写

非阻塞读

在非阻塞模式下读取数据时,read系统调用会立即返回。如果当前没有数据可读,它会返回EWOULDBLOCKEAGAIN错误。应用程序需要在合适的时机再次尝试读取。

以下是一个简单的非阻塞读示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>

#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    // 假设已经连接到服务器

    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    while (1) {
        bytes_read = read(sockfd, buffer, sizeof(buffer));
        if (bytes_read < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 没有数据可读,继续执行其他任务
                continue;
            } else {
                perror("Read error");
                break;
            }
        } else if (bytes_read == 0) {
            // 对方关闭了连接
            printf("Connection closed by peer\n");
            break;
        } else {
            buffer[bytes_read] = '\0';
            printf("Received: %s\n", buffer);
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,使用read函数从Socket读取数据。如果返回错误且错误码为EAGAINEWOULDBLOCK,表示当前没有数据可读,程序继续执行其他任务。如果返回0,表示对方关闭了连接。否则,处理接收到的数据。

非阻塞写

非阻塞写与非阻塞读类似,write系统调用会立即返回。如果当前Socket缓冲区已满,它会返回EWOULDBLOCKEAGAIN错误。应用程序需要再次尝试写入。

以下是一个简单的非阻塞写示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>

#define MESSAGE "Hello, server!"

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    // 假设已经连接到服务器

    ssize_t bytes_written;
    size_t total_bytes = strlen(MESSAGE);
    size_t bytes_to_write = total_bytes;
    const char *ptr = MESSAGE;

    while (bytes_to_write > 0) {
        bytes_written = write(sockfd, ptr, bytes_to_write);
        if (bytes_written < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 缓冲区已满,稍后再试
                continue;
            } else {
                perror("Write error");
                break;
            }
        } else {
            bytes_to_write -= bytes_written;
            ptr += bytes_written;
        }
    }

    if (bytes_to_write == 0) {
        printf("Message sent successfully\n");
    }

    close(sockfd);
    return 0;
}

在这个示例中,尝试将消息写入Socket。如果write返回错误且错误码为EAGAINEWOULDBLOCK,表示缓冲区已满,程序稍后再试。否则,更新已写入的字节数和指针,继续写入剩余数据。

连接维护

心跳机制

在长时间的Socket连接中,为了确保连接的有效性,通常会引入心跳机制。心跳机制通过定期发送和接收特定的心跳消息来检测连接是否仍然存活。

以下是一个简单的心跳示例(以Python为例):

import socket
import time

HEARTBEAT_INTERVAL = 5  # 心跳间隔时间,单位:秒
HEARTBEAT_MESSAGE = b'HEARTBEAT'

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.connect(('127.0.0.1', 8080))

last_heartbeat_time = time.time()
while True:
    try:
        if time.time() - last_heartbeat_time >= HEARTBEAT_INTERVAL:
            sock.sendall(HEARTBEAT_MESSAGE)
            last_heartbeat_time = time.time()
            print('Sent heartbeat')

        data = sock.recv(1024)
        if not data:
            print('Connection closed by peer')
            break
        elif data == HEARTBEAT_MESSAGE:
            print('Received heartbeat response')
        else:
            print('Received data:', data.decode('utf - 8'))
    except socket.timeout:
        print('Heartbeat timeout')
        break
    except socket.error as e:
        print('Socket error:', e)
        break

sock.close()

在这个示例中,客户端每隔HEARTBEAT_INTERVAL秒发送一次心跳消息HEARTBEAT_MESSAGE。如果接收到心跳响应,打印相应信息。如果出现超时或其他Socket错误,关闭连接。

错误处理与重连

在Socket连接过程中,可能会遇到各种错误,如网络中断、连接超时等。对于这些错误,应用程序需要进行适当的处理,例如尝试重连。

以下是一个简单的C语言重连示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <errno.h>

#define PORT 8080
#define IP "127.0.0.1"
#define RECONNECT_INTERVAL 5  // 重连间隔时间,单位:秒

int main() {
    int sockfd;
    while (1) {
        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd < 0) {
            perror("Socket creation failed");
            sleep(RECONNECT_INTERVAL);
            continue;
        }

        int flags = fcntl(sockfd, F_GETFL, 0);
        fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

        struct sockaddr_in servaddr;
        memset(&servaddr, 0, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_port = htons(PORT);
        servaddr.sin_addr.s_addr = inet_addr(IP);

        int ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
        if (ret < 0) {
            if (errno != EINPROGRESS) {
                perror("Connect failed");
                close(sockfd);
                sleep(RECONNECT_INTERVAL);
                continue;
            }
        } else {
            printf("Connected successfully\n");
            // 进行数据读写等操作
            close(sockfd);
            break;
        }
    }

    return 0;
}

在这个示例中,如果创建Socket或连接失败,程序会等待RECONNECT_INTERVAL秒后再次尝试,直到连接成功。

高级话题:多路复用

select

select是一种多路复用技术,它允许应用程序同时监视多个文件描述符(如Socket)的状态变化。select函数接受三个文件描述符集合(读集合、写集合和异常集合),以及一个超时时间。

以下是一个简单的select示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/select.h>

#define PORT 8080
#define IP "127.0.0.1"
#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = inet_addr(IP);

    int ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if (ret < 0 && errno != EINPROGRESS) {
        perror("Connect failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(sockfd, &read_fds);

    struct timeval timeout;
    timeout.tv_sec = 10;
    timeout.tv_usec = 0;

    char buffer[BUFFER_SIZE];
    while (1) {
        fd_set tmp_fds = read_fds;
        ret = select(sockfd + 1, &tmp_fds, NULL, NULL, &timeout);
        if (ret < 0) {
            perror("Select error");
            break;
        } else if (ret == 0) {
            printf("Select timeout\n");
            continue;
        } else {
            if (FD_ISSET(sockfd, &tmp_fds)) {
                ssize_t bytes_read = read(sockfd, buffer, sizeof(buffer));
                if (bytes_read < 0) {
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("Read error");
                        break;
                    }
                } else if (bytes_read == 0) {
                    printf("Connection closed by peer\n");
                    break;
                } else {
                    buffer[bytes_read] = '\0';
                    printf("Received: %s\n", buffer);
                }
            }
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,使用select监视Socket的可读状态。如果select返回,表示Socket有数据可读,进行读取操作。

poll

pollselect类似,但它使用pollfd结构体数组来表示需要监视的文件描述符及其事件。poll没有文件描述符数量的限制(相比selectFD_SETSIZE限制)。

以下是一个简单的poll示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <poll.h>

#define PORT 8080
#define IP "127.0.0.1"
#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = inet_addr(IP);

    int ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if (ret < 0 && errno != EINPROGRESS) {
        perror("Connect failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct pollfd fds[1];
    fds[0].fd = sockfd;
    fds[0].events = POLLIN;

    char buffer[BUFFER_SIZE];
    while (1) {
        ret = poll(fds, 1, 10000);
        if (ret < 0) {
            perror("Poll error");
            break;
        } else if (ret == 0) {
            printf("Poll timeout\n");
            continue;
        } else {
            if (fds[0].revents & POLLIN) {
                ssize_t bytes_read = read(sockfd, buffer, sizeof(buffer));
                if (bytes_read < 0) {
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("Read error");
                        break;
                    }
                } else if (bytes_read == 0) {
                    printf("Connection closed by peer\n");
                    break;
                } else {
                    buffer[bytes_read] = '\0';
                    printf("Received: %s\n", buffer);
                }
            }
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,使用poll结构体数组来监视Socket的可读事件。如果poll返回,表示Socket有可读事件,进行读取操作。

epoll

epoll是Linux特有的多路复用技术,它在处理大量并发连接时具有更高的效率。epoll使用红黑树来管理文件描述符,并且通过回调机制来通知应用程序有事件发生。

以下是一个简单的epoll示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/epoll.h>

#define PORT 8080
#define IP "127.0.0.1"
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = inet_addr(IP);

    int ret = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if (ret < 0 && errno != EINPROGRESS) {
        perror("Connect failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    int epollfd = epoll_create1(0);
    if (epollfd < 0) {
        perror("Epoll create failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLIN;

    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) < 0) {
        perror("Epoll ctl add failed");
        close(sockfd);
        close(epollfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];
    char buffer[BUFFER_SIZE];
    while (1) {
        int num_events = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (num_events < 0) {
            perror("Epoll wait error");
            break;
        }

        for (int i = 0; i < num_events; ++i) {
            if (events[i].data.fd == sockfd && (events[i].events & EPOLLIN)) {
                ssize_t bytes_read = read(sockfd, buffer, sizeof(buffer));
                if (bytes_read < 0) {
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("Read error");
                        break;
                    }
                } else if (bytes_read == 0) {
                    printf("Connection closed by peer\n");
                    break;
                } else {
                    buffer[bytes_read] = '\0';
                    printf("Received: %s\n", buffer);
                }
            }
        }
    }

    close(sockfd);
    close(epollfd);
    return 0;
}

在这个示例中,首先创建了一个epoll实例,然后将Socket添加到epoll实例中,监视其可读事件。通过epoll_wait等待事件发生,如果有可读事件,进行读取操作。

通过以上对非阻塞I/O模型下Socket连接管理与维护的详细介绍,包括连接建立、监听、数据读写、连接维护以及多路复用技术,希望读者对这一领域有更深入的理解和掌握,能够在实际开发中高效地处理大量并发Socket连接。