springboot-多线程使用CompletableFuture实现
# 说明
这里讲的是使用CompletableFuture的一些记录和问题解决。以及多线程之间的传参问题
# 参考资料
【多线程】优雅使用线程池结合CompletableFuture实现异步编排 (opens new window)
玩转CompletableFuture线程异步编排,看这一篇就够了 (opens new window)
# 问题
多线程的调用中不调用自定义线程池,输出的线程名称都不对。
# 解决
asyncExecutor是自定义的线程Bean名,注意保持名字一致
@Resource
private Executor asyncExecutor;
CompletableFuture.supplyAsync(() -> {
log.info("操作");
},asyncExecutor); //asyncExecutor需要指定使用线程池,否则会默认调用tomcat的线程池
1
2
3
4
5
6
2
3
4
5
6
# 线程传参
众所周知,多线程中默认是无法互相传参的,包括在线程中也无法获取Request等参数。 但是很多拦截等方法中又需要从header取值,如用户token。所以这里采用alibaba开源的TransmittableThreadLocal (opens new window)(简称TTL)
# 使用
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.1</version>
</dependency>
1
2
3
4
5
2
3
4
5
@Slf4j
public class RequestHandler {
private static final TransmittableThreadLocal<Map<String, Object>> THREAD_LOCAL = new TransmittableThreadLocal<>();
/**
* 设置ttl数据
*
* @param key key
* @param value value
*/
public static void set(String key, Object value) {
Map<String, Object> map = getLocalMap();
map.put(key, value == null ? StrUtil.EMPTY : value);
}
public static Map<String, Object> getLocalMap() {
Map<String, Object> map = THREAD_LOCAL.get();
if (map == null) {
map = new ConcurrentHashMap<>();
THREAD_LOCAL.set(map);
}
return map;
}
public static String get(String key) {
Map<String, Object> map = getLocalMap();
return MapUtil.getStr(map, key);
}
public static <T> T get(String key, Class<T> clazz) {
Map<String, Object> map = getLocalMap();
return MapUtil.get(map, key, clazz);
}
public static void remove() {
RequestHandler.getLocalMap();
THREAD_LOCAL.remove();
RequestHandler.getLocalMap();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 注意是OncePerRequestFilter
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
public class WebHeaderHolderInterceptor extends OncePerRequestFilter {
@Override
public void doFilterInternal(HttpServletRequest request, @NotNull HttpServletResponse response, FilterChain chain) throws IOException, ServletException {
try {
String tokenName = "token";
// 将请求头数据封装到线程变量中
//注意我这里没做任何的判断,校验,请根据业务需求自己做判断
RequestHandler.set(tokenName,request.getHeader(tokenName));
chain.doFilter(request, response);
} finally {
RequestHandler.remove();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注意这里的线程池也需要包装一下
@Slf4j
@Configuration
@EnableAsync
public class ThreadConfig {
/**
*这个Bean 是自定义的从yml取值的,可以自己修改或者直接写死
*/
@Resource
private AsyncProperties asyncProperties;
@Value("${spring.application.name:default}")
private String APPLICATION_NAME;
/**
* ThreadPoolTaskExecutor的处理流程
* 当池子大小小于corePoolSize,就新建线程,并处理请求
* 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
* 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
* 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
*
* @return 线程配置
*/
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数
executor.setCorePoolSize(asyncProperties.getCorePoolSize());
//允许的最大线程数
executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
//活跃时间
executor.setKeepAliveSeconds(asyncProperties.getKeepAliveSeconds());
//任务队列容量(阻塞队列)
executor.setQueueCapacity(asyncProperties.getQueueCapacity());
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(asyncProperties.isWaitForTasksToCompleteOnShutdown());
//等待时间:强制停止的时间(秒)
executor.setAwaitTerminationSeconds(asyncProperties.getAwaitTerminationSeconds());
executor.setThreadNamePrefix(APPLICATION_NAME + "-");
executor.initialize();
return executor;
}
/**
* TTl包装后的线程执行器
* <p>
* Primary 优先使用该Bean
*/
@Bean("asyncExecutor")
@Primary
public Executor asyncExecutor(ThreadPoolTaskExecutor executor) {
return TtlExecutors.getTtlExecutor(executor);
}
/**
* TTl包装后的线程执行服务
*/
@Bean
public ExecutorService asyncExecutorService(ThreadPoolTaskExecutor executor) {
return TtlExecutors.getTtlExecutorService(executor.getThreadPoolExecutor());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
编辑 (opens new window)
上次更新: 2024-11-06, 19:27:10