Chapter 07

WebSocket 实时通信

从协议原理到实时聊天室——Bun 原生 WebSocket 与生产级实现

WebSocket 协议原理

HTTP 是请求-响应模型:客户端发请求,服务器返回响应,连接关闭。这对于聊天、游戏、股票行情等需要实时双向通信的场景完全不够用。

HTTP 请求-响应(短连接):
客户端 ──GET /data──▶ 服务器
客户端 ◀──Response── 服务器
(连接关闭,下次需重新建立)

WebSocket(长连接,全双工):
客户端 ──HTTP Upgrade──▶ 服务器  ← 一次握手
客户端 ◀────────────────▶ 服务器  ← 持久双向通信
客户端 ──Message──▶ 服务器
客户端 ◀──Push────── 服务器       ← 服务器主动推送

Bun 原生 WebSocket

Bun 内置了基于 uWebSockets 的高性能 WebSocket 服务器,完全不需要 ws 库:

// server.ts — Bun 原生 WebSocket 服务器
type WSData = { userId: string; room: string };

const server = Bun.serve<WSData>({
  port: 3001,

  // HTTP 请求处理(包括 WebSocket 升级)
  async fetch(req, server) {
    const url = new URL(req.url);

    if (url.pathname === '/ws') {
      // 验证 token
      const token = url.searchParams.get('token');
      const userId = await validateToken(token);
      if (!userId) return new Response('Unauthorized', { status: 401 });

      const room = url.searchParams.get('room') ?? 'general';

      // 升级为 WebSocket,传入上下文数据
      const upgraded = server.upgrade(req, {
        data: { userId, room },
      });
      if (!upgraded) return new Response('WebSocket upgrade failed', { status: 400 });
      return undefined; // 升级成功后不返回 Response
    }

    return new Response('Hello HTTP!');
  },

  // WebSocket 事件处理器
  websocket: {
    // 连接建立
    open(ws) {
      const { userId, room } = ws.data;
      console.log(`用户 ${userId} 加入房间 ${room}`);

      // 订阅房间(pub/sub)
      ws.subscribe(room);

      // 广播"用户加入"消息
      server.publish(room, JSON.stringify({
        type: 'system',
        message: `${userId} 加入了房间`,
        timestamp: Date.now(),
      }));
    },

    // 收到消息
    message(ws, rawMessage) {
      const { userId, room } = ws.data;
      try {
        const msg = JSON.parse(String(rawMessage));

        switch (msg.type) {
          case 'chat':
            // 广播消息到房间所有人
            server.publish(room, JSON.stringify({
              type: 'chat',
              userId,
              content: msg.content,
              timestamp: Date.now(),
            }));
            break;

          case 'join_room':
            ws.unsubscribe(room); // 离开当前房间
            ws.data.room = msg.room;
            ws.subscribe(msg.room); // 加入新房间
            break;
        }
      } catch (e) {
        ws.send(JSON.stringify({ type: 'error', message: '消息格式错误' }));
      }
    },

    // 连接关闭
    close(ws, code, reason) {
      const { userId, room } = ws.data;
      ws.unsubscribe(room);
      server.publish(room, JSON.stringify({
        type: 'system',
        message: `${userId} 离开了房间`,
      }));
    },

    // 压缩(大消息时节省带宽)
    perMessageDeflate: true,
  },
});

console.log(`WebSocket 服务运行在 ws://localhost:${server.port}/ws`);

Node.js — 使用 ws 库

npm install ws @types/ws
import { WebSocketServer, WebSocket } from 'ws';
import http from 'node:http';

const server = http.createServer();
const wss = new WebSocketServer({ server });

const rooms = new Map<string, Set<WebSocket>>();

wss.on('connection', (ws, req) => {
  const room = new URL(req.url!, 'http://localhost').searchParams.get('room') ?? 'general';

  if (!rooms.has(room)) rooms.set(room, new Set());
  rooms.get(room)!.add(ws);

  ws.on('message', (data) => {
    // 广播到同一房间
    rooms.get(room)?.forEach(client => {
      if (client !== ws && client.readyState === WebSocket.OPEN) {
        client.send(data);
      }
    });
  });

  ws.on('close', () => rooms.get(room)?.delete(ws));
});

server.listen(3001);

心跳检测与重连

WebSocket 连接可能在无数据传输时被防火墙或负载均衡器切断。需要实现心跳机制:

// 客户端心跳实现(浏览器 JS)
class ReconnectingWebSocket {
  private ws: WebSocket | null = null;
  private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
  private reconnectDelay = 1000;

  connect(url: string) {
    this.ws = new WebSocket(url);

    this.ws.addEventListener('open', () => {
      this.reconnectDelay = 1000; // 重置重连间隔
      this.startHeartbeat();
    });

    this.ws.addEventListener('close', () => {
      this.stopHeartbeat();
      // 指数退避重连:1s → 2s → 4s → ... → 最大 30s
      setTimeout(() => {
        this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
        this.connect(url);
      }, this.reconnectDelay);
    });

    this.ws.addEventListener('message', (event) => {
      const data = JSON.parse(event.data);
      if (data.type === 'pong') return; // 心跳响应,忽略
      // 处理业务消息...
    });
  }

  private startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping' }));
      }
    }, 30000); // 每30秒发一次
  }

  private stopHeartbeat() {
    if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
  }

  send(data: unknown) {
    this.ws?.send(JSON.stringify(data));
  }
}

Server-Sent Events(SSE)

如果只需要服务器单向推送数据(股票价格更新、进度通知、AI 流式输出),SSE 比 WebSocket 更简单:

// Hono SSE — 服务器单向推送
import { streamSSE } from 'hono/streaming';

app.get('/events', (c) => {
  return streamSSE(c, async (stream) => {
    let id = 0;
    while (true) {
      await stream.writeSSE({
        event: 'update',
        data: JSON.stringify({ price: Math.random() * 100 }),
        id: String(id++),
      });
      await stream.sleep(1000);
    }
  });
});

// 客户端(浏览器原生支持)
const es = new EventSource('/events');
es.addEventListener('update', (e) => {
  const data = JSON.parse(e.data);
  console.log('新价格:', data.price);
});

WebSocket vs SSE vs HTTP Polling:WebSocket 适合双向实时通信(聊天、游戏、协同编辑);SSE 适合服务器单向推送(通知、AI 流输出、进度更新),更简单、自动重连;HTTP Long Polling 是最后的保底方案,兼容性最好但效率最低。

多节点部署:Redis 适配器

单节点 WebSocket 没问题,但多节点(水平扩展)时,用户 A 和用户 B 可能连接到不同服务器,直接广播会失败。解决方案是使用 Redis 的 pub/sub:

bun add ioredis
import Redis from 'ioredis';

const pub = new Redis(process.env.REDIS_URL!);
const sub = new Redis(process.env.REDIS_URL!);

// 本地连接的 WebSocket 客户端
const localClients = new Map<string, Set<ServerWebSocket<WSData>>>();

// 订阅 Redis 频道,收到消息转发给本地 WebSocket 客户端
await sub.subscribe('room:general');
sub.on('message', (channel, message) => {
  const room = channel.replace('room:', '');
  localClients.get(room)?.forEach(ws => ws.send(message));
});

// 广播时发布到 Redis(所有节点都会收到)
function broadcastToRoom(room: string, message: string) {
  pub.publish(`room:${room}`, message);
}