Future 的局限与 CompletableFuture 的改进
Java 5 引入的 Future<T> 是表示异步计算结果的基础接口,但有几个关键局限:
- 阻塞的 get():获取结果只能调用 get(),会阻塞当前线程,无法实现真正的非阻塞编程。
- 无回调机制:不能在任务完成后自动执行回调,必须主动 get() 轮询或阻塞。
- 无法链式组合:无法将多个 Future 组合为依赖关系(A 完成后执行 B,B 完成后执行 C)。
- 无法手动完成:无法从外部主动触发 Future 的完成。
Java 8 引入的 CompletableFuture<T> 同时实现了 Future<T>
和 CompletionStage<T> 接口,解决了上述所有问题,支持函数式风格的链式异步编程。
创建 CompletableFuture
import java.util.concurrent.*;
// 1. 无返回值的异步任务(使用 ForkJoinPool.commonPool())
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("异步执行,无返回值");
});
// 2. 有返回值的异步任务
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
return "Hello from async";
});
// 3. 指定自定义执行器(推荐,不占用公共 ForkJoinPool)
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
return 42;
}, executor);
// 4. 手动创建并完成(常用于测试或已知结果的场景)
CompletableFuture<String> completed = CompletableFuture.completedFuture("立即完成");
CompletableFuture<String> failed = CompletableFuture.failedFuture(new RuntimeException("失败了"));
// 5. 手动控制完成时机
CompletableFuture<String> manual = new CompletableFuture<>();
new Thread(() -> {
// 某个时刻主动完成它
manual.complete("手动完成的结果");
// 或异常完成:manual.completeExceptionally(new Exception("出错"));
}).start();
转换与链式操作
thenApply:转换结果(同步)
// thenApply 类似 Stream.map(),对结果进行转换,在完成线程中执行
CompletableFuture<Integer> result = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(String::length) // "Hello" → 5
.thenApply(n -> n * n); // 5 → 25
result.thenAccept(v -> System.out.println("最终结果:" + v)); // 25
// thenApplyAsync:在新的异步线程中执行(默认 ForkJoinPool 或指定 executor)
CompletableFuture<String> async = CompletableFuture
.supplyAsync(() -> fetchData())
.thenApplyAsync(data -> processData(data), executor);
thenCompose:扁平化异步链(flatMap)
// thenCompose 用于将两个异步操作顺序连接(避免嵌套 CompletableFuture)
// 类似 Stream.flatMap()
// 错误示例(嵌套):
CompletableFuture<CompletableFuture<User>> nested = CompletableFuture
.supplyAsync(() -> getUserId())
.thenApply(id -> getUserAsync(id)); // getUserAsync 返回 CompletableFuture,导致嵌套
// 正确示例(thenCompose 自动解包):
CompletableFuture<User> user = CompletableFuture
.supplyAsync(() -> getUserId()) // CF<String>
.thenCompose(id -> getUserAsync(id)) // id → CF<User>,解包为 CF<User>
.thenCompose(u -> getOrderAsync(u.id)); // u → CF<Order>
// 实战:串行依赖查询
CompletableFuture<String> pipeline =
CompletableFuture.supplyAsync(() -> queryUserId("alice"))
.thenCompose(uid -> queryProfile(uid))
.thenCompose(profile -> buildResponse(profile));
thenCombine:合并两个独立的异步结果
// thenCombine:两个独立任务并行执行,完成后合并结果
CompletableFuture<String> userCF =
CompletableFuture.supplyAsync(() -> fetchUserInfo("alice"));
CompletableFuture<List<Order>> orderCF =
CompletableFuture.supplyAsync(() -> fetchOrders("alice"));
// 两个任务并行,等两者都完成后合并
CompletableFuture<UserProfile> profile = userCF.thenCombine(orderCF,
(user, orders) -> new UserProfile(user, orders)); // 合并函数
// thenAcceptBoth:合并但无返回值
userCF.thenAcceptBoth(orderCF, (user, orders) ->
System.out.println(user + "有" + orders.size() + "个订单"));
allOf 与 anyOf:批量协调
import java.util.*;
import java.util.stream.*;
List<String> urls = List.of(
"https://api1.example.com",
"https://api2.example.com",
"https://api3.example.com"
);
// 并行发起所有请求
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> httpGet(url)))
.collect(Collectors.toList());
// allOf:等待所有任务完成(返回 CF<Void>,需手动收集结果)
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// 完成后收集所有结果
CompletableFuture<List<String>> allResults = allDone.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // 此时所有已完成,join() 不会阻塞
.collect(Collectors.toList())
);
List<String> responses = allResults.get(); // 获取所有响应
// anyOf:任意一个完成就继续(竞速模式)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
futures.toArray(new CompletableFuture[0]));
String first = (String) fastest.get(); // 最快返回的结果
异常处理
// 方式1:exceptionally —— 发生异常时提供默认值(类似 Optional.orElse)
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("随机失败");
return "成功";
})
.exceptionally(ex -> {
System.err.println("发生异常:" + ex.getMessage());
return "默认值"; // 异常时的降级值
});
// 方式2:handle —— 无论正常还是异常都会执行(可以检查两者)
CompletableFuture<String> cf2 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((result, ex) -> {
if (ex != null) {
System.err.println("异常:" + ex);
return "fallback";
}
return result.toUpperCase();
});
// 方式3:whenComplete —— 副作用(日志、指标),不改变结果
CompletableFuture<String> cf3 = CompletableFuture
.supplyAsync(() -> fetchData())
.whenComplete((result, ex) -> {
if (ex != null) metrics.recordError(ex);
else metrics.recordSuccess();
// whenComplete 不改变原始 CF 的结果/异常
});
// 方式4:获取时异常处理
try {
String result = cf.get(); // 抛出 ExecutionException(包装原始异常)
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 解包获取原始异常
}
超时控制(Java 9+)
// Java 9 新增:orTimeout —— 超时则以 TimeoutException 完成
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(3, TimeUnit.SECONDS); // 3秒超时
// Java 9:completeOnTimeout —— 超时则以默认值完成(不抛异常)
CompletableFuture<String> cfWithDefault = CompletableFuture
.supplyAsync(() -> slowOperation())
.completeOnTimeout("默认响应", 3, TimeUnit.SECONDS);
实战:并行聚合电商数据
/**
* 场景:展示商品详情页,需要并行获取:
* 1. 商品基本信息
* 2. 库存信息
* 3. 评价列表(依赖商品 ID)
* 4. 推荐商品(可超时降级)
*/
public ProductPage buildProductPage(String productId) throws Exception {
// 并行发起独立查询
CompletableFuture<Product> productCF =
CompletableFuture.supplyAsync(() -> productService.get(productId));
CompletableFuture<Stock> stockCF =
CompletableFuture.supplyAsync(() -> stockService.get(productId));
// 评价依赖商品(串行)
CompletableFuture<List<Review>> reviewsCF = productCF
.thenCompose(p -> CompletableFuture.supplyAsync(() ->
reviewService.getTop10(p.getId())));
// 推荐商品:3秒超时降级为空列表
CompletableFuture<List<Product>> recommendCF =
CompletableFuture.supplyAsync(() -> recommendService.get(productId))
.completeOnTimeout(List.of(), 3, TimeUnit.SECONDS);
// 等待所有查询完成
CompletableFuture.allOf(productCF, stockCF, reviewsCF, recommendCF).get();
return new ProductPage(
productCF.join(),
stockCF.join(),
reviewsCF.join(),
recommendCF.join()
);
}
方法全景表
| 方法 | 类型 | 描述 |
|---|---|---|
| supplyAsync(supplier) | 创建 | 异步执行有返回值任务 |
| runAsync(runnable) | 创建 | 异步执行无返回值任务 |
| thenApply(fn) | 转换 | 结果转换,同步执行 |
| thenApplyAsync(fn) | 转换 | 结果转换,异步执行 |
| thenAccept(consumer) | 消费 | 消费结果,无返回值 |
| thenRun(runnable) | 顺序 | 完成后执行,不关心结果 |
| thenCompose(fn) | 组合 | 串联另一个 CF(flatMap) |
| thenCombine(cf, fn) | 组合 | 合并两个 CF 的结果 |
| allOf(cfs...) | 聚合 | 等待所有 CF 完成 |
| anyOf(cfs...) | 聚合 | 任意一个 CF 完成 |
| exceptionally(fn) | 异常 | 异常时提供降级值 |
| handle(fn) | 异常 | 正常/异常都执行,可转换 |
| whenComplete(fn) | 异常 | 正常/异常都执行,不改变结果 |
| orTimeout(t, unit) | 超时 | 超时抛 TimeoutException |
| completeOnTimeout(v, t, unit) | 超时 | 超时使用默认值完成 |
常见陷阱
- 在主线程 get() 阻塞:CompletableFuture 的 get() 仍然是阻塞的,在高性能场景应避免,改用回调链(thenApply/thenAccept)。
- 默认使用 ForkJoinPool.commonPool():不指定 executor 时使用公共池,可能被其他任务(如 parallel stream)占满。生产环境应指定独立的 executor。
- 异常被吞掉:不加 exceptionally/handle 时,异常会被封装在 CF 中,只有 get() 时才抛出 ExecutionException。务必处理异常。
- join() vs get():
join()抛出非受检异常(CompletionException),get()抛出受检的 ExecutionException,在 Lambda 中使用 join() 更方便。
本章小结
- CompletableFuture 弥补了 Future 不支持回调和链式组合的缺陷,提供了丰富的函数式 API。
- thenApply 转换结果,thenCompose 串联异步操作(避免嵌套),thenCombine 合并两个独立结果。
- allOf 等待所有任务完成,anyOf 等待任意一个完成(竞速)。
- 异常处理:exceptionally 提供降级,handle 统一处理,whenComplete 做副作用。
- Java 9+ 的 orTimeout/completeOnTimeout 优雅处理超时降级。