Chapter 08

结构化并发

StructuredTaskScope、ShutdownOnFailure/Success、Scoped Values——Java 并发编程的新范式

什么是结构化并发?

结构化并发(Structured Concurrency)是一种编程范式,其核心思想来自「结构化编程」: 就像 if/for/while 控制流有明确的入口和出口一样,并发任务的生命周期也应该有明确的作用域—— 所有子任务必须在其父作用域退出之前完成或取消

传统的 ExecutorService 提交任务后,任务的生命周期与提交代码完全解耦,难以管理:

传统方式的问题

  • 任务生命周期与代码作用域脱钩
  • 一个子任务失败,其他任务继续运行(资源泄漏)
  • 取消传播复杂(需手动 Future.cancel())
  • 错误处理分散(难以在一处统一处理)
  • 调试困难(任务在哪个线程、何时结束?)

结构化并发的优势

  • 任务生命周期受作用域约束
  • 一个子任务失败 → 其他子任务自动取消
  • 取消自动传播(父取消 → 子取消)
  • 错误在作用域退出时统一处理
  • 可观测性更好(任务属于明确的父)

API 版本历史

Java 19(JEP 428)
结构化并发首次作为孵化特性(Incubator)引入,包含 StructuredTaskScope 基础 API。
Java 21(JEP 453)
升级为预览特性(Preview),API 基本稳定,需要 --enable-preview 编译运行。
Java 23/24(JEP 499/505)
继续作为预览特性迭代,API 有细化调整。预计 Java 25 正式发布。

StructuredTaskScope 基础

基本用法

import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;

// 需要 Java 21+ 预览特性:--enable-preview
public Response handle() throws InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        // 提交子任务(在虚拟线程中执行)
        Subtask<User>    userTask   = scope.fork(() -> findUser());
        Subtask<Order>   orderTask  = scope.fork(() -> fetchOrder());
        Subtask<String>  configTask = scope.fork(() -> getConfig());

        scope.join();           // 等待所有子任务完成(或任意失败)
        scope.throwIfFailed(); // 如有异常则抛出

        // 所有任务成功完成,安全获取结果
        return new Response(
            userTask.get(),
            orderTask.get(),
            configTask.get()
        );
    }
    // try-with-resources 退出时,scope 自动关闭
    // 若此时还有未完成的子任务,会先取消它们
}

ShutdownOnFailure:任意失败则取消所有

ShutdownOnFailure 策略实现了「全有或全无」语义: 任意一个子任务失败,scope 立即取消所有其他正在运行的子任务,然后 throwIfFailed() 重新抛出原始异常。

// 场景:并行查询多个微服务,任一失败则整体失败
public DashboardData buildDashboard(String userId)
        throws InterruptedException, ExecutionException {

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // 三个服务并行调用
        var profileTask = scope.fork(() -> profileService.get(userId));
        var statsTask   = scope.fork(() -> statsService.get(userId));
        var newsTask    = scope.fork(() -> newsService.getLatest());

        scope.join().throwIfFailed(); // 等待并检查(链式调用)

        return new DashboardData(
            profileTask.get(), // Subtask.get() 直接返回结果(已保证成功)
            statsTask.get(),
            newsTask.get()
        );
    }
    // 若 statsService 抛异常:
    // 1. scope 立即发出取消信号给 profileTask 和 newsTask
    // 2. throwIfFailed() 重新抛出 statsService 的原始异常
    // 3. 调用方收到明确的异常,不会拿到半成品数据
}

与传统方式对比

传统 ExecutorService 方式(任务失败时难以清理): Future<User> f1 = executor.submit(() -> findUser()); Future<Order> f2 = executor.submit(() -> fetchOrder()); // f1 失败了... f2 继续在后台运行,无人管理 // 需要手动 f2.cancel(true),但 cancel 是尽力而为的 // 若 f2 正在做数据库操作,cancel 可能无效 结构化并发方式(取消自动传播): scope.fork(() -> findUser()); // 子任务 A scope.fork(() -> fetchOrder()); // 子任务 B scope.join(); // 等待 // A 失败 → B 自动收到中断信号 → B 干净退出 // scope 退出前保证所有子任务已完成/取消

ShutdownOnSuccess:竞速模式

ShutdownOnSuccess 策略实现竞速(Race)语义: 任意一个子任务成功完成,scope 立即取消其他所有任务,返回第一个成功的结果。 类似 CompletableFuture.anyOf(),但取消传播更彻底。

// 场景:从多个镜像同时下载,取最快的那个
public String fetchFromFastest(List<String> mirrors)
        throws InterruptedException, ExecutionException {

    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        for (String mirror : mirrors) {
            scope.fork(() -> download(mirror));
        }
        scope.join(); // 等待第一个成功(其他自动取消)
        return scope.result(); // 获取最快的结果
    }
    // 若所有镜像都失败,result() 抛出异常
}

// 场景:双写验证——新旧系统同时计算,取最先返回的结果验证一致性
String shadowTest() throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        scope.fork(() -> legacyService.compute());
        scope.fork(() -> newService.compute());
        scope.join();
        return scope.result();
    }
}

自定义 StructuredTaskScope

可以继承 StructuredTaskScope 实现自定义策略,例如:收集所有成功结果(忽略失败), 或在超过一半成功时继续。

// 自定义:收集所有成功结果,忽略失败的任务
class CollectSuccessScope<T> extends StructuredTaskScope<T> {
    private final List<T> results = new CopyOnWriteArrayList<>();

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
        // 失败任务被忽略,不影响其他任务
    }

    public List<T> results() { return Collections.unmodifiableList(results); }
}

// 使用
try (var scope = new CollectSuccessScope<String>()) {
    urls.forEach(url -> scope.fork(() -> httpGet(url)));
    scope.join();
    List<String> successes = scope.results(); // 成功的响应列表
}

Scoped Values(作用域值)

ScopedValue(JEP 487,Java 23 预览)是 ThreadLocal 的现代替代品, 专为虚拟线程和结构化并发设计。

ThreadLocal vs ScopedValue

ThreadLocal
线程的私有变量,生命周期与线程相同。可以随时修改(set/get/remove)。在线程池中需要手动 remove() 防止污染。继承性通过 InheritableThreadLocal 实现(但每次 fork 都要拷贝,开销大)。
ScopedValue
不可变绑定(immutable binding),只在特定作用域(代码块)内有效,超出作用域自动清除。子线程自动继承父线程的绑定(无需拷贝)。与结构化并发完美配合,无内存泄漏风险。
import java.lang.ScopedValue;

// 声明 ScopedValue(类似 ThreadLocal 的声明)
public class RequestContext {
    public static final ScopedValue<String> USER_ID =
        ScopedValue.newInstance();
    public static final ScopedValue<String> REQUEST_ID =
        ScopedValue.newInstance();
}

// 使用:ScopedValue.where().run() 创建绑定作用域
ScopedValue.where(RequestContext.USER_ID, "alice")
           .where(RequestContext.REQUEST_ID, "req-001")
           .run(() -> {
               System.out.println("用户:" + RequestContext.USER_ID.get()); // "alice"
               processRequest(); // 整个调用链都能读到 USER_ID
           });
// 作用域退出,绑定自动清除,无需手动 remove()

// 与结构化并发结合:子任务自动继承绑定
ScopedValue.where(RequestContext.USER_ID, "alice")
           .run(() -> {
               try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                   scope.fork(() -> {
                       // 子虚拟线程自动继承 USER_ID = "alice"
                       String uid = RequestContext.USER_ID.get(); // "alice"
                       return processUser(uid);
                   });
                   scope.join().throwIfFailed();
               }
           });

ScopedValue.callWhere():有返回值的作用域

// callWhere 类似 run,但可以返回值
String result = ScopedValue.callWhere(
    RequestContext.USER_ID, "alice",
    () -> fetchUserData() // 返回结果
);

// 重绑定(rebinding):在嵌套作用域中覆盖值
ScopedValue.where(RequestContext.USER_ID, "alice").run(() -> {
    System.out.println(RequestContext.USER_ID.get()); // "alice"

    ScopedValue.where(RequestContext.USER_ID, "admin").run(() -> {
        System.out.println(RequestContext.USER_ID.get()); // "admin"(内层覆盖)
    });

    System.out.println(RequestContext.USER_ID.get()); // "alice"(外层恢复)
});

结构化并发完整实战

/**
 * 场景:用户购买流程
 * 1. 验证用户(查询数据库)
 * 2. 检查库存(调用库存服务)
 * 3. 计算价格(调用定价服务,可能超时)
 * 任何步骤失败 → 取消所有,抛出异常
 */
public PurchaseResult purchase(String userId, String productId)
        throws InterruptedException, ExecutionException {

    return ScopedValue
        .where(RequestContext.USER_ID, userId)
        .call(() -> {
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

                var userTask = scope.fork(() ->
                    userRepo.findById(userId)
                        .orElseThrow(() -> new UserNotFoundException(userId)));

                var stockTask = scope.fork(() ->
                    inventoryService.checkStock(productId));

                var priceTask = scope.fork(() ->
                    pricingService.calculate(userId, productId));

                scope.join(Duration.ofSeconds(5)) // 整体超时5秒
                     .throwIfFailed();

                User   user  = userTask.get();
                Stock  stock = stockTask.get();
                double price = priceTask.get();

                if (!stock.isAvailable()) throw new OutOfStockException(productId);

                return orderService.create(user, productId, price);
            }
        });
}
启用预览特性

结构化并发和 Scoped Values 在 Java 21/23 中是预览特性,需要编译和运行时参数:

本章小结