Chapter 05

线程池与 Executor 框架

ThreadPoolExecutor 七大参数、ForkJoinPool 工作窃取、线程池调优最佳实践

为什么需要线程池?

每次 new Thread(task).start() 都会创建一个新的操作系统线程(约 1MB 栈空间)并在任务完成后销毁, 这个过程涉及内核调用,开销可观。线程池通过复用已创建的线程来避免频繁的创建/销毁开销, 并通过队列缓冲待执行任务,实现流量控制。

线程复用
线程完成一个任务后不销毁,而是继续取下一个任务执行,避免重复的线程创建/销毁开销。
资源控制
通过限制最大线程数,防止无限创建线程耗尽内存和 CPU;通过任务队列缓冲高峰期的任务。
统一管理
统一监控线程数、任务数、完成任务数等指标;统一设置线程名称、优先级、异常处理器。

Executor 框架体系

Executor 框架继承体系: Executor(接口) └── ExecutorService(接口) ├── AbstractExecutorService │ └── ThreadPoolExecutor ← 核心实现 │ └── ScheduledThreadPoolExecutor └── ForkJoinPool CompletionService(接口) └── ExecutorCompletionService(实现) Executors(工厂类) ├── newFixedThreadPool() ├── newCachedThreadPool() ├── newSingleThreadExecutor() ├── newScheduledThreadPool() └── newVirtualThreadPerTaskExecutor() ← Java 21

ThreadPoolExecutor 七大核心参数

new ThreadPoolExecutor(
    int    corePoolSize,    // 1. 核心线程数
    int    maximumPoolSize, // 2. 最大线程数
    long   keepAliveTime,   // 3. 空闲线程存活时间
    TimeUnit unit,          // 4. keepAliveTime 的时间单位
    BlockingQueue<Runnable> workQueue, // 5. 任务队列
    ThreadFactory threadFactory,       // 6. 线程工厂
    RejectedExecutionHandler handler    // 7. 拒绝策略
);

参数详解

corePoolSize
核心线程数。线程池的「标配员工」:即使空闲也不会被回收(除非设置 allowCoreThreadTimeOut=true)。任务到来时,若当前线程数 < corePoolSize,直接创建新线程执行(即使有空闲线程)。
maximumPoolSize
最大线程数。当队列已满、线程数 < maximumPoolSize 时,创建「临时工」线程。当任务量下降后,临时工线程在空闲 keepAliveTime 后被回收。
keepAliveTime + unit
临时工线程空闲存活时间。超过这个时间未被使用的非核心线程会被终止回收。
workQueue
任务等待队列。当线程数达到 corePoolSize 时,新任务进入队列排队。不同队列类型影响线程池行为: LinkedBlockingQueue(无界,队列先满不了,maxPoolSize 无意义)、 ArrayBlockingQueue(有界,队列满后创建临时工线程)、 SynchronousQueue(无缓冲,直接移交,常配合大 maxPoolSize)。
threadFactory
创建新线程的工厂。可以自定义线程名称(便于调试)、设置为守护线程、设置优先级、添加 UncaughtExceptionHandler。
handler(拒绝策略)
当队列已满且线程数已达 maximumPoolSize 时,如何处理新提交的任务。Java 内置四种策略(见下文)。

任务调度流程

submit(task) 的执行流程: ┌─────────────────────────────────────────────────────┐ │ 当前线程数 < corePoolSize? │ │ YES → 创建新核心线程执行任务(即使有空闲线程) │ └─────────────────────────────┬───────────────────────┘ │ NO ▼ ┌─────────────────────────────────────────────────────┐ │ 任务队列未满? │ │ YES → 将任务加入队列,等待空闲线程取执行 │ └─────────────────────────────┬───────────────────────┘ │ NO(队列满了) ▼ ┌─────────────────────────────────────────────────────┐ │ 当前线程数 < maximumPoolSize? │ │ YES → 创建临时工线程执行任务 │ └─────────────────────────────┬───────────────────────┘ │ NO(线程数也满了) ▼ 执行拒绝策略 RejectedExecutionHandler

四种内置拒绝策略

AbortPolicy(默认)
直接抛出 RejectedExecutionException。调用方需要捕获处理。适合对任务丢失零容忍的场景(可以感知并处理)。
CallerRunsPolicy
由提交任务的调用方线程直接执行该任务。相当于降级到串行执行,会降低提交速度(反压机制),适合不能丢任务且能接受延迟的场景。
DiscardPolicy
静默丢弃新任务,不抛异常不通知。任务可能无声消失,非常危险,慎用。
DiscardOldestPolicy
丢弃队列中最旧(等待最久)的任务,然后重新提交当前任务。同样有丢任务风险。

自定义线程工厂

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

// 自定义线程工厂:为线程命名,便于 jstack 等工具诊断
public class NamedThreadFactory implements ThreadFactory {
    private final String prefix;
    private final AtomicInteger count = new AtomicInteger(1);
    private final boolean daemon;

    public NamedThreadFactory(String prefix, boolean daemon) {
        this.prefix = prefix;
        this.daemon = daemon;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, prefix + "-" + count.getAndIncrement());
        t.setDaemon(daemon);
        t.setUncaughtExceptionHandler((thread, ex) ->
            System.err.println("线程 " + thread.getName() + " 发生未捕获异常:" + ex));
        return t;
    }
}

// 使用
ExecutorService pool = new ThreadPoolExecutor(
    4, 16, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new NamedThreadFactory("order-processor", false),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

Executors 工厂方法与陷阱

// FixedThreadPool:固定线程数,无界队列 LinkedBlockingQueue(MAX_INT)
// 陷阱:队列无界,任务堆积可能 OOM
ExecutorService fixed = Executors.newFixedThreadPool(4);

// CachedThreadPool:核心=0,最大=MAX_INT,队列=SynchronousQueue
// 陷阱:无限创建线程,高并发下可能 OOM 或线程耗尽
ExecutorService cached = Executors.newCachedThreadPool();

// SingleThreadExecutor:单线程,无界队列,顺序执行
ExecutorService single = Executors.newSingleThreadExecutor();

// ScheduledThreadPool:定时/周期执行
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
scheduled.scheduleAtFixedRate(() -> System.out.println("心跳"),
    0, 5, TimeUnit.SECONDS); // 每5秒执行一次

// Java 21:虚拟线程执行器(无需管理线程池大小)
ExecutorService vPool = Executors.newVirtualThreadPerTaskExecutor();
阿里巴巴规范:禁止使用 Executors 工厂方法

《阿里巴巴 Java 开发手册》强制要求:不允许使用 Executors 创建线程池, 必须通过 new ThreadPoolExecutor() 手动指定所有参数。 原因:工厂方法使用无界队列(FixedThreadPool)或无界线程数(CachedThreadPool), 在生产环境高负载时容易造成 OOM。手动创建时必须明确队列容量和最大线程数上限。

ForkJoinPool

工作窃取算法(Work Stealing)

ForkJoinPool 是专为分治(Divide and Conquer)任务设计的线程池, 使用工作窃取(Work Stealing)算法。每个工作线程有自己的双端队列(Deque):

工作窃取示意: Worker-0 Deque:[T1, T2, T3, T4] ← 自己从头部取 Worker-1 Deque:[] (空闲)→ 从 Worker-0 尾部窃取 T4 Worker-2 Deque:[T5, T6] Worker-3 Deque:[] (空闲)→ 从 Worker-2 尾部窃取 T6

RecursiveTask 和 RecursiveAction

import java.util.concurrent.*;

// RecursiveTask<V>:有返回值的分治任务
public class ParallelSum extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1000; // 分割阈值
    private final long[] array;
    private final int start, end;

    public ParallelSum(long[] array, int start, int end) {
        this.array = array; this.start = start; this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 任务足够小,直接计算
            long sum = 0;
            for (int i = start; i < end; i++) sum += array[i];
            return sum;
        }
        // 任务太大,分成两半
        int mid = (start + end) / 2;
        ParallelSum left  = new ParallelSum(array, start, mid);
        ParallelSum right = new ParallelSum(array, mid, end);
        left.fork();         // 异步执行左半部分(提交到当前线程队列)
        long rightResult = right.compute(); // 同步执行右半部分(当前线程)
        long leftResult  = left.join();  // 等待左半部分结果
        return leftResult + rightResult;
    }
}

// 使用
ForkJoinPool pool = ForkJoinPool.commonPool(); // 全局公共 FJP
long[] data = new long[10_000_000];
Long sum = pool.invoke(new ParallelSum(data, 0, data.length));

// Java 8+ Stream.parallel() 内部就使用 ForkJoinPool.commonPool()
long total = Arrays.stream(data).parallel().sum();

线程池的生命周期管理

ExecutorService pool = new ThreadPoolExecutor(/* ... */);

// 优雅关闭:不接受新任务,等待已提交任务完成
pool.shutdown();

// 等待所有任务完成(最多等 60 秒)
try {
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
        pool.shutdownNow(); // 超时则强制中断
        if (!pool.awaitTermination(60, TimeUnit.SECONDS))
            System.err.println("线程池未能正常关闭");
    }
} catch (InterruptedException e) {
    pool.shutdownNow();
    Thread.currentThread().interrupt();
}

// 立即关闭:发送中断信号给所有线程,返回未执行的任务
List<Runnable> unfinished = pool.shutdownNow();

线程池参数调优

CPU 密集型 vs I/O 密集型

CPU 密集型任务
大量计算(如图像处理、加密解密、科学计算),线程大部分时间都在使用 CPU。 推荐核心线程数 = CPU 核数 + 1(+1 是为了防止偶发 I/O 或页缺失导致 CPU 空闲)。 过多线程会导致频繁上下文切换,反而降低性能。
I/O 密集型任务
大量网络请求、数据库查询、文件读写,线程大部分时间在等待 I/O。 推荐核心线程数 = CPU 核数 × 2,或更高(可通过 CPU 利用率目标公式计算)。 Java 21 后 I/O 密集型场景应优先考虑虚拟线程,完全无需关心线程数。
int cpuCores = Runtime.getRuntime().availableProcessors();

// CPU 密集型
ExecutorService cpuPool = new ThreadPoolExecutor(
    cpuCores + 1, cpuCores + 1,  // core = max(固定线程数)
    0, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(1000),
    new NamedThreadFactory("cpu-worker", false),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// I/O 密集型(传统平台线程)
ExecutorService ioPool = new ThreadPoolExecutor(
    cpuCores * 2, cpuCores * 4,
    60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2000),
    new NamedThreadFactory("io-worker", false),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// Java 21+:I/O 密集型直接用虚拟线程(推荐)
ExecutorService virtualPool = Executors.newVirtualThreadPerTaskExecutor();

线程池监控指标

ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;

System.out.printf("核心线程数: %d%n",  tpe.getCorePoolSize());
System.out.printf("最大线程数: %d%n",  tpe.getMaximumPoolSize());
System.out.printf("当前线程数: %d%n",  tpe.getPoolSize());
System.out.printf("活跃线程数: %d%n",  tpe.getActiveCount());
System.out.printf("队列任务数: %d%n",  tpe.getQueue().size());
System.out.printf("已完成任务: %d%n",  tpe.getCompletedTaskCount());
System.out.printf("历史最大线程: %d%n", tpe.getLargestPoolSize());

线程池最佳实践

核心原则

本章小结