这篇文章我们来聊聊 Java 线程池的使用姿势。主要包含 ThreadPoolExecutor 各种参数的实际效果以及 Spring Boot 中线程池的使用,是一篇非常简单的文章。
虽然 Java 生态在慢慢过渡到虚拟线程,但是这个过程应该还需要好几年,我们还是要好好配置线程池,它往往是生产环境最容易出问题的地方。
ThreadPoolExecutor 参数回顾
先快速回顾一下 ThreadPoolExecutor 的核心参数,这是一切的基础。
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程的空闲存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 任务等待队列
ThreadFactory threadFactory, // 线程创建工厂
RejectedExecutionHandler handler // 拒绝策略
)
任务提交时,线程池的行为逻辑如下:
提交任务
→ 当前线程数 < corePoolSize → 创建新线程执行
→ 当前线程数 >= corePoolSize → 尝试入队
→ 入队成功 → 等待线程来取任务
→ 入队失败(队列已满)
→ 当前线程数 < maximumPoolSize → 创建新线程执行
→ 当前线程数 = maximumPoolSize → 执行拒绝策略
这个流程非常重要,后面理解 Spring 线程池的坑也依赖于此。
我工作这么多年,面试过非常多的候选人,在这个简单的问题上,可能有 50% 的候选人都说不对这个过程。
CachedThreadPool 参数及生效过程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // corePoolSize = 0
Integer.MAX_VALUE, // maximumPoolSize = 几乎无限
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
}
关键在于两点:corePoolSize = 0 和 SynchronousQueue。
SynchronousQueue 是一个没有容量的队列,它不存储元素,每一次入队操作必须等到有线程来取,否则入队失败。也就是说,只要有任务进来,workQueue.offer() 必然失败(除非恰好此时有空闲线程在等待接收),然后线程池就会去创建新线程。
我们来模拟一下任务不断提交时的线程变化:
初始状态:线程池为空,线程数 = 0
提交第 1 个任务:
- 当前线程数 (0) < corePoolSize (0)?不满足
- 尝试入队:SynchronousQueue.offer() 失败(没有线程在等待接收)
- 当前线程数 (0) < maximumPoolSize (Integer.MAX_VALUE)?满足
- 创建第 1 个线程,执行任务
提交第 2 个任务(假设第 1 个任务还没结束):
- 当前线程数 (1) < corePoolSize (0)?不满足
- 尝试入队:SynchronousQueue.offer() 失败(第 1 个线程正在忙,没空接收)
- 当前线程数 (1) < Integer.MAX_VALUE?满足
- 创建第 2 个线程,执行任务
提交第 3 个任务(假设第 1 个任务已完成,第 1 个线程空闲):
- 尝试入队:SynchronousQueue.offer() 成功!(第 1 个线程在 getTask() 里阻塞等待,立即接收了这个任务)
- 不需要创建新线程,复用第 1 个线程
线程回收:任务执行完毕后,线程进入等待状态,调用 workQueue.poll(60s, …) 等待新任务。60 秒内没有新任务到来,poll 超时返回 null,线程被回收。
小结:CachedThreadPool 适合任务量不稳定、任务执行时间较短的场景。它能复用刚刚空闲的线程,也能在高峰期快速扩展。空闲 60 秒后线程自动回收,不会长期占用资源。但如果任务提交速度远超处理速度,线程数会暴增,可能导致 OOM,生产环境慎用无界的 CachedThreadPool。
FixedThreadPool 参数及生效过程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界队列
);
}
核心特点:corePoolSize = maximumPoolSize = n,使用 无界 LinkedBlockingQueue。
线程变化过程(以 nThreads = 3 为例):
提交第 1、2、3 个任务(线程数从 0 增长):
- 每提交一个任务,当前线程数 < corePoolSize (3),直接创建新线程
- 第 1 个任务 → 创建线程 1;第 2 个 → 创建线程 2;第 3 个 → 创建线程 3
提交第 4、5、6 ... 个任务(队列积压):
- 当前线程数 (3) 已等于 corePoolSize (3),不再创建新线程
- 任务进入 LinkedBlockingQueue 排队
- 由于队列无界,无论提交多少任务都不会触发拒绝策略,也不会创建更多线程(maximumPoolSize 的设置在这里完全没有意义)
线程回收:由于 keepAliveTime = 0,且 corePoolSize = maximumPoolSize,线程池始终保持 n 个线程活着,不会回收(除非调用 shutdown)。
小结:FixedThreadPool 适合需要限制并发数量、任务量可能比较大的场景。但要注意无界队列带来的内存风险——如果消费速度跟不上,队列会无限增长。生产环境建议给队列设置一个合理的上限,并配置拒绝策略。
拒绝策略
ThreadPoolExecutor 内置了四种拒绝策略,选择合适的拒绝策略和线程池参数同样重要:
- AbortPolicy 默认策略,抛出 RejectedExecutionException,让调用方自己处理异常
- CallerRunsPolicy,由调用方线程直接执行,能达到背压的效果,降低提交速度
- DiscardPolicy,默默丢弃任务,不报错
- DiscardOldestPolicy,丢弃队列最旧的任务,然后尝试重新提交当前任务
生产环境中,CallerRunsPolicy 是一个相对安全的选择:当线程池饱和时,提交任务的线程自己来执行,自然降低了任务提交速度,避免了无限积压。
Spring 中的线程池
Spring 提供了自己的线程池抽象和实现,在 Spring Boot 项目中,很多时候你甚至不需要自己定义线程池就能使用 @Async。但这里有不少坑,我们一一来说。
TaskExecutor 接口
Spring 提供了 TaskExecutor 接口,它继承自 JDK 的 java.util.concurrent.Executor:
package org.springframework.core.task;
import java.util.concurrent.Executor;
@FunctionalInterface
public interface TaskExecutor extends Executor {
@Override
void execute(Runnable task);
}
接口本身非常简单。Spring 提供了多个实现,我们最常用的是 ThreadPoolTaskExecutor(org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor)。
ThreadPoolTaskExecutor
ThreadPoolTaskExecutor 是对 JDK ThreadPoolExecutor 的包装,提供了更友好的 Spring 风格属性配置:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("app-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
下面介绍它与原生 ThreadPoolExecutor 的关键区别:queueCapacity 的坑
ThreadPoolTaskExecutor 根据 queueCapacity 的值决定底层使用哪种队列:
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
} else {
return new SynchronousQueue<>();
}
}
也就是说,如果 queueCapacity > 0,使用有界的 LinkedBlockingQueue,否则使用 SynchronousQueue。
ThreadPoolTaskExecutor 的默认参数是:
- corePoolSize=1
- maxPoolSize=Integer.MAX_VALUE
- queueCapacity=Integer.MAX_VALUE
看起来 maxPoolSize 设置得很大,但实际上:由于队列容量也是 Integer.MAX_VALUE,所以队列永远不会满,线程数到达 corePoolSize 后就不再增长了。默认配置下,ThreadPoolTaskExecutor 等同于一个单线程 + 无限队列的执行器,maxPoolSize 形同虚设。
这是 Spring 线程池中最常见的配置错误:
// 错误示例:以为配置了 50 个线程,实际最多只有 10 个线程在工作
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50); // 没用!因为队列是无界的
// queueCapacity 没设置,默认 Integer.MAX_VALUE
// ------
// 正确示例:queueCapacity 有限,队列满后才扩展到 maxPoolSize
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200); // 队列装满 200 个任务后,才开始创建线程到 50
扩容顺序:前 10 个并发任务直接由核心线程处理 → 超出的任务进入队列(最多等 200 个)→ 队列满后才扩容至最多 50 个线程 → 50 个线程全忙且队列满 → 执行拒绝策略。
如果你希望行为类似 CachedThreadPool(任务来了立即分配线程),设置 queueCapacity=0:
executor.setCorePoolSize(0);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(0); // 使用 SynchronousQueue,立即移交线程
executor.setKeepAliveSeconds(60);
Spring Boot 自动配置:为什么不定义就能用?
Spring Boot 通过 TaskExecutionAutoConfiguration 为我们自动配置了一个 ThreadPoolTaskExecutor。它生效的条件是:当前容器中没有任何 Executor 类型的 Bean。
@ConditionalOnClass({ThreadPoolTaskExecutor.class})
@AutoConfiguration
@EnableConfigurationProperties({TaskExecutionProperties.class})
public class TaskExecutionAutoConfiguration {
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize()); // 默认是 8
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
Stream var10001 = taskExecutorCustomizers.orderedStream();
var10001.getClass();
builder = builder.customizers(var10001::iterator);
builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(
name = {"applicationTaskExecutor", "taskExecutor"}
)
@ConditionalOnMissingBean({Executor.class})
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
}
自动配置的线程池 Bean 名称是 applicationTaskExecutor(同时注册别名 taskExecutor),这让 @Async 注解可以自动找到它。
Spring Boot 自动配置的默认参数(通过 spring.task.execution.* 控制):
spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=2147483647 # Integer.MAX_VALUE
spring.task.execution.pool.queue-capacity=2147483647 # Integer.MAX_VALUE
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=true
spring.task.execution.thread-name-prefix=task-
spring.task.execution.shutdown.await-termination=true
spring.task.execution.shutdown.await-termination-period=30s
注意:Spring Boot 自动配置的 core-size 默认是 8,和直接 new ThreadPoolTaskExecutor() 的默认 corePoolSize=1 不同。但 queueCapacity 同样是无界的,所以同样存在 maxPoolSize 无效的问题:在绝大多数 Spring Boot 项目里,即使不覆盖配置,线程数最多也只有 8 个。
一个重要的陷阱:@ConditionalOnMissingBean(Executor.class) 检查的是 Executor 接口(JDK 的),而不是 Spring 的 TaskExecutor。这意味着如果你的代码里定义了一个 ThreadPoolExecutor(JDK 的)或者 ScheduledExecutorService 的 Bean,Spring Boot 的自动配置就失效了,也就不会有默认的 ThreadPoolTaskExecutor 可用。
自定义配置示例:
# application.properties
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=50
spring.task.execution.pool.queue-capacity=200
spring.task.execution.thread-name-prefix=myapp-async-
spring.task.execution.shutdown.await-termination=true
spring.task.execution.shutdown.await-termination-period=30s
如果需要多个不同用途的线程池,可以手动定义 Bean 并通过 @Async(“beanName”) 指定:
@Bean("ioTaskExecutor")
public ThreadPoolTaskExecutor ioTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // I/O 密集,核心线程多一些
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("io-task-");
executor.initialize();
return executor;
}
@Bean("cpuTaskExecutor")
public ThreadPoolTaskExecutor cpuTaskExecutor() {
int cpuCount = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuCount); // CPU 密集,线程数 = CPU 核数
executor.setMaxPoolSize(cpuCount * 2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("cpu-task-");
executor.initialize();
return executor;
}
@EnableAsync 与 @Async
在配置类或主类上加 @EnableAsync,Spring 会通过 AOP 代理拦截带有 @Async 注解的方法,将方法的实际执行提交到线程池,调用者立即返回:
@SpringBootApplication
@EnableAsync
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
@Service
public class NotificationService {
@Async
public void sendEmail(String address) {
// 在线程池中异步执行,调用者立即返回
doSendEmail(address);
}
@Async
public CompletableFuture<String> fetchRemoteData(String url) {
// 异步执行,通过 CompletableFuture 返回结果
String data = doFetch(url);
return CompletableFuture.completedFuture(data);
}
// 使用指定的线程池
@Async("ioTaskExecutor")
public void handleIoTask() {
// ...
}
}
需要注意的是,返回值只能是 void、Future 或 CompletableFuture
当 @Async 没有指定线程池名称时,Spring 按以下顺序查找:
- AsyncConfigurer.getAsyncExecutor() 返回的 Executor
- 容器中唯一的 TaskExecutor Bean
- 容器中名为 “taskExecutor” 的 Bean(Spring Boot 自动配置的 “applicationTaskExecutor” 就有此别名)
- 降级使用 SimpleAsyncTaskExecutor(每次新建线程,无复用,生产环境不应依赖此降级)
如果想统一自定义异步执行器和异常处理,可以实现 AsyncConfigurer:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("async-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// @Async void 方法抛出的异常,调用者无法感知,需要在这里处理
return (ex, method, params) ->
log.error("Async method {} threw exception: {}", method.getName(), ex.getMessage(), ex);
}
}
TaskScheduler:定时任务专用线程池
说完 TaskExecutor,顺便提一下 TaskScheduler。有些同学会把定时任务用的 TaskScheduler 和异步执行用的 TaskExecutor 混为一谈,这里简单区分一下。
TaskScheduler 是 Spring 专门用于定时任务的接口,支持延迟执行、固定频率、固定延迟和 Cron 表达式,和 @Scheduled 注解配合使用:
@Component
public class ReportTask {
// 固定延迟:上次执行完成后 5 秒再执行
@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
public void generateReport() { ... }
// 固定频率:每 10 秒触发一次(不等上次是否完成)
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
public void syncData() { ... }
// Cron 表达式
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点
public void archiveData() { ... }
}
ThreadPoolTaskScheduler 是最常用的 TaskScheduler 实现,底层封装 ScheduledExecutorService。
一个重要的坑:ThreadPoolTaskScheduler 的默认线程数是 1。这意味着所有的 @Scheduled 任务共享同一个线程串行执行。如果一个定时任务执行时间较长,会影响其他所有定时任务的准时触发。
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5); // 根据你的定时任务数量来配置
scheduler.setThreadNamePrefix("scheduled-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
return scheduler;
}
Spring Boot 也提供了配置属性:
spring.task.scheduling.pool.size=5
spring.task.scheduling.thread-name-prefix=scheduling-
spring.task.scheduling.shutdown.await-termination=true
spring.task.scheduling.shutdown.await-termination-period=30s
TaskExecutor 和 TaskScheduler 的区别一句话总结:@Async 用 TaskExecutor,@Scheduled 用 TaskScheduler,两者相互独立,不要混用。
但是有一个问题就是 TaskScheduler 的默认实现 ThreadPoolTaskScheduler,它其实也实现了 TaskExecutor,所以在一些配置复杂的场景中,它可能会被当作 TaskExecutor 来使用。我们要避免 @Async 底层实际使用 ThreadPoolTaskScheduler,导致出现预期之外的情况。
总结
- CachedThreadPool 适合短时任务,线程按需创建、60 秒空闲后回收;FixedThreadPool 保持固定线程数,超出的任务进无界队列排队。生产环境中,建议手动配置 ThreadPoolExecutor,设置有界队列和合理的拒绝策略,而不是直接使用 Executors 的工厂方法。
- Spring ThreadPoolTaskExecutor 的最大坑:queueCapacity 默认是 Integer.MAX_VALUE,导致 maxPoolSize 形同虚设。生产环境一定要显式设置有限的 queueCapacity,才能让 maxPoolSize 真正生效。
- Spring Boot 自动配置:容器中没有任何 Executor 类型的 Bean 时,Spring Boot 自动配置一个 applicationTaskExecutor(别名 taskExecutor),默认 corePoolSize=8,可通过 spring.task.execution.* 属性覆盖,@Async 注解可以自动找到它。
- TaskScheduler vs TaskExecutor:@Async 配合 TaskExecutor,@Scheduled 配合 TaskScheduler,两者独立,但是要确保不会出现混用的情况;ThreadPoolTaskScheduler 默认线程数为 1,多个定时任务时需要增加 poolSize。
(全文完)
0 条评论