两个时代的并发:ELF vs async/await
Vapor 4 发布于 Swift Concurrency(async/await、Actor)成熟之前,因此其早期 API 大量使用 EventLoopFuture(简称 ELF)。随着 Swift 5.5 引入原生并发,Vapor 4 也逐步提供了 async/await 版本的 API,两者可以共存。
理解两者的区别,不只是语法偏好,更是运行时模型的差异:ELF 是基于回调的 Future/Promise 模式,链式调用(flatMap、map)在编译期难以发现所有问题;async/await 则将异步代码写成同步风格,借助 Swift 的结构化并发(Structured Concurrency)和编译器检查,大大降低了犯错的概率。
| 特性 | EventLoopFuture (ELF) | async/await |
|---|---|---|
| 语法风格 | 链式回调,嵌套可能深 | 顺序线性,与同步代码相似 |
| 错误处理 | .flatMapError 链 | 标准 try/catch |
| 编译器支持 | 较少检查 | 强类型检查,Sendable 保障 |
| 调试栈帧 | 难以追踪 | 保留完整调用栈 |
| Vapor 4 支持 | 全面(老 API) | 推荐(新 API) |
| 性能 | 极高 | 极高(编译为协程) |
核心概念词典
.map(转换成功值)、.flatMap(链接另一个 Future)、.whenComplete(注册回调)等方法组合。是 Vapor 旧版 API 的基础类型。async 函数可以暂停执行(挂起),await 等待异步结果恢复。挂起期间线程不被阻塞,Swift 运行时会调度其他工作。本质上是对 EventLoopFuture 的语法糖,但由编译器和运行时协作实现。await,编译器强制此规则。Vapor 限流器等共享状态的组件是 Actor 的典型使用场景。Task 是并发执行的独立工作单元,类似轻量级线程。Task.detached 创建不继承上下文的独立任务;withTaskGroup 创建可结构化管理的任务组,支持并行执行多个异步操作并收集结果。ELF 与 async/await 对比示例
下面用同一个"查询用户及其文章"的场景,分别展示 ELF 和 async/await 两种写法,直观感受可读性差异:
// ── ELF 写法(Vapor 旧式 API)──────────────────────────
app.get("users", ":id", "posts") { req -> EventLoopFuture<[PostResponse]> in
let userID = try req.parameters.require("id", as: UUID.self)
return User.find(userID, on: req.db) // EventLoopFuture<User?>
.unwrap(or: Abort(.notFound)) // 解包 Optional
.flatMap { user in // 链接下一个 Future
user.$posts.query(on: req.db)
.filter(\.$published == true)
.all()
}
.map { posts in // 转换结果
posts.map { PostResponse(from: $0) }
}
}
// ── async/await 写法(推荐)──────────────────────────────
app.get("users", ":id", "posts") { req async throws -> [PostResponse] in
let userID = try req.parameters.require("id", as: UUID.self)
guard let user = try await User.find(userID, on: req.db) else {
throw Abort(.notFound)
}
let posts = try await user.$posts.query(on: req.db)
.filter(\.$published == true)
.all()
return posts.map { PostResponse(from: $0) }
}
并发执行多个异步操作
async/await 的顺序写法默认是串行的——每个 await 等前一个完成再继续。如果几个操作相互独立,用 async let 或 withTaskGroup 可以并行执行,减少总耗时:
// 串行(慢):两次数据库查询依次执行,总耗时 = A + B
let user = try await User.find(userID, on: db)
let posts = try await Post.query(on: db).filter(\.$published==true).all()
// 并行(快):两个查询同时发起,总耗时 = max(A, B)
async let userFetch = User.find(userID, on: db)
async let postsFetch = Post.query(on: db).filter(\.$published==true).all()
let (user, posts) = try await (userFetch, postsFetch)
// TaskGroup:并行处理动态数量的任务
let enrichedPosts = try await withThrowingTaskGroup(of: EnrichedPost.self) { group in
for post in posts {
group.addTask {
let comments = try await post.$comments.query(on: db).count()
return EnrichedPost(post: post, commentCount: comments)
}
}
var results: [EnrichedPost] = []
for try await result in group {
results.append(result)
}
return results
}
多请求并发处理模型
EventLoopFuture 的老 API 时,使用 .get() 方法将 ELF 转换为 async:let result = try await someELF.get()。反向转换(async → ELF)使用 req.eventLoop.performWithTask { try await asyncFunc() }。不要跨 EventLoop 传递 Future,使用 .hop(to: eventLoop) 正确切换。
Actor 模型:从根本上消除数据竞争
传统多线程编程中,共享可变状态(shared mutable state)是 Bug 的主要来源:两个线程同时读写同一变量,结果不可预测。Actor 是 Swift 对这个问题的根本解决方案——编译器而非程序员来保证串行访问。
Actor 的核心保证:同一时刻只有一个 Task 可以执行 Actor 的可变方法。其他 Task 必须 await 等待,编译器在编译期强制此规则,违反者直接报错。
// ── 错误示例:用 class 存储共享状态(数据竞争风险)────────────
final class BadCounter {
var count = 0
// 两个并发 Task 同时调用 increment() → count 值不可预测
func increment() { count += 1 }
}
// ── 正确做法:用 actor 保证串行访问 ──────────────────────────
actor RequestCounter {
private var counts: [String: Int] = [:] // 按路由统计请求数
private var total = 0
// 所有方法均在 Actor 内部串行执行,无数据竞争
func record(path: String) {
counts[path, default: 0] += 1
total += 1
}
// nonisolated:不访问可变状态,不需要 await
nonisolated var description: String { "RequestCounter" }
func snapshot() -> (counts: [String: Int], total: Int) {
(counts, total)
}
}
// 使用:Actor 方法必须 await
let counter = RequestCounter()
app.get("stats") { req async throws -> Response in
await counter.record(path: req.url.path) // await 必须,编译器强制
let snap = await counter.snapshot()
return try req.response(.ok, content: snap.counts)
}
Sendable 协议:跨并发边界的类型安全
Sendable 是 Swift 6 strict concurrency 的核心协议。它标记一个类型可以在不同的并发域(不同 Task、不同 Actor)之间安全传递,不会引发数据竞争。Swift 6 模式下,编译器会对所有跨并发边界传递的类型进行检查。
@unchecked Sendable(使用 NSLock 等),或将其改为 Actor(自动符合 Sendable),或改为 struct。Task { } 和 withTaskGroup 的闭包需要标注 @Sendable,确保闭包内捕获的所有值都是 Sendable 类型,防止捕获非线程安全的对象。// Vapor 路由中常见的 Sendable 场景
// ✅ struct 自动符合 Sendable(所有属性均为 Sendable)
struct UserDTO: Content, Sendable {
let id: UUID
let name: String
let email: String
}
// ✅ Actor 自动符合 Sendable
actor UserCache {
private var cache: [UUID: UserDTO] = [:]
func get(_ id: UUID) -> UserDTO? { cache[id] }
func set(_ dto: UserDTO) { cache[dto.id] = dto }
}
// ⚠️ class 需要 @unchecked Sendable + 手动锁保护
final class LegacyConfig: @unchecked Sendable {
private let lock = NSLock()
private var _value: String = ""
var value: String {
get { lock.withLock { _value } }
set { lock.withLock { _value = newValue } }
}
}
.swiftLanguageVersions([.v5]) 保持兼容,再逐模块开启 swiftSettings: [.enableUpcomingFeature("StrictConcurrency")] 修复问题。
ELF 到 async/await 迁移指南
如果你在维护使用旧版 ELF API 的 Vapor 项目,以下迁移模式会经常用到:
// ── 模式 1:ELF 转 async(最常见)────────────────────────────
// 旧:返回 EventLoopFuture
func findUser(_ id: UUID, db: Database) -> EventLoopFuture<User?> {
User.find(id, on: db)
}
// 新:async 写法,调用处加 try await
func findUser(_ id: UUID, db: Database) async throws -> User? {
try await User.find(id, on: db)
}
// ── 模式 2:在 async 上下文中调用 ELF API ─────────────────────
func hybridHandler(req: Request) async throws -> String {
// .get() 将 ELF 转为 async 并 await
let result = try await someLegacyELFFunc().get()
return result
}
// ── 模式 3:ELF.flatMap 链 → async 顺序调用 ──────────────────
// 旧写法:.flatMap 嵌套
return fetchUser(id)
.flatMap { user in fetchPosts(for: user) }
.map { posts in posts.map { PostDTO(from: $0) } }
// 新写法:async 线性代码
let user = try await fetchUser(id)
let posts = try await fetchPosts(for: user)
return posts.map { PostDTO(from: $0) }
req.application.threadPool.runIfActive 或 NIOThreadPool 移到独立线程:let data = try await req.application.threadPool.runIfActive(eventLoop: req.eventLoop) { try Data(contentsOf: url) }.get()
实践:并发安全的缓存服务
将本章知识综合运用,实现一个并发安全的内存缓存,在 Vapor 的 Application 生命周期中共享:
// Sources/App/Services/MemoryCacheService.swift
import Vapor
// Actor 保证线程安全,Sendable 允许跨并发域传递
actor MemoryCacheService: Sendable {
private struct Entry {
let value: any Sendable
let expiresAt: Date?
}
private var store: [String: Entry] = [:]
// 设置缓存,可选 TTL(秒)
func set(_ key: String, value: some Sendable, ttl: TimeInterval? = nil) {
let expiresAt = ttl.map { Date().addingTimeInterval($0) }
store[key] = Entry(value: value, expiresAt: expiresAt)
}
// 获取缓存(过期自动删除)
func get<T: Sendable>(_ key: String, as: T.Type = T.self) -> T? {
guard let entry = store[key] else { return nil }
if let exp = entry.expiresAt, Date() > exp {
store.removeValue(forKey: key)
return nil
}
return entry.value as? T
}
func delete(_ key: String) { store.removeValue(forKey: key) }
func flush() { store.removeAll() }
}
// 挂载到 Application,全局复用同一实例
extension Application {
private struct CacheKey: StorageKey {
typealias Value = MemoryCacheService
}
var cache: MemoryCacheService {
get {
if let existing = storage[CacheKey.self] { return existing }
let cache = MemoryCacheService()
storage[CacheKey.self] = cache
return cache
}
}
}
// 在路由中使用缓存(注意:Actor 方法必须 await)
app.get("users", ":id") { req async throws -> UserDTO in
let id = try req.parameters.require("id", as: UUID.self)
let cacheKey = "user:\(id)"
// 先查缓存
if let cached: UserDTO = await req.application.cache.get(cacheKey) {
return cached
}
// 缓存未命中,查数据库
guard let user = try await User.find(id, on: req.db) else {
throw Abort(.notFound)
}
let dto = UserDTO(from: user)
// 写入缓存,TTL 5分钟
await req.application.cache.set(cacheKey, value: dto, ttl: 300)
return dto
}
async let 和 TaskGroup 让独立操作并行执行;Actor 从语言层面消除数据竞争,编译器强制 await 访问,是 Swift 并发安全的核心机制;Sendable 协议是跨并发域类型安全的保障,Swift 6 中由警告升级为编译错误。迁移老 ELF 代码:用 .get() 在 async 上下文中 await ELF,阻塞操作使用 threadPool 移到独立线程。