MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

非阻塞Socket编程中的数据读取与写入策略

2022-04-043.5k 阅读

非阻塞Socket编程基础

在传统的阻塞式Socket编程中,当执行读取(read)或写入(write)操作时,线程会被阻塞,直到操作完成。这意味着在数据准备好之前,程序无法执行其他任务,对于需要同时处理多个连接或者在I/O操作时还要处理其他逻辑的应用程序来说,这种方式效率较低。

非阻塞Socket编程则不同,当进行读取或写入操作时,如果数据尚未准备好,系统调用不会阻塞线程,而是立即返回一个错误码,程序可以继续执行其他任务,稍后再尝试I/O操作。这样就实现了在单个线程中同时处理多个I/O操作,提高了程序的并发处理能力。

在UNIX/Linux系统中,可以通过fcntl函数将Socket设置为非阻塞模式。例如:

#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

在Windows系统中,可以使用ioctlsocket函数来设置非阻塞模式:

#include <winsock2.h>

SOCKET sockfd = socket(AF_INET, SOCK_STREAM, 0);
u_long iMode = 1;
ioctlsocket(sockfd, FIONBIO, &iMode);

非阻塞Socket的数据读取策略

1. 单次读取策略

在非阻塞模式下,调用read函数读取数据时,如果没有数据可读,函数会立即返回-1,并且errno会被设置为EAGAINEWOULDBLOCK(不同系统可能略有差异)。最简单的读取策略就是在每次循环中尝试读取一定量的数据。

以下是一个简单的C语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    char buffer[BUFFER_SIZE];
    ssize_t bytes_read;
    while (1) {
        bytes_read = read(sockfd, buffer, sizeof(buffer));
        if (bytes_read < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 没有数据可读,继续做其他事情
                continue;
            } else {
                perror("read error");
                break;
            }
        } else if (bytes_read == 0) {
            // 对端关闭连接
            printf("Connection closed by peer\n");
            break;
        } else {
            buffer[bytes_read] = '\0';
            printf("Received: %s\n", buffer);
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,每次循环尝试读取数据,如果返回-1errnoEAGAINEWOULDBLOCK,则说明没有数据可读,继续循环做其他事情。

2. 循环读取策略

由于一次read调用可能无法读取完所有数据(例如接收缓冲区中有大量数据),所以通常需要循环读取,直到read返回0(表示对端关闭连接)或者-1errno不是EAGAINEWOULDBLOCK

以下是一个改进后的C语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    char buffer[BUFFER_SIZE];
    ssize_t total_bytes_read = 0;
    ssize_t bytes_read;
    while (1) {
        bytes_read = read(sockfd, buffer + total_bytes_read, sizeof(buffer) - total_bytes_read);
        if (bytes_read < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 没有数据可读,处理其他逻辑
                usleep(100000); // 短暂休眠,避免过度占用CPU
                continue;
            } else {
                perror("read error");
                break;
            }
        } else if (bytes_read == 0) {
            // 对端关闭连接
            buffer[total_bytes_read] = '\0';
            printf("Received: %s\n", buffer);
            break;
        } else {
            total_bytes_read += bytes_read;
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,通过total_bytes_read记录已经读取的字节数,每次read从上次读取结束的位置继续读取,直到对端关闭连接或者发生错误。

3. 使用事件驱动机制优化读取

单纯的循环读取虽然能保证数据读取完整,但会占用较多CPU资源,因为在没有数据可读时也在不断尝试读取。可以结合事件驱动机制,如selectpollepoll(在Linux系统中)来优化读取操作。

epoll为例,以下是一个简单的示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>

#define BUFFER_SIZE 1024
#define EPOLL_SIZE 10

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    int epollfd = epoll_create1(0);
    if (epollfd < 0) {
        perror("epoll_create1 failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLIN | EPOLLET; // 使用边缘触发模式
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) < 0) {
        perror("epoll_ctl add failed");
        close(sockfd);
        close(epollfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[EPOLL_SIZE];
    char buffer[BUFFER_SIZE];
    while (1) {
        int num_events = epoll_wait(epollfd, events, EPOLL_SIZE, -1);
        if (num_events < 0) {
            perror("epoll_wait error");
            break;
        }

        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == sockfd) {
                ssize_t bytes_read;
                while ((bytes_read = read(sockfd, buffer, sizeof(buffer))) > 0) {
                    buffer[bytes_read] = '\0';
                    printf("Received: %s\n", buffer);
                }
                if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                    perror("read error");
                    break;
                }
            }
        }
    }

    close(sockfd);
    close(epollfd);
    return 0;
}

在这个示例中,epoll_wait函数会阻塞,直到有事件发生(这里是Socket可读事件)。当事件发生时,通过循环读取确保数据读取完整。

非阻塞Socket的数据写入策略

1. 单次写入策略

与读取类似,在非阻塞模式下调用write函数写入数据时,如果写入缓冲区已满,write会立即返回-1,并且errno会被设置为EAGAINEWOULDBLOCK。最简单的写入策略就是在每次循环中尝试写入一定量的数据。

以下是一个简单的C语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    char *message = "Hello, Server!";
    ssize_t bytes_written;
    while (1) {
        bytes_written = write(sockfd, message, strlen(message));
        if (bytes_written < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 写入缓冲区满,继续做其他事情
                continue;
            } else {
                perror("write error");
                break;
            }
        } else {
            printf("Message sent successfully\n");
            break;
        }
    }

    close(sockfd);
    return 0;
}

在这个示例中,每次循环尝试写入数据,如果返回-1errnoEAGAINEWOULDBLOCK,则说明写入缓冲区满,继续循环做其他事情。

2. 循环写入策略

与读取一样,一次write调用可能无法将所有数据写入,所以通常需要循环写入,直到所有数据都被成功写入或者发生错误。

以下是一个改进后的C语言示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

#define BUFFER_SIZE 1024

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    char *message = "Hello, Server! This is a longer message.";
    size_t total_bytes_to_write = strlen(message);
    size_t bytes_written_so_far = 0;
    ssize_t bytes_written;
    while (bytes_written_so_far < total_bytes_to_write) {
        bytes_written = write(sockfd, message + bytes_written_so_far, total_bytes_to_write - bytes_written_so_far);
        if (bytes_written < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 写入缓冲区满,处理其他逻辑
                usleep(100000); // 短暂休眠,避免过度占用CPU
                continue;
            } else {
                perror("write error");
                break;
            }
        } else {
            bytes_written_so_far += bytes_written;
        }
    }

    if (bytes_written_so_far == total_bytes_to_write) {
        printf("Message sent successfully\n");
    }

    close(sockfd);
    return 0;
}

在这个示例中,通过bytes_written_so_far记录已经写入的字节数,每次write从上次写入结束的位置继续写入,直到所有数据都被成功写入或者发生错误。

3. 使用事件驱动机制优化写入

同样,可以结合事件驱动机制如epoll来优化写入操作。在epoll中,可以监听EPOLLOUT事件,表示Socket可写。

以下是一个使用epoll优化写入的示例:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>

#define BUFFER_SIZE 1024
#define EPOLL_SIZE 10

int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8080);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect failed");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    int epollfd = epoll_create1(0);
    if (epollfd < 0) {
        perror("epoll_create1 failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    event.data.fd = sockfd;
    event.events = EPOLLOUT | EPOLLET; // 使用边缘触发模式
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event) < 0) {
        perror("epoll_ctl add failed");
        close(sockfd);
        close(epollfd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[EPOLL_SIZE];
    char *message = "Hello, Server! This is a message to be sent using epoll.";
    size_t total_bytes_to_write = strlen(message);
    size_t bytes_written_so_far = 0;
    while (1) {
        int num_events = epoll_wait(epollfd, events, EPOLL_SIZE, -1);
        if (num_events < 0) {
            perror("epoll_wait error");
            break;
        }

        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == sockfd) {
                ssize_t bytes_written;
                while ((bytes_written = write(sockfd, message + bytes_written_so_far, total_bytes_to_write - bytes_written_so_far)) > 0) {
                    bytes_written_so_far += bytes_written;
                }
                if (bytes_written < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                    perror("write error");
                    break;
                }
                if (bytes_written_so_far == total_bytes_to_write) {
                    printf("Message sent successfully\n");
                    // 这里可以移除对EPOLLOUT的监听,避免不必要的唤醒
                    event.events = EPOLLIN | EPOLLET;
                    if (epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &event) < 0) {
                        perror("epoll_ctl mod failed");
                        close(sockfd);
                        close(epollfd);
                        exit(EXIT_FAILURE);
                    }
                }
            }
        }
    }

    close(sockfd);
    close(epollfd);
    return 0;
}

在这个示例中,当epoll_wait监听到EPOLLOUT事件时,通过循环写入确保所有数据都被成功写入。当数据全部写入后,可以修改epoll监听事件,避免不必要的唤醒。

实际应用中的注意事项

  1. 缓冲区管理:在非阻塞Socket编程中,合理管理缓冲区非常重要。无论是读取还是写入,都需要考虑缓冲区的大小以及数据的完整性。例如,在读取数据时,如果缓冲区过小,可能需要多次读取才能获取完整的数据;在写入数据时,如果缓冲区已满,需要等待缓冲区有空间后再继续写入。
  2. 错误处理:非阻塞Socket的系统调用可能会返回各种错误码,需要仔细处理这些错误。例如,除了常见的EAGAINEWOULDBLOCK错误外,还可能会遇到连接断开(如ECONNRESET)等错误,需要根据不同的错误类型采取相应的处理措施。
  3. 性能优化:虽然非阻塞Socket编程提高了并发处理能力,但如果处理不当,也可能导致性能问题。例如,在使用循环读取或写入时,避免过度占用CPU资源,可以适当加入休眠时间。同时,合理选择事件驱动机制(如selectpollepoll)也能对性能产生较大影响。
  4. 跨平台兼容性:不同操作系统对非阻塞Socket的支持和实现略有差异,在编写跨平台应用程序时,需要注意兼容性问题。例如,Windows系统使用ioctlsocket设置非阻塞模式,而UNIX/Linux系统使用fcntl函数。同时,不同系统的错误码定义和事件驱动机制也可能有所不同。

总结

非阻塞Socket编程为后端开发提供了一种高效处理并发I/O的方式。通过合理的数据读取与写入策略,结合事件驱动机制,可以充分发挥非阻塞Socket的优势,提高应用程序的性能和并发处理能力。在实际应用中,需要注意缓冲区管理、错误处理、性能优化以及跨平台兼容性等问题,以确保程序的稳定性和高效性。掌握非阻塞Socket编程技术,对于开发高性能的网络应用程序,如Web服务器、即时通讯应用等,具有重要意义。

以上就是关于非阻塞Socket编程中的数据读取与写入策略的详细介绍,希望对您有所帮助。在实际开发中,您可以根据具体的应用场景和需求,选择合适的策略和技术,打造出更加优秀的网络应用程序。