Chapter 08

并发编程:线程与 async/await

Rust 的"无畏并发"——用类型系统在编译期防止数据竞争,让并发编程既安全又高效

线程基础

创建线程

Rust 的"无畏并发"(Fearless Concurrency)并非指并发变得容易,而是指所有权和类型系统会在编译期捕获大多数并发 bug,让你可以放心地编写并发代码。

use std::thread;
use std::time::Duration;

fn main() {
    // 创建新线程
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程: {}", i);
            thread::sleep(Duration::from_millis(50));
        }
    });

    for i in 1..=3 {
        println!("主线程: {}", i);
        thread::sleep(Duration::from_millis(50));
    }

    // 等待子线程完成
    handle.join().unwrap();
    println!("所有线程完成");
}

move 闭包:将所有权转移到线程

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    // move 关键字:将闭包捕获的变量所有权移入线程
    // 不用 move 会编译错误——Rust 无法保证主线程中的 v 在子线程使用期间仍然有效
    let handle = thread::spawn(move || {
        println!("子线程中的 v: {:?}", v);
    });

    // println!("{:?}", v); // 编译错误!v 已被 move 进子线程
    handle.join().unwrap();
}

线程间共享数据

Arc<Mutex<T>>:多线程共享可变状态

在多线程中安全地共享可变数据,需要两个工具的组合:

Arc<T>
原子引用计数(Atomic Reference Counted)。类似 Rc<T>,但线程安全。允许多个线程持有同一数据的共享所有权。
Mutex<T>
互斥锁。同一时刻只允许一个线程访问内部数据。调用 lock() 获取锁守卫(MutexGuard),守卫离开作用域时自动释放锁。
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc 包裹 Mutex,允许多个线程持有所有权
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);  // 克隆引用计数指针(不复制数据)
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();  // 获取锁
            *num += 1;
        });  // num(MutexGuard)在这里 drop,锁自动释放
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("最终计数: {}", *counter.lock().unwrap());  // 10
}

消息通道(Channel)

除了共享内存,Rust 还支持通过消息传递来通信——Go 语言的名言"不要通过共享内存来通信,要通过通信来共享内存"在 Rust 中同样适用:

use std::sync::mpsc;  // mpsc: multiple producer, single consumer
use std::thread;

fn main() {
    // 创建通道,返回 (发送端, 接收端)
    let (tx, rx) = mpsc::channel();

    // 克隆发送端以支持多生产者
    let tx2 = tx.clone();

    thread::spawn(move || {
        let msgs = vec!["你好", "来自", "线程1"];
        for msg in msgs {
            tx.send(msg).unwrap();
        }
    });

    thread::spawn(move || {
        tx2.send("来自线程2").unwrap();
    });

    // 接收端阻塞等待消息(直到所有发送端被 drop)
    for received in rx {
        println!("收到: {}", received);
    }
    println!("通道已关闭");
}

async/await:异步编程

为什么需要异步?

操作系统线程适合 CPU 密集型任务,但对于 I/O 密集型任务(网络请求、文件读写)来说代价太高:每个线程通常需要 8MB 栈空间,而一个 Web 服务器可能需要处理数万个并发连接。异步编程的核心思想是:当一个任务在等待 I/O 时,不占用线程,而是让出控制权,让其他任务在同一个线程上运行。

async fn 与 await

// Cargo.toml 中添加:
// tokio = { version = "1", features = ["full"] }

use std::time::Duration;

// async fn 返回一个 Future(未来某时会完成的值)
async fn fetch_data(id: u32) -> String {
    // 模拟异步 I/O 等待
    tokio::time::sleep(Duration::from_millis(100)).await;
    format!("数据 #{}", id)
}

// #[tokio::main] 宏设置异步运行时
#[tokio::main]
async fn main() {
    // await 等待 Future 完成,期间让出线程控制权
    let result = fetch_data(1).await;
    println!("{}", result);

    // 并发执行多个 Future(不是顺序执行)
    let (r1, r2, r3) = tokio::join!(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    );
    println!("{} {} {}", r1, r2, r3);
}

异步任务(Task)

#[tokio::main]
async fn main() {
    // spawn 创建独立的异步任务(类似轻量级线程)
    let task1 = tokio::task::spawn(async {
        fetch_data(1).await
    });

    let task2 = tokio::task::spawn(async {
        fetch_data(2).await
    });

    // 等待任务完成(JoinHandle 类似线程的 handle)
    let r1 = task1.await.unwrap();
    let r2 = task2.await.unwrap();
    println!("{} {}", r1, r2);
}

实战:异步 HTTP 请求

[dependencies]
tokio   = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde   = { version = "1", features = ["derive"] }
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct Post {
    id: u32,
    title: String,
    body: String,
}

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let client = reqwest::Client::new();

    // 并发请求多个 API
    let futures: Vec<_> = (1..=5)
        .map(|id| {
            let client = &client;
            async move {
                client
                    .get(format!("https://jsonplaceholder.typicode.com/posts/{}", id))
                    .send().await?
                    .json::<Post>().await
            }
        })
        .collect();

    let posts = futures::future::join_all(futures).await;
    for post in posts {
        match post {
            Ok(p) => println!("[{}] {}", p.id, p.title),
            Err(e) => eprintln!("错误: {}", e),
        }
    }
    Ok(())
}
Send + Sync:并发安全的 Trait

Rust 通过两个 Marker Trait 标记类型的并发安全性:
Send:类型可以安全地移动到另一个线程。原始指针 *mut T 不是 Send。
Sync:类型可以安全地被多个线程同时引用RefCell<T> 不是 Sync。
编译器会自动推导大多数类型是否实现这些 Trait。如果你试图在线程间发送 non-Send 类型,编译器会报错,而不是让数据竞争在运行时发生。

Future:异步的底层机制

理解 async/await 的真正运作方式,需要了解 Future trait——它是 Rust 异步编程的核心抽象:

Future trait
async fn 返回的是一个实现了 Future 的匿名类型。Future 是一个状态机,通过 poll() 方法被驱动执行。Poll::Pending 表示尚未完成,Poll::Ready(val) 表示完成并返回值。
Waker
Future 返回 Pending 时,必须保存 Waker。当底层 I/O 就绪时(如 epoll 事件),通过 Waker 通知运行时重新 poll 该 Future。这是"零成本异步"的关键——没有轮询,只有事件驱动的唤醒。
Tokio 运行时
Tokio 是 Rust 最流行的异步运行时,提供事件循环、I/O 驱动、定时器、任务调度器。它在多个线程上运行一个工作窃取(work-stealing)调度器,将大量异步任务(Task)分配到少量系统线程上执行。
// Future trait 的简化定义
pub trait Future {
    type Output;
    // poll 由运行时调用,cx 提供 Waker
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

// async fn 会被编译器展开为实现 Future 的状态机
// 类似这样(伪代码):
async fn fetch_data(id: u32) -> String {
    tokio::time::sleep(Duration::from_millis(100)).await;
    format!("data-{}", id)
}

// 等价于编译器生成的状态机(概念示意):
// enum FetchData {
//     Start { id: u32 },
//     WaitingSleep { id: u32, sleep_future: SleepFuture },
//     Done,
// }
// impl Future for FetchData { ... }

async/await 的常见陷阱

use std::sync::Mutex;
use tokio::time::{sleep, Duration};

// 错误:在 await 点持有 Mutex 锁(非 Send)
async fn bad_example(mutex: &Mutex<Vec<i32>>) {
    let guard = mutex.lock().unwrap();
    sleep(Duration::from_millis(100)).await;  // 编译错误!MutexGuard 不是 Send
    guard.push(1);
}

// 正确:在 await 之前释放锁
async fn good_example(mutex: &Mutex<Vec<i32>>) {
    {
        let mut guard = mutex.lock().unwrap();
        guard.push(1);
    }  // guard 在这里 drop,锁释放
    sleep(Duration::from_millis(100)).await;  // OK
}

// 或者使用 Tokio 的异步 Mutex(可以跨 await 持有)
use tokio::sync::Mutex as AsyncMutex;

async fn with_async_mutex(mutex: &AsyncMutex<Vec<i32>>) {
    let mut guard = mutex.lock().await;  // 异步获锁
    guard.push(1);
    sleep(Duration::from_millis(100)).await;  // OK,AsyncMutexGuard 是 Send
}

阻塞操作不能在异步上下文中直接调用:在 async fn 中调用 std::thread::sleep()std::fs::read_to_string() 等阻塞函数会阻塞整个 Tokio 工作线程,导致该线程上的其他任务无法执行。应使用 tokio::time::sleep()tokio::fs::read_to_string() 等异步版本。如果必须调用阻塞代码,使用 tokio::task::spawn_blocking(|| { /* 阻塞操作 */ }).await 将其移入专用的阻塞线程池。

Tokio 1.x 实战:并发限制与超时

use tokio::time::{timeout, Duration};
use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    // 超时控制:5 秒内未完成则返回 Err(Elapsed)
    let result = timeout(
        Duration::from_secs(5),
        fetch_data(1)
    ).await;

    match result {
        Ok(data) => println!("获取成功: {}", data),
        Err(_)   => println!("超时!"),
    }

    // 并发限制:同时最多 3 个请求(buffer_unordered)
    let results: Vec<_> = stream::iter(1..=10)
        .map(|id| async move { fetch_data(id).await })
        .buffer_unordered(3)  // 并发度 3
        .collect()
        .await;

    println!("完成 {} 个请求", results.len());
}
本章小结

Rust 的并发模型通过类型系统在编译期保障线程安全,是"无畏并发"的真正含义:
OS 线程thread::spawn + move 将数据所有权转移到新线程;Arc<Mutex<T>> 安全地跨线程共享可变状态;mpsc 通道实现消息传递并发(符合 Go 的"通过通信共享内存"哲学)。
async/await 原理:async fn 返回实现 Future 的状态机;运行时通过 poll + Waker 机制驱动 Future;await 让出控制权而不阻塞线程,使单线程可处理万级并发 I/O。
Send + Sync:编译器自动推导,non-Send 类型不能 move 到其他线程,non-Sync 类型不能被多线程共享引用。这两个约束在编译期完全排除数据竞争。
跨 await 的锁:标准库 Mutex 不能跨 await 持有(非 Send);需要异步 Mutex 时使用 tokio::sync::Mutex。
阻塞与异步混用:阻塞操作必须用 spawn_blocking 隔离到阻塞线程池,防止饿死 Tokio 工作线程。