为什么需要线程池?
每次 new Thread(task).start() 都会创建一个新的操作系统线程(约 1MB 栈空间)并在任务完成后销毁,
这个过程涉及内核调用,开销可观。线程池通过复用已创建的线程来避免频繁的创建/销毁开销,
并通过队列缓冲待执行任务,实现流量控制。
线程复用
线程完成一个任务后不销毁,而是继续取下一个任务执行,避免重复的线程创建/销毁开销。
资源控制
通过限制最大线程数,防止无限创建线程耗尽内存和 CPU;通过任务队列缓冲高峰期的任务。
统一管理
统一监控线程数、任务数、完成任务数等指标;统一设置线程名称、优先级、异常处理器。
Executor 框架体系
Executor 框架继承体系:
Executor(接口)
└── ExecutorService(接口)
├── AbstractExecutorService
│ └── ThreadPoolExecutor ← 核心实现
│ └── ScheduledThreadPoolExecutor
└── ForkJoinPool
CompletionService(接口)
└── ExecutorCompletionService(实现)
Executors(工厂类)
├── newFixedThreadPool()
├── newCachedThreadPool()
├── newSingleThreadExecutor()
├── newScheduledThreadPool()
└── newVirtualThreadPerTaskExecutor() ← Java 21
ThreadPoolExecutor 七大核心参数
new ThreadPoolExecutor(
int corePoolSize, // 1. 核心线程数
int maximumPoolSize, // 2. 最大线程数
long keepAliveTime, // 3. 空闲线程存活时间
TimeUnit unit, // 4. keepAliveTime 的时间单位
BlockingQueue<Runnable> workQueue, // 5. 任务队列
ThreadFactory threadFactory, // 6. 线程工厂
RejectedExecutionHandler handler // 7. 拒绝策略
);
参数详解
corePoolSize
核心线程数。线程池的「标配员工」:即使空闲也不会被回收(除非设置 allowCoreThreadTimeOut=true)。任务到来时,若当前线程数 < corePoolSize,直接创建新线程执行(即使有空闲线程)。
maximumPoolSize
最大线程数。当队列已满、线程数 < maximumPoolSize 时,创建「临时工」线程。当任务量下降后,临时工线程在空闲 keepAliveTime 后被回收。
keepAliveTime + unit
临时工线程空闲存活时间。超过这个时间未被使用的非核心线程会被终止回收。
workQueue
任务等待队列。当线程数达到 corePoolSize 时,新任务进入队列排队。不同队列类型影响线程池行为:
LinkedBlockingQueue(无界,队列先满不了,maxPoolSize 无意义)、
ArrayBlockingQueue(有界,队列满后创建临时工线程)、
SynchronousQueue(无缓冲,直接移交,常配合大 maxPoolSize)。
threadFactory
创建新线程的工厂。可以自定义线程名称(便于调试)、设置为守护线程、设置优先级、添加 UncaughtExceptionHandler。
handler(拒绝策略)
当队列已满且线程数已达 maximumPoolSize 时,如何处理新提交的任务。Java 内置四种策略(见下文)。
任务调度流程
submit(task) 的执行流程:
┌─────────────────────────────────────────────────────┐
│ 当前线程数 < corePoolSize? │
│ YES → 创建新核心线程执行任务(即使有空闲线程) │
└─────────────────────────────┬───────────────────────┘
│ NO
▼
┌─────────────────────────────────────────────────────┐
│ 任务队列未满? │
│ YES → 将任务加入队列,等待空闲线程取执行 │
└─────────────────────────────┬───────────────────────┘
│ NO(队列满了)
▼
┌─────────────────────────────────────────────────────┐
│ 当前线程数 < maximumPoolSize? │
│ YES → 创建临时工线程执行任务 │
└─────────────────────────────┬───────────────────────┘
│ NO(线程数也满了)
▼
执行拒绝策略 RejectedExecutionHandler
四种内置拒绝策略
AbortPolicy(默认)
直接抛出 RejectedExecutionException。调用方需要捕获处理。适合对任务丢失零容忍的场景(可以感知并处理)。
CallerRunsPolicy
由提交任务的调用方线程直接执行该任务。相当于降级到串行执行,会降低提交速度(反压机制),适合不能丢任务且能接受延迟的场景。
DiscardPolicy
静默丢弃新任务,不抛异常不通知。任务可能无声消失,非常危险,慎用。
DiscardOldestPolicy
丢弃队列中最旧(等待最久)的任务,然后重新提交当前任务。同样有丢任务风险。
自定义线程工厂
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
// 自定义线程工厂:为线程命名,便于 jstack 等工具诊断
public class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger count = new AtomicInteger(1);
private final boolean daemon;
public NamedThreadFactory(String prefix, boolean daemon) {
this.prefix = prefix;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + count.getAndIncrement());
t.setDaemon(daemon);
t.setUncaughtExceptionHandler((thread, ex) ->
System.err.println("线程 " + thread.getName() + " 发生未捕获异常:" + ex));
return t;
}
}
// 使用
ExecutorService pool = new ThreadPoolExecutor(
4, 16, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("order-processor", false),
new ThreadPoolExecutor.CallerRunsPolicy()
);
Executors 工厂方法与陷阱
// FixedThreadPool:固定线程数,无界队列 LinkedBlockingQueue(MAX_INT)
// 陷阱:队列无界,任务堆积可能 OOM
ExecutorService fixed = Executors.newFixedThreadPool(4);
// CachedThreadPool:核心=0,最大=MAX_INT,队列=SynchronousQueue
// 陷阱:无限创建线程,高并发下可能 OOM 或线程耗尽
ExecutorService cached = Executors.newCachedThreadPool();
// SingleThreadExecutor:单线程,无界队列,顺序执行
ExecutorService single = Executors.newSingleThreadExecutor();
// ScheduledThreadPool:定时/周期执行
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
scheduled.scheduleAtFixedRate(() -> System.out.println("心跳"),
0, 5, TimeUnit.SECONDS); // 每5秒执行一次
// Java 21:虚拟线程执行器(无需管理线程池大小)
ExecutorService vPool = Executors.newVirtualThreadPerTaskExecutor();
阿里巴巴规范:禁止使用 Executors 工厂方法
《阿里巴巴 Java 开发手册》强制要求:不允许使用 Executors 创建线程池,
必须通过 new ThreadPoolExecutor() 手动指定所有参数。
原因:工厂方法使用无界队列(FixedThreadPool)或无界线程数(CachedThreadPool),
在生产环境高负载时容易造成 OOM。手动创建时必须明确队列容量和最大线程数上限。
ForkJoinPool
工作窃取算法(Work Stealing)
ForkJoinPool 是专为分治(Divide and Conquer)任务设计的线程池,
使用工作窃取(Work Stealing)算法。每个工作线程有自己的双端队列(Deque):
- 线程优先从自己队列的头部取任务执行(LIFO 顺序,利用缓存局部性)。
- 当自己队列为空时,从其他线程队列的尾部「窃取」任务(FIFO 顺序,减少竞争)。
- 这样空闲线程不会等待,总能找到工作,CPU 利用率极高。
工作窃取示意:
Worker-0 Deque:[T1, T2, T3, T4] ← 自己从头部取
Worker-1 Deque:[] (空闲)→ 从 Worker-0 尾部窃取 T4
Worker-2 Deque:[T5, T6]
Worker-3 Deque:[] (空闲)→ 从 Worker-2 尾部窃取 T6
RecursiveTask 和 RecursiveAction
import java.util.concurrent.*;
// RecursiveTask<V>:有返回值的分治任务
public class ParallelSum extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 分割阈值
private final long[] array;
private final int start, end;
public ParallelSum(long[] array, int start, int end) {
this.array = array; this.start = start; this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 任务足够小,直接计算
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
}
// 任务太大,分成两半
int mid = (start + end) / 2;
ParallelSum left = new ParallelSum(array, start, mid);
ParallelSum right = new ParallelSum(array, mid, end);
left.fork(); // 异步执行左半部分(提交到当前线程队列)
long rightResult = right.compute(); // 同步执行右半部分(当前线程)
long leftResult = left.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}
// 使用
ForkJoinPool pool = ForkJoinPool.commonPool(); // 全局公共 FJP
long[] data = new long[10_000_000];
Long sum = pool.invoke(new ParallelSum(data, 0, data.length));
// Java 8+ Stream.parallel() 内部就使用 ForkJoinPool.commonPool()
long total = Arrays.stream(data).parallel().sum();
线程池的生命周期管理
ExecutorService pool = new ThreadPoolExecutor(/* ... */);
// 优雅关闭:不接受新任务,等待已提交任务完成
pool.shutdown();
// 等待所有任务完成(最多等 60 秒)
try {
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 超时则强制中断
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("线程池未能正常关闭");
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
// 立即关闭:发送中断信号给所有线程,返回未执行的任务
List<Runnable> unfinished = pool.shutdownNow();
线程池参数调优
CPU 密集型 vs I/O 密集型
CPU 密集型任务
大量计算(如图像处理、加密解密、科学计算),线程大部分时间都在使用 CPU。
推荐核心线程数 = CPU 核数 + 1(+1 是为了防止偶发 I/O 或页缺失导致 CPU 空闲)。
过多线程会导致频繁上下文切换,反而降低性能。
I/O 密集型任务
大量网络请求、数据库查询、文件读写,线程大部分时间在等待 I/O。
推荐核心线程数 = CPU 核数 × 2,或更高(可通过 CPU 利用率目标公式计算)。
Java 21 后 I/O 密集型场景应优先考虑虚拟线程,完全无需关心线程数。
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU 密集型
ExecutorService cpuPool = new ThreadPoolExecutor(
cpuCores + 1, cpuCores + 1, // core = max(固定线程数)
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("cpu-worker", false),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// I/O 密集型(传统平台线程)
ExecutorService ioPool = new ThreadPoolExecutor(
cpuCores * 2, cpuCores * 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
new NamedThreadFactory("io-worker", false),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// Java 21+:I/O 密集型直接用虚拟线程(推荐)
ExecutorService virtualPool = Executors.newVirtualThreadPerTaskExecutor();
线程池监控指标
ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
System.out.printf("核心线程数: %d%n", tpe.getCorePoolSize());
System.out.printf("最大线程数: %d%n", tpe.getMaximumPoolSize());
System.out.printf("当前线程数: %d%n", tpe.getPoolSize());
System.out.printf("活跃线程数: %d%n", tpe.getActiveCount());
System.out.printf("队列任务数: %d%n", tpe.getQueue().size());
System.out.printf("已完成任务: %d%n", tpe.getCompletedTaskCount());
System.out.printf("历史最大线程: %d%n", tpe.getLargestPoolSize());
线程池最佳实践
核心原则
- 不同业务场景用不同线程池:不要所有任务共用一个线程池,防止互相影响(慢任务占满线程,快任务饿死)。
- 使用有界队列:防止任务无限堆积导致 OOM。
- 合理命名线程:便于 jstack、Arthas 等工具诊断问题。
- 处理未捕获异常:在 ThreadFactory 中设置 UncaughtExceptionHandler,或在任务中 try-catch。
- 优雅关闭:JVM 关闭时调用 shutdown() + awaitTermination(),防止任务丢失。
- Java 21+ I/O 密集型用虚拟线程:无需调优线程数,直接使用 newVirtualThreadPerTaskExecutor()。
本章小结
- ThreadPoolExecutor 七大参数:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。
- 任务调度流程:线程数 < core → 创建核心线程;队列未满 → 入队;队列满且 < max → 创建临时工;否则执行拒绝策略。
- 四种拒绝策略:AbortPolicy(抛异常)、CallerRunsPolicy(调用方执行)、DiscardPolicy(静默丢弃)、DiscardOldestPolicy(丢最旧)。
- ForkJoinPool 用工作窃取算法实现 CPU 密集型分治任务的并行执行,Java Stream parallel() 底层使用它。
- CPU 密集型推荐线程数 = CPU 核 + 1;I/O 密集型推荐虚拟线程(Java 21+)。