WebSocket 协议原理
HTTP 是请求-响应模型:客户端发请求,服务器返回响应,连接关闭。这对于聊天、游戏、股票行情等需要实时双向通信的场景完全不够用。
- WebSocket 基于 TCP 的全双工通信协议(RFC 6455)。通过 HTTP 升级握手建立连接后,双方可以随时互发数据帧,连接持久保持,无需轮询。
-
握手升级
客户端发送含
Upgrade: websocket和Connection: Upgrade头的 HTTP 请求,服务器返回101 Switching Protocols,之后协议从 HTTP 切换到 WebSocket。 - 数据帧(Frame) WebSocket 传输的基本单位。分为:文本帧(UTF-8 字符串)、二进制帧、ping/pong 帧(心跳)、关闭帧。
- pub/sub 模式 发布/订阅模式。客户端订阅某个"主题",当有消息发布到该主题时,所有订阅者都收到消息。是实现聊天室、通知的核心模式。
HTTP 请求-响应(短连接):
客户端 ──GET /data──▶ 服务器
客户端 ◀──Response── 服务器
(连接关闭,下次需重新建立)
WebSocket(长连接,全双工):
客户端 ──HTTP Upgrade──▶ 服务器 ← 一次握手
客户端 ◀────────────────▶ 服务器 ← 持久双向通信
客户端 ──Message──▶ 服务器
客户端 ◀──Push────── 服务器 ← 服务器主动推送
Bun 原生 WebSocket
Bun 内置了基于 uWebSockets 的高性能 WebSocket 服务器,完全不需要 ws 库:
// server.ts — Bun 原生 WebSocket 服务器
type WSData = { userId: string; room: string };
const server = Bun.serve<WSData>({
port: 3001,
// HTTP 请求处理(包括 WebSocket 升级)
async fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === '/ws') {
// 验证 token
const token = url.searchParams.get('token');
const userId = await validateToken(token);
if (!userId) return new Response('Unauthorized', { status: 401 });
const room = url.searchParams.get('room') ?? 'general';
// 升级为 WebSocket,传入上下文数据
const upgraded = server.upgrade(req, {
data: { userId, room },
});
if (!upgraded) return new Response('WebSocket upgrade failed', { status: 400 });
return undefined; // 升级成功后不返回 Response
}
return new Response('Hello HTTP!');
},
// WebSocket 事件处理器
websocket: {
// 连接建立
open(ws) {
const { userId, room } = ws.data;
console.log(`用户 ${userId} 加入房间 ${room}`);
// 订阅房间(pub/sub)
ws.subscribe(room);
// 广播"用户加入"消息
server.publish(room, JSON.stringify({
type: 'system',
message: `${userId} 加入了房间`,
timestamp: Date.now(),
}));
},
// 收到消息
message(ws, rawMessage) {
const { userId, room } = ws.data;
try {
const msg = JSON.parse(String(rawMessage));
switch (msg.type) {
case 'chat':
// 广播消息到房间所有人
server.publish(room, JSON.stringify({
type: 'chat',
userId,
content: msg.content,
timestamp: Date.now(),
}));
break;
case 'join_room':
ws.unsubscribe(room); // 离开当前房间
ws.data.room = msg.room;
ws.subscribe(msg.room); // 加入新房间
break;
}
} catch (e) {
ws.send(JSON.stringify({ type: 'error', message: '消息格式错误' }));
}
},
// 连接关闭
close(ws, code, reason) {
const { userId, room } = ws.data;
ws.unsubscribe(room);
server.publish(room, JSON.stringify({
type: 'system',
message: `${userId} 离开了房间`,
}));
},
// 压缩(大消息时节省带宽)
perMessageDeflate: true,
},
});
console.log(`WebSocket 服务运行在 ws://localhost:${server.port}/ws`);
Node.js — 使用 ws 库
npm install ws @types/ws
import { WebSocketServer, WebSocket } from 'ws';
import http from 'node:http';
const server = http.createServer();
const wss = new WebSocketServer({ server });
const rooms = new Map<string, Set<WebSocket>>();
wss.on('connection', (ws, req) => {
const room = new URL(req.url!, 'http://localhost').searchParams.get('room') ?? 'general';
if (!rooms.has(room)) rooms.set(room, new Set());
rooms.get(room)!.add(ws);
ws.on('message', (data) => {
// 广播到同一房间
rooms.get(room)?.forEach(client => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
ws.on('close', () => rooms.get(room)?.delete(ws));
});
server.listen(3001);
心跳检测与重连
WebSocket 连接可能在无数据传输时被防火墙或负载均衡器切断。需要实现心跳机制:
// 客户端心跳实现(浏览器 JS)
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private reconnectDelay = 1000;
connect(url: string) {
this.ws = new WebSocket(url);
this.ws.addEventListener('open', () => {
this.reconnectDelay = 1000; // 重置重连间隔
this.startHeartbeat();
});
this.ws.addEventListener('close', () => {
this.stopHeartbeat();
// 指数退避重连:1s → 2s → 4s → ... → 最大 30s
setTimeout(() => {
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
this.connect(url);
}, this.reconnectDelay);
});
this.ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'pong') return; // 心跳响应,忽略
// 处理业务消息...
});
}
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000); // 每30秒发一次
}
private stopHeartbeat() {
if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
}
send(data: unknown) {
this.ws?.send(JSON.stringify(data));
}
}
Server-Sent Events(SSE)
如果只需要服务器单向推送数据(股票价格更新、进度通知、AI 流式输出),SSE 比 WebSocket 更简单:
// Hono SSE — 服务器单向推送
import { streamSSE } from 'hono/streaming';
app.get('/events', (c) => {
return streamSSE(c, async (stream) => {
let id = 0;
while (true) {
await stream.writeSSE({
event: 'update',
data: JSON.stringify({ price: Math.random() * 100 }),
id: String(id++),
});
await stream.sleep(1000);
}
});
});
// 客户端(浏览器原生支持)
const es = new EventSource('/events');
es.addEventListener('update', (e) => {
const data = JSON.parse(e.data);
console.log('新价格:', data.price);
});
WebSocket vs SSE vs HTTP Polling:WebSocket 适合双向实时通信(聊天、游戏、协同编辑);SSE 适合服务器单向推送(通知、AI 流输出、进度更新),更简单、自动重连;HTTP Long Polling 是最后的保底方案,兼容性最好但效率最低。
多节点部署:Redis 适配器
单节点 WebSocket 没问题,但多节点(水平扩展)时,用户 A 和用户 B 可能连接到不同服务器,直接广播会失败。解决方案是使用 Redis 的 pub/sub:
bun add ioredis
import Redis from 'ioredis';
const pub = new Redis(process.env.REDIS_URL!);
const sub = new Redis(process.env.REDIS_URL!);
// 本地连接的 WebSocket 客户端
const localClients = new Map<string, Set<ServerWebSocket<WSData>>>();
// 订阅 Redis 频道,收到消息转发给本地 WebSocket 客户端
await sub.subscribe('room:general');
sub.on('message', (channel, message) => {
const room = channel.replace('room:', '');
localClients.get(room)?.forEach(ws => ws.send(message));
});
// 广播时发布到 Redis(所有节点都会收到)
function broadcastToRoom(room: string, message: string) {
pub.publish(`room:${room}`, message);
}