线程基础
创建线程
Rust 的"无畏并发"(Fearless Concurrency)并非指并发变得容易,而是指所有权和类型系统会在编译期捕获大多数并发 bug,让你可以放心地编写并发代码。
use std::thread;
use std::time::Duration;
fn main() {
// 创建新线程
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(50));
}
});
for i in 1..=3 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(50));
}
// 等待子线程完成
handle.join().unwrap();
println!("所有线程完成");
}
move 闭包:将所有权转移到线程
use std::thread;
fn main() {
let v = vec![1, 2, 3];
// move 关键字:将闭包捕获的变量所有权移入线程
// 不用 move 会编译错误——Rust 无法保证主线程中的 v 在子线程使用期间仍然有效
let handle = thread::spawn(move || {
println!("子线程中的 v: {:?}", v);
});
// println!("{:?}", v); // 编译错误!v 已被 move 进子线程
handle.join().unwrap();
}
线程间共享数据
Arc<Mutex<T>>:多线程共享可变状态
在多线程中安全地共享可变数据,需要两个工具的组合:
Arc<T>
原子引用计数(Atomic Reference Counted)。类似 Rc<T>,但线程安全。允许多个线程持有同一数据的共享所有权。
Mutex<T>
互斥锁。同一时刻只允许一个线程访问内部数据。调用 lock() 获取锁守卫(MutexGuard),守卫离开作用域时自动释放锁。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc 包裹 Mutex,允许多个线程持有所有权
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter); // 克隆引用计数指针(不复制数据)
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap(); // 获取锁
*num += 1;
}); // num(MutexGuard)在这里 drop,锁自动释放
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数: {}", *counter.lock().unwrap()); // 10
}
消息通道(Channel)
除了共享内存,Rust 还支持通过消息传递来通信——Go 语言的名言"不要通过共享内存来通信,要通过通信来共享内存"在 Rust 中同样适用:
use std::sync::mpsc; // mpsc: multiple producer, single consumer
use std::thread;
fn main() {
// 创建通道,返回 (发送端, 接收端)
let (tx, rx) = mpsc::channel();
// 克隆发送端以支持多生产者
let tx2 = tx.clone();
thread::spawn(move || {
let msgs = vec!["你好", "来自", "线程1"];
for msg in msgs {
tx.send(msg).unwrap();
}
});
thread::spawn(move || {
tx2.send("来自线程2").unwrap();
});
// 接收端阻塞等待消息(直到所有发送端被 drop)
for received in rx {
println!("收到: {}", received);
}
println!("通道已关闭");
}
async/await:异步编程
为什么需要异步?
操作系统线程适合 CPU 密集型任务,但对于 I/O 密集型任务(网络请求、文件读写)来说代价太高:每个线程通常需要 8MB 栈空间,而一个 Web 服务器可能需要处理数万个并发连接。异步编程的核心思想是:当一个任务在等待 I/O 时,不占用线程,而是让出控制权,让其他任务在同一个线程上运行。
async fn 与 await
// Cargo.toml 中添加:
// tokio = { version = "1", features = ["full"] }
use std::time::Duration;
// async fn 返回一个 Future(未来某时会完成的值)
async fn fetch_data(id: u32) -> String {
// 模拟异步 I/O 等待
tokio::time::sleep(Duration::from_millis(100)).await;
format!("数据 #{}", id)
}
// #[tokio::main] 宏设置异步运行时
#[tokio::main]
async fn main() {
// await 等待 Future 完成,期间让出线程控制权
let result = fetch_data(1).await;
println!("{}", result);
// 并发执行多个 Future(不是顺序执行)
let (r1, r2, r3) = tokio::join!(
fetch_data(1),
fetch_data(2),
fetch_data(3)
);
println!("{} {} {}", r1, r2, r3);
}
异步任务(Task)
#[tokio::main]
async fn main() {
// spawn 创建独立的异步任务(类似轻量级线程)
let task1 = tokio::task::spawn(async {
fetch_data(1).await
});
let task2 = tokio::task::spawn(async {
fetch_data(2).await
});
// 等待任务完成(JoinHandle 类似线程的 handle)
let r1 = task1.await.unwrap();
let r2 = task2.await.unwrap();
println!("{} {}", r1, r2);
}
实战:异步 HTTP 请求
[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1", features = ["derive"] }
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct Post {
id: u32,
title: String,
body: String,
}
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let client = reqwest::Client::new();
// 并发请求多个 API
let futures: Vec<_> = (1..=5)
.map(|id| {
let client = &client;
async move {
client
.get(format!("https://jsonplaceholder.typicode.com/posts/{}", id))
.send().await?
.json::<Post>().await
}
})
.collect();
let posts = futures::future::join_all(futures).await;
for post in posts {
match post {
Ok(p) => println!("[{}] {}", p.id, p.title),
Err(e) => eprintln!("错误: {}", e),
}
}
Ok(())
}
Send + Sync:并发安全的 Trait
Rust 通过两个 Marker Trait 标记类型的并发安全性:
Send:类型可以安全地移动到另一个线程。原始指针 *mut T 不是 Send。
Sync:类型可以安全地被多个线程同时引用。RefCell<T> 不是 Sync。
编译器会自动推导大多数类型是否实现这些 Trait。如果你试图在线程间发送 non-Send 类型,编译器会报错,而不是让数据竞争在运行时发生。