Chapter 08

并发编程:线程与 async/await

Rust 的"无畏并发"——用类型系统在编译期防止数据竞争,让并发编程既安全又高效

线程基础

创建线程

Rust 的"无畏并发"(Fearless Concurrency)并非指并发变得容易,而是指所有权和类型系统会在编译期捕获大多数并发 bug,让你可以放心地编写并发代码。

use std::thread;
use std::time::Duration;

fn main() {
    // 创建新线程
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程: {}", i);
            thread::sleep(Duration::from_millis(50));
        }
    });

    for i in 1..=3 {
        println!("主线程: {}", i);
        thread::sleep(Duration::from_millis(50));
    }

    // 等待子线程完成
    handle.join().unwrap();
    println!("所有线程完成");
}

move 闭包:将所有权转移到线程

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    // move 关键字:将闭包捕获的变量所有权移入线程
    // 不用 move 会编译错误——Rust 无法保证主线程中的 v 在子线程使用期间仍然有效
    let handle = thread::spawn(move || {
        println!("子线程中的 v: {:?}", v);
    });

    // println!("{:?}", v); // 编译错误!v 已被 move 进子线程
    handle.join().unwrap();
}

线程间共享数据

Arc<Mutex<T>>:多线程共享可变状态

在多线程中安全地共享可变数据,需要两个工具的组合:

Arc<T>
原子引用计数(Atomic Reference Counted)。类似 Rc<T>,但线程安全。允许多个线程持有同一数据的共享所有权。
Mutex<T>
互斥锁。同一时刻只允许一个线程访问内部数据。调用 lock() 获取锁守卫(MutexGuard),守卫离开作用域时自动释放锁。
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc 包裹 Mutex,允许多个线程持有所有权
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);  // 克隆引用计数指针(不复制数据)
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();  // 获取锁
            *num += 1;
        });  // num(MutexGuard)在这里 drop,锁自动释放
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("最终计数: {}", *counter.lock().unwrap());  // 10
}

消息通道(Channel)

除了共享内存,Rust 还支持通过消息传递来通信——Go 语言的名言"不要通过共享内存来通信,要通过通信来共享内存"在 Rust 中同样适用:

use std::sync::mpsc;  // mpsc: multiple producer, single consumer
use std::thread;

fn main() {
    // 创建通道,返回 (发送端, 接收端)
    let (tx, rx) = mpsc::channel();

    // 克隆发送端以支持多生产者
    let tx2 = tx.clone();

    thread::spawn(move || {
        let msgs = vec!["你好", "来自", "线程1"];
        for msg in msgs {
            tx.send(msg).unwrap();
        }
    });

    thread::spawn(move || {
        tx2.send("来自线程2").unwrap();
    });

    // 接收端阻塞等待消息(直到所有发送端被 drop)
    for received in rx {
        println!("收到: {}", received);
    }
    println!("通道已关闭");
}

async/await:异步编程

为什么需要异步?

操作系统线程适合 CPU 密集型任务,但对于 I/O 密集型任务(网络请求、文件读写)来说代价太高:每个线程通常需要 8MB 栈空间,而一个 Web 服务器可能需要处理数万个并发连接。异步编程的核心思想是:当一个任务在等待 I/O 时,不占用线程,而是让出控制权,让其他任务在同一个线程上运行。

async fn 与 await

// Cargo.toml 中添加:
// tokio = { version = "1", features = ["full"] }

use std::time::Duration;

// async fn 返回一个 Future(未来某时会完成的值)
async fn fetch_data(id: u32) -> String {
    // 模拟异步 I/O 等待
    tokio::time::sleep(Duration::from_millis(100)).await;
    format!("数据 #{}", id)
}

// #[tokio::main] 宏设置异步运行时
#[tokio::main]
async fn main() {
    // await 等待 Future 完成,期间让出线程控制权
    let result = fetch_data(1).await;
    println!("{}", result);

    // 并发执行多个 Future(不是顺序执行)
    let (r1, r2, r3) = tokio::join!(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    );
    println!("{} {} {}", r1, r2, r3);
}

异步任务(Task)

#[tokio::main]
async fn main() {
    // spawn 创建独立的异步任务(类似轻量级线程)
    let task1 = tokio::task::spawn(async {
        fetch_data(1).await
    });

    let task2 = tokio::task::spawn(async {
        fetch_data(2).await
    });

    // 等待任务完成(JoinHandle 类似线程的 handle)
    let r1 = task1.await.unwrap();
    let r2 = task2.await.unwrap();
    println!("{} {}", r1, r2);
}

实战:异步 HTTP 请求

[dependencies]
tokio   = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde   = { version = "1", features = ["derive"] }
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct Post {
    id: u32,
    title: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let client = reqwest::Client::new();

    // 并发请求多个 API
    let futures: Vec<_> = (1..=5)
        .map(|id| {
            let client = &client;
            async move {
                client
                    .get(format!("https://jsonplaceholder.typicode.com/posts/{}", id))
                    .send().await?
                    .json::<Post>().await
            }
        })
        .collect();

    let posts = futures::future::join_all(futures).await;
    for post in posts {
        match post {
            Ok(p) => println!("[{}] {}", p.id, p.title),
            Err(e) => eprintln!("错误: {}", e),
        }
    }
    Ok(())
}
Send + Sync:并发安全的 Trait

Rust 通过两个 Marker Trait 标记类型的并发安全性:
Send:类型可以安全地移动到另一个线程。原始指针 *mut T 不是 Send。
Sync:类型可以安全地被多个线程同时引用RefCell<T> 不是 Sync。
编译器会自动推导大多数类型是否实现这些 Trait。如果你试图在线程间发送 non-Send 类型,编译器会报错,而不是让数据竞争在运行时发生。