基于 libevent 的 WebSocket 服务器开发
一、WebSocket 协议基础
1.1 WebSocket 是什么
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它使得客户端和服务器之间能够进行实时、双向的通信,打破了传统 HTTP 协议的单向请求 - 响应模式的限制。在传统的 Web 开发中,HTTP 协议用于客户端向服务器发送请求,服务器返回响应。但这种模式对于实时性要求高的应用,如聊天应用、实时游戏、股票行情推送等并不适用,因为客户端需要不断地轮询服务器获取最新数据,这会带来额外的网络开销。
WebSocket 协议在 Web 浏览器和服务器之间建立了持久连接,允许双方随时发送消息。它的出现极大地改善了实时 Web 应用的开发体验,提高了数据传输效率和实时性。
1.2 WebSocket 握手过程
WebSocket 连接的建立基于 HTTP 协议的握手过程。客户端首先发送一个 HTTP 请求,其中包含特殊的头部信息来表明要发起一个 WebSocket 连接:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec - WebSocket - Version: 13
在这个请求中:
Upgrade: websocket
和Connection: Upgrade
头字段表明客户端希望将协议升级到 WebSocket。Sec - WebSocket - Key
是一个 Base64 编码的随机字符串,用于服务器验证请求的合法性。Sec - WebSocket - Version
指明客户端支持的 WebSocket 协议版本。
服务器接收到这个请求后,如果支持 WebSocket 协议,会返回一个 HTTP 响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
其中 Sec - WebSocket - Accept
字段的值是通过将客户端发送的 Sec - WebSocket - Key
加上一个固定字符串 258EAFA5 - E914 - 47DA - 95CA - C5AB0DC85B11
,然后进行 SHA - 1 哈希计算,最后再进行 Base64 编码得到的。如果客户端验证 Sec - WebSocket - Accept
的值正确,就完成了握手过程,后续的数据传输将基于 WebSocket 协议进行。
二、libevent 库简介
2.1 libevent 是什么
libevent 是一个轻量级的开源事件通知库,它提供了一个跨平台的机制来处理事件驱动的 I/O。它支持多种事件多路复用机制,如 epoll(Linux)、kqueue(FreeBSD、Mac OS X)、select(跨平台但性能相对较低)等,开发者可以根据不同的平台选择最合适的机制。
libevent 的设计目标是让开发者能够方便地编写高性能、可扩展性强的网络应用程序。它隐藏了底层事件多路复用机制的细节,提供了统一的 API 来处理事件,包括文件描述符(如 socket)的可读、可写事件,定时事件等。
2.2 libevent 的基本结构和 API
libevent 主要包含以下几个核心概念:
- 事件基(event_base):事件基是 libevent 的核心数据结构,它管理着所有的事件和事件多路复用机制。一个应用程序通常只有一个事件基,通过
event_base_new()
函数创建:
struct event_base *base = event_base_new();
if (!base) {
// 创建失败处理
return 1;
}
- 事件(event):事件表示对某个文件描述符或其他事件源的关注。例如,我们可以创建一个事件来关注某个 socket 的可读事件:
struct event *ev;
ev = event_new(base, sockfd, EV_READ | EV_PERSIST, callback_function, (void *)arg);
if (!ev) {
// 创建失败处理
event_base_free(base);
return 1;
}
这里 EV_READ
表示关注可读事件,EV_PERSIST
表示事件触发后不会自动删除,callback_function
是事件触发时调用的回调函数,arg
是传递给回调函数的参数。
- 事件添加和删除:创建事件后,需要将其添加到事件基中才能生效,通过
event_add()
函数实现:
if (event_add(ev, NULL) == -1) {
// 添加失败处理
event_free(ev);
event_base_free(base);
return 1;
}
当不再需要某个事件时,可以通过 event_free()
函数删除:
event_free(ev);
- 事件循环:事件基通过事件循环来检测事件的发生并调用相应的回调函数。通过
event_base_dispatch()
函数启动事件循环:
event_base_dispatch(base);
这个函数会阻塞当前线程,直到所有事件都被处理完或者调用 event_base_loopbreak()
等函数中断循环。
三、基于 libevent 的 WebSocket 服务器开发
3.1 开发环境准备
在开始开发之前,确保你已经安装了 libevent 库。在大多数 Linux 系统上,可以通过包管理器安装,例如在 Ubuntu 上:
sudo apt - get install libevent - dev
对于其他操作系统,你可以从 libevent 的官方网站(https://libevent.org/)下载源代码并手动编译安装。
3.2 服务器框架搭建
首先,我们需要创建一个基于 libevent 的服务器框架,用于监听新的 TCP 连接。这部分代码类似于普通的基于 libevent 的 TCP 服务器:
#include <event2/event.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#define PORT 8080
// 处理新连接的回调函数
void accept_connection(int fd, short event, void *arg) {
struct event_base *base = (struct event_base *)arg;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
return;
}
printf("Accepted new connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
// 这里可以开始处理 WebSocket 握手等操作
}
int main() {
struct event_base *base;
struct event *listen_event;
int listen_fd;
struct sockaddr_in server_addr;
base = event_base_new();
if (!base) {
perror("event_base_new");
return 1;
}
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
event_base_free(base);
return 1;
}
int reuse = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt");
close(listen_fd);
event_base_free(base);
return 1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(listen_fd);
event_base_free(base);
return 1;
}
if (listen(listen_fd, 10) == -1) {
perror("listen");
close(listen_fd);
event_base_free(base);
return 1;
}
listen_event = event_new(base, listen_fd, EV_READ | EV_PERSIST, accept_connection, (void *)base);
if (!listen_event) {
perror("event_new");
close(listen_fd);
event_base_free(base);
return 1;
}
if (event_add(listen_event, NULL) == -1) {
perror("event_add");
event_free(listen_event);
close(listen_fd);
event_base_free(base);
return 1;
}
event_base_dispatch(base);
event_free(listen_event);
close(listen_fd);
event_base_free(base);
return 0;
}
在这个代码中:
- 我们创建了一个
event_base
用于管理事件。 - 创建了一个监听 socket 并绑定到指定端口。
- 使用
event_new
创建了一个事件来监听监听 socket 的可读事件,当有新连接到来时,会调用accept_connection
回调函数。
3.3 WebSocket 握手处理
在 accept_connection
回调函数中,我们开始处理 WebSocket 握手。WebSocket 握手需要解析客户端发送的 HTTP 请求头,验证 Sec - WebSocket - Key
等字段,并构造正确的响应头。
#include <openssl/sha.h>
#include <base64.h> // 假设已经有 base64 编码/解码的实现
// 处理 WebSocket 握手
int handle_websocket_handshake(int client_fd) {
char buffer[1024];
ssize_t read_bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
if (read_bytes <= 0) {
perror("recv");
return -1;
}
buffer[read_bytes] = '\0';
char *key_start = strstr(buffer, "Sec - WebSocket - Key: ");
if (!key_start) {
// 没有找到 Sec - WebSocket - Key 字段,握手失败
return -1;
}
key_start += strlen("Sec - WebSocket - Key: ");
char key[128];
sscanf(key_start, "%127s", key);
char magic_string[] = "258EAFA5 - E914 - 47DA - 95CA - C5AB0DC85B11";
char combined[256];
snprintf(combined, sizeof(combined), "%s%s", key, magic_string);
unsigned char hash_result[SHA_DIGEST_LENGTH];
SHA1((unsigned char *)combined, strlen(combined), hash_result);
char accept_value[256];
base64_encode(hash_result, SHA_DIGEST_LENGTH, accept_value);
const char *response = "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec - WebSocket - Accept: %s\r\n\r\n";
char response_buffer[1024];
snprintf(response_buffer, sizeof(response_buffer), response, accept_value);
ssize_t write_bytes = send(client_fd, response_buffer, strlen(response_buffer), 0);
if (write_bytes != strlen(response_buffer)) {
perror("send");
return -1;
}
return 0;
}
在这个函数中:
- 首先从客户端接收 HTTP 请求数据。
- 查找
Sec - WebSocket - Key
字段并提取其值。 - 将
Sec - WebSocket - Key
与固定的魔法字符串拼接后进行 SHA - 1 哈希计算。 - 对哈希结果进行 Base64 编码得到
Sec - WebSocket - Accept
的值。 - 构造并发送 WebSocket 握手响应。
修改 accept_connection
回调函数以调用 handle_websocket_handshake
:
void accept_connection(int fd, short event, void *arg) {
struct event_base *base = (struct event_base *)arg;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
return;
}
printf("Accepted new connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
if (handle_websocket_handshake(client_fd) == 0) {
// 握手成功,可以开始处理 WebSocket 消息
} else {
close(client_fd);
}
}
3.4 WebSocket 消息处理
WebSocket 消息采用帧格式进行传输。每个帧由头部和数据部分组成。头部包含了帧的类型(如文本帧、二进制帧等)、掩码位、数据长度等信息。
下面是处理 WebSocket 消息接收和发送的代码示例。我们先定义一些辅助函数来解析和构造 WebSocket 帧:
// 解析 WebSocket 帧头部
int parse_websocket_frame_header(char *buffer, ssize_t length, int *is_text, int *payload_length, char *mask_key) {
if (length < 2) {
return -1;
}
int fin = (buffer[0] & 0x80) >> 7;
int opcode = buffer[0] & 0x0F;
*is_text = (opcode == 0x1);
int has_mask = (buffer[1] & 0x80) >> 7;
*payload_length = buffer[1] & 0x7F;
if (*payload_length == 126) {
if (length < 4) {
return -1;
}
*payload_length = (buffer[2] << 8) | buffer[3];
} else if (*payload_length == 127) {
if (length < 10) {
return -1;
}
*payload_length = 0;
for (int i = 2; i < 10; i++) {
*payload_length = (*payload_length << 8) | buffer[i];
}
}
if (has_mask) {
if (length < (*payload_length + 4)) {
return -1;
}
memcpy(mask_key, buffer + 2, 4);
}
return 0;
}
// 解掩码 WebSocket 数据
void unmask_websocket_data(char *data, int length, char *mask_key) {
for (int i = 0; i < length; i++) {
data[i] ^= mask_key[i % 4];
}
}
// 构造 WebSocket 帧
void construct_websocket_frame(char *buffer, int is_text, int payload_length, char *payload) {
buffer[0] = (is_text? 0x1 : 0x2) | 0x80;
if (payload_length < 126) {
buffer[1] = payload_length;
} else if (payload_length < 65536) {
buffer[1] = 126;
buffer[2] = (payload_length >> 8) & 0xFF;
buffer[3] = payload_length & 0xFF;
} else {
buffer[1] = 127;
for (int i = 0; i < 8; i++) {
buffer[2 + i] = (payload_length >> (56 - i * 8)) & 0xFF;
}
}
memcpy(buffer + (payload_length < 126? 2 : (payload_length < 65536? 4 : 10)), payload, payload_length);
}
然后在 accept_connection
中添加处理 WebSocket 消息接收和发送的逻辑:
void accept_connection(int fd, short event, void *arg) {
struct event_base *base = (struct event_base *)arg;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
return;
}
printf("Accepted new connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
if (handle_websocket_handshake(client_fd) == 0) {
char buffer[1024];
ssize_t read_bytes = recv(client_fd, buffer, sizeof(buffer), 0);
if (read_bytes > 0) {
int is_text, payload_length;
char mask_key[4];
if (parse_websocket_frame_header(buffer, read_bytes, &is_text, &payload_length, mask_key) == 0) {
char *payload = buffer + (payload_length < 126? 2 : (payload_length < 65536? 4 : 10));
unmask_websocket_data(payload, payload_length, mask_key);
payload[payload_length] = '\0';
printf("Received WebSocket message: %s\n", payload);
// 构造响应消息
char response_payload[] = "Message received by server";
char response_buffer[1024];
construct_websocket_frame(response_buffer, 1, strlen(response_payload), response_payload);
ssize_t write_bytes = send(client_fd, response_buffer, strlen(response_payload) + (strlen(response_payload) < 126? 2 : (strlen(response_payload) < 65536? 4 : 10)), 0);
if (write_bytes != strlen(response_payload) + (strlen(response_payload) < 126? 2 : (strlen(response_payload) < 65536? 4 : 10))) {
perror("send");
}
}
}
} else {
close(client_fd);
}
}
在这个代码中:
parse_websocket_frame_header
函数用于解析 WebSocket 帧头部,获取帧类型、数据长度和掩码密钥。unmask_websocket_data
函数用于对接收的数据进行解掩码操作。construct_websocket_frame
函数用于构造 WebSocket 响应帧。- 在
accept_connection
中,接收 WebSocket 帧,解析并处理后构造响应帧发送回客户端。
3.5 完整代码示例
下面是一个完整的基于 libevent 的 WebSocket 服务器代码示例,包含了前面提到的所有功能:
#include <event2/event.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <openssl/sha.h>
#include <base64.h> // 假设已经有 base64 编码/解码的实现
#define PORT 8080
// 处理新连接的回调函数
void accept_connection(int fd, short event, void *arg);
// 处理 WebSocket 握手
int handle_websocket_handshake(int client_fd);
// 解析 WebSocket 帧头部
int parse_websocket_frame_header(char *buffer, ssize_t length, int *is_text, int *payload_length, char *mask_key);
// 解掩码 WebSocket 数据
void unmask_websocket_data(char *data, int length, char *mask_key);
// 构造 WebSocket 帧
void construct_websocket_frame(char *buffer, int is_text, int payload_length, char *payload);
int main() {
struct event_base *base;
struct event *listen_event;
int listen_fd;
struct sockaddr_in server_addr;
base = event_base_new();
if (!base) {
perror("event_base_new");
return 1;
}
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket");
event_base_free(base);
return 1;
}
int reuse = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt");
close(listen_fd);
event_base_free(base);
return 1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(listen_fd);
event_base_free(base);
return 1;
}
if (listen(listen_fd, 10) == -1) {
perror("listen");
close(listen_fd);
event_base_free(base);
return 1;
}
listen_event = event_new(base, listen_fd, EV_READ | EV_PERSIST, accept_connection, (void *)base);
if (!listen_event) {
perror("event_new");
close(listen_fd);
event_base_free(base);
return 1;
}
if (event_add(listen_event, NULL) == -1) {
perror("event_add");
event_free(listen_event);
close(listen_fd);
event_base_free(base);
return 1;
}
event_base_dispatch(base);
event_free(listen_event);
close(listen_fd);
event_base_free(base);
return 0;
}
void accept_connection(int fd, short event, void *arg) {
struct event_base *base = (struct event_base *)arg;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
return;
}
printf("Accepted new connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
if (handle_websocket_handshake(client_fd) == 0) {
char buffer[1024];
ssize_t read_bytes = recv(client_fd, buffer, sizeof(buffer), 0);
if (read_bytes > 0) {
int is_text, payload_length;
char mask_key[4];
if (parse_websocket_frame_header(buffer, read_bytes, &is_text, &payload_length, mask_key) == 0) {
char *payload = buffer + (payload_length < 126? 2 : (payload_length < 65536? 4 : 10));
unmask_websocket_data(payload, payload_length, mask_key);
payload[payload_length] = '\0';
printf("Received WebSocket message: %s\n", payload);
// 构造响应消息
char response_payload[] = "Message received by server";
char response_buffer[1024];
construct_websocket_frame(response_buffer, 1, strlen(response_payload), response_payload);
ssize_t write_bytes = send(client_fd, response_buffer, strlen(response_payload) + (strlen(response_payload) < 126? 2 : (strlen(response_payload) < 65536? 4 : 10)), 0);
if (write_bytes != strlen(response_payload) + (strlen(response_payload) < 126? 2 : (strlen(response_payload) < 65536? 4 : 10))) {
perror("send");
}
}
}
} else {
close(client_fd);
}
}
int handle_websocket_handshake(int client_fd) {
char buffer[1024];
ssize_t read_bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
if (read_bytes <= 0) {
perror("recv");
return -1;
}
buffer[read_bytes] = '\0';
char *key_start = strstr(buffer, "Sec - WebSocket - Key: ");
if (!key_start) {
// 没有找到 Sec - WebSocket - Key 字段,握手失败
return -1;
}
key_start += strlen("Sec - WebSocket - Key: ");
char key[128];
sscanf(key_start, "%127s", key);
char magic_string[] = "258EAFA5 - E914 - 47DA - 95CA - C5AB0DC85B11";
char combined[256];
snprintf(combined, sizeof(combined), "%s%s", key, magic_string);
unsigned char hash_result[SHA_DIGEST_LENGTH];
SHA1((unsigned char *)combined, strlen(combined), hash_result);
char accept_value[256];
base64_encode(hash_result, SHA_DIGEST_LENGTH, accept_value);
const char *response = "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec - WebSocket - Accept: %s\r\n\r\n";
char response_buffer[1024];
snprintf(response_buffer, sizeof(response_buffer), response, accept_value);
ssize_t write_bytes = send(client_fd, response_buffer, strlen(response_buffer), 0);
if (write_bytes != strlen(response_buffer)) {
perror("send");
return -1;
}
return 0;
}
int parse_websocket_frame_header(char *buffer, ssize_t length, int *is_text, int *payload_length, char *mask_key) {
if (length < 2) {
return -1;
}
int fin = (buffer[0] & 0x80) >> 7;
int opcode = buffer[0] & 0x0F;
*is_text = (opcode == 0x1);
int has_mask = (buffer[1] & 0x80) >> 7;
*payload_length = buffer[1] & 0x7F;
if (*payload_length == 126) {
if (length < 4) {
return -1;
}
*payload_length = (buffer[2] << 8) | buffer[3];
} else if (*payload_length == 127) {
if (length < 10) {
return -1;
}
*payload_length = 0;
for (int i = 2; i < 10; i++) {
*payload_length = (*payload_length << 8) | buffer[i];
}
}
if (has_mask) {
if (length < (*payload_length + 4)) {
return -1;
}
memcpy(mask_key, buffer + 2, 4);
}
return 0;
}
void unmask_websocket_data(char *data, int length, char *mask_key) {
for (int i = 0; i < length; i++) {
data[i] ^= mask_key[i % 4];
}
}
void construct_websocket_frame(char *buffer, int is_text, int payload_length, char *payload) {
buffer[0] = (is_text? 0x1 : 0x2) | 0x80;
if (payload_length < 126) {
buffer[1] = payload_length;
} else if (payload_length < 65536) {
buffer[1] = 126;
buffer[2] = (payload_length >> 8) & 0xFF;
buffer[3] = payload_length & 0xFF;
} else {
buffer[1] = 127;
for (int i = 0; i < 8; i++) {
buffer[2 + i] = (payload_length >> (56 - i * 8)) & 0xFF;
}
}
memcpy(buffer + (payload_length < 126? 2 : (payload_length < 65536? 4 : 10)), payload, payload_length);
}
这个完整的代码实现了一个基本的基于 libevent 的 WebSocket 服务器,能够处理新连接、WebSocket 握手以及简单的消息接收和响应。
四、优化与扩展
4.1 连接管理优化
在实际应用中,服务器可能需要处理大量的 WebSocket 连接。为了提高性能和资源利用率,可以使用连接池来管理连接。连接池可以预先创建一定数量的连接对象,当有新的 WebSocket 连接请求时,从连接池中获取一个空闲连接,而不是每次都创建新的连接。这样可以减少系统调用开销和资源分配开销。
另外,对于长时间没有活动的连接,可以设置心跳机制来检测连接的有效性。如果某个连接在一定时间内没有发送或接收任何数据,服务器可以主动关闭该连接,以释放资源。
4.2 消息处理优化
对于大量的 WebSocket 消息,需要优化消息的处理流程。可以采用多线程或多进程的方式来并行处理消息。例如,使用线程池来处理接收到的 WebSocket 消息,每个线程负责处理一部分消息,这样可以提高整体的处理效率。
同时,对于消息的序列化和反序列化,可以采用更高效的算法和数据格式。例如,使用 Protocol Buffers 或 MessagePack 等二进制序列化格式代替 JSON 或 XML,以减少数据传输量和解析时间。
4.3 安全性扩展
WebSocket 服务器面临着多种安全威胁,如 DDoS 攻击、恶意消息注入等。为了增强安全性,可以采取以下措施:
- 认证和授权:在 WebSocket 握手阶段或后续的消息交互中,对客户端进行认证,确保只有合法的客户端能够连接和发送消息。可以使用用户名密码、Token 等方式进行认证。认证通过后,根据用户的权限进行授权,限制用户能够执行的操作。
- 输入验证:对客户端发送的消息进行严格的输入验证,防止恶意消息注入。例如,检查消息的长度、格式、内容是否符合预期,过滤掉非法字符和敏感信息。
- 加密传输:使用 SSL/TLS 对 WebSocket 连接进行加密,确保数据在传输过程中的保密性和完整性。可以通过 OpenSSL 等库来实现 SSL/TLS 加密。
通过以上优化和扩展措施,可以使基于 libevent 的 WebSocket 服务器更加健壮、高效和安全,适用于各种复杂的实际应用场景。