0
点赞
收藏
分享

微信扫一扫

Spring-boot 引入、使用和测试线程池及异步调用使用事项

前言

同步调用

同步调用就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。

异步调用

异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

同步调用一个明显的缺点就是,假设一个业务由A、B、C三部分完成,其中B任务是一个耗时长的任务,如需要打开一个文件,往文件中写入数据,保存日志,A任务是获取用户提交的数据并解包预处理,C任务是数据处理完毕返回给前端操作人员。如果是同步调用那么只能A->B-C顺序执行,从用户提交数据到用户得到响应,可能是一个漫长的过程,对用户来说十分不友好。仔细分析一下B任务,它相当于记录一次日志,对A和C的业务其实并没有影响,但它去占据了大部分时间。这时其实可以将B任务设置成异步调用即可,让A->C执行返回给前端用户,B可以单独的线程去执行,B执行的时间长短对用户来说是无感的。这就是异步的好处!

在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

四种常用的线程池

  1. Executors.newCacheThreadPool():可缓存线程池
  2. Executors.newFixedThreadPool(int n):创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程。
  3. Executors.newScheduledThreadPool(int n):创建一个定长线程池,支持定时及周期性任务执行
  4. Executors.newSingleThreadExecutor():创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

spring-boot引入线程不需要再添加额外的jar

Spring-boot主启动类添加注解

在spring-boot的主启动类上加添加注解@EnableAsync

@SpringBootApplication(scanBasePackages = {"com.nfsqd.junwei"})
@EnableScheduling   //启动定时任务
@EnableTransactionManagement // 开启事务
@Slf4j
@EnableAsync
public class MainLuncherApp {
public static void main(String[] args) {
        SpringApplication.run(MainLuncherApp.class, args);
    }
}

Spring-boot 新建自定义线程池

直接定义线程池

我们的需要新建一个Configuration 类来配置我们的线程池参数。

package com.nfsqd.junwei.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置类
 * @author junwei
 * @date 2022/03/20 02:02
 */
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

  /**
   *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
   *    当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
   *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
   */

    /**
     * 核心线程数(默认线程数)
     */
    private static final int corePoolSize = 5;
    /**
     * 最大线程数
     */
    private static final int maxPoolSize = 30;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int keepAliveTime = 30;
    /**
     * 缓冲队列大小
     */
    private static final int queueCapacity = 10000;
    /**
     * 线程池名前缀
     */
    private static final String threadNamePrefix = "jw-aysnc-service-";

    @Bean("asyncTaskExecutor") // bean的名称,默认为首字母小写的方法名
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);

        // 线程池对拒绝任务的处理策略
        // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

其中我们的静态参数也可以放在配置文件中,我这边使用的 yml 配置文件:

#线程池配置参数
task:
  pool:
    corePoolSize: 5 #设置核心线程数
    maxPoolSize: 20  #设置最大线程数
    keepAliveSeconds: 300 #设置线程活跃时间(秒)
    queueCapacity: 50 #设置队列容量

上边的这里需要用注解来引用对象方式实现:

@ConfigurationProperties(prefix = "task.pool")

其他定义线程池的方法

还有一种定义线程池的方法,是实现org.springframework.scheduling.annotation.AsyncConfigurer 或者继承org.springframework.scheduling.annotation.AsyncConfigurerSupport,通过重写Executor getAsyncExecutor()来完成。另外还有一个好处是可以控制多线程中的异常处理。参考我另一篇博文

注解解释

  • @Configuration用于定义配置类,被注解的类内部包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContextAnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器。
  • @EnableAsync开始对异步任务的支持
  • @Async 定义一个线程任务
  • @Bean 产生一个Bean对象,然后这个Bean对象交给Spring管理 (如上:@Bean("taskExecutor"))

@Async介绍

在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。

从异常信息JedisConnectionException: Could not get a resource from the pool来看,
我们很容易的可以想到,在应用关闭的时候异步任务还在执行,由于Redis连接池先销毁了,导致异步任务中要访问Redis的操作就报了上面的错。
所以,我们得出结论,上面的实现方式在应用关闭的时候是不优雅的,那么我们要怎么做呢?如下设置:
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

线程使用

引用线程池

    @Autowired
    @Qualifier(value = "asyncTaskExecutor")//注意跟前面定义执行器名称要一致
    private ThreadPoolTaskExecutor poolTaskExecutor;

异步方法定义

//根据组织做数据更新以及新增删除
   for (OrgContrast orgContrast : orgContrastList) {
       poolTaskExecutor.execute(() -> {
           userPackageData(token, orgContrast);
       });
   }

方法添加指定线程名

  // 为userPackageData方法添加@Async
  @Async("asyncTaskExecutor")
  public void userPackageData(String token, OrgContrast orgContrast) {
    //业务代码实现
    // XXXX
  }

具体使用

创建任务

@Component
@Slf4j
public class AsyncTask {
    //asyncTaskExecutor即配置线程池的方法名,此处如果不写自定义线程池的方法名,会使用默认的线程池
    // 一、有返回值的异步调用方法定义举例
    // 以上示例可以发现,返回的数据类型为Future类型,其为一个接口。
    // 具体的结果类型为AsyncResult,这个是需要注意的地方。
    @Async("asyncTaskExecutor")
    public Future<String> syncMethod(Parmater p1) throws InterruptedException{
        //do some thing 业务代码实现
        return new AsyncResult<>("success");
    }

	// 二、这个方法是无返回值的void方法
	// 使用的方式非常简单,一个标注即可解决所有的问题。
	@Async("asyncTaskExecutor")  //标注使用  
	public void asyncMethodWithVoidReturnType() {  
	    System.out.println("Execute method asynchronously. "  
	      + Thread.currentThread().getName());  
	} 
}

调用测试

//这里是用测试类来调用
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MainLuncherApp.class,
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class SpringbootApplicationTest {
    @Resource
    private AsyncTask task;
    @Test
    public void AsyncTaskTest(){
        try {
            Parameter p = new Parameter();
            //异步服务
            // 这里测试是调用有返回值的异步方法,因为是异步的,测试其返回值需要不停的用while循环判断是否能获取到返回值
            Future<String> task1 = task.syncMethod(p1);
            log.info("AsyncTaskTest syncMethodexecuting...");
            while(true) {//这里使用了循环判断,等待获取结果信息  
                if(task1.isDone()) {//判断是否执行完毕  
                    log.info("Task1 result: {}", task1.get());
                    break;
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("AsyncTaskTest main done");
    }
}

分析: 这些获取异步方法的结果信息,是通过不停的检查Future的状态来获取当前的异步方法是否执行完毕来实现的。

基于@Async调用中的异常处理机制

在异步方法中,如果出现异常,对于调用者caller而言,是无法感知的。
如果确实需要进行异常处理,则按照如下方法来进行处理:

  1. 自定义实现AsyncTaskExecutor的任务执行器,在这里定义处理具体异常的逻辑和方式。
  2. 配置由自定义的TaskExecutor替代内置的任务执行器

可以自己实现AsyncConfigurer接口处理异常。

@Configuration  
    @EnableAsync  
    public class SpringAsyncConfigurer implements AsyncConfigurer {  
           
        @Override  
        public Executor getAsyncExecutor() {  
            return new ThreadPoolTaskExecutor();  
        }  
      
        @Override  
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {  
            return new CustomAsyncExceptionHandler();  
        }  
      
    }

异常处理类:

public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {  
       
        @Override  
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {  
            System.out.println("Exception message - " + throwable.getMessage());  
            System.out.println("Method name - " + method.getName());  
            for (Object param : obj) {  
                System.out.println("Parameter value - " + param);  
            }  
        }  
           
    }

@Async调用中的事务处理机制

@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。

那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.

例如: 方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
方法B,使用了@Async来标注, B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的。

@Async 异步调用的使用注意事项

需要注意的问题一:异步方法的定义位置问题

最好将异步调用的方法单独放在一个@Component类中,或者说不要将异步调用方法写在@Controller中,否则将无法进行调用,因为SpringBoot使用@Transaction需要经过事务拦截器,只有通过了该事务拦截器的方法才能被加入Spring的事务管理器中,而在同一个类中的一个方法调用另一个方法只会经过一次事务拦截器,所以如果是后面的方法使用了事务注解将不会生效,在这里异步调用也是同样的道理

需要注意的问题二:异步方法的事务调用问题

@Async注解的方法上再使用@Transaction注解是无效的,在@Async注解的方法中调用Service层的事务方法是有效的

需要注意的问题三:异步方法必须是实例的

因为静态方法不能被override重写,因为@Async异步方法的实现原理是通过注入一个代理类到Bean中,该代理类集成这个Bean并且需要重写这个异步方法,所以需要是实例方法


参考文章:

  1. https://blog.csdn.net/lh87270202/article/details/79898409
  2. https://www.jianshu.com/p/4388f46675c0
  3. Spring @Async之三:Spring @Async使用方法总结
  4. https://zhuanlan.zhihu.com/p/190208804
  5. https://blog.csdn.net/GeeLoong/article/details/98043786
举报

相关推荐

0 条评论