目录

springboot-多线程使用CompletableFuture实现

# 说明

这里讲的是使用CompletableFuture的一些记录和问题解决。以及多线程之间的传参问题

# 参考资料

【多线程】优雅使用线程池结合CompletableFuture实现异步编排 (opens new window)

玩转CompletableFuture线程异步编排,看这一篇就够了 (opens new window)

# 问题

多线程的调用中不调用自定义线程池,输出的线程名称都不对。

# 解决

asyncExecutor是自定义的线程Bean名,注意保持名字一致

@Resource
private Executor asyncExecutor;

CompletableFuture.supplyAsync(() -> {
  log.info("操作");
},asyncExecutor); //asyncExecutor需要指定使用线程池,否则会默认调用tomcat的线程池
1
2
3
4
5
6

# 线程传参

众所周知,多线程中默认是无法互相传参的,包括在线程中也无法获取Request等参数。 但是很多拦截等方法中又需要从header取值,如用户token。所以这里采用alibaba开源的TransmittableThreadLocal (opens new window)(简称TTL)

# 使用

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.1</version>
</dependency>
1
2
3
4
5
@Slf4j
public class RequestHandler {

	private static final TransmittableThreadLocal<Map<String, Object>> THREAD_LOCAL = new TransmittableThreadLocal<>();

	/**
	 * 设置ttl数据
	 *
	 * @param key   key
	 * @param value value
	 */
	public static void set(String key, Object value) {
		Map<String, Object> map = getLocalMap();
		map.put(key, value == null ? StrUtil.EMPTY : value);
	}

	public static Map<String, Object> getLocalMap() {
		Map<String, Object> map = THREAD_LOCAL.get();
		if (map == null) {
			map = new ConcurrentHashMap<>();
			THREAD_LOCAL.set(map);
		}
		return map;
	}

	public static String get(String key) {
		Map<String, Object> map = getLocalMap();
		return MapUtil.getStr(map, key);
	}

	public static <T> T get(String key, Class<T> clazz) {
		Map<String, Object> map = getLocalMap();
		return MapUtil.get(map, key, clazz);
	}

	public static void remove() {
		RequestHandler.getLocalMap();
		THREAD_LOCAL.remove();
		RequestHandler.getLocalMap();
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 注意是OncePerRequestFilter
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
public class WebHeaderHolderInterceptor extends OncePerRequestFilter {

	@Override
	public void doFilterInternal(HttpServletRequest request, @NotNull HttpServletResponse response, FilterChain chain) throws IOException, ServletException {
		try {
			String tokenName = "token";
			// 将请求头数据封装到线程变量中
			//注意我这里没做任何的判断,校验,请根据业务需求自己做判断
			RequestHandler.set(tokenName,request.getHeader(tokenName));

			chain.doFilter(request, response);
		} finally {
			RequestHandler.remove();
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

注意这里的线程池也需要包装一下


@Slf4j
@Configuration
@EnableAsync
public class ThreadConfig {

	/**
	*这个Bean 是自定义的从yml取值的,可以自己修改或者直接写死
	*/
	@Resource
	private AsyncProperties asyncProperties;
	
	@Value("${spring.application.name:default}")
	private String APPLICATION_NAME;

	/**
	 * ThreadPoolTaskExecutor的处理流程
	 * 当池子大小小于corePoolSize,就新建线程,并处理请求
	 * 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
	 * 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
	 * 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
	 *
	 * @return 线程配置
	 */
	@Bean
	public ThreadPoolTaskExecutor executor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		//核心线程数
		executor.setCorePoolSize(asyncProperties.getCorePoolSize());
		//允许的最大线程数
		executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
		//活跃时间
		executor.setKeepAliveSeconds(asyncProperties.getKeepAliveSeconds());
		//任务队列容量(阻塞队列)
		executor.setQueueCapacity(asyncProperties.getQueueCapacity());
		//等待所有任务结束后再关闭线程池
		executor.setWaitForTasksToCompleteOnShutdown(asyncProperties.isWaitForTasksToCompleteOnShutdown());
		//等待时间:强制停止的时间(秒)
		executor.setAwaitTerminationSeconds(asyncProperties.getAwaitTerminationSeconds());
		executor.setThreadNamePrefix(APPLICATION_NAME + "-");
		executor.initialize();
		return executor;
	}

	/**
	 * TTl包装后的线程执行器
	 * <p>
	 * Primary 优先使用该Bean
	 */
	@Bean("asyncExecutor")
	@Primary
	public Executor asyncExecutor(ThreadPoolTaskExecutor executor) {
		return TtlExecutors.getTtlExecutor(executor);
	}

	/**
	 * TTl包装后的线程执行服务
	 */
	@Bean
	public ExecutorService asyncExecutorService(ThreadPoolTaskExecutor executor) {
		return TtlExecutors.getTtlExecutorService(executor.getThreadPoolExecutor());
	}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
上次更新: 2024-01-03, 13:22:13
最近更新
01
2023年度总结
01-03
02
MongoDB的简单的常用语法
12-11
03
cetnos7通过nfs共享磁盘文件
11-24
更多文章>