SpringBoot配置线程池

项目中经常会用到多线程去执行任务,为了不每次都使用Thread去创建线程,这很麻烦。我们可以在项目初始化时,就配置好要用到的线程池,这样需要用的时候直接调用即可。

首先,我们知道线程池有几个核心参数:

  1. corePoolSize 核心线程数,即线程池中始终保持存活的线程数量,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut(true)
  2. maximumPoolSize 最大线程数量,当任务队列已满且核心线程都在忙碌时,线程池会创建新的线程,直到线程数量达到这个最大值。
  3. keepAliveTime 当线程池中的线程数量超过核心线程数时,多余的空闲线程的存活时间。
  4. workQueue 任务队列,用于存储等待执行的任务。常用的任务队列有 LinkedBlockingQueueArrayBlockingQueueSynchronousQueue 等。
  5. threadFactory 用于创建新线程的工厂,你可以自定义线程工厂,为线程设置名称、优先级等属性。

除此之外,还有队列大小、线程前缀等属性,方便我们使用。

在配置文件写上我们需要的参数,用于初始化配置。当然,如果想要配置多个线程池,也可以多写几个配置。

1
2
3
4
5
6
7
8
# 核心线程数
primary.thread.corePoolSize=5
# 最大线程数
primary.thread.maxPoolSize=10
# 线程存活时间
primary.thread.keepAliveSeconds=60
# 队列最大容量
primary.thread.queueCapacity=100

读取配置文件

1
2
3
4
5
6
7
8
@Component
@ConfigurationProperties(prefix = "primary.thread")
public class PrimaryAsyncConstants {
private Integer corePoolSize = 5;
private Integer maxPoolSize = 10;
private Integer keepAliveSeconds = 60;
private Integer queueCapacity = 100;
}

创建线程池配置类,在这里将用到的线程池管理起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
public class ExecutorConfig {

private final PrimaryAsyncConstants primaryAsyncConstants;


@Autowired
public ExecutorConfig(PrimaryAsyncConstants primaryAsyncConstants) {
this.primaryAsyncConstants = primaryAsyncConstants;
}


/**
* 主业务线程池
*
* @return asyncServiceExecutor
*/
@Bean("asyncServiceExecutor")
public Executor asyncServiceExecutor() {
return initExecutor(primaryAsyncConstants, "PrimaryExecutor-");
}

/**
* 告警规则线程池-单线程池,按顺序执行
* 将告警逻辑从MQ中拆分,与MQ并行
*
* @return pointRuleAlarmSingleExecutor
*/
@Bean("alarmSingleExecutor")
public Executor alarmSingleExecutor() {
primaryAsyncConstants.setCorePoolSize(1); //手动定义核心线程数
primaryAsyncConstants.setMaxPoolSize(1); //手动定义最大线程数
return initExecutor(primaryAsyncConstants, "AlarmSingleExecutor-");
}

// 初始化线程池配置方法
public ThreadPoolTaskExecutor initExecutor(PrimaryAsyncConstants constants, String prefix) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(constants.getCorePoolSize());
executor.setMaxPoolSize(constants.getMaxPoolSize());
executor.setKeepAliveSeconds(constants.getKeepAliveSeconds());
executor.setQueueCapacity(constants.getQueueCapacity());
executor.setThreadNamePrefix(prefix);
//拒绝策略:主线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

使用ThreadPoolTaskExecutor定义好的线程池,会在第一次提交任务(submit)时初始化,创建一个LinkedBlockingQueue队列,队列大小就是传入的参数。而我们写代码使用时,直接通过@Autowired导入就可以了。

1
2
3
4
5
6
7
8
9
10
@Autowired
@Qualifier("xxxThreadPool")
private Executor dingThreadPool;

//执行逻辑
public void sendMessage(){
dingThreadPool.execute(() -> {
//业务代码
});
}

多线程示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 
@Autowired
@Qualifier("xxxThreadPool")
private Executor executor;

public void calculateStats() {
CountDownLatch downLatch = new CountDownLatch(4);
List<String> data1 = new ArrayList<>();
executor.execute(() -> {
//执行逻辑 1

data1.addAll();
downLatch.countDown();
});

List<String> data2 = new ArrayList<>();
executor.execute(() -> {
//执行逻辑 2

data1.addAll();
downLatch.countDown();
});

Map<String, String> map1= new HashMap<>();
Map<String, String> map2= new HashMap<>();
executor.execute(() -> {
//执行逻辑 3
map1.putAll();
downLatch.countDown();
});
executor.execute(() -> {
//执行逻辑 4
map2.putAll();
downLatch.countDown();
});
try {
downLatch.await();
// 主线程继续
System.out.println(data1);
System.out.println(data2);
System.out.println(map1);
System.out.println(map2);
} catch (InterruptedException e) {
LoggerUtil.error("服务器错误", e);
}
}