1.SpringBoot @Async 注解的使用方式
在启动类中加上@EnableAsync ,在异步类方法上使用@Async
注解生效原理
Spring 底层会用 Aop 去检测到有@Async 注解的方法,然后创建一个代理方法进行异步操作,
在Spring 中 会根据两个条件决定不使用 SimpleAsyncTaskExecutor(无界队列,一个请求就创建一个线程),
(1) @Async(value) 指定特定线程池
(2) SpringFactor 中没有线程池bean对象,Springboot项目 默认会创建ThreadPoolTaskExecutor,会默认使用这个线程池,ThreadPoolTaskExecutor(拒绝策略会丢弃任务)
使用注意事项
1.@Async 需配合配置异步线程池使用<task:executor id = “ asyncExecutor ” pool-size = “ 1 0 0 - 1 0 0 0 0 ” queue-capacity = “ 1 0 ” />
-
2.@Async 修饰的方法, 被异步调用, 如果方法中有错误, 并不会抛出到控制台.你所看到的现象是, 程序不知道在哪一步不执行了, 而控制台没有打印出任何错误, 好像一切正常.这给排查问题带来一些干扰.所以最好是加上try, catch代码块, 在catch中将异常打印到控制台.
自定义默认线程池
package com.bestj.syncio.config;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 自定义线程池配置----------@Async默认线程池复写
* @date 2021年2月21日14:28:26
* @author Lvshiyang
*/
@Configuration
@Slf4j
public class ThreadPoolConfiguration implements AsyncConfigurer {
@Value("${pool.core-size:4}")
private int corePoolSize;
@Value("${pool.max-size:8}")
private int maxPoolSize;
@Value("${pool.queue-capacity:5}")
private int queueCapacity;
@Value("${pool.keep-alive:60}")
private int keepAliveSeconds;
@Value("${thread-name-prefix:myDefinl}")
private String threadNamePrefix;
//设置@Async的默认线程池
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(corePoolSize);//核心线程池数
pool.setMaxPoolSize(maxPoolSize); // 最大线程数
pool.setQueueCapacity(queueCapacity);//队列容量,当核心线程数达到最大时,新任务会放在队列中排队等待执行
pool.setKeepAliveSeconds(keepAliveSeconds);//线程空闲时间
pool.setAllowCoreThreadTimeOut(false);//核心线程会一直存活,即使没有任务需要执行。(默认false)时,核心线程会超时关闭
pool.setThreadNamePrefix(threadNamePrefix);//线程前缀名称
//默认:AbortPolicy 丢弃任务,抛运行时异常
//CallerRunsPolicy由调用线程处理该任务
pool.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
return pool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}
/**
* 自定义异常处理类
* 被 @Async 修饰的方法在独立线程调用,不能被@ControllerAdvice全局异常处理器捕获,所以需要自己设置异常处理
*/
}
@Slf4j
class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param: objects) {
log.info("Parameter value - " + param);
}
log.error("handleUncaughtException method:" + method.getName(), throwable);
}
}
多线程池定义及使用
package com.bestj.syncio.config;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author shf 自定义多线程池
* @date 2022-03-14 11:46
*/
@Configuration
public class ThreadPoolConfig {
/**
* 核心线程数 默认的核心线程数为1
*/
private static final int CORE_POOL_SIZE = 2;
/**
* 最大线程数 默认的最大线程数是Integer.MAX_VALUE
*/
private static final int MAX_POOL_SIZE = 20;
/**
* 缓冲队列数 默认的缓冲队列数是Integer.MAX_VALUE
*/
private static final int QUEUE_CAPACITY = 50;
/**
* 允许线程空闲时间 默认的线程空闲时间为60秒
*/
private static final int KEEP_ALIVE_SECONDS = 30;
@Bean("destroyResource")
public AsyncTaskExecutor destroyImGroupTaskExecutor() {
return getAsyncTaskExecutor("del-resource-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
}
@Bean("statisticsData")
public AsyncTaskExecutor statisticsDataExecutor() {
return getAsyncTaskExecutor("save-data-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
}
@Bean("commonTaskExecutor")
public AsyncTaskExecutor get() {
return getAsyncTaskExecutor("common-ex-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
}
private AsyncTaskExecutor getAsyncTaskExecutor(String threadNamePrefix
, int MAX_POOL_SIZE, int QUEUE_CAPACITY) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
taskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
taskExecutor.setThreadNamePrefix(threadNamePrefix);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;
}
}
@Async("statisticsData")
public Future<Integer> test1(Integer i) throws InterruptedException {
Thread.sleep(100);
System.out.println("@Async 执行: " + i +"线程名称"+ Thread.currentThread().getName());
return new AsyncResult(i);
}
//调用要与异步方法不在一个类
@GetMapping("/service2")
public void service2(){
long l = System.currentTimeMillis();
System.out.println("service2 执行----->");
List<Future> result = new ArrayList<>();
try {
for (int i = 0; i < 300; i++) {
Future<Integer> integerFuture = asyncExecutorTest.test1(i);
result.add(integerFuture);
}
for (Future future : result) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("service2执行出错");
}
System.out.println("service2 结束----->" + (System.currentTimeMillis() - l));
}