0
点赞
收藏
分享

微信扫一扫

多线程工具

眸晓 2023-10-13 阅读 36

多线程工具

工具类

public class SupplyAsyncUtils {



    /**
     * 多线程上传冠心
     *
     * @param futures 异步执行的数据
     *                 staram <T>
     * @return
     */
    public static synchronized void supplyAsyncUploadingGX(List<CompletableFuture<?>> futures) {
        /* 等待所有返回 */
        CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            /* 获取返回数据结果 */
            result.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取所有字段为null的属性名
     *
     * @param source 要复制的源对象
     * @return
     */
    public static String[] getNullPropertyNames(Object source) {
        final BeanWrapper src = new BeanWrapperImpl(source);
        java.beans.PropertyDescriptor[] pds = src.getPropertyDescriptors();

        Set<String> emptyNames = new HashSet<>();
        for (java.beans.PropertyDescriptor pd : pds) {
            Object srcValue = src.getPropertyValue(pd.getName());
            if (srcValue == null) emptyNames.add(pd.getName());
        }
        String[] result = new String[emptyNames.size()];
        return emptyNames.toArray(result);
    }
}


线程池配置

@Slf4j
@EnableAsync
@Configuration
public class ExecutorConfig {


    private static ExecutorConfig executorConfig;
    //核心线程数
    private static int CORE_POOL_SIZE = 10;
    //最大线程数
    private static int MAX_POOL_SIZE = 20;
    //队列大小
    private static int QUEUE_CAPACITY = 1000;

    /**
     * 核心线程数
     *
     * @param corePoolSize
     * @return
     */
    public ExecutorConfig setCorePoolSize(Integer corePoolSize) {
        CORE_POOL_SIZE = corePoolSize;
        return this.getExecutorConfig();
    }

    /**
     * 最大线程数
     *
     * @param queueCapacity
     * @return
     */
    public ExecutorConfig setMaxPoolSize(Integer queueCapacity) {
        MAX_POOL_SIZE = queueCapacity;
        return this.getExecutorConfig();
    }

    /**
     * 队列大小
     *
     * @param queueCapacity
     * @return
     */
    public ExecutorConfig setQueueCapacity(Integer queueCapacity) {
        QUEUE_CAPACITY = queueCapacity;
        return this.getExecutorConfig();
    }

    public static ExecutorConfig getExecutorConfig() {
        if (null == executorConfig) {
            synchronized (ExecutorConfig.class) {
                if (null == executorConfig) {
                    return executorConfig = SpringUtil.getBean(ExecutorConfig.class);
                }
            }
        }
        return executorConfig;
    }

    @Bean
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(CORE_POOL_SIZE);
        //配置最大线程数
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        //配置队列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        //等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 拒绝策略
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;

    }

}

调用 (有返回)


 		//数据汇总返回的 map
        HashMap<String, Object> collectMap = new HashMap<>();
            List<CompletableFuture<?>> futures = new ArrayList<>();
            futures.add(CompletableFuture.supplyAsync(() -> {
                try {
                    /**
                     * 需要异步处理的方法
                     */
                    for (AgedExaminationBeanEnum agedExaminationBeanEnum : AgedExaminationBeanEnum.values()) {
                        IAgedExaminationReport agedExaminationBean = strategyBean.getAgedExaminationBean(agedExaminationBeanEnum);
                        //调用查询各个大项的结果数据 (注意,除了数组以及对象外 其他基本类型都需要为 String 方便处理)
                        HashMap<String, Object> examinationReportMap = agedExaminationBean.examinationReport(agedExaminationReportParam);
                        //每次调用将返回值放入 collectMap 
                        if (null != examinationReportMap) {
                            collectMap.putAll(examinationReportMap);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("FileController-agedExaminationReport异常:{}", e.getMessage());
                }
                return null;
            }, ExecutorConfig.getExecutorConfig().asyncServiceExecutor()));
            //任务提交
            SupplyAsyncUtils.supplyAsyncUploadingGX(futures);
举报

相关推荐

0 条评论