MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

实时数据推送:WebSocket在股票行情中的应用

2023-03-281.6k 阅读

WebSocket 基础原理

WebSocket 协议概述

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 协议不同,HTTP 是一种请求 - 响应模式的协议,客户端发起请求,服务器返回响应,在请求响应完成后连接通常会关闭。而 WebSocket 协议使得客户端和服务器之间可以建立持久连接,双方能够随时主动地向对方发送数据。

WebSocket 协议通过一个握手过程将 HTTP 协议升级为 WebSocket 协议。客户端首先发送一个带有特殊头部的 HTTP 请求,例如:

GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Key: dGhlIHNhbXBsZSBub25jZQ==
Sec - WebSocket - Version: 13

服务器接收到这个请求后,如果支持 WebSocket 协议,会返回类似如下的响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec - WebSocket - Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

这个过程完成后,连接就从 HTTP 协议升级为 WebSocket 协议,双方可以开始进行全双工通信。

WebSocket 的优势

  1. 实时性:由于 WebSocket 建立的是持久连接,服务器可以随时向客户端推送最新的数据,无需客户端频繁轮询。在股票行情应用中,这意味着客户端能够第一时间获取到股票价格的变化、交易信息等实时数据,大大提高了数据的及时性。
  2. 减少网络开销:传统的轮询方式需要客户端定时向服务器发送请求获取数据,即使数据没有变化也会产生不必要的网络请求。而 WebSocket 只有在有数据传输时才会使用网络带宽,有效地减少了网络流量和服务器负载。
  3. 双向通信:WebSocket 支持全双工通信,客户端和服务器可以同时主动地发送数据。在股票行情场景中,客户端不仅可以接收行情数据,还可以向服务器发送一些指令,如订阅特定股票、查询历史行情等,服务器根据客户端的请求返回相应的数据。

股票行情系统中的实时数据需求

数据类型

  1. 价格数据:包括股票的当前价格、开盘价、最高价、最低价等。这些数据是投资者最关注的信息,它们的实时变化反映了股票的市场价值波动。例如,一只股票的当前价格可能在短时间内频繁变动,投资者需要及时了解这些变化来做出买卖决策。
  2. 成交量数据:成交量反映了市场对该股票的交易活跃程度。高成交量可能意味着市场对该股票的关注度较高,价格波动可能更加剧烈。实时的成交量数据对于分析股票的市场热度和趋势非常重要。
  3. 交易信息:如每一笔交易的时间、成交价格、成交量等详细信息。这些数据可以帮助投资者了解市场的交易动态,分析主力资金的流向等。

实时性要求

在股票市场中,每一秒的数据变化都可能影响投资者的决策。对于一些日内交易者或者高频交易者来说,数据的延迟可能导致他们错过最佳的交易时机。因此,股票行情系统对实时数据的推送要求极高,数据延迟应尽可能控制在几十毫秒以内。例如,在一些大型的证券交易平台上,为了保证行情数据的实时性,会投入大量的资源来优化数据传输和处理流程。

WebSocket 在股票行情中的应用架构

客户端

  1. 连接建立:在股票行情客户端应用中,首先要建立与服务器的 WebSocket 连接。以 JavaScript 为例,可以使用 WebSocket 对象来实现:
const socket = new WebSocket('ws://localhost:8080/stock - quotes');
socket.onopen = function (event) {
    console.log('WebSocket connection established');
};
socket.onerror = function (error) {
    console.log('WebSocket error:', error);
};
  1. 数据接收与处理:当连接建立成功后,客户端可以接收服务器推送的股票行情数据。接收到的数据需要根据预先定义的格式进行解析和处理,然后展示在用户界面上。例如,如果服务器推送的是 JSON 格式的数据:
socket.onmessage = function (event) {
    const data = JSON.parse(event.data);
    const stockName = data.stockName;
    const currentPrice = data.currentPrice;
    // 在页面上更新股票价格显示
    document.getElementById('stock - price -'+ stockName).textContent = currentPrice;
};
  1. 发送指令:客户端还可以向服务器发送指令,如订阅特定股票的行情。同样以 JavaScript 为例:
const subscribeMessage = {
    action:'subscribe',
    stockSymbol: 'AAPL'
};
socket.send(JSON.stringify(subscribeMessage));

服务器端

  1. WebSocket 服务器搭建:在后端开发中,可以使用多种框架来搭建 WebSocket 服务器。以 Node.js 为例,常用的框架有 ws 库。以下是一个简单的使用 ws 库搭建 WebSocket 服务器的示例:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
    console.log('A client connected');
    ws.on('message', function incoming(message) {
        console.log('Received message:', message);
        // 处理客户端发送的消息
        const data = JSON.parse(message);
        if (data.action ==='subscribe') {
            // 处理订阅逻辑
        }
    });
    ws.on('close', function close() {
        console.log('A client disconnected');
    });
});
  1. 数据获取与推送:服务器需要从数据源获取股票行情数据,数据源可以是证券交易所提供的接口、数据库等。获取到数据后,服务器将数据推送给已连接的客户端。例如,假设服务器从数据库中获取到股票行情数据,然后推送给客户端:
// 模拟从数据库获取股票行情数据
const getStockQuotes = () => {
    return {
        stockName: 'AAPL',
        currentPrice: 150.25,
        volume: 100000
    };
};
setInterval(() => {
    const stockQuotes = getStockQuotes();
    wss.clients.forEach((client) => {
        if (client.readyState === WebSocket.OPEN) {
            client.send(JSON.stringify(stockQuotes));
        }
    });
}, 1000);
  1. 指令处理:服务器接收到客户端发送的指令后,需要根据指令类型进行相应的处理。如处理订阅指令,服务器可能需要维护一个订阅列表,记录每个客户端订阅的股票,然后只向订阅了特定股票的客户端推送相关行情数据。
const subscriptions = {};
ws.on('message', function incoming(message) {
    const data = JSON.parse(message);
    if (data.action ==='subscribe') {
        const stockSymbol = data.stockSymbol;
        if (!subscriptions[stockSymbol]) {
            subscriptions[stockSymbol] = [];
        }
        subscriptions[stockSymbol].push(ws);
    }
});
// 推送数据时,只推送给订阅了该股票的客户端
const stockQuotes = getStockQuotes();
if (subscriptions[stockQuotes.stockName]) {
    subscriptions[stockQuotes.stockName].forEach((client) => {
        if (client.readyState === WebSocket.OPEN) {
            client.send(JSON.stringify(stockQuotes));
        }
    });
}

可靠性与性能优化

连接管理

  1. 心跳机制:为了确保 WebSocket 连接的可靠性,防止连接因为网络故障、服务器重启等原因意外断开,通常会引入心跳机制。客户端和服务器定期向对方发送心跳消息,以确认连接仍然有效。例如,客户端可以每隔一段时间(如 30 秒)向服务器发送一个心跳消息:
const heartbeatInterval = 30000;
let heartbeatTimer;
function sendHeartbeat() {
    socket.send('ping');
    heartbeatTimer = setTimeout(sendHeartbeat, heartbeatInterval);
}
socket.onopen = function () {
    sendHeartbeat();
};
socket.onclose = function () {
    clearTimeout(heartbeatTimer);
};

服务器接收到心跳消息后,需要回复一个响应消息,如 pong。客户端接收到 pong 消息后,确认连接正常。如果客户端在一定时间内没有收到 pong 消息,则认为连接可能已断开,需要尝试重新连接。 2. 重连机制:当 WebSocket 连接断开时,客户端需要有重连机制来尝试重新建立连接。可以设置一个重试次数和重试间隔时间,每次连接失败后,按照一定的策略增加重试间隔时间。例如:

const maxRetries = 5;
let retryCount = 0;
const initialRetryInterval = 1000;
function reconnect() {
    if (retryCount < maxRetries) {
        const retryInterval = initialRetryInterval * (2 ** retryCount);
        setTimeout(() => {
            const socket = new WebSocket('ws://localhost:8080/stock - quotes');
            socket.onopen = function () {
                console.log('Reconnected successfully');
                retryCount = 0;
            };
            socket.onclose = function () {
                retryCount++;
                reconnect();
            };
        }, retryInterval);
    }
}
socket.onclose = function () {
    reconnect();
};

数据优化

  1. 数据压缩:在网络传输过程中,为了减少数据量,提高传输效率,可以对股票行情数据进行压缩。常见的压缩算法有 Gzip 等。在服务器端,可以使用相应的库对要推送的数据进行压缩,然后在客户端进行解压缩。以 Node.js 为例,使用 zlib 库进行 Gzip 压缩:
const zlib = require('zlib');
const stockQuotes = getStockQuotes();
const jsonData = JSON.stringify(stockQuotes);
zlib.gzip(jsonData, function (err, buffer) {
    if (!err) {
        wss.clients.forEach((client) => {
            if (client.readyState === WebSocket.OPEN) {
                client.send(buffer);
            }
        });
    }
});

在客户端,使用 pako 库进行解压缩(假设使用 JavaScript):

import pako from 'pako';
socket.onmessage = function (event) {
    const decompressed = pako.inflate(event.data);
    const data = JSON.parse(decompressed);
    // 处理数据
};
  1. 数据过滤与聚合:服务器端可以根据客户端的订阅情况对数据进行过滤,只推送客户端需要的股票行情数据。同时,对于一些高频变化的数据,可以进行适当的聚合处理,减少数据推送频率。例如,对于每秒都在变化的股票价格,可以每隔 5 秒进行一次价格平均值的计算,然后推送这个平均值,这样既能满足大多数投资者对价格趋势的关注,又能减少数据量。

安全性考虑

身份验证

  1. 基于 Token 的认证:在 WebSocket 连接建立之前,客户端需要进行身份验证。一种常见的方式是使用 Token。客户端在发起 WebSocket 连接时,将 Token 作为参数传递给服务器,例如:
const token = 'your - authentication - token';
const socket = new WebSocket('ws://localhost:8080/stock - quotes?token=' + token);

服务器接收到连接请求后,从请求参数中获取 Token,并验证 Token 的有效性。可以通过与认证服务器进行交互,或者在本地缓存中验证 Token。以 Node.js 为例:

const jwt = require('jsonwebtoken');
wss.on('connection', function connection(ws, req) {
    const token = req.url.split('=')[1];
    try {
        const decoded = jwt.verify(token, 'your - secret - key');
        console.log('Authenticated user:', decoded.username);
        // 连接建立成功
    } catch (err) {
        console.log('Authentication failed:', err);
        ws.close(1008, 'Authentication failed');
    }
});
  1. Cookie - 基于的认证:如果应用程序已经使用了基于 Cookie 的身份验证机制,WebSocket 服务器也可以利用 Cookie 进行认证。服务器可以从 HTTP 请求头中获取 Cookie,并验证用户的身份。例如,在 Node.js 中使用 cookie - parser 库:
const cookieParser = require('cookie - parser');
const app = require('express')();
app.use(cookieParser());
const wss = new WebSocket.Server({ server: app });
wss.on('connection', function connection(ws, req) {
    const cookies = req.cookies;
    if (cookies['session - cookie']) {
        // 验证 session - cookie 的有效性
        if (isValidSessionCookie(cookies['session - cookie'])) {
            console.log('Authenticated user from cookie');
            // 连接建立成功
        } else {
            console.log('Authentication failed from cookie');
            ws.close(1008, 'Authentication failed');
        }
    } else {
        console.log('No session cookie found');
        ws.close(1008, 'No session cookie found');
    }
});

数据加密

  1. TLS/SSL 加密:为了保护在 WebSocket 连接上传输的股票行情数据的机密性和完整性,应使用 TLS/SSL 加密。在服务器端配置 TLS/SSL 证书,使 WebSocket 连接通过 HTTPS 协议进行。例如,在 Node.js 中使用 https 模块和 ws 库:
const https = require('https');
const fs = require('fs');
const WebSocket = require('ws');
const options = {
    key: fs.readFileSync('path/to/key.pem'),
    cert: fs.readFileSync('path/to/cert.pem')
};
const server = https.createServer(options);
const wss = new WebSocket.Server({ server });
server.listen(443, function () {
    console.log('WebSocket server is running on HTTPS');
});

客户端在连接时,使用 wss:// 协议:

const socket = new WebSocket('wss://localhost/stock - quotes');
  1. 数据层面加密:除了传输层的加密,对于一些敏感的股票行情数据,还可以在数据层面进行加密。例如,使用对称加密算法(如 AES)对数据进行加密后再传输。在服务器端加密数据:
const crypto = require('crypto');
const algorithm = 'aes - 256 - cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);
const stockQuotes = getStockQuotes();
const jsonData = JSON.stringify(stockQuotes);
const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update(jsonData, 'utf8', 'hex');
encrypted += cipher.final('hex');
const dataToSend = {
    encryptedData: encrypted,
    iv: iv.toString('hex')
};
wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify(dataToSend));
    }
});

在客户端解密数据:

const crypto = require('crypto');
const algorithm = 'aes - 256 - cbc';
socket.onmessage = function (event) {
    const data = JSON.parse(event.data);
    const decipher = crypto.createDecipheriv(algorithm, key, Buffer.from(data.iv, 'hex'));
    let decrypted = decipher.update(data.encryptedData, 'hex', 'utf8');
    decrypted += decipher.final('utf8');
    const stockQuotes = JSON.parse(decrypted);
    // 处理数据
};

跨域问题

跨域原理

当客户端在一个域名下发起 WebSocket 连接,而服务器位于另一个域名时,就会出现跨域问题。与 HTTP 跨域类似,浏览器出于安全考虑,会阻止这种跨域的 WebSocket 连接。例如,客户端在 http://client - domain.com 上运行,而 WebSocket 服务器在 http://server - domain.com 上,浏览器会拦截这个连接请求。

解决方案

  1. 服务器端配置 CORS:在服务器端配置跨域资源共享(CORS),允许特定的源访问 WebSocket 服务。以 Node.js 和 ws 库为例,可以在服务器端设置相应的响应头:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws, req) {
    const origin = req.headers.origin;
    if (origin === 'http://client - domain.com') {
        // 设置允许跨域的响应头
        ws.send('Connection established with CORS');
    } else {
        ws.close(1008, 'Cross - origin not allowed');
    }
});
  1. 代理服务器:可以使用代理服务器来解决跨域问题。客户端先向本地代理服务器发起 WebSocket 连接,代理服务器再转发请求到实际的 WebSocket 服务器。例如,使用 Nginx 作为代理服务器:
server {
    listen 80;
    server_name client - domain.com;
    location /ws {
        proxy_pass http://server - domain.com:8080;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}

这样,客户端通过 ws://client - domain.com/ws 连接,Nginx 会将请求转发到 http://server - domain.com:8080,从而绕过浏览器的跨域限制。

与其他技术结合

与数据库结合

  1. 数据存储:股票行情数据不仅需要实时推送,还需要存储以便后续分析和查询。可以将实时获取的股票行情数据存储到数据库中。例如,使用 MySQL 数据库,在 Node.js 中使用 mysql2 库进行数据存储:
const mysql = require('mysql2');
const connection = mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database:'stock_db'
});
const stockQuotes = getStockQuotes();
const query = 'INSERT INTO stock_quotes (stock_name, current_price, volume) VALUES (?,?,?)';
connection.query(query, [stockQuotes.stockName, stockQuotes.currentPrice, stockQuotes.volume], function (error, results, fields) {
    if (error) throw error;
    console.log('Data inserted successfully');
});
  1. 历史数据查询:当客户端需要查询历史股票行情数据时,服务器可以从数据库中检索相应的数据,并通过 WebSocket 推送给客户端。例如,客户端发送查询指令:
const queryMessage = {
    action: 'query - history',
    stockSymbol: 'AAPL',
    startDate: '2023 - 01 - 01',
    endDate: '2023 - 01 - 31'
};
socket.send(JSON.stringify(queryMessage));

服务器接收到指令后,从数据库中查询数据并推送:

ws.on('message', function incoming(message) {
    const data = JSON.parse(message);
    if (data.action === 'query - history') {
        const { stockSymbol, startDate, endDate } = data;
        const query = 'SELECT * FROM stock_quotes WHERE stock_name =? AND quote_date BETWEEN? AND?';
        connection.query(query, [stockSymbol, startDate, endDate], function (error, results, fields) {
            if (error) throw error;
            ws.send(JSON.stringify(results));
        });
    }
});

与消息队列结合

  1. 解耦数据获取与推送:在股票行情系统中,数据获取和推送可能是两个相对独立的过程。可以使用消息队列(如 RabbitMQ)来解耦这两个过程。数据获取模块将获取到的股票行情数据发送到消息队列中,而 WebSocket 服务器从消息队列中消费数据并推送给客户端。例如,在 Node.js 中使用 amqplib 库连接 RabbitMQ:
const amqp = require('amqplib');
async function sendStockQuotesToQueue() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    const queue ='stock - quotes - queue';
    await channel.assertQueue(queue, { durable: false });
    const stockQuotes = getStockQuotes();
    const message = JSON.stringify(stockQuotes);
    channel.sendToQueue(queue, Buffer.from(message));
    console.log('Stock quotes sent to queue');
    await channel.close();
    await connection.close();
}

WebSocket 服务器消费消息:

async function consumeStockQuotesFromQueue() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    const queue ='stock - quotes - queue';
    await channel.assertQueue(queue, { durable: false });
    channel.consume(queue, function (msg) {
        if (msg) {
            const stockQuotes = JSON.parse(msg.content.toString());
            wss.clients.forEach((client) => {
                if (client.readyState === WebSocket.OPEN) {
                    client.send(JSON.stringify(stockQuotes));
                }
            });
            channel.ack(msg);
        }
    }, { noAck: false });
}
  1. 提高系统的可靠性和扩展性:消息队列可以作为一个可靠的中间层,即使 WebSocket 服务器暂时出现故障,数据也不会丢失,等服务器恢复后可以继续从消息队列中消费数据。同时,通过增加消息队列的消费者数量,可以很容易地扩展系统的处理能力,以应对大量客户端的连接和数据推送需求。