Chapter 08

WebSocket 与 SSE

打破请求-响应的单次对话模式,用 WebSocket 实现双向实时通信,用 SSE 推送服务端事件流——并在 Swift 6 Actor 并发安全模型下管理连接状态。

为什么需要实时通信?

传统 HTTP 是一问一答的模型:客户端发请求,服务端返回响应,连接随即关闭。这对于查询用户信息、提交表单等场景足够用,但对于聊天室、在线协作、股价行情、游戏状态同步等场景,"客户端轮询"(每秒发一次 GET)的方案既浪费带宽,又引入不必要的延迟。

HTTP/1.1 时代出现了两种更高效的实时推送方式:WebSocket 和 SSE(Server-Sent Events)。它们解决的问题不同,适合不同场景,理解二者的区别是正确选型的前提。

特性WebSocketSSE(Server-Sent Events)HTTP 轮询
通信方向双向(全双工)单向(服务端→客户端)单向(客户端发起)
协议ws:// / wss://(独立协议)HTTP/1.1 长连接标准 HTTP
消息格式任意二进制/文本帧text/event-stream(纯文本)任意 HTTP 响应
浏览器支持全面支持全面支持(IE 除外)全面支持
自动重连需手动实现浏览器 EventSource 自动重连客户端定时发起
负载均衡需粘性会话(Sticky Session)需粘性会话无状态,简单
典型场景聊天、游戏、协同编辑通知推送、进度更新、AI 流式输出低频状态查询

核心概念词典

WebSocket
基于 TCP 的全双工通信协议。通过一次 HTTP Upgrade 握手将普通 HTTP 连接升级为持久 WebSocket 连接,之后客户端和服务端可以随时互相发送消息帧,无需每次建立新连接。SwiftNIO 内建对 WebSocket 帧的解析和序列化支持,Vapor 在此之上提供了更高级的 API。
SSE (Server-Sent Events)
利用 HTTP 长连接(Keep-Alive)实现的服务端推送机制。响应 Content-Type 为 text/event-stream,服务端可以持续向该连接写入格式化的事件行(data: ...),客户端的 EventSource API 会自动解析并分发事件。SSE 不需要特殊握手,标准 HTTP 负载均衡完全可用,且浏览器原生支持断线重连。
WebSocket 对象
Vapor 中代表单个 WebSocket 连接的对象。提供 send(_:)(发送文本/二进制消息)、onText(_:)(注册文本消息处理器)、onBinary(_:)(注册二进制处理器)、close()(主动关闭连接)等方法。该对象本身不是 Sendable,在 Swift 6 下需要通过 Actor 封装才能安全跨任务传递。
@Sendable 路由闭包
Swift 6 / Vapor 4.99.x 中,路由处理闭包被标记为 @Sendable,意味着闭包捕获的所有外部状态必须是 Sendable 类型。这防止了路由闭包在并发执行时意外共享非线程安全的状态。如需在路由闭包中访问共享状态(如在线用户列表),必须通过 actor 或其他并发安全机制。
Actor 连接注册表
Swift 的 Actor 类型保证其内部状态的串行访问,天然适合管理 WebSocket 连接池。把所有已连接的 WebSocket 对象维护在一个 Actor 中,任何广播操作都通过 await actor.broadcast(...) 进行,编译器保证同一时刻只有一个 Task 修改连接列表,彻底消除竞争条件。
AsyncStream / AsyncThrowingStream
Swift 标准库中用于将基于回调的事件源转换为异步序列的工具。可以把 WebSocket 的 onText 回调包装成 AsyncStream<String>,然后用 for await msg in stream 顺序处理每条消息,使代码结构更线性、更易于测试。对于 SSE,AsyncStream 可以将任意异步数据源转换为持续写入 HTTP 响应体的字节流。
粘性会话(Sticky Session)
负载均衡的一种配置策略:同一客户端的请求总是路由到同一台后端服务器。WebSocket 和 SSE 连接是有状态的长连接,一旦建立就必须由同一台服务器维持,因此在多实例部署时必须配置粘性会话,否则负载均衡器可能把同一个连接的消息路由到不同服务器实例,导致连接断开或消息丢失。

Vapor 中的 WebSocket

基本 echo 服务

Vapor 提供 app.webSocket(_:onUpgrade:) 方法注册 WebSocket 路由。路由升级成功后,回调接收到 ws 对象,随后可注册消息处理器。下面从最简单的 echo 服务开始理解整体结构:

// routes.swift — Swift 6 WebSocket echo 服务
import Vapor

func routes(_ app: Application) throws {

    // WebSocket 路由:ws://localhost:8080/echo
    // 闭包是 @Sendable,捕获的外部状态必须是 Sendable
    app.webSocket("echo") { req, ws async in

        // 注册文本消息处理器:收到什么就回什么
        ws.onText { ws, text async in
            try? await ws.send(text)
        }

        // 注册二进制消息处理器
        ws.onBinary { ws, buffer async in
            try? await ws.send(raw: buffer, opcode: .binary)
        }

        // 等待连接关闭(onClose 是 EventLoopFuture,用 .get() 桥接到 async)
        try? await ws.onClose.get()
        print("Client disconnected with code: \(ws.closeCode?.description ?? "none")")
    }
}
Swift 6 闭包捕获限制 在 Swift 6 的 @Sendable 闭包内,不能直接捕获非 Sendable 的引用类型。WebSocket 对象本身不是 Sendable,如果需要在多个 Task 中操作同一个 ws 对象(如广播),必须通过 Actor 中转。不要试图用 @unchecked Sendable 绕过检查——这只是告诉编译器"相信我",并不解决实际的线程安全问题,只是把错误从编译期推迟到运行期。

多客户端聊天室:Actor 连接注册表

真实的聊天室需要维护所有在线连接的列表,当某个客户端发消息时,广播给其他所有人。这是 Swift 6 Actor 的经典使用场景。Actor 保证同一时刻只有一个 Task 能修改连接字典,从根本上消除并发写的数据竞争:

// ChatRoom.swift — Actor 管理连接池
import Vapor

/// 每个连接的唯一标识(UUID 是值类型,自动 Sendable)
typealias ClientID = UUID

/// Actor:内部状态串行访问,并发安全
actor ChatRoom {
    private var connections: [ClientID: WebSocket] = [:]

    /// 新客户端连接时注册
    func connect(id: ClientID, ws: WebSocket) {
        connections[id] = ws
        print("[Room] 用户 \(id.uuidString.prefix(8)) 连接,当前在线:\(connections.count)")
    }

    /// 客户端断开时注销
    func disconnect(id: ClientID) {
        connections.removeValue(forKey: id)
        print("[Room] 用户 \(id.uuidString.prefix(8)) 断开,当前在线:\(connections.count)")
    }

    /// 广播消息给所有人(含发送者)
    func broadcast(text: String) async {
        for (_, ws) in connections {
            guard !ws.isClosed else { continue }
            try? await ws.send(text)
        }
    }

    /// 广播给所有人,排除发送者
    func broadcast(text: String, excluding senderID: ClientID) async {
        for (id, ws) in connections where id != senderID {
            guard !ws.isClosed else { continue }
            try? await ws.send(text)
        }
    }

    /// 当前在线人数(只读访问,也需要 await)
    var count: Int { connections.count }
}
// Application+ChatRoom.swift — 将 ChatRoom 注册为应用级单例服务
import Vapor

extension Application {
    // StorageKey 标记该服务在 Application 存储中的唯一键
    private struct ChatRoomKey: StorageKey {
        typealias Value = ChatRoom
    }

    var chatRoom: ChatRoom {
        get {
            if let existing = storage[ChatRoomKey.self] { return existing }
            let room = ChatRoom()
            storage[ChatRoomKey.self] = room
            return room
        }
    }
}

// routes.swift — 聊天室 WebSocket 路由(Swift 6)
func routes(_ app: Application) throws {

    app.webSocket("chat") { req, ws async in
        let clientID = ClientID()            // 为本次连接分配唯一 ID
        let room = req.application.chatRoom   // Actor 是 Sendable,可以在闭包中安全捕获

        // 1. 注册连接
        await room.connect(id: clientID, ws: ws)

        // 2. 收到消息 → 广播给其他人
        ws.onText { ws, text async in
            let nick = clientID.uuidString.prefix(6)
            await room.broadcast(text: "[\(nick)]: \(text)", excluding: clientID)
        }

        // 3. 等待连接关闭,然后注销
        try? await ws.onClose.get()
        await room.disconnect(id: clientID)
    }
}
WebSocket 聊天室架构(Swift 6 Actor 模型): Client A ──ws──┐ Client B ──ws──┤ ┌──────────────────────────────────────┐ Client C ──ws──┼───►│ ChatRoom (actor) │ Client D ──ws──┘ │ connections: [UUID: WebSocket] │ │ │ │ connect(id:ws:) → 写入字典 │ │ disconnect(id:) → 移除字典 │ │ broadcast(text:) → 遍历全部发送 │ │ broadcast(excluding:)→ 遍历排除发送 │ └──────────────────────────────────────┘ │ │ Actor 保证:同一时刻只有 │ 一个 Task 修改 connections ▼ 编译器强制:跨 Actor 边界调用必须 await → 彻底消除字典读写的数据竞争

Ping / Pong 心跳保活

WebSocket 连接长时间无数据时,中间代理(如 nginx、AWS ALB)可能将其视为空闲连接而主动断开。服务端定期发送 Ping 帧,客户端响应 Pong,可以保持连接活跃。在 Swift 6 中,心跳 Task 要处理好生命周期——连接关闭时必须取消心跳任务,否则会泄漏:

// 心跳保活 — 每 30 秒发一次 ping,连接关闭时取消
app.webSocket("live") { req, ws async in

    // 启动心跳后台 Task
    let pingTask = Task {
        while !Task.isCancelled && !ws.isClosed {
            try? await Task.sleep(for: .seconds(30))
            if !ws.isClosed {
                try? await ws.sendPing()
            }
        }
    }

    // 处理业务消息
    ws.onText { ws, text async in
        try? await ws.send("Echo: \(text)")
    }

    // 等待连接关闭,然后取消心跳 Task 避免泄漏
    try? await ws.onClose.get()
    pingTask.cancel()
}

Server-Sent Events(SSE)

SSE 协议格式详解

SSE 是普通的 HTTP 响应,但 Content-Type 必须是 text/event-stream,响应体是持续写入的文本行。每个事件由固定字段组成,字段之间用换行分隔,事件之间用空行分隔:

id: 42
event: priceUpdate
data: {"symbol":"AAPL","price":185.3}

data: 这是一个只含 data 的简单消息(event 字段省略时,客户端触发 onmessage)

: 注释行(冒号开头),浏览器忽略,常用来发送心跳保活 keepalive

retry: 3000
data: 告知浏览器断线后等待 3000ms 再重连

id 字段让浏览器在断线重连时通过 Last-Event-ID 请求头告知服务端已收到的最后事件 ID,服务端可以从该 ID 之后继续推送,实现断点续传。这是 SSE 相比 WebSocket 的独特优势之一——重连逻辑完全由浏览器内置,无需任何客户端代码。

Vapor 实现 SSE:股价推送示例

Vapor 没有专门的 SSE 高级 API,但可以用 AsyncStream 将任意异步数据源转换为 HTTP 流式响应体。这种方案与 Swift 6 的异步模型天然契合:

// routes.swift — SSE 股价推送(Swift 6 + AsyncStream)
import Vapor

func routes(_ app: Application) throws {

    // GET /prices/stream → 返回 SSE 流
    app.get("prices", "stream") { req async throws -> Response in

        // makeStream() 返回 (AsyncStream, Continuation) 对
        // Continuation 是 Sendable,可以安全跨 Task 传递
        let (stream, continuation) = AsyncStream<String>.makeStream()

        // 后台生产者 Task:每秒推送一条价格
        Task {
            var eventID = 0
            let symbols = ["AAPL", "GOOG", "TSLA", "MSFT"]

            while !Task.isCancelled {
                try? await Task.sleep(for: .seconds(1))
                eventID += 1
                let symbol = symbols[eventID % symbols.count]
                let price  = Double.random(in: 100..<300)

                // 严格遵守 SSE 格式:字段: 值\n,事件结束空行
                let event = """
                    id: \(eventID)
                    event: priceUpdate
                    data: {"symbol":"\(symbol)","price":\(String(format:"%.2f",price))}

                    """
                continuation.yield(event)
            }
            continuation.finish()
        }

        // 将 String 流映射为 ByteBuffer 流,构造 Response.Body
        let body = Response.Body(asyncSequence: stream.map { ByteBuffer(string: $0) })

        var headers = HTTPHeaders()
        headers[.contentType]      = "text/event-stream; charset=utf-8"
        headers[.cacheControl]     = "no-cache"
        headers[.connection]       = "keep-alive"
        headers["X-Accel-Buffering"] = "no"  // 关闭 nginx 缓冲,确保实时推送

        return Response(status: .ok, headers: headers, body: body)
    }
}

浏览器端 EventSource 消费 SSE

// 前端 JavaScript — 订阅 SSE 价格流
const source = new EventSource("/prices/stream");

// 监听自定义事件(对应服务端 event: priceUpdate)
source.addEventListener("priceUpdate", (e) => {
    const { symbol, price } = JSON.parse(e.data);
    document.getElementById(`price-\${symbol}`).textContent = price.toFixed(2);
});

// 监听通用消息(没有 event 字段的推送)
source.onmessage = (e) => console.log("msg:", e.data);

// 连接断开后,EventSource 自动按 retry 间隔重连(无需任何额外代码)
source.onerror = (e) => {
    if (source.readyState === EventSource.CLOSED) {
        console.log("连接已关闭");
    }
    // readyState === CONNECTING 时,浏览器正在自动重连
};

// 主动关闭
// source.close();
SSE 心跳保活注释行 某些反向代理(如 nginx、CloudFlare)会在无数据传输一定时间后关闭长连接。可以每 15 秒在 SSE 流中写入一个注释行来保持连接活跃:: keepalive\n\n。浏览器会忽略注释行,不触发任何事件回调,对应用逻辑没有影响。

实战:AI 流式输出(SSE + LLM)

大语言模型(LLM)的 Streaming API 是 SSE 最典型的现代用例——服务端在 LLM 逐 token 生成内容时,实时推送给前端,用户看到"打字机"效果,而不是等待整个回复生成完再一次性显示。

// AIStreamHandler.swift — 将 LLM 流式响应转发给浏览器(Swift 6)
import Vapor

struct ChatRequest: Content {
    let message: String
}

app.post("ai", "stream") { req async throws -> Response in
    let chat = try req.content.decode(ChatRequest.self)

    let (stream, continuation) = AsyncStream<String>.makeStream()

    Task {
        do {
            // 调用 LLM 客户端,llmStream 是 AsyncSequence<String>
            // 实际使用时替换为 OpenAI、Anthropic 等 SDK
            let llmStream = try await LLMClient.streamCompletion(prompt: chat.message)
            for try await token in llmStream {
                // 每个 token 作为 SSE data 推送
                continuation.yield("data: \(token)\n\n")
            }
            // OpenAI 兼容的结束标记
            continuation.yield("data: [DONE]\n\n")
        } catch {
            let errMsg = error.localizedDescription
                .replacingOccurrences(of: "\n", with: " ")
            continuation.yield("data: {\"error\":\"\(errMsg)\"}\n\n")
        }
        continuation.finish()
    }

    var headers = HTTPHeaders()
    headers[.contentType]  = "text/event-stream; charset=utf-8"
    headers[.cacheControl] = "no-cache"

    return Response(
        status: .ok,
        headers: headers,
        body: .init(asyncSequence: stream.map { ByteBuffer(string: $0) })
    )
}

WebSocket 连接认证

WebSocket 握手发生在 HTTP 层,因此可以在握手建立后立即验证 JWT 令牌或 Session。由于浏览器的 WebSocket API 不支持自定义请求头,Token 通常通过 URL Query 参数传递:

// 带 JWT 认证的 WebSocket — 握手后即时验证,无效则关闭连接
app.webSocket("chat") { req, ws async in
    // 从 Query 参数提取 token:ws://host/chat?token=xxx
    guard let token = req.query[String.self, at: "token"] else {
        try? await ws.close(code: .policyViolation)
        return
    }

    do {
        let payload = try req.jwt.verify(token, as: UserPayload.self)
        let room = req.application.chatRoom

        let clientID = ClientID()
        await room.connect(id: clientID, ws: ws)

        ws.onText { ws, text async in
            await room.broadcast(
                text: "[\(payload.username)]: \(text)",
                excluding: clientID
            )
        }

        try? await ws.onClose.get()
        await room.disconnect(id: clientID)

    } catch {
        // Token 验证失败:关闭连接并拒绝握手
        try? await ws.close(code: .policyViolation)
    }
}
WebSocket Token 安全注意 URL Query 参数中的 Token 会出现在服务端日志、浏览器历史和 HTTP 中间件的访问日志中,存在泄漏风险。生产环境中,更安全的方案是:连接建立后立即发送一条包含 Token 的文本消息作为"握手包",服务端在收到前不响应任何业务消息;或者改用 Cookie 认证(在首次升级请求中携带 HttpOnly Cookie)。
本章小结 WebSocket 适合双向实时通信(聊天、游戏、协同编辑),SSE 适合服务端单向推送(通知、进度、AI 流式输出)。在 Swift 6 并发模型下,用 Actor 管理 WebSocket 连接池是保证并发安全的标准做法——Actor 内部状态串行访问,让广播操作不会出现字典竞争。SSE 借助 AsyncStream + Response.Body 可以干净地将任何异步数据源转换为 HTTP 流式响应,与 Swift 6 的异步序列模型无缝衔接。下一章将聚焦如何为这些功能编写可靠的自动化测试。