0
点赞
收藏
分享

微信扫一扫

SpringBoot @Async异步注解

芷兮离离 2022-04-20 阅读 97
java

1.SpringBoot @Async 注解的使用方式

         在启动类中加上@EnableAsync ,在异步类方法上使用@Async 

注解生效原理

		Spring 底层会用 Aop 去检测到有@Async 注解的方法,然后创建一个代理方法进行异步操作,
	在Spring 中 会根据两个条件决定不使用 SimpleAsyncTaskExecutor(无界队列,一个请求就创建一个线程),
	(1) @Async(value) 指定特定线程池
  (2) SpringFactor 中没有线程池bean对象,Springboot项目 默认会创建ThreadPoolTaskExecutor,会默认使用这个线程池,ThreadPoolTaskExecutor(拒绝策略会丢弃任务)

在这里插入图片描述

在这里插入图片描述

使用注意事项

		1.@Async 需配合配置异步线程池使用<task:executor id = “ asyncExecutor ” pool-size = “ 1 0 0 - 1 0 0 0 0 ” queue-capacity = “ 1 0 ” />
  •   2.@Async 修饰的方法, 被异步调用, 如果方法中有错误, 并不会抛出到控制台.你所看到的现象是, 程序不知道在哪一步不执行了, 而控制台没有打印出任何错误, 好像一切正常.这给排查问题带来一些干扰.所以最好是加上try, catch代码块, 在catch中将异常打印到控制台.
    

自定义默认线程池

package com.bestj.syncio.config;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 自定义线程池配置----------@Async默认线程池复写
 * @date 2021年2月21日14:28:26
 * @author Lvshiyang
 */
@Configuration
@Slf4j
public class ThreadPoolConfiguration implements AsyncConfigurer {
    @Value("${pool.core-size:4}")
    private int corePoolSize;

    @Value("${pool.max-size:8}")
    private int maxPoolSize;

    @Value("${pool.queue-capacity:5}")
    private int queueCapacity;

    @Value("${pool.keep-alive:60}")
    private int keepAliveSeconds;

    @Value("${thread-name-prefix:myDefinl}")
    private String threadNamePrefix;

    //设置@Async的默认线程池
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(corePoolSize);//核心线程池数
        pool.setMaxPoolSize(maxPoolSize); // 最大线程数
        pool.setQueueCapacity(queueCapacity);//队列容量,当核心线程数达到最大时,新任务会放在队列中排队等待执行
        pool.setKeepAliveSeconds(keepAliveSeconds);//线程空闲时间
        pool.setAllowCoreThreadTimeOut(false);//核心线程会一直存活,即使没有任务需要执行。(默认false)时,核心线程会超时关闭
        pool.setThreadNamePrefix(threadNamePrefix);//线程前缀名称
        //默认:AbortPolicy 丢弃任务,抛运行时异常
        //CallerRunsPolicy由调用线程处理该任务
        pool.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());

        return pool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncExceptionHandler();
    }

    /**
     * 自定义异常处理类
     * 被 @Async 修饰的方法在独立线程调用,不能被@ControllerAdvice全局异常处理器捕获,所以需要自己设置异常处理
     */

}

@Slf4j
class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
        log.info("Exception message - " + throwable.getMessage());
        log.info("Method name - " + method.getName());
        for (Object param: objects) {
            log.info("Parameter value - " + param);
        }
        log.error("handleUncaughtException method:" + method.getName(), throwable);
    }

}

多线程池定义及使用

package com.bestj.syncio.config;

import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author shf         自定义多线程池
 * @date 2022-03-14 11:46
 */
@Configuration
public class ThreadPoolConfig {
    /**
     * 核心线程数 默认的核心线程数为1
     */
    private static final int CORE_POOL_SIZE = 2;
    /**
     * 最大线程数 默认的最大线程数是Integer.MAX_VALUE
     */
    private static final int MAX_POOL_SIZE = 20;
    /**
     * 缓冲队列数 默认的缓冲队列数是Integer.MAX_VALUE
     */
    private static final int QUEUE_CAPACITY = 50;
    /**
     * 允许线程空闲时间 默认的线程空闲时间为60秒
     */
    private static final int KEEP_ALIVE_SECONDS = 30;

    @Bean("destroyResource")
    public AsyncTaskExecutor destroyImGroupTaskExecutor() {
        return getAsyncTaskExecutor("del-resource-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
    }

    @Bean("statisticsData")
    public AsyncTaskExecutor statisticsDataExecutor() {
        return getAsyncTaskExecutor("save-data-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
    }

    @Bean("commonTaskExecutor")
    public AsyncTaskExecutor get() {
        return getAsyncTaskExecutor("common-ex-td-", MAX_POOL_SIZE, QUEUE_CAPACITY);
    }

    private AsyncTaskExecutor getAsyncTaskExecutor(String threadNamePrefix
        , int MAX_POOL_SIZE, int QUEUE_CAPACITY) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
        taskExecutor.setThreadNamePrefix(threadNamePrefix);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return taskExecutor;
    }
}


 @Async("statisticsData")
    public Future<Integer> test1(Integer i) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("@Async 执行: " + i +"线程名称"+ Thread.currentThread().getName());
        return new AsyncResult(i);
    }
    
//调用要与异步方法不在一个类
@GetMapping("/service2")
    public void service2(){
        long l = System.currentTimeMillis();
        System.out.println("service2 执行----->");
        List<Future> result = new ArrayList<>();
        try {
            for (int i = 0; i < 300; i++) {
                Future<Integer> integerFuture = asyncExecutorTest.test1(i);
                result.add(integerFuture);
            }
            for (Future future : result) {
                System.out.println(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("service2执行出错");
        }
        System.out.println("service2 结束----->" + (System.currentTimeMillis() - l));
    }
举报

相关推荐

0 条评论