Chapter 03

异步编程深度

深入 async/await 状态机原理,掌握 Task vs ValueTask 选择、Channel 生产者消费者、异步流与精准取消控制。

async/await 工作原理:状态机

async/await 不是多线程!它是编译器的语法糖。当你写一个 async 方法时,C# 编译器将其转换为一个状态机类。每遇到一个 await,状态机会挂起当前执行,释放线程回线程池,等待异步操作完成后恢复执行。整个过程在 I/O 等待期间不占用任何线程。

async Task<string> FetchDataAsync(string url) { var client = new HttpClient(); string result = await client.GetStringAsync(url); return result.ToUpper(); } ↓ 编译器生成状态机(简化) class FetchDataAsync_StateMachine { int _state = 0; // 当前状态 string _url; HttpClient _client; TaskAwaiter<string> _awaiter; void MoveNext() { switch (_state) { case 0: // 初始 _client = new HttpClient(); _awaiter = _client.GetStringAsync(_url).GetAwaiter(); if (!_awaiter.IsCompleted) { _state = 1; _awaiter.OnCompleted(MoveNext); // 注册回调 return; // 释放线程! } goto case 1; case 1: // I/O 完成后恢复 string result = _awaiter.GetResult(); SetResult(result.ToUpper()); break; } } }

SynchronizationContext 与线程上下文

SynchronizationContext
描述"在哪里恢复执行"的抽象。在 ASP.NET Core 中,没有 SynchronizationContext(为 null),await 后会在线程池任意线程上恢复,性能最优。在 WPF/WinForms 中,SynchronizationContext 确保 UI 代码在 UI 线程上恢复。使用 ConfigureAwait(false) 可以明确表示不需要回到原始上下文。
// 库代码:始终加 ConfigureAwait(false),避免死锁,提升性能
public async Task<User> GetUserAsync(int id)
{
    var data = await _db.QueryAsync(id).ConfigureAwait(false);
    return MapToUser(data);
}

// 应用层代码(UI/Controller):不需要 ConfigureAwait(false)
// ASP.NET Core 无 SyncContext,加不加效果相同

Task vs ValueTask:何时选哪个

Task<T> 是一个引用类型对象,每次调用异步方法都会在堆上分配一个 Task 对象。ValueTask<T>(C# 7.0+)是值类型,在方法可以同步完成(缓存命中、已就绪等)的场景下避免堆分配。

场景推荐类型原因
总是需要等待 I/O(HTTP、DB)Task<T>开销相差不大,Task 更简单
频繁调用且常走缓存(热路径)ValueTask<T>同步完成时零分配
实现 IAsyncEnumerable 迭代器ValueTask运行时要求
公共库 APIValueTask<T>调用方可以选择是否 await
// ValueTask 示例:缓存优先,避免热路径分配
private readonly Dictionary<int, User> _cache = [];

public ValueTask<User> GetUserAsync(int id)
{
    // 缓存命中:同步返回,无堆分配
    if (_cache.TryGetValue(id, out var cached))
        return new ValueTask<User>(cached);

    // 缓存未命中:异步加载
    return new ValueTask<User>(LoadFromDbAsync(id));
}

private async Task<User> LoadFromDbAsync(int id)
{
    var user = await _db.FindAsync<User>(id);
    _cache[id] = user;
    return user;
}

并行编程:Parallel 与并发控制

Parallel.ForEachAsync(.NET 6+)

// 并行处理图片列表(限制并发数)
var urls = Enumerable.Range(1, 100).Select(i => $"https://picsum.photos/id/{i}/800");

await Parallel.ForEachAsync(
    urls,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    async (url, ct) =>
    {
        var bytes = await _http.GetByteArrayAsync(url, ct);
        await File.WriteAllBytesAsync($"img_{url.Split('/')[^1]}.jpg", bytes, ct);
    }
);

SemaphoreSlim:手动限制并发

// SemaphoreSlim 精确控制并发下载数
var semaphore = new SemaphoreSlim(5, 5);  // 最多 5 个并发

async Task DownloadAsync(string url, CancellationToken ct)
{
    await semaphore.WaitAsync(ct);
    try
    {
        var data = await _http.GetByteArrayAsync(url, ct);
        await SaveAsync(data, ct);
    }
    finally
    {
        semaphore.Release();
    }
}

var tasks = urls.Select(url => DownloadAsync(url, ct));
await Task.WhenAll(tasks);

Channel<T>:生产者消费者模式

System.Threading.Channels.Channel<T> 是 .NET 专为异步生产者消费者设计的数据结构,比 BlockingCollection<T> 更高效,支持完全异步操作。

using System.Threading.Channels;

// 创建有界 Channel(防止生产过快撑爆内存)
var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait  // 满时等待
});

// 生产者
async Task ProducerAsync(ChannelWriter<WorkItem> writer, CancellationToken ct)
{
    try
    {
        await foreach (var item in GetWorkItemsAsync(ct))
            await writer.WriteAsync(item, ct);
    }
    finally
    {
        writer.Complete();  // 通知消费者没有更多数据
    }
}

// 消费者
async Task ConsumerAsync(ChannelReader<WorkItem> reader, CancellationToken ct)
{
    await foreach (var item in reader.ReadAllAsync(ct))
    {
        await ProcessItemAsync(item, ct);
    }
}

// 启动(一个生产者,三个消费者)
var producer = ProducerAsync(channel.Writer, cts.Token);
var consumers = Enumerable.Range(0, 3)
    .Select(_ => ConsumerAsync(channel.Reader, cts.Token));
await Task.WhenAll([producer, ..consumers]);

IAsyncEnumerable<T>:异步流

C# 8 引入了异步流,允许以 await foreach 逐步消费异步产生的序列,非常适合流式数据处理(数据库游标、SSE、大文件逐行读取)。

// 生成异步流(数据库游标分页)
public async IAsyncEnumerable<User> GetAllUsersAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    const int pageSize = 100;
    int page = 0;

    while (true)
    {
        var batch = await _db.Users
            .OrderBy(u => u.Id)
            .Skip(page * pageSize)
            .Take(pageSize)
            .ToListAsync(ct);

        if (batch.Count == 0) yield break;

        foreach (var user in batch)
            yield return user;

        page++;
    }
}

// 消费异步流
await foreach (var user in GetAllUsersAsync(ct))
{
    await ProcessUserAsync(user);
}

// 也可以用 LINQ 操作异步流(System.Linq.Async NuGet 包)
var activeUsers = GetAllUsersAsync(ct)
    .Where(u => u.IsActive)
    .Take(50);

CancellationToken:请求取消与超时

CancellationToken 是 .NET 异步取消的标准模式。在 ASP.NET Core 中,HttpContext.RequestAborted 会在客户端断开连接时自动触发取消,应将其传递给所有异步操作以避免资源浪费。

// 创建超时取消令牌
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
CancellationToken ct = cts.Token;

// 组合多个取消源(任一取消即可)
using var linked = CancellationTokenSource.CreateLinkedTokenSource(
    ct,
    HttpContext.RequestAborted
);

try
{
    var result = await _service.DoWorkAsync(linked.Token);
    return Ok(result);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
    return StatusCode(408, "请求超时");
}

// 在自定义方法中正确响应取消
async Task LongRunningOperationAsync(CancellationToken ct)
{
    for (int i = 0; i < 1000; i++)
    {
        ct.ThrowIfCancellationRequested();  // 主动检查取消
        await ProcessChunkAsync(i, ct);
    }
}

实战:并发下载 URL,限制并发数

using System.Net.Http;
using System.Threading.Channels;

public class ConcurrentDownloader
{
    private readonly HttpClient _http;
    private readonly int _maxConcurrency;

    public ConcurrentDownloader(HttpClient http, int maxConcurrency = 5)
    {
        _http = http;
        _maxConcurrency = maxConcurrency;
    }

    public async Task<Dictionary<string, byte[]>> DownloadAllAsync(
        IEnumerable<string> urls,
        CancellationToken ct = default)
    {
        var results = new ConcurrentDictionary<string, byte[]>();
        var semaphore = new SemaphoreSlim(_maxConcurrency);

        var tasks = urls.Select(async url =>
        {
            await semaphore.WaitAsync(ct);
            try
            {
                var bytes = await _http.GetByteArrayAsync(url, ct);
                results[url] = bytes;
                Console.WriteLine($"✓ {url} ({bytes.Length:N0} bytes)");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"✗ {url}: {ex.Message}");
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
        return new Dictionary<string, byte[]>(results);
    }
}

// 使用
var downloader = new ConcurrentDownloader(httpClient, maxConcurrency: 10);
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));

var results = await downloader.DownloadAllAsync(urlList, cts.Token);
Console.WriteLine($"下载完成: {results.Count} 个文件");
本章小结 async/await 本质是编译器生成的状态机,await 让线程在 I/O 等待时"归还"给线程池而非阻塞。ValueTask 在热路径上避免分配。Channel<T> 是高性能生产者消费者的标准工具。IAsyncEnumerable 让流式数据处理变得优雅。CancellationToken 应当贯穿所有 I/O 操作,是正确实现超时和取消的唯一标准方式。下一章进入 ASP.NET Core Web API 实战。