Chapter 06

CompletableFuture 异步编程

掌握链式组合、异常处理、并行聚合,用 CompletableFuture 构建高效异步管道

Future 的局限与 CompletableFuture 的改进

Java 5 引入的 Future<T> 是表示异步计算结果的基础接口,但有几个关键局限:

Java 8 引入的 CompletableFuture<T> 同时实现了 Future<T>CompletionStage<T> 接口,解决了上述所有问题,支持函数式风格的链式异步编程。

创建 CompletableFuture

import java.util.concurrent.*;

// 1. 无返回值的异步任务(使用 ForkJoinPool.commonPool())
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    System.out.println("异步执行,无返回值");
});

// 2. 有返回值的异步任务
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
    return "Hello from async";
});

// 3. 指定自定义执行器(推荐,不占用公共 ForkJoinPool)
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
    return 42;
}, executor);

// 4. 手动创建并完成(常用于测试或已知结果的场景)
CompletableFuture<String> completed = CompletableFuture.completedFuture("立即完成");
CompletableFuture<String> failed = CompletableFuture.failedFuture(new RuntimeException("失败了"));

// 5. 手动控制完成时机
CompletableFuture<String> manual = new CompletableFuture<>();
new Thread(() -> {
    // 某个时刻主动完成它
    manual.complete("手动完成的结果");
    // 或异常完成:manual.completeExceptionally(new Exception("出错"));
}).start();

转换与链式操作

thenApply:转换结果(同步)

// thenApply 类似 Stream.map(),对结果进行转换,在完成线程中执行
CompletableFuture<Integer> result = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(String::length)      // "Hello" → 5
    .thenApply(n -> n * n);          // 5 → 25

result.thenAccept(v -> System.out.println("最终结果:" + v)); // 25

// thenApplyAsync:在新的异步线程中执行(默认 ForkJoinPool 或指定 executor)
CompletableFuture<String> async = CompletableFuture
    .supplyAsync(() -> fetchData())
    .thenApplyAsync(data -> processData(data), executor);

thenCompose:扁平化异步链(flatMap)

// thenCompose 用于将两个异步操作顺序连接(避免嵌套 CompletableFuture)
// 类似 Stream.flatMap()

// 错误示例(嵌套):
CompletableFuture<CompletableFuture<User>> nested = CompletableFuture
    .supplyAsync(() -> getUserId())
    .thenApply(id -> getUserAsync(id)); // getUserAsync 返回 CompletableFuture,导致嵌套

// 正确示例(thenCompose 自动解包):
CompletableFuture<User> user = CompletableFuture
    .supplyAsync(() -> getUserId())     // CF<String>
    .thenCompose(id -> getUserAsync(id))   // id → CF<User>,解包为 CF<User>
    .thenCompose(u -> getOrderAsync(u.id)); // u → CF<Order>

// 实战:串行依赖查询
CompletableFuture<String> pipeline =
    CompletableFuture.supplyAsync(() -> queryUserId("alice"))
    .thenCompose(uid -> queryProfile(uid))
    .thenCompose(profile -> buildResponse(profile));

thenCombine:合并两个独立的异步结果

// thenCombine:两个独立任务并行执行,完成后合并结果
CompletableFuture<String> userCF =
    CompletableFuture.supplyAsync(() -> fetchUserInfo("alice"));
CompletableFuture<List<Order>> orderCF =
    CompletableFuture.supplyAsync(() -> fetchOrders("alice"));

// 两个任务并行,等两者都完成后合并
CompletableFuture<UserProfile> profile = userCF.thenCombine(orderCF,
    (user, orders) -> new UserProfile(user, orders)); // 合并函数

// thenAcceptBoth:合并但无返回值
userCF.thenAcceptBoth(orderCF, (user, orders) ->
    System.out.println(user + "有" + orders.size() + "个订单"));

allOf 与 anyOf:批量协调

import java.util.*;
import java.util.stream.*;

List<String> urls = List.of(
    "https://api1.example.com",
    "https://api2.example.com",
    "https://api3.example.com"
);

// 并行发起所有请求
List<CompletableFuture<String>> futures = urls.stream()
    .map(url -> CompletableFuture.supplyAsync(() -> httpGet(url)))
    .collect(Collectors.toList());

// allOf:等待所有任务完成(返回 CF<Void>,需手动收集结果)
CompletableFuture<Void> allDone = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0]));

// 完成后收集所有结果
CompletableFuture<List<String>> allResults = allDone.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join)  // 此时所有已完成,join() 不会阻塞
        .collect(Collectors.toList())
);

List<String> responses = allResults.get(); // 获取所有响应

// anyOf:任意一个完成就继续(竞速模式)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
    futures.toArray(new CompletableFuture[0]));
String first = (String) fastest.get(); // 最快返回的结果

异常处理

// 方式1:exceptionally —— 发生异常时提供默认值(类似 Optional.orElse)
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("随机失败");
        return "成功";
    })
    .exceptionally(ex -> {
        System.err.println("发生异常:" + ex.getMessage());
        return "默认值"; // 异常时的降级值
    });

// 方式2:handle —— 无论正常还是异常都会执行(可以检查两者)
CompletableFuture<String> cf2 = CompletableFuture
    .supplyAsync(() -> riskyOperation())
    .handle((result, ex) -> {
        if (ex != null) {
            System.err.println("异常:" + ex);
            return "fallback";
        }
        return result.toUpperCase();
    });

// 方式3:whenComplete —— 副作用(日志、指标),不改变结果
CompletableFuture<String> cf3 = CompletableFuture
    .supplyAsync(() -> fetchData())
    .whenComplete((result, ex) -> {
        if (ex != null) metrics.recordError(ex);
        else metrics.recordSuccess();
        // whenComplete 不改变原始 CF 的结果/异常
    });

// 方式4:获取时异常处理
try {
    String result = cf.get(); // 抛出 ExecutionException(包装原始异常)
} catch (ExecutionException e) {
    Throwable cause = e.getCause(); // 解包获取原始异常
}

超时控制(Java 9+)

// Java 9 新增:orTimeout —— 超时则以 TimeoutException 完成
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> slowOperation())
    .orTimeout(3, TimeUnit.SECONDS); // 3秒超时

// Java 9:completeOnTimeout —— 超时则以默认值完成(不抛异常)
CompletableFuture<String> cfWithDefault = CompletableFuture
    .supplyAsync(() -> slowOperation())
    .completeOnTimeout("默认响应", 3, TimeUnit.SECONDS);

实战:并行聚合电商数据

/**
 * 场景:展示商品详情页,需要并行获取:
 * 1. 商品基本信息
 * 2. 库存信息
 * 3. 评价列表(依赖商品 ID)
 * 4. 推荐商品(可超时降级)
 */
public ProductPage buildProductPage(String productId) throws Exception {
    // 并行发起独立查询
    CompletableFuture<Product> productCF =
        CompletableFuture.supplyAsync(() -> productService.get(productId));

    CompletableFuture<Stock> stockCF =
        CompletableFuture.supplyAsync(() -> stockService.get(productId));

    // 评价依赖商品(串行)
    CompletableFuture<List<Review>> reviewsCF = productCF
        .thenCompose(p -> CompletableFuture.supplyAsync(() ->
            reviewService.getTop10(p.getId())));

    // 推荐商品:3秒超时降级为空列表
    CompletableFuture<List<Product>> recommendCF =
        CompletableFuture.supplyAsync(() -> recommendService.get(productId))
            .completeOnTimeout(List.of(), 3, TimeUnit.SECONDS);

    // 等待所有查询完成
    CompletableFuture.allOf(productCF, stockCF, reviewsCF, recommendCF).get();

    return new ProductPage(
        productCF.join(),
        stockCF.join(),
        reviewsCF.join(),
        recommendCF.join()
    );
}

方法全景表

方法类型描述
supplyAsync(supplier)创建异步执行有返回值任务
runAsync(runnable)创建异步执行无返回值任务
thenApply(fn)转换结果转换,同步执行
thenApplyAsync(fn)转换结果转换,异步执行
thenAccept(consumer)消费消费结果,无返回值
thenRun(runnable)顺序完成后执行,不关心结果
thenCompose(fn)组合串联另一个 CF(flatMap)
thenCombine(cf, fn)组合合并两个 CF 的结果
allOf(cfs...)聚合等待所有 CF 完成
anyOf(cfs...)聚合任意一个 CF 完成
exceptionally(fn)异常异常时提供降级值
handle(fn)异常正常/异常都执行,可转换
whenComplete(fn)异常正常/异常都执行,不改变结果
orTimeout(t, unit)超时超时抛 TimeoutException
completeOnTimeout(v, t, unit)超时超时使用默认值完成
常见陷阱

本章小结