实时数据推送:WebSocket在股票行情中的应用
WebSocket 基础原理
WebSocket 协议概述
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 协议不同,HTTP 是一种请求 - 响应模式的协议,客户端发起请求,服务器返回响应,在请求响应完成后连接通常会关闭。而 WebSocket 协议使得客户端和服务器之间可以建立持久连接,双方能够随时主动地向对方发送数据。
WebSocket 协议通过一个握手过程将 HTTP 协议升级为 WebSocket 协议。客户端首先发送一个带有特殊头部的 HTTP 请求,例如:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Key: dGhlIHNhbXBsZSBub25jZQ==
Sec - WebSocket - Version: 13
服务器接收到这个请求后,如果支持 WebSocket 协议,会返回类似如下的响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
这个过程完成后,连接就从 HTTP 协议升级为 WebSocket 协议,双方可以开始进行全双工通信。
WebSocket 的优势
- 实时性:由于 WebSocket 建立的是持久连接,服务器可以随时向客户端推送最新的数据,无需客户端频繁轮询。在股票行情应用中,这意味着客户端能够第一时间获取到股票价格的变化、交易信息等实时数据,大大提高了数据的及时性。
- 减少网络开销:传统的轮询方式需要客户端定时向服务器发送请求获取数据,即使数据没有变化也会产生不必要的网络请求。而 WebSocket 只有在有数据传输时才会使用网络带宽,有效地减少了网络流量和服务器负载。
- 双向通信:WebSocket 支持全双工通信,客户端和服务器可以同时主动地发送数据。在股票行情场景中,客户端不仅可以接收行情数据,还可以向服务器发送一些指令,如订阅特定股票、查询历史行情等,服务器根据客户端的请求返回相应的数据。
股票行情系统中的实时数据需求
数据类型
- 价格数据:包括股票的当前价格、开盘价、最高价、最低价等。这些数据是投资者最关注的信息,它们的实时变化反映了股票的市场价值波动。例如,一只股票的当前价格可能在短时间内频繁变动,投资者需要及时了解这些变化来做出买卖决策。
- 成交量数据:成交量反映了市场对该股票的交易活跃程度。高成交量可能意味着市场对该股票的关注度较高,价格波动可能更加剧烈。实时的成交量数据对于分析股票的市场热度和趋势非常重要。
- 交易信息:如每一笔交易的时间、成交价格、成交量等详细信息。这些数据可以帮助投资者了解市场的交易动态,分析主力资金的流向等。
实时性要求
在股票市场中,每一秒的数据变化都可能影响投资者的决策。对于一些日内交易者或者高频交易者来说,数据的延迟可能导致他们错过最佳的交易时机。因此,股票行情系统对实时数据的推送要求极高,数据延迟应尽可能控制在几十毫秒以内。例如,在一些大型的证券交易平台上,为了保证行情数据的实时性,会投入大量的资源来优化数据传输和处理流程。
WebSocket 在股票行情中的应用架构
客户端
- 连接建立:在股票行情客户端应用中,首先要建立与服务器的 WebSocket 连接。以 JavaScript 为例,可以使用 WebSocket 对象来实现:
const socket = new WebSocket('ws://localhost:8080/stock - quotes');
socket.onopen = function (event) {
console.log('WebSocket connection established');
};
socket.onerror = function (error) {
console.log('WebSocket error:', error);
};
- 数据接收与处理:当连接建立成功后,客户端可以接收服务器推送的股票行情数据。接收到的数据需要根据预先定义的格式进行解析和处理,然后展示在用户界面上。例如,如果服务器推送的是 JSON 格式的数据:
socket.onmessage = function (event) {
const data = JSON.parse(event.data);
const stockName = data.stockName;
const currentPrice = data.currentPrice;
// 在页面上更新股票价格显示
document.getElementById('stock - price -'+ stockName).textContent = currentPrice;
};
- 发送指令:客户端还可以向服务器发送指令,如订阅特定股票的行情。同样以 JavaScript 为例:
const subscribeMessage = {
action:'subscribe',
stockSymbol: 'AAPL'
};
socket.send(JSON.stringify(subscribeMessage));
服务器端
- WebSocket 服务器搭建:在后端开发中,可以使用多种框架来搭建 WebSocket 服务器。以 Node.js 为例,常用的框架有
ws
库。以下是一个简单的使用ws
库搭建 WebSocket 服务器的示例:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
console.log('A client connected');
ws.on('message', function incoming(message) {
console.log('Received message:', message);
// 处理客户端发送的消息
const data = JSON.parse(message);
if (data.action ==='subscribe') {
// 处理订阅逻辑
}
});
ws.on('close', function close() {
console.log('A client disconnected');
});
});
- 数据获取与推送:服务器需要从数据源获取股票行情数据,数据源可以是证券交易所提供的接口、数据库等。获取到数据后,服务器将数据推送给已连接的客户端。例如,假设服务器从数据库中获取到股票行情数据,然后推送给客户端:
// 模拟从数据库获取股票行情数据
const getStockQuotes = () => {
return {
stockName: 'AAPL',
currentPrice: 150.25,
volume: 100000
};
};
setInterval(() => {
const stockQuotes = getStockQuotes();
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(stockQuotes));
}
});
}, 1000);
- 指令处理:服务器接收到客户端发送的指令后,需要根据指令类型进行相应的处理。如处理订阅指令,服务器可能需要维护一个订阅列表,记录每个客户端订阅的股票,然后只向订阅了特定股票的客户端推送相关行情数据。
const subscriptions = {};
ws.on('message', function incoming(message) {
const data = JSON.parse(message);
if (data.action ==='subscribe') {
const stockSymbol = data.stockSymbol;
if (!subscriptions[stockSymbol]) {
subscriptions[stockSymbol] = [];
}
subscriptions[stockSymbol].push(ws);
}
});
// 推送数据时,只推送给订阅了该股票的客户端
const stockQuotes = getStockQuotes();
if (subscriptions[stockQuotes.stockName]) {
subscriptions[stockQuotes.stockName].forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(stockQuotes));
}
});
}
可靠性与性能优化
连接管理
- 心跳机制:为了确保 WebSocket 连接的可靠性,防止连接因为网络故障、服务器重启等原因意外断开,通常会引入心跳机制。客户端和服务器定期向对方发送心跳消息,以确认连接仍然有效。例如,客户端可以每隔一段时间(如 30 秒)向服务器发送一个心跳消息:
const heartbeatInterval = 30000;
let heartbeatTimer;
function sendHeartbeat() {
socket.send('ping');
heartbeatTimer = setTimeout(sendHeartbeat, heartbeatInterval);
}
socket.onopen = function () {
sendHeartbeat();
};
socket.onclose = function () {
clearTimeout(heartbeatTimer);
};
服务器接收到心跳消息后,需要回复一个响应消息,如 pong
。客户端接收到 pong
消息后,确认连接正常。如果客户端在一定时间内没有收到 pong
消息,则认为连接可能已断开,需要尝试重新连接。
2. 重连机制:当 WebSocket 连接断开时,客户端需要有重连机制来尝试重新建立连接。可以设置一个重试次数和重试间隔时间,每次连接失败后,按照一定的策略增加重试间隔时间。例如:
const maxRetries = 5;
let retryCount = 0;
const initialRetryInterval = 1000;
function reconnect() {
if (retryCount < maxRetries) {
const retryInterval = initialRetryInterval * (2 ** retryCount);
setTimeout(() => {
const socket = new WebSocket('ws://localhost:8080/stock - quotes');
socket.onopen = function () {
console.log('Reconnected successfully');
retryCount = 0;
};
socket.onclose = function () {
retryCount++;
reconnect();
};
}, retryInterval);
}
}
socket.onclose = function () {
reconnect();
};
数据优化
- 数据压缩:在网络传输过程中,为了减少数据量,提高传输效率,可以对股票行情数据进行压缩。常见的压缩算法有 Gzip 等。在服务器端,可以使用相应的库对要推送的数据进行压缩,然后在客户端进行解压缩。以 Node.js 为例,使用
zlib
库进行 Gzip 压缩:
const zlib = require('zlib');
const stockQuotes = getStockQuotes();
const jsonData = JSON.stringify(stockQuotes);
zlib.gzip(jsonData, function (err, buffer) {
if (!err) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(buffer);
}
});
}
});
在客户端,使用 pako
库进行解压缩(假设使用 JavaScript):
import pako from 'pako';
socket.onmessage = function (event) {
const decompressed = pako.inflate(event.data);
const data = JSON.parse(decompressed);
// 处理数据
};
- 数据过滤与聚合:服务器端可以根据客户端的订阅情况对数据进行过滤,只推送客户端需要的股票行情数据。同时,对于一些高频变化的数据,可以进行适当的聚合处理,减少数据推送频率。例如,对于每秒都在变化的股票价格,可以每隔 5 秒进行一次价格平均值的计算,然后推送这个平均值,这样既能满足大多数投资者对价格趋势的关注,又能减少数据量。
安全性考虑
身份验证
- 基于 Token 的认证:在 WebSocket 连接建立之前,客户端需要进行身份验证。一种常见的方式是使用 Token。客户端在发起 WebSocket 连接时,将 Token 作为参数传递给服务器,例如:
const token = 'your - authentication - token';
const socket = new WebSocket('ws://localhost:8080/stock - quotes?token=' + token);
服务器接收到连接请求后,从请求参数中获取 Token,并验证 Token 的有效性。可以通过与认证服务器进行交互,或者在本地缓存中验证 Token。以 Node.js 为例:
const jwt = require('jsonwebtoken');
wss.on('connection', function connection(ws, req) {
const token = req.url.split('=')[1];
try {
const decoded = jwt.verify(token, 'your - secret - key');
console.log('Authenticated user:', decoded.username);
// 连接建立成功
} catch (err) {
console.log('Authentication failed:', err);
ws.close(1008, 'Authentication failed');
}
});
- Cookie - 基于的认证:如果应用程序已经使用了基于 Cookie 的身份验证机制,WebSocket 服务器也可以利用 Cookie 进行认证。服务器可以从 HTTP 请求头中获取 Cookie,并验证用户的身份。例如,在 Node.js 中使用
cookie - parser
库:
const cookieParser = require('cookie - parser');
const app = require('express')();
app.use(cookieParser());
const wss = new WebSocket.Server({ server: app });
wss.on('connection', function connection(ws, req) {
const cookies = req.cookies;
if (cookies['session - cookie']) {
// 验证 session - cookie 的有效性
if (isValidSessionCookie(cookies['session - cookie'])) {
console.log('Authenticated user from cookie');
// 连接建立成功
} else {
console.log('Authentication failed from cookie');
ws.close(1008, 'Authentication failed');
}
} else {
console.log('No session cookie found');
ws.close(1008, 'No session cookie found');
}
});
数据加密
- TLS/SSL 加密:为了保护在 WebSocket 连接上传输的股票行情数据的机密性和完整性,应使用 TLS/SSL 加密。在服务器端配置 TLS/SSL 证书,使 WebSocket 连接通过 HTTPS 协议进行。例如,在 Node.js 中使用
https
模块和ws
库:
const https = require('https');
const fs = require('fs');
const WebSocket = require('ws');
const options = {
key: fs.readFileSync('path/to/key.pem'),
cert: fs.readFileSync('path/to/cert.pem')
};
const server = https.createServer(options);
const wss = new WebSocket.Server({ server });
server.listen(443, function () {
console.log('WebSocket server is running on HTTPS');
});
客户端在连接时,使用 wss://
协议:
const socket = new WebSocket('wss://localhost/stock - quotes');
- 数据层面加密:除了传输层的加密,对于一些敏感的股票行情数据,还可以在数据层面进行加密。例如,使用对称加密算法(如 AES)对数据进行加密后再传输。在服务器端加密数据:
const crypto = require('crypto');
const algorithm = 'aes - 256 - cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);
const stockQuotes = getStockQuotes();
const jsonData = JSON.stringify(stockQuotes);
const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update(jsonData, 'utf8', 'hex');
encrypted += cipher.final('hex');
const dataToSend = {
encryptedData: encrypted,
iv: iv.toString('hex')
};
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(dataToSend));
}
});
在客户端解密数据:
const crypto = require('crypto');
const algorithm = 'aes - 256 - cbc';
socket.onmessage = function (event) {
const data = JSON.parse(event.data);
const decipher = crypto.createDecipheriv(algorithm, key, Buffer.from(data.iv, 'hex'));
let decrypted = decipher.update(data.encryptedData, 'hex', 'utf8');
decrypted += decipher.final('utf8');
const stockQuotes = JSON.parse(decrypted);
// 处理数据
};
跨域问题
跨域原理
当客户端在一个域名下发起 WebSocket 连接,而服务器位于另一个域名时,就会出现跨域问题。与 HTTP 跨域类似,浏览器出于安全考虑,会阻止这种跨域的 WebSocket 连接。例如,客户端在 http://client - domain.com
上运行,而 WebSocket 服务器在 http://server - domain.com
上,浏览器会拦截这个连接请求。
解决方案
- 服务器端配置 CORS:在服务器端配置跨域资源共享(CORS),允许特定的源访问 WebSocket 服务。以 Node.js 和
ws
库为例,可以在服务器端设置相应的响应头:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws, req) {
const origin = req.headers.origin;
if (origin === 'http://client - domain.com') {
// 设置允许跨域的响应头
ws.send('Connection established with CORS');
} else {
ws.close(1008, 'Cross - origin not allowed');
}
});
- 代理服务器:可以使用代理服务器来解决跨域问题。客户端先向本地代理服务器发起 WebSocket 连接,代理服务器再转发请求到实际的 WebSocket 服务器。例如,使用 Nginx 作为代理服务器:
server {
listen 80;
server_name client - domain.com;
location /ws {
proxy_pass http://server - domain.com:8080;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}
这样,客户端通过 ws://client - domain.com/ws
连接,Nginx 会将请求转发到 http://server - domain.com:8080
,从而绕过浏览器的跨域限制。
与其他技术结合
与数据库结合
- 数据存储:股票行情数据不仅需要实时推送,还需要存储以便后续分析和查询。可以将实时获取的股票行情数据存储到数据库中。例如,使用 MySQL 数据库,在 Node.js 中使用
mysql2
库进行数据存储:
const mysql = require('mysql2');
const connection = mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database:'stock_db'
});
const stockQuotes = getStockQuotes();
const query = 'INSERT INTO stock_quotes (stock_name, current_price, volume) VALUES (?,?,?)';
connection.query(query, [stockQuotes.stockName, stockQuotes.currentPrice, stockQuotes.volume], function (error, results, fields) {
if (error) throw error;
console.log('Data inserted successfully');
});
- 历史数据查询:当客户端需要查询历史股票行情数据时,服务器可以从数据库中检索相应的数据,并通过 WebSocket 推送给客户端。例如,客户端发送查询指令:
const queryMessage = {
action: 'query - history',
stockSymbol: 'AAPL',
startDate: '2023 - 01 - 01',
endDate: '2023 - 01 - 31'
};
socket.send(JSON.stringify(queryMessage));
服务器接收到指令后,从数据库中查询数据并推送:
ws.on('message', function incoming(message) {
const data = JSON.parse(message);
if (data.action === 'query - history') {
const { stockSymbol, startDate, endDate } = data;
const query = 'SELECT * FROM stock_quotes WHERE stock_name =? AND quote_date BETWEEN? AND?';
connection.query(query, [stockSymbol, startDate, endDate], function (error, results, fields) {
if (error) throw error;
ws.send(JSON.stringify(results));
});
}
});
与消息队列结合
- 解耦数据获取与推送:在股票行情系统中,数据获取和推送可能是两个相对独立的过程。可以使用消息队列(如 RabbitMQ)来解耦这两个过程。数据获取模块将获取到的股票行情数据发送到消息队列中,而 WebSocket 服务器从消息队列中消费数据并推送给客户端。例如,在 Node.js 中使用
amqplib
库连接 RabbitMQ:
const amqp = require('amqplib');
async function sendStockQuotesToQueue() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue ='stock - quotes - queue';
await channel.assertQueue(queue, { durable: false });
const stockQuotes = getStockQuotes();
const message = JSON.stringify(stockQuotes);
channel.sendToQueue(queue, Buffer.from(message));
console.log('Stock quotes sent to queue');
await channel.close();
await connection.close();
}
WebSocket 服务器消费消息:
async function consumeStockQuotesFromQueue() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue ='stock - quotes - queue';
await channel.assertQueue(queue, { durable: false });
channel.consume(queue, function (msg) {
if (msg) {
const stockQuotes = JSON.parse(msg.content.toString());
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(stockQuotes));
}
});
channel.ack(msg);
}
}, { noAck: false });
}
- 提高系统的可靠性和扩展性:消息队列可以作为一个可靠的中间层,即使 WebSocket 服务器暂时出现故障,数据也不会丢失,等服务器恢复后可以继续从消息队列中消费数据。同时,通过增加消息队列的消费者数量,可以很容易地扩展系统的处理能力,以应对大量客户端的连接和数据推送需求。