为什么需要实时通信?
传统 HTTP 是一问一答的模型:客户端发请求,服务端返回响应,连接随即关闭。这对于查询用户信息、提交表单等场景足够用,但对于聊天室、在线协作、股价行情、游戏状态同步等场景,"客户端轮询"(每秒发一次 GET)的方案既浪费带宽,又引入不必要的延迟。
HTTP/1.1 时代出现了两种更高效的实时推送方式:WebSocket 和 SSE(Server-Sent Events)。它们解决的问题不同,适合不同场景,理解二者的区别是正确选型的前提。
| 特性 | WebSocket | SSE(Server-Sent Events) | HTTP 轮询 |
|---|---|---|---|
| 通信方向 | 双向(全双工) | 单向(服务端→客户端) | 单向(客户端发起) |
| 协议 | ws:// / wss://(独立协议) | HTTP/1.1 长连接 | 标准 HTTP |
| 消息格式 | 任意二进制/文本帧 | text/event-stream(纯文本) | 任意 HTTP 响应 |
| 浏览器支持 | 全面支持 | 全面支持(IE 除外) | 全面支持 |
| 自动重连 | 需手动实现 | 浏览器 EventSource 自动重连 | 客户端定时发起 |
| 负载均衡 | 需粘性会话(Sticky Session) | 需粘性会话 | 无状态,简单 |
| 典型场景 | 聊天、游戏、协同编辑 | 通知推送、进度更新、AI 流式输出 | 低频状态查询 |
核心概念词典
text/event-stream,服务端可以持续向该连接写入格式化的事件行(data: ...),客户端的 EventSource API 会自动解析并分发事件。SSE 不需要特殊握手,标准 HTTP 负载均衡完全可用,且浏览器原生支持断线重连。send(_:)(发送文本/二进制消息)、onText(_:)(注册文本消息处理器)、onBinary(_:)(注册二进制处理器)、close()(主动关闭连接)等方法。该对象本身不是 Sendable,在 Swift 6 下需要通过 Actor 封装才能安全跨任务传递。@Sendable,意味着闭包捕获的所有外部状态必须是 Sendable 类型。这防止了路由闭包在并发执行时意外共享非线程安全的状态。如需在路由闭包中访问共享状态(如在线用户列表),必须通过 actor 或其他并发安全机制。await actor.broadcast(...) 进行,编译器保证同一时刻只有一个 Task 修改连接列表,彻底消除竞争条件。onText 回调包装成 AsyncStream<String>,然后用 for await msg in stream 顺序处理每条消息,使代码结构更线性、更易于测试。对于 SSE,AsyncStream 可以将任意异步数据源转换为持续写入 HTTP 响应体的字节流。Vapor 中的 WebSocket
基本 echo 服务
Vapor 提供 app.webSocket(_:onUpgrade:) 方法注册 WebSocket 路由。路由升级成功后,回调接收到 ws 对象,随后可注册消息处理器。下面从最简单的 echo 服务开始理解整体结构:
// routes.swift — Swift 6 WebSocket echo 服务
import Vapor
func routes(_ app: Application) throws {
// WebSocket 路由:ws://localhost:8080/echo
// 闭包是 @Sendable,捕获的外部状态必须是 Sendable
app.webSocket("echo") { req, ws async in
// 注册文本消息处理器:收到什么就回什么
ws.onText { ws, text async in
try? await ws.send(text)
}
// 注册二进制消息处理器
ws.onBinary { ws, buffer async in
try? await ws.send(raw: buffer, opcode: .binary)
}
// 等待连接关闭(onClose 是 EventLoopFuture,用 .get() 桥接到 async)
try? await ws.onClose.get()
print("Client disconnected with code: \(ws.closeCode?.description ?? "none")")
}
}
@Sendable 闭包内,不能直接捕获非 Sendable 的引用类型。WebSocket 对象本身不是 Sendable,如果需要在多个 Task 中操作同一个 ws 对象(如广播),必须通过 Actor 中转。不要试图用 @unchecked Sendable 绕过检查——这只是告诉编译器"相信我",并不解决实际的线程安全问题,只是把错误从编译期推迟到运行期。
多客户端聊天室:Actor 连接注册表
真实的聊天室需要维护所有在线连接的列表,当某个客户端发消息时,广播给其他所有人。这是 Swift 6 Actor 的经典使用场景。Actor 保证同一时刻只有一个 Task 能修改连接字典,从根本上消除并发写的数据竞争:
// ChatRoom.swift — Actor 管理连接池
import Vapor
/// 每个连接的唯一标识(UUID 是值类型,自动 Sendable)
typealias ClientID = UUID
/// Actor:内部状态串行访问,并发安全
actor ChatRoom {
private var connections: [ClientID: WebSocket] = [:]
/// 新客户端连接时注册
func connect(id: ClientID, ws: WebSocket) {
connections[id] = ws
print("[Room] 用户 \(id.uuidString.prefix(8)) 连接,当前在线:\(connections.count)")
}
/// 客户端断开时注销
func disconnect(id: ClientID) {
connections.removeValue(forKey: id)
print("[Room] 用户 \(id.uuidString.prefix(8)) 断开,当前在线:\(connections.count)")
}
/// 广播消息给所有人(含发送者)
func broadcast(text: String) async {
for (_, ws) in connections {
guard !ws.isClosed else { continue }
try? await ws.send(text)
}
}
/// 广播给所有人,排除发送者
func broadcast(text: String, excluding senderID: ClientID) async {
for (id, ws) in connections where id != senderID {
guard !ws.isClosed else { continue }
try? await ws.send(text)
}
}
/// 当前在线人数(只读访问,也需要 await)
var count: Int { connections.count }
}
// Application+ChatRoom.swift — 将 ChatRoom 注册为应用级单例服务
import Vapor
extension Application {
// StorageKey 标记该服务在 Application 存储中的唯一键
private struct ChatRoomKey: StorageKey {
typealias Value = ChatRoom
}
var chatRoom: ChatRoom {
get {
if let existing = storage[ChatRoomKey.self] { return existing }
let room = ChatRoom()
storage[ChatRoomKey.self] = room
return room
}
}
}
// routes.swift — 聊天室 WebSocket 路由(Swift 6)
func routes(_ app: Application) throws {
app.webSocket("chat") { req, ws async in
let clientID = ClientID() // 为本次连接分配唯一 ID
let room = req.application.chatRoom // Actor 是 Sendable,可以在闭包中安全捕获
// 1. 注册连接
await room.connect(id: clientID, ws: ws)
// 2. 收到消息 → 广播给其他人
ws.onText { ws, text async in
let nick = clientID.uuidString.prefix(6)
await room.broadcast(text: "[\(nick)]: \(text)", excluding: clientID)
}
// 3. 等待连接关闭,然后注销
try? await ws.onClose.get()
await room.disconnect(id: clientID)
}
}
Ping / Pong 心跳保活
WebSocket 连接长时间无数据时,中间代理(如 nginx、AWS ALB)可能将其视为空闲连接而主动断开。服务端定期发送 Ping 帧,客户端响应 Pong,可以保持连接活跃。在 Swift 6 中,心跳 Task 要处理好生命周期——连接关闭时必须取消心跳任务,否则会泄漏:
// 心跳保活 — 每 30 秒发一次 ping,连接关闭时取消
app.webSocket("live") { req, ws async in
// 启动心跳后台 Task
let pingTask = Task {
while !Task.isCancelled && !ws.isClosed {
try? await Task.sleep(for: .seconds(30))
if !ws.isClosed {
try? await ws.sendPing()
}
}
}
// 处理业务消息
ws.onText { ws, text async in
try? await ws.send("Echo: \(text)")
}
// 等待连接关闭,然后取消心跳 Task 避免泄漏
try? await ws.onClose.get()
pingTask.cancel()
}
Server-Sent Events(SSE)
SSE 协议格式详解
SSE 是普通的 HTTP 响应,但 Content-Type 必须是 text/event-stream,响应体是持续写入的文本行。每个事件由固定字段组成,字段之间用换行分隔,事件之间用空行分隔:
id: 42
event: priceUpdate
data: {"symbol":"AAPL","price":185.3}
data: 这是一个只含 data 的简单消息(event 字段省略时,客户端触发 onmessage)
: 注释行(冒号开头),浏览器忽略,常用来发送心跳保活 keepalive
retry: 3000
data: 告知浏览器断线后等待 3000ms 再重连
id 字段让浏览器在断线重连时通过 Last-Event-ID 请求头告知服务端已收到的最后事件 ID,服务端可以从该 ID 之后继续推送,实现断点续传。这是 SSE 相比 WebSocket 的独特优势之一——重连逻辑完全由浏览器内置,无需任何客户端代码。
Vapor 实现 SSE:股价推送示例
Vapor 没有专门的 SSE 高级 API,但可以用 AsyncStream 将任意异步数据源转换为 HTTP 流式响应体。这种方案与 Swift 6 的异步模型天然契合:
// routes.swift — SSE 股价推送(Swift 6 + AsyncStream)
import Vapor
func routes(_ app: Application) throws {
// GET /prices/stream → 返回 SSE 流
app.get("prices", "stream") { req async throws -> Response in
// makeStream() 返回 (AsyncStream, Continuation) 对
// Continuation 是 Sendable,可以安全跨 Task 传递
let (stream, continuation) = AsyncStream<String>.makeStream()
// 后台生产者 Task:每秒推送一条价格
Task {
var eventID = 0
let symbols = ["AAPL", "GOOG", "TSLA", "MSFT"]
while !Task.isCancelled {
try? await Task.sleep(for: .seconds(1))
eventID += 1
let symbol = symbols[eventID % symbols.count]
let price = Double.random(in: 100..<300)
// 严格遵守 SSE 格式:字段: 值\n,事件结束空行
let event = """
id: \(eventID)
event: priceUpdate
data: {"symbol":"\(symbol)","price":\(String(format:"%.2f",price))}
"""
continuation.yield(event)
}
continuation.finish()
}
// 将 String 流映射为 ByteBuffer 流,构造 Response.Body
let body = Response.Body(asyncSequence: stream.map { ByteBuffer(string: $0) })
var headers = HTTPHeaders()
headers[.contentType] = "text/event-stream; charset=utf-8"
headers[.cacheControl] = "no-cache"
headers[.connection] = "keep-alive"
headers["X-Accel-Buffering"] = "no" // 关闭 nginx 缓冲,确保实时推送
return Response(status: .ok, headers: headers, body: body)
}
}
浏览器端 EventSource 消费 SSE
// 前端 JavaScript — 订阅 SSE 价格流
const source = new EventSource("/prices/stream");
// 监听自定义事件(对应服务端 event: priceUpdate)
source.addEventListener("priceUpdate", (e) => {
const { symbol, price } = JSON.parse(e.data);
document.getElementById(`price-\${symbol}`).textContent = price.toFixed(2);
});
// 监听通用消息(没有 event 字段的推送)
source.onmessage = (e) => console.log("msg:", e.data);
// 连接断开后,EventSource 自动按 retry 间隔重连(无需任何额外代码)
source.onerror = (e) => {
if (source.readyState === EventSource.CLOSED) {
console.log("连接已关闭");
}
// readyState === CONNECTING 时,浏览器正在自动重连
};
// 主动关闭
// source.close();
: keepalive\n\n。浏览器会忽略注释行,不触发任何事件回调,对应用逻辑没有影响。
实战:AI 流式输出(SSE + LLM)
大语言模型(LLM)的 Streaming API 是 SSE 最典型的现代用例——服务端在 LLM 逐 token 生成内容时,实时推送给前端,用户看到"打字机"效果,而不是等待整个回复生成完再一次性显示。
// AIStreamHandler.swift — 将 LLM 流式响应转发给浏览器(Swift 6)
import Vapor
struct ChatRequest: Content {
let message: String
}
app.post("ai", "stream") { req async throws -> Response in
let chat = try req.content.decode(ChatRequest.self)
let (stream, continuation) = AsyncStream<String>.makeStream()
Task {
do {
// 调用 LLM 客户端,llmStream 是 AsyncSequence<String>
// 实际使用时替换为 OpenAI、Anthropic 等 SDK
let llmStream = try await LLMClient.streamCompletion(prompt: chat.message)
for try await token in llmStream {
// 每个 token 作为 SSE data 推送
continuation.yield("data: \(token)\n\n")
}
// OpenAI 兼容的结束标记
continuation.yield("data: [DONE]\n\n")
} catch {
let errMsg = error.localizedDescription
.replacingOccurrences(of: "\n", with: " ")
continuation.yield("data: {\"error\":\"\(errMsg)\"}\n\n")
}
continuation.finish()
}
var headers = HTTPHeaders()
headers[.contentType] = "text/event-stream; charset=utf-8"
headers[.cacheControl] = "no-cache"
return Response(
status: .ok,
headers: headers,
body: .init(asyncSequence: stream.map { ByteBuffer(string: $0) })
)
}
WebSocket 连接认证
WebSocket 握手发生在 HTTP 层,因此可以在握手建立后立即验证 JWT 令牌或 Session。由于浏览器的 WebSocket API 不支持自定义请求头,Token 通常通过 URL Query 参数传递:
// 带 JWT 认证的 WebSocket — 握手后即时验证,无效则关闭连接
app.webSocket("chat") { req, ws async in
// 从 Query 参数提取 token:ws://host/chat?token=xxx
guard let token = req.query[String.self, at: "token"] else {
try? await ws.close(code: .policyViolation)
return
}
do {
let payload = try req.jwt.verify(token, as: UserPayload.self)
let room = req.application.chatRoom
let clientID = ClientID()
await room.connect(id: clientID, ws: ws)
ws.onText { ws, text async in
await room.broadcast(
text: "[\(payload.username)]: \(text)",
excluding: clientID
)
}
try? await ws.onClose.get()
await room.disconnect(id: clientID)
} catch {
// Token 验证失败:关闭连接并拒绝握手
try? await ws.close(code: .policyViolation)
}
}
AsyncStream + Response.Body 可以干净地将任何异步数据源转换为 HTTP 流式响应,与 Swift 6 的异步序列模型无缝衔接。下一章将聚焦如何为这些功能编写可靠的自动化测试。