0
点赞
收藏
分享

微信扫一扫

TaskExecutor和ExecutorService的简单整合



文章目录

  • 一、遇到问题
  • 二、达成目的
  • 三、开始调研
  • 1、使用@Async获取线程池流程
  • 2、查看中间件线程池工具类
  • 3、观察AsyncConfigurer接口
  • 4、查看TaskExecutorAdapter源码
  • 四、复盘总结




一、遇到问题

自己负责的项目想通过引入一个中间件,达到在业务场景维度的全链路日志监控。当接入相关中间件后,发现如果在业务中开启了多线程,链路日志就会缺失。当然,该中间件官方给出了解决办法,那就是使用中间件提供的工具类对线程池包装一下(TrackTraceHelper.wrappedExecutorService(executorService))就可以解决。当然这个问题确实解决了,但我们还存在以下几个问题:

问题一:项目原本使用的线程池是Spring的ThreadPoolTaskExecutor,在其setTaskDecorator方法中有进行corp租户信息的传递。功能是否还生效?

问题二:项目有使用@Aync注解开启异步任务。异步注解能否获取到包装后的线程池?


二、达成目的

查看该中间件的源码,发现corp租户信息上下的传递在其底层已经实现,所以这算不是一个问题。在使用@Aync开启异步任务的时候,我们是可以指定线程池的名称的,所以问题二也不是问题。

虽然问题解决了,但是每次使用异步注解都需要去显式的指定名称,感觉这种做法并不友好。有没有一种办法能够在开发者无感知的情况下,实现@Async开启异步线程是、ThreadPoolTaskExecutor线程池的统一,并且这个线程池还是被中间件工具类包装后的,达到既要又要。


三、开始调研

查看Spring源码中异步任务相关的源码,从@EnableAsync注解往下点就完事

1、使用@Async获取线程池流程



TaskExecutor和ExecutorService的简单整合_ExecutorService


根据源码可得出,使用@Async后,异步任务使用线程池流程为:

  1. 我们给显式给@Async指定了线程池
  2. 首先判定有没有实现AsyncConfigurer接口的线程池;
  3. 其次判定有没有TaskExecutor(比如Spring中的ThreadPoolTaskExecutor)类型的线程池Bean;
  4. 再然后就是判定有没有Executor类型的线程池Bean;
  5. 最后,如果都没有,那就使用兜底逻辑,Spring自己创建一个SimpleAsyncTaskExecutor线程池。

在我们已知的情况下,方案1(不友好)和5(无法被中间件工具类包装)肯定也不行,所以只剩下方案2、3、4

2、查看中间件线程池工具类

进入中间件线程池工具类源码,我们不难发现中间件包装的线程池类型,是一个ExecutorService,并且内部有直接调用submit方法。所以方案4,使用Executor类型的线程池肯定会报错。方案3中的TaskExecutor是隶属于Spring的线程池体系,它和Java中的线程池相交的点,只有顶层接口Executor,所以方案3也不行。只剩方案2了。



TaskExecutor和ExecutorService的简单整合_spring_02



3、观察AsyncConfigurer接口

我发现getAsyncExecutor方法返回的线程池是一个Executor类型的,这必然也不满足我们的要求,因为我们想要的类型是ExecutorService,所以方案2也不行。

可我还想试试看,万一还有希望呢?既然这里需要返回的类型是Executor,我要是返回一个它的子类ExecutorService,编译肯定是没问题的,并且理论上子类比父类更强大,用起来也不会有问题。但是为了保险,还是继续跟下源码。



TaskExecutor和ExecutorService的简单整合_ExecutorService_03



4、查看TaskExecutorAdapter源码

继续向下跟Spring异步任务线程池调用的源码,这里不得不称赞Spring源码的兼容性



TaskExecutor和ExecutorService的简单整合_线程池_04


最终我们会进入到TaskExecutorAdapter内部,这里以submit方法为例:



TaskExecutor和ExecutorService的简单整合_spring_05


使用线程池调用submit方法时:

  1. 判断当前线程池是否为ExecutorService类型,如果是则直接强转后调用submit方法
  2. 如果是其它类型,则将任务封装为一个FutureTask,然后调用Executor的execute方法,最后将返回值get出来返回
    即,方案2的做法是可行的。最终线程池的配置代码如下所示:

@Configuration
@ConfigurationProperties(prefix = "spring.task.execution")
public class SpringTaskConfig  implements AsyncConfigurer {
    private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool();

    /**
     * 使用中间件包装后的线程池
     */
    @Bean
    @Override
    public ExecutorService getAsyncExecutor() {
        ExecutorService executorService = new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAlive().getSeconds(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(pool.getQueueCapacity()),
                new ThreadPoolExecutor.CallerRunsPolicy());
        return TrackTraceHelper.wrappedExecutorService(executorService);
    }
}


四、复盘总结

回头再理一下方案2为什么能成功。

其实Spring自始自终都没有使用Java中ExecutorService这个线程池,而是使用Executor线程池,并且自定义了一个TaskExecutor线程池来替代ExecutorService。AbstractExecutorService中相关的功能也都分别转交给了TaskExecutor的各个子类。

而今天我们的做法,相当于将ExecutorService以Executor子类,但是非TaskExecutor子类的身份进行传递,原本这种做法是有问题,巧在Spring在真正执行的时候,有一个兼容的逻辑判定,而正是这个兼容逻辑的存在完成了我们的目的。

至此,我们就可以在开发成员无感知的情况下,实现线程池的统一了。


举报

相关推荐

0 条评论