未分类 · 2024-03-31 0

springboot 默认线程池原理

一、使用

1.maven 依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.2</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

2.yml

spring:
  task:
    execution:
      pool:
        core-size: 5
        max-size: 16

3.java

@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Autowired
private TaskExecutor taskExecutor;

@Autowired
private Executor executor;

二、原理

1.加载 TaskExecutionAutoConfiguration

spring-boot-autoconfigure-2.7.2.jar 下有文件
org.springframework.boot.autoconfigure.AutoConfiguration.imports,内有:

根据 META-INF 下文件,会加载 TaskExecutionAutoConfiguration

org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

2.加载 ThreadPoolTaskExecutor

自动注入类

// TaskExecutionAutoConfiguration.java
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@AutoConfiguration
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {

    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
            ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
            ObjectProvider<TaskDecorator> taskDecorator) {
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }

    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
            AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }

}

配置类

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

    private final Pool pool = new Pool();

    private String threadNamePrefix = "task-";

    ...

    public static class Pool {
        ...
        private int coreSize = 8;
        ...
    }
}
// ThreadPoolTaskExecutor.java

private ExecutorService executor;

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

@Override
public void afterPropertiesSet() {
    initialize();
}

public void initialize() {
    ...
    this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

@Override
protected ExecutorService initializeExecutor(
        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

    BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

    ThreadPoolExecutor executor;
    if (this.taskDecorator != null) {
        executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
            @Override
            public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                    decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
            }
        };
    }
    else {
        executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);

    }

    if (this.allowCoreThreadTimeOut) {
        executor.allowCoreThreadTimeOut(true);
    }
    if (this.prestartAllCoreThreads) {
        executor.prestartAllCoreThreads();
    }

    this.threadPoolExecutor = executor;
    return executor;
}

protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    if (queueCapacity > 0) {
        return new LinkedBlockingQueue<>(queueCapacity);
    }   
    else {
        return new SynchronousQueue<>();
    }
}