什么是结构化并发?
结构化并发(Structured Concurrency)是一种编程范式,其核心思想来自「结构化编程」:
就像 if/for/while 控制流有明确的入口和出口一样,并发任务的生命周期也应该有明确的作用域——
所有子任务必须在其父作用域退出之前完成或取消。
传统的 ExecutorService 提交任务后,任务的生命周期与提交代码完全解耦,难以管理:
传统方式的问题
- 任务生命周期与代码作用域脱钩
- 一个子任务失败,其他任务继续运行(资源泄漏)
- 取消传播复杂(需手动 Future.cancel())
- 错误处理分散(难以在一处统一处理)
- 调试困难(任务在哪个线程、何时结束?)
结构化并发的优势
- 任务生命周期受作用域约束
- 一个子任务失败 → 其他子任务自动取消
- 取消自动传播(父取消 → 子取消)
- 错误在作用域退出时统一处理
- 可观测性更好(任务属于明确的父)
API 版本历史
Java 19(JEP 428)
结构化并发首次作为孵化特性(Incubator)引入,包含 StructuredTaskScope 基础 API。
Java 21(JEP 453)
升级为预览特性(Preview),API 基本稳定,需要 --enable-preview 编译运行。
Java 23/24(JEP 499/505)
继续作为预览特性迭代,API 有细化调整。预计 Java 25 正式发布。
StructuredTaskScope 基础
基本用法
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
// 需要 Java 21+ 预览特性:--enable-preview
public Response handle() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 提交子任务(在虚拟线程中执行)
Subtask<User> userTask = scope.fork(() -> findUser());
Subtask<Order> orderTask = scope.fork(() -> fetchOrder());
Subtask<String> configTask = scope.fork(() -> getConfig());
scope.join(); // 等待所有子任务完成(或任意失败)
scope.throwIfFailed(); // 如有异常则抛出
// 所有任务成功完成,安全获取结果
return new Response(
userTask.get(),
orderTask.get(),
configTask.get()
);
}
// try-with-resources 退出时,scope 自动关闭
// 若此时还有未完成的子任务,会先取消它们
}
ShutdownOnFailure:任意失败则取消所有
ShutdownOnFailure 策略实现了「全有或全无」语义:
任意一个子任务失败,scope 立即取消所有其他正在运行的子任务,然后 throwIfFailed()
重新抛出原始异常。
// 场景:并行查询多个微服务,任一失败则整体失败
public DashboardData buildDashboard(String userId)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 三个服务并行调用
var profileTask = scope.fork(() -> profileService.get(userId));
var statsTask = scope.fork(() -> statsService.get(userId));
var newsTask = scope.fork(() -> newsService.getLatest());
scope.join().throwIfFailed(); // 等待并检查(链式调用)
return new DashboardData(
profileTask.get(), // Subtask.get() 直接返回结果(已保证成功)
statsTask.get(),
newsTask.get()
);
}
// 若 statsService 抛异常:
// 1. scope 立即发出取消信号给 profileTask 和 newsTask
// 2. throwIfFailed() 重新抛出 statsService 的原始异常
// 3. 调用方收到明确的异常,不会拿到半成品数据
}
与传统方式对比
传统 ExecutorService 方式(任务失败时难以清理):
Future<User> f1 = executor.submit(() -> findUser());
Future<Order> f2 = executor.submit(() -> fetchOrder());
// f1 失败了... f2 继续在后台运行,无人管理
// 需要手动 f2.cancel(true),但 cancel 是尽力而为的
// 若 f2 正在做数据库操作,cancel 可能无效
结构化并发方式(取消自动传播):
scope.fork(() -> findUser()); // 子任务 A
scope.fork(() -> fetchOrder()); // 子任务 B
scope.join(); // 等待
// A 失败 → B 自动收到中断信号 → B 干净退出
// scope 退出前保证所有子任务已完成/取消
ShutdownOnSuccess:竞速模式
ShutdownOnSuccess 策略实现竞速(Race)语义:
任意一个子任务成功完成,scope 立即取消其他所有任务,返回第一个成功的结果。
类似 CompletableFuture.anyOf(),但取消传播更彻底。
// 场景:从多个镜像同时下载,取最快的那个
public String fetchFromFastest(List<String> mirrors)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
for (String mirror : mirrors) {
scope.fork(() -> download(mirror));
}
scope.join(); // 等待第一个成功(其他自动取消)
return scope.result(); // 获取最快的结果
}
// 若所有镜像都失败,result() 抛出异常
}
// 场景:双写验证——新旧系统同时计算,取最先返回的结果验证一致性
String shadowTest() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> legacyService.compute());
scope.fork(() -> newService.compute());
scope.join();
return scope.result();
}
}
自定义 StructuredTaskScope
可以继承 StructuredTaskScope 实现自定义策略,例如:收集所有成功结果(忽略失败),
或在超过一半成功时继续。
// 自定义:收集所有成功结果,忽略失败的任务
class CollectSuccessScope<T> extends StructuredTaskScope<T> {
private final List<T> results = new CopyOnWriteArrayList<>();
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
// 失败任务被忽略,不影响其他任务
}
public List<T> results() { return Collections.unmodifiableList(results); }
}
// 使用
try (var scope = new CollectSuccessScope<String>()) {
urls.forEach(url -> scope.fork(() -> httpGet(url)));
scope.join();
List<String> successes = scope.results(); // 成功的响应列表
}
Scoped Values(作用域值)
ScopedValue(JEP 487,Java 23 预览)是 ThreadLocal 的现代替代品,
专为虚拟线程和结构化并发设计。
ThreadLocal vs ScopedValue
ThreadLocal
线程的私有变量,生命周期与线程相同。可以随时修改(set/get/remove)。在线程池中需要手动 remove() 防止污染。继承性通过 InheritableThreadLocal 实现(但每次 fork 都要拷贝,开销大)。
ScopedValue
不可变绑定(immutable binding),只在特定作用域(代码块)内有效,超出作用域自动清除。子线程自动继承父线程的绑定(无需拷贝)。与结构化并发完美配合,无内存泄漏风险。
import java.lang.ScopedValue;
// 声明 ScopedValue(类似 ThreadLocal 的声明)
public class RequestContext {
public static final ScopedValue<String> USER_ID =
ScopedValue.newInstance();
public static final ScopedValue<String> REQUEST_ID =
ScopedValue.newInstance();
}
// 使用:ScopedValue.where().run() 创建绑定作用域
ScopedValue.where(RequestContext.USER_ID, "alice")
.where(RequestContext.REQUEST_ID, "req-001")
.run(() -> {
System.out.println("用户:" + RequestContext.USER_ID.get()); // "alice"
processRequest(); // 整个调用链都能读到 USER_ID
});
// 作用域退出,绑定自动清除,无需手动 remove()
// 与结构化并发结合:子任务自动继承绑定
ScopedValue.where(RequestContext.USER_ID, "alice")
.run(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> {
// 子虚拟线程自动继承 USER_ID = "alice"
String uid = RequestContext.USER_ID.get(); // "alice"
return processUser(uid);
});
scope.join().throwIfFailed();
}
});
ScopedValue.callWhere():有返回值的作用域
// callWhere 类似 run,但可以返回值
String result = ScopedValue.callWhere(
RequestContext.USER_ID, "alice",
() -> fetchUserData() // 返回结果
);
// 重绑定(rebinding):在嵌套作用域中覆盖值
ScopedValue.where(RequestContext.USER_ID, "alice").run(() -> {
System.out.println(RequestContext.USER_ID.get()); // "alice"
ScopedValue.where(RequestContext.USER_ID, "admin").run(() -> {
System.out.println(RequestContext.USER_ID.get()); // "admin"(内层覆盖)
});
System.out.println(RequestContext.USER_ID.get()); // "alice"(外层恢复)
});
结构化并发完整实战
/**
* 场景:用户购买流程
* 1. 验证用户(查询数据库)
* 2. 检查库存(调用库存服务)
* 3. 计算价格(调用定价服务,可能超时)
* 任何步骤失败 → 取消所有,抛出异常
*/
public PurchaseResult purchase(String userId, String productId)
throws InterruptedException, ExecutionException {
return ScopedValue
.where(RequestContext.USER_ID, userId)
.call(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() ->
userRepo.findById(userId)
.orElseThrow(() -> new UserNotFoundException(userId)));
var stockTask = scope.fork(() ->
inventoryService.checkStock(productId));
var priceTask = scope.fork(() ->
pricingService.calculate(userId, productId));
scope.join(Duration.ofSeconds(5)) // 整体超时5秒
.throwIfFailed();
User user = userTask.get();
Stock stock = stockTask.get();
double price = priceTask.get();
if (!stock.isAvailable()) throw new OutOfStockException(productId);
return orderService.create(user, productId, price);
}
});
}
启用预览特性
结构化并发和 Scoped Values 在 Java 21/23 中是预览特性,需要编译和运行时参数:
- 编译:
javac --enable-preview --release 21 *.java - 运行:
java --enable-preview MyApp - Maven:在 pom.xml 中 maven-compiler-plugin 添加
<compilerArgs><arg>--enable-preview</arg></compilerArgs>
本章小结
- 结构化并发确保子任务的生命周期受父作用域约束,任意失败可自动取消兄弟任务,简化错误处理。
- ShutdownOnFailure:任意子任务失败 → 取消所有 + 抛出异常(「全有或全无」)。
- ShutdownOnSuccess:任意子任务成功 → 取消其他 + 返回结果(竞速模式)。
- 可继承
StructuredTaskScope实现自定义策略(如收集所有成功结果)。 - ScopedValue 是 ThreadLocal 的现代替代,不可变绑定,子任务自动继承,作用域退出自动清除,与结构化并发完美配合。
- 目前(Java 21/23)仍是预览特性,预计 Java 25 转正。