这篇文章我们来聊聊 Java 线程池的使用姿势。主要包含 ThreadPoolExecutor 各种参数的实际效果以及 Spring Boot 中线程池的使用,是一篇非常简单的文章。

虽然 Java 生态在慢慢过渡到虚拟线程,但是这个过程应该还需要好几年,我们还是要好好配置线程池,它往往是生产环境最容易出问题的地方。

ThreadPoolExecutor 参数回顾

先快速回顾一下 ThreadPoolExecutor 的核心参数,这是一切的基础。

public ThreadPoolExecutor(
    int corePoolSize,          // 核心线程数
    int maximumPoolSize,       // 最大线程数
    long keepAliveTime,        // 非核心线程的空闲存活时间
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,         // 任务等待队列
    ThreadFactory threadFactory,               // 线程创建工厂
    RejectedExecutionHandler handler           // 拒绝策略
)

任务提交时,线程池的行为逻辑如下:

提交任务
  → 当前线程数 < corePoolSize  → 创建新线程执行
  → 当前线程数 >= corePoolSize → 尝试入队
      → 入队成功               → 等待线程来取任务
      → 入队失败(队列已满)
          → 当前线程数 < maximumPoolSize → 创建新线程执行
          → 当前线程数 = maximumPoolSize → 执行拒绝策略

这个流程非常重要,后面理解 Spring 线程池的坑也依赖于此。

我工作这么多年,面试过非常多的候选人,在这个简单的问题上,可能有 50% 的候选人都说不对这个过程。

CachedThreadPool 参数及生效过程

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0,                    // corePoolSize = 0
        Integer.MAX_VALUE,    // maximumPoolSize = 几乎无限
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>()
    );
}

关键在于两点:corePoolSize = 0SynchronousQueue

SynchronousQueue 是一个没有容量的队列,它不存储元素,每一次入队操作必须等到有线程来取,否则入队失败。也就是说,只要有任务进来,workQueue.offer() 必然失败(除非恰好此时有空闲线程在等待接收),然后线程池就会去创建新线程。

我们来模拟一下任务不断提交时的线程变化:

初始状态:线程池为空,线程数 = 0

提交第 1 个任务

- 当前线程数 (0) < corePoolSize (0)?不满足
- 尝试入队:SynchronousQueue.offer() 失败(没有线程在等待接收)
- 当前线程数 (0) < maximumPoolSize (Integer.MAX_VALUE)?满足
- 创建第 1 个线程,执行任务

提交第 2 个任务(假设第 1 个任务还没结束):

- 当前线程数 (1) < corePoolSize (0)?不满足
- 尝试入队:SynchronousQueue.offer() 失败(第 1 个线程正在忙,没空接收)
- 当前线程数 (1) < Integer.MAX_VALUE?满足
- 创建第 2 个线程,执行任务

提交第 3 个任务(假设第 1 个任务已完成,第 1 个线程空闲):

- 尝试入队:SynchronousQueue.offer() 成功!(第 1 个线程在 getTask() 里阻塞等待,立即接收了这个任务)
- 不需要创建新线程,复用第 1 个线程

线程回收:任务执行完毕后,线程进入等待状态,调用 workQueue.poll(60s, …) 等待新任务。60 秒内没有新任务到来,poll 超时返回 null,线程被回收。

小结:CachedThreadPool 适合任务量不稳定、任务执行时间较短的场景。它能复用刚刚空闲的线程,也能在高峰期快速扩展。空闲 60 秒后线程自动回收,不会长期占用资源。但如果任务提交速度远超处理速度,线程数会暴增,可能导致 OOM,生产环境慎用无界的 CachedThreadPool。

FixedThreadPool 参数及生效过程

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()  // 无界队列
    );
}

核心特点:corePoolSize = maximumPoolSize = n,使用 无界 LinkedBlockingQueue

线程变化过程(以 nThreads = 3 为例):

提交第 1、2、3 个任务(线程数从 0 增长):

- 每提交一个任务,当前线程数 < corePoolSize (3),直接创建新线程
- 第 1 个任务 → 创建线程 1;第 2 个 → 创建线程 2;第 3 个 → 创建线程 3

提交第 4、5、6 ... 个任务(队列积压):

- 当前线程数 (3) 已等于 corePoolSize (3),不再创建新线程
- 任务进入 LinkedBlockingQueue 排队
- 由于队列无界,无论提交多少任务都不会触发拒绝策略,也不会创建更多线程(maximumPoolSize 的设置在这里完全没有意义)

线程回收:由于 keepAliveTime = 0,且 corePoolSize = maximumPoolSize,线程池始终保持 n 个线程活着,不会回收(除非调用 shutdown)。

小结:FixedThreadPool 适合需要限制并发数量、任务量可能比较大的场景。但要注意无界队列带来的内存风险——如果消费速度跟不上,队列会无限增长。生产环境建议给队列设置一个合理的上限,并配置拒绝策略。

拒绝策略

ThreadPoolExecutor 内置了四种拒绝策略,选择合适的拒绝策略和线程池参数同样重要:

  1. AbortPolicy 默认策略,抛出 RejectedExecutionException,让调用方自己处理异常
  2. CallerRunsPolicy,由调用方线程直接执行,能达到背压的效果,降低提交速度
  3. DiscardPolicy,默默丢弃任务,不报错
  4. DiscardOldestPolicy,丢弃队列最旧的任务,然后尝试重新提交当前任务

生产环境中,CallerRunsPolicy 是一个相对安全的选择:当线程池饱和时,提交任务的线程自己来执行,自然降低了任务提交速度,避免了无限积压。

Spring 中的线程池

Spring 提供了自己的线程池抽象和实现,在 Spring Boot 项目中,很多时候你甚至不需要自己定义线程池就能使用 @Async。但这里有不少坑,我们一一来说。

TaskExecutor 接口

Spring 提供了 TaskExecutor 接口,它继承自 JDK 的 java.util.concurrent.Executor:

package org.springframework.core.task;

import java.util.concurrent.Executor;

@FunctionalInterface
public interface TaskExecutor extends Executor {
  
	@Override
	void execute(Runnable task);
}

接口本身非常简单。Spring 提供了多个实现,我们最常用的是 ThreadPoolTaskExecutor(org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor)。

ThreadPoolTaskExecutor

ThreadPoolTaskExecutor 是对 JDK ThreadPoolExecutor 的包装,提供了更友好的 Spring 风格属性配置:

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    executor.setQueueCapacity(200);
    executor.setKeepAliveSeconds(60);
    executor.setThreadNamePrefix("app-async-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(30);
    executor.initialize();
    return executor;
}

下面介绍它与原生 ThreadPoolExecutor 的关键区别:queueCapacity 的坑

ThreadPoolTaskExecutor 根据 queueCapacity 的值决定底层使用哪种队列:

protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
  if (queueCapacity > 0) {
    return new LinkedBlockingQueue<>(queueCapacity);
  } else {
    return new SynchronousQueue<>();
  }
}

也就是说,如果 queueCapacity > 0,使用有界的 LinkedBlockingQueue,否则使用 SynchronousQueue。

ThreadPoolTaskExecutor 的默认参数是:

  • corePoolSize=1
  • maxPoolSize=Integer.MAX_VALUE
  • queueCapacity=Integer.MAX_VALUE

看起来 maxPoolSize 设置得很大,但实际上:由于队列容量也是 Integer.MAX_VALUE,所以队列永远不会满,线程数到达 corePoolSize 后就不再增长了。默认配置下,ThreadPoolTaskExecutor 等同于一个单线程 + 无限队列的执行器,maxPoolSize 形同虚设。

这是 Spring 线程池中最常见的配置错误:

// 错误示例:以为配置了 50 个线程,实际最多只有 10 个线程在工作
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);  // 没用!因为队列是无界的
// queueCapacity 没设置,默认 Integer.MAX_VALUE

// ------
// 正确示例:queueCapacity 有限,队列满后才扩展到 maxPoolSize
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);  // 队列装满 200 个任务后,才开始创建线程到 50

扩容顺序:前 10 个并发任务直接由核心线程处理 → 超出的任务进入队列(最多等 200 个)→ 队列满后才扩容至最多 50 个线程 → 50 个线程全忙且队列满 → 执行拒绝策略。

如果你希望行为类似 CachedThreadPool(任务来了立即分配线程),设置 queueCapacity=0:

executor.setCorePoolSize(0);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(0);  // 使用 SynchronousQueue,立即移交线程
executor.setKeepAliveSeconds(60);

Spring Boot 自动配置:为什么不定义就能用?

Spring Boot 通过 TaskExecutionAutoConfiguration 为我们自动配置了一个 ThreadPoolTaskExecutor。它生效的条件是:当前容器中没有任何 Executor 类型的 Bean。

@ConditionalOnClass({ThreadPoolTaskExecutor.class})
@AutoConfiguration
@EnableConfigurationProperties({TaskExecutionProperties.class})
public class TaskExecutionAutoConfiguration {
    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());  // 默认是 8
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        Stream var10001 = taskExecutorCustomizers.orderedStream();
        var10001.getClass();
        builder = builder.customizers(var10001::iterator);
        builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
        return builder;
    }

    @Lazy
    @Bean(
        name = {"applicationTaskExecutor", "taskExecutor"}
    )
    @ConditionalOnMissingBean({Executor.class})
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }
}

自动配置的线程池 Bean 名称是 applicationTaskExecutor(同时注册别名 taskExecutor),这让 @Async 注解可以自动找到它。

Spring Boot 自动配置的默认参数(通过 spring.task.execution.* 控制):

spring.task.execution.pool.core-size=8
spring.task.execution.pool.max-size=2147483647   # Integer.MAX_VALUE
spring.task.execution.pool.queue-capacity=2147483647  # Integer.MAX_VALUE
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=true
spring.task.execution.thread-name-prefix=task-
spring.task.execution.shutdown.await-termination=true
spring.task.execution.shutdown.await-termination-period=30s

注意:Spring Boot 自动配置的 core-size 默认是 8,和直接 new ThreadPoolTaskExecutor() 的默认 corePoolSize=1 不同。但 queueCapacity 同样是无界的,所以同样存在 maxPoolSize 无效的问题:在绝大多数 Spring Boot 项目里,即使不覆盖配置,线程数最多也只有 8 个。

一个重要的陷阱:@ConditionalOnMissingBean(Executor.class) 检查的是 Executor 接口(JDK 的),而不是 Spring 的 TaskExecutor。这意味着如果你的代码里定义了一个 ThreadPoolExecutor(JDK 的)或者 ScheduledExecutorService 的 Bean,Spring Boot 的自动配置就失效了,也就不会有默认的 ThreadPoolTaskExecutor 可用。

自定义配置示例:

# application.properties
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=50
spring.task.execution.pool.queue-capacity=200
spring.task.execution.thread-name-prefix=myapp-async-
spring.task.execution.shutdown.await-termination=true
spring.task.execution.shutdown.await-termination-period=30s

如果需要多个不同用途的线程池,可以手动定义 Bean 并通过 @Async(“beanName”) 指定:

@Bean("ioTaskExecutor")
public ThreadPoolTaskExecutor ioTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);   // I/O 密集,核心线程多一些
    executor.setMaxPoolSize(100);
    executor.setQueueCapacity(500);
    executor.setThreadNamePrefix("io-task-");
    executor.initialize();
    return executor;
}

@Bean("cpuTaskExecutor")
public ThreadPoolTaskExecutor cpuTaskExecutor() {
    int cpuCount = Runtime.getRuntime().availableProcessors();
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(cpuCount);     // CPU 密集,线程数 = CPU 核数
    executor.setMaxPoolSize(cpuCount * 2);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("cpu-task-");
    executor.initialize();
    return executor;
}

@EnableAsync 与 @Async

在配置类或主类上加 @EnableAsync,Spring 会通过 AOP 代理拦截带有 @Async 注解的方法,将方法的实际执行提交到线程池,调用者立即返回:

@SpringBootApplication
@EnableAsync
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}
@Service
public class NotificationService {

    @Async
    public void sendEmail(String address) {
        // 在线程池中异步执行,调用者立即返回
        doSendEmail(address);
    }

    @Async
    public CompletableFuture<String> fetchRemoteData(String url) {
        // 异步执行,通过 CompletableFuture 返回结果
        String data = doFetch(url);
        return CompletableFuture.completedFuture(data);
    }

    // 使用指定的线程池
    @Async("ioTaskExecutor")
    public void handleIoTask() {
        // ...
    }
}

需要注意的是,返回值只能是 void、Future 或 CompletableFuture

当 @Async 没有指定线程池名称时,Spring 按以下顺序查找:

  1. AsyncConfigurer.getAsyncExecutor() 返回的 Executor
  2. 容器中唯一的 TaskExecutor Bean
  3. 容器中名为 “taskExecutor” 的 Bean(Spring Boot 自动配置的 “applicationTaskExecutor” 就有此别名)
  4. 降级使用 SimpleAsyncTaskExecutor(每次新建线程,无复用,生产环境不应依赖此降级)

如果想统一自定义异步执行器和异常处理,可以实现 AsyncConfigurer:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("async-");
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        // @Async void 方法抛出的异常,调用者无法感知,需要在这里处理
        return (ex, method, params) ->
            log.error("Async method {} threw exception: {}", method.getName(), ex.getMessage(), ex);
    }
}

TaskScheduler:定时任务专用线程池

说完 TaskExecutor,顺便提一下 TaskScheduler。有些同学会把定时任务用的 TaskScheduler 和异步执行用的 TaskExecutor 混为一谈,这里简单区分一下。

TaskScheduler 是 Spring 专门用于定时任务的接口,支持延迟执行、固定频率、固定延迟和 Cron 表达式,和 @Scheduled 注解配合使用:

@Component
public class ReportTask {

    // 固定延迟:上次执行完成后 5 秒再执行
    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    public void generateReport() { ... }

    // 固定频率:每 10 秒触发一次(不等上次是否完成)
    @Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
    public void syncData() { ... }

    // Cron 表达式
    @Scheduled(cron = "0 0 2 * * ?")  // 每天凌晨 2 点
    public void archiveData() { ... }
}

ThreadPoolTaskScheduler 是最常用的 TaskScheduler 实现,底层封装 ScheduledExecutorService。

一个重要的坑:ThreadPoolTaskScheduler 的默认线程数是 1。这意味着所有的 @Scheduled 任务共享同一个线程串行执行。如果一个定时任务执行时间较长,会影响其他所有定时任务的准时触发。

@Bean
public ThreadPoolTaskScheduler taskScheduler() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setPoolSize(5);  // 根据你的定时任务数量来配置
    scheduler.setThreadNamePrefix("scheduled-");
    scheduler.setWaitForTasksToCompleteOnShutdown(true);
    scheduler.setAwaitTerminationSeconds(30);
    return scheduler;
}

Spring Boot 也提供了配置属性:

spring.task.scheduling.pool.size=5
spring.task.scheduling.thread-name-prefix=scheduling-
spring.task.scheduling.shutdown.await-termination=true
spring.task.scheduling.shutdown.await-termination-period=30s

TaskExecutor 和 TaskScheduler 的区别一句话总结:@Async 用 TaskExecutor,@Scheduled 用 TaskScheduler,两者相互独立,不要混用。

但是有一个问题就是 TaskScheduler 的默认实现 ThreadPoolTaskScheduler,它其实也实现了 TaskExecutor,所以在一些配置复杂的场景中,它可能会被当作 TaskExecutor 来使用。我们要避免 @Async 底层实际使用 ThreadPoolTaskScheduler,导致出现预期之外的情况。

总结

  1. CachedThreadPool 适合短时任务,线程按需创建、60 秒空闲后回收;FixedThreadPool 保持固定线程数,超出的任务进无界队列排队。生产环境中,建议手动配置 ThreadPoolExecutor,设置有界队列和合理的拒绝策略,而不是直接使用 Executors 的工厂方法。
  2. Spring ThreadPoolTaskExecutor 的最大坑:queueCapacity 默认是 Integer.MAX_VALUE,导致 maxPoolSize 形同虚设。生产环境一定要显式设置有限的 queueCapacity,才能让 maxPoolSize 真正生效。
  3. Spring Boot 自动配置:容器中没有任何 Executor 类型的 Bean 时,Spring Boot 自动配置一个 applicationTaskExecutor(别名 taskExecutor),默认 corePoolSize=8,可通过 spring.task.execution.* 属性覆盖,@Async 注解可以自动找到它。
  4. TaskScheduler vs TaskExecutor:@Async 配合 TaskExecutor,@Scheduled 配合 TaskScheduler,两者独立,但是要确保不会出现混用的情况;ThreadPoolTaskScheduler 默认线程数为 1,多个定时任务时需要增加 poolSize。

(全文完)