Chapter 07

Swift 并发模型

从 EventLoopFuture 到 async/await,理解 Swift Concurrency 的设计哲学,用 Actor 消除数据竞争。

两个时代的并发:ELF vs async/await

Vapor 4 发布于 Swift Concurrency(async/await、Actor)成熟之前,因此其早期 API 大量使用 EventLoopFuture(简称 ELF)。随着 Swift 5.5 引入原生并发,Vapor 4 也逐步提供了 async/await 版本的 API,两者可以共存。

理解两者的区别,不只是语法偏好,更是运行时模型的差异:ELF 是基于回调的 Future/Promise 模式,链式调用(flatMapmap)在编译期难以发现所有问题;async/await 则将异步代码写成同步风格,借助 Swift 的结构化并发(Structured Concurrency)和编译器检查,大大降低了犯错的概率。

特性EventLoopFuture (ELF)async/await
语法风格链式回调,嵌套可能深顺序线性,与同步代码相似
错误处理.flatMapError标准 try/catch
编译器支持较少检查强类型检查,Sendable 保障
调试栈帧难以追踪保留完整调用栈
Vapor 4 支持全面(老 API)推荐(新 API)
性能极高极高(编译为协程)

核心概念词典

EventLoop
NIO 的单线程事件循环,每个 EventLoop 绑定一个 OS 线程,不断轮询 I/O 事件。Vapor 启动时创建 CPU 核心数个 EventLoop(通常 4-8 个),所有请求分配到某个 EventLoop 上处理,I/O 期间不阻塞线程。
EventLoopFuture<T>
代表一个尚未完成的异步计算,最终会产生类型 T 的值或错误。通过 .map(转换成功值)、.flatMap(链接另一个 Future)、.whenComplete(注册回调)等方法组合。是 Vapor 旧版 API 的基础类型。
async/await
Swift 5.5 引入的原生异步语法。async 函数可以暂停执行(挂起),await 等待异步结果恢复。挂起期间线程不被阻塞,Swift 运行时会调度其他工作。本质上是对 EventLoopFuture 的语法糖,但由编译器和运行时协作实现。
Actor
Swift 的引用类型,内部状态保证串行访问(同一时刻只有一个 Task 执行其内部代码),从根本上消除数据竞争。访问 Actor 的方法必须 await,编译器强制此规则。Vapor 限流器等共享状态的组件是 Actor 的典型使用场景。
Sendable
Swift 协议,标记一个类型可以安全地在并发域(Task、Actor)之间传递。值类型(struct、enum)通常自动符合 Sendable;引用类型(class)需要手动保证线程安全后才能声明为 Sendable。是 Swift Concurrency 类型安全的基础之一。
Task / TaskGroup
Task 是并发执行的独立工作单元,类似轻量级线程。Task.detached 创建不继承上下文的独立任务;withTaskGroup 创建可结构化管理的任务组,支持并行执行多个异步操作并收集结果。

ELF 与 async/await 对比示例

下面用同一个"查询用户及其文章"的场景,分别展示 ELF 和 async/await 两种写法,直观感受可读性差异:

// ── ELF 写法(Vapor 旧式 API)──────────────────────────
app.get("users", ":id", "posts") { req -> EventLoopFuture<[PostResponse]> in
    let userID = try req.parameters.require("id", as: UUID.self)

    return User.find(userID, on: req.db)          // EventLoopFuture<User?>
        .unwrap(or: Abort(.notFound))             // 解包 Optional
        .flatMap { user in                         // 链接下一个 Future
            user.$posts.query(on: req.db)
                .filter(\.$published == true)
                .all()
        }
        .map { posts in                            // 转换结果
            posts.map { PostResponse(from: $0) }
        }
}

// ── async/await 写法(推荐)──────────────────────────────
app.get("users", ":id", "posts") { req async throws -> [PostResponse] in
    let userID = try req.parameters.require("id", as: UUID.self)

    guard let user = try await User.find(userID, on: req.db) else {
        throw Abort(.notFound)
    }

    let posts = try await user.$posts.query(on: req.db)
        .filter(\.$published == true)
        .all()

    return posts.map { PostResponse(from: $0) }
}

并发执行多个异步操作

async/await 的顺序写法默认是串行的——每个 await 等前一个完成再继续。如果几个操作相互独立,用 async letwithTaskGroup 可以并行执行,减少总耗时:

// 串行(慢):两次数据库查询依次执行,总耗时 = A + B
let user  = try await User.find(userID, on: db)
let posts = try await Post.query(on: db).filter(\.$published==true).all()

// 并行(快):两个查询同时发起,总耗时 = max(A, B)
async let userFetch  = User.find(userID, on: db)
async let postsFetch = Post.query(on: db).filter(\.$published==true).all()
let (user, posts) = try await (userFetch, postsFetch)

// TaskGroup:并行处理动态数量的任务
let enrichedPosts = try await withThrowingTaskGroup(of: EnrichedPost.self) { group in
    for post in posts {
        group.addTask {
            let comments = try await post.$comments.query(on: db).count()
            return EnrichedPost(post: post, commentCount: comments)
        }
    }
    var results: [EnrichedPost] = []
    for try await result in group {
        results.append(result)
    }
    return results
}

多请求并发处理模型

Vapor 多请求并发处理(4 核 CPU = 4 个 EventLoop): 请求队列:req1 req2 req3 req4 req5 req6 ... │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ EL-0 ── [req1] ────── await DB ────────── [req5] ── │ │ 挂起,不阻塞线程 │ EL-1 ── [req2] ─── await DB ─────────────[req6] ── │ │ EL-2 ── [req3] ──────── await DB ──────────────── EL-3 ── [req4] ──────────── await DB ──────────── 每个 EventLoop 线程: - 不阻塞,遇到 await 立即处理其他请求 - 4 个线程轻松支撑数千并发连接 - 与"每请求一线程"模型(需要数千线程)形成对比
混用 ELF 和 async/await 时的注意事项 当你在 async 上下文中需要调用返回 EventLoopFuture 的老 API 时,使用 .get() 方法将 ELF 转换为 async:let result = try await someELF.get()。反向转换(async → ELF)使用 req.eventLoop.performWithTask { try await asyncFunc() }。不要跨 EventLoop 传递 Future,使用 .hop(to: eventLoop) 正确切换。

Actor 模型:从根本上消除数据竞争

传统多线程编程中,共享可变状态(shared mutable state)是 Bug 的主要来源:两个线程同时读写同一变量,结果不可预测。Actor 是 Swift 对这个问题的根本解决方案——编译器而非程序员来保证串行访问。

Actor 的核心保证:同一时刻只有一个 Task 可以执行 Actor 的可变方法。其他 Task 必须 await 等待,编译器在编译期强制此规则,违反者直接报错。

// ── 错误示例:用 class 存储共享状态(数据竞争风险)────────────
final class BadCounter {
    var count = 0
    // 两个并发 Task 同时调用 increment() → count 值不可预测
    func increment() { count += 1 }
}

// ── 正确做法:用 actor 保证串行访问 ──────────────────────────
actor RequestCounter {
    private var counts: [String: Int] = [:]   // 按路由统计请求数
    private var total = 0

    // 所有方法均在 Actor 内部串行执行,无数据竞争
    func record(path: String) {
        counts[path, default: 0] += 1
        total += 1
    }

    // nonisolated:不访问可变状态,不需要 await
    nonisolated var description: String { "RequestCounter" }

    func snapshot() -> (counts: [String: Int], total: Int) {
        (counts, total)
    }
}

// 使用:Actor 方法必须 await
let counter = RequestCounter()

app.get("stats") { req async throws -> Response in
    await counter.record(path: req.url.path)   // await 必须,编译器强制
    let snap = await counter.snapshot()
    return try req.response(.ok, content: snap.counts)
}

Sendable 协议:跨并发边界的类型安全

Sendable 是 Swift 6 strict concurrency 的核心协议。它标记一个类型可以在不同的并发域(不同 Task、不同 Actor)之间安全传递,不会引发数据竞争。Swift 6 模式下,编译器会对所有跨并发边界传递的类型进行检查。

自动符合 Sendable 的类型
值类型(struct、enum)在其所有存储属性也是 Sendable 时自动符合。基本类型(Int、String、Bool、Date、UUID 等)和不可变引用(let)均符合。Swift 的 Array、Dictionary、Optional 在元素/值是 Sendable 时也符合。
需要手动处理的类型
class(引用类型)不自动符合 Sendable。需要手动保证线程安全后才能标记为 @unchecked Sendable(使用 NSLock 等),或将其改为 Actor(自动符合 Sendable),或改为 struct。
@Sendable 函数
闭包默认不是 Sendable 的。传递给 Task { }withTaskGroup 的闭包需要标注 @Sendable,确保闭包内捕获的所有值都是 Sendable 类型,防止捕获非线程安全的对象。
// Vapor 路由中常见的 Sendable 场景

// ✅ struct 自动符合 Sendable(所有属性均为 Sendable)
struct UserDTO: Content, Sendable {
    let id:    UUID
    let name:  String
    let email: String
}

// ✅ Actor 自动符合 Sendable
actor UserCache {
    private var cache: [UUID: UserDTO] = [:]

    func get(_ id: UUID) -> UserDTO? { cache[id] }
    func set(_ dto: UserDTO) { cache[dto.id] = dto }
}

// ⚠️ class 需要 @unchecked Sendable + 手动锁保护
final class LegacyConfig: @unchecked Sendable {
    private let lock = NSLock()
    private var _value: String = ""

    var value: String {
        get { lock.withLock { _value } }
        set { lock.withLock { _value = newValue } }
    }
}
Swift 6 严格并发检查(strict concurrency) Swift 6 将所有并发安全检查从警告升级为编译错误。迁移到 Swift 6 的 Vapor 项目常见编译错误:(1) 在 Task 闭包中捕获非 Sendable 的 class 实例;(2) 从 Actor 外访问 Actor 的可变属性而没有 await;(3) 将非 Sendable 类型传递给 @Sendable 函数参数。逐步迁移策略:先在 Package.swift 中添加 .swiftLanguageVersions([.v5]) 保持兼容,再逐模块开启 swiftSettings: [.enableUpcomingFeature("StrictConcurrency")] 修复问题。

ELF 到 async/await 迁移指南

如果你在维护使用旧版 ELF API 的 Vapor 项目,以下迁移模式会经常用到:

// ── 模式 1:ELF 转 async(最常见)────────────────────────────
// 旧:返回 EventLoopFuture
func findUser(_ id: UUID, db: Database) -> EventLoopFuture<User?> {
    User.find(id, on: db)
}

// 新:async 写法,调用处加 try await
func findUser(_ id: UUID, db: Database) async throws -> User? {
    try await User.find(id, on: db)
}

// ── 模式 2:在 async 上下文中调用 ELF API ─────────────────────
func hybridHandler(req: Request) async throws -> String {
    // .get() 将 ELF 转为 async 并 await
    let result = try await someLegacyELFFunc().get()
    return result
}

// ── 模式 3:ELF.flatMap 链 → async 顺序调用 ──────────────────
// 旧写法:.flatMap 嵌套
return fetchUser(id)
    .flatMap { user in fetchPosts(for: user) }
    .map { posts in posts.map { PostDTO(from: $0) } }

// 新写法:async 线性代码
let user  = try await fetchUser(id)
let posts = try await fetchPosts(for: user)
return posts.map { PostDTO(from: $0) }
不要在 EventLoop 线程上调用阻塞操作 Vapor 使用少量线程(等于 CPU 核心数)驱动 EventLoop。如果在路由处理函数中调用阻塞 API(Thread.sleep、同步文件读写、同步网络请求),会阻塞整个 EventLoop 线程,导致该线程上的所有其他请求挂起。阻塞操作必须使用 req.application.threadPool.runIfActiveNIOThreadPool 移到独立线程:
let data = try await req.application.threadPool.runIfActive(eventLoop: req.eventLoop) { try Data(contentsOf: url) }.get()

实践:并发安全的缓存服务

将本章知识综合运用,实现一个并发安全的内存缓存,在 Vapor 的 Application 生命周期中共享:

// Sources/App/Services/MemoryCacheService.swift
import Vapor

// Actor 保证线程安全,Sendable 允许跨并发域传递
actor MemoryCacheService: Sendable {

    private struct Entry {
        let value:     any Sendable
        let expiresAt: Date?
    }

    private var store: [String: Entry] = [:]

    // 设置缓存,可选 TTL(秒)
    func set(_ key: String, value: some Sendable, ttl: TimeInterval? = nil) {
        let expiresAt = ttl.map { Date().addingTimeInterval($0) }
        store[key] = Entry(value: value, expiresAt: expiresAt)
    }

    // 获取缓存(过期自动删除)
    func get<T: Sendable>(_ key: String, as: T.Type = T.self) -> T? {
        guard let entry = store[key] else { return nil }
        if let exp = entry.expiresAt, Date() > exp {
            store.removeValue(forKey: key)
            return nil
        }
        return entry.value as? T
    }

    func delete(_ key: String) { store.removeValue(forKey: key) }
    func flush() { store.removeAll() }
}

// 挂载到 Application,全局复用同一实例
extension Application {
    private struct CacheKey: StorageKey {
        typealias Value = MemoryCacheService
    }

    var cache: MemoryCacheService {
        get {
            if let existing = storage[CacheKey.self] { return existing }
            let cache = MemoryCacheService()
            storage[CacheKey.self] = cache
            return cache
        }
    }
}

// 在路由中使用缓存(注意:Actor 方法必须 await)
app.get("users", ":id") { req async throws -> UserDTO in
    let id = try req.parameters.require("id", as: UUID.self)
    let cacheKey = "user:\(id)"

    // 先查缓存
    if let cached: UserDTO = await req.application.cache.get(cacheKey) {
        return cached
    }

    // 缓存未命中,查数据库
    guard let user = try await User.find(id, on: req.db) else {
        throw Abort(.notFound)
    }
    let dto = UserDTO(from: user)

    // 写入缓存,TTL 5分钟
    await req.application.cache.set(cacheKey, value: dto, ttl: 300)
    return dto
}
本章小结 Swift Concurrency 的 async/await 相比 EventLoopFuture 显著提升代码可读性——线性代码比链式回调更易理解和调试;async letTaskGroup 让独立操作并行执行;Actor 从语言层面消除数据竞争,编译器强制 await 访问,是 Swift 并发安全的核心机制;Sendable 协议是跨并发域类型安全的保障,Swift 6 中由警告升级为编译错误。迁移老 ELF 代码:用 .get() 在 async 上下文中 await ELF,阻塞操作使用 threadPool 移到独立线程。