通过函数式接口实现模板模式线程池
主class
package demo;
import java.util.*;
import java.util.concurrent.*;
/**
* @date: 2021/9/30 9:58
* @version: V1.0
* @Description:
* <p>
* 批处理的工具类
* </p>
*/
public class BatchTemplate {
/**
* 定义最大的执行线程数量
*/
private static final Integer MAX_THREAD = 8;
/**
*
* @param sourceList 执行的数据列表
* @param splitCount 拆分的数量
* @param executor 执行的方法
* @param combiner 对返回值的拼接处理
* @param maxThread 最大线程数
* @param <T> 请求参数
* @param <V> 返回值
* @return
*/
public static <T,V> WhhRestResult executeWithSplitList(List<T> sourceList,Integer splitCount,
BatchCallExecutor<List<T>,V> executor,ResultCombiner<V> combiner,Integer maxThread) {
//拆分
List<List<T>> splitGroup = ListSplitUtils.splitList(sourceList,splitCount);
return executeWithoutException(splitGroup,executor,combiner,maxThread);
}
/**
* 不含异常的处理方法
* @param req
* @param executor
* @param combiner
* @param maxThread
* @param <T>
* @param <V>
* @return
*/
public static <T,V> WhhRestResult executeWithoutException(Collection<T> req, BatchCallExecutor<T,V> executor, ResultCombiner<V> combiner, Integer maxThread){
return execute(req,executor,combiner,null,maxThread);
}
/**
* 简单的无需处理返回值和异常
* @param req
* @param executor
* @param <T>
* @return
*/
public static <T> WhhRestResult execute(Collection<T> req, BatchExecutor<T> executor) {
BatchCallExecutor<T,Boolean> batchCallExecutor = e->{
executor.execute(e);
return true;
};
return execute(req, batchCallExecutor, aBoolean -> {});
}
/**
* 需要对异常进行处理
* @param req
* @param executor
* @param handler
* @param <T>
* @return
*/
public static <T> WhhRestResult execute(Collection<T> req, BatchExecutor<T> executor, BatchErrorHandler<T> handler) {
BatchCallExecutor<T,Boolean> batchCallExecutor = e->{
executor.execute(e);
return true;
};
return execute(req, batchCallExecutor, aBoolean -> {},handler,MAX_THREAD);
}
/**
* 无需处理异常
* @param req
* @param executor
* @param combiner
* @param <T>
* @param <V>
* @return
*/
public static <T,V> WhhRestResult execute(Collection<T> req, BatchCallExecutor<T,V> executor,ResultCombiner<V> combiner){
return execute(req,executor,combiner,null);
}
/**
* 无需指定最大线程数
* @param req
* @param executor
* @param combiner
* @param handler
* @param <T>
* @param <V>
* @return
*/
public static <T,V> WhhRestResult execute(Collection<T> req, BatchCallExecutor<T,V> executor,ResultCombiner<V> combiner,BatchErrorHandler<T> handler) {
return execute(req,executor,combiner,handler,MAX_THREAD);
}
/**
* 批量执行的核心方法,如果部分成功,统一返回WhhRestResult[200]
* 允许用户自定义处理异常和结果集,无并发问题
* @param req 请求的参数集合,集合中每个元素给定一个线程执行
* @param executor 处理元素的执行器
* @param combiner 元素返回结果的合并执行器
* @param handler 处理异常的执行器,将请求参数作为回调参数
* @param maxThread 用户指定的最大执行线程数
* @param <T> 请求参数类型
* @param <V> 返回参数类型
* @return
*/
public static <T,V> WhhRestResult execute(
Collection<T> req,
BatchCallExecutor<T,V> executor,
ResultCombiner<V> combiner,
BatchErrorHandler<T> handler,
Integer maxThread
) {
//如果要执行的列表为空,直接返回
if(CollectionUtils.isEmpty(req)){
return WhhRestResult.ok().message(null);
}
//如果执行的数量为1,则不需要开启多线程
if(req.size()==1){
T t = req.iterator().next();
try{
V v = executor.execute(t);
combiner.combine(v);
}catch (Exception ex){
//如果定义了异常处理,则执行自定义异常处理
if(handler!=null){
log.error("批量执行单条时出错:原因: ",ex);
handler.handError(t,ex);
return WhhRestResult.server_error().message(ex.getMessage());
}else{
log.error("批量执行时出错:",ex);
throw new RuntimeException(ex.getMessage());
}
}
return WhhRestResult.ok();
}
//定义最大的线程数
//不论用户定义多大,自己要定一个上线
Integer maxNum = req.size()+1>maxThread?maxThread:req.size()+1;
if(maxNum>MAX_THREAD){
maxNum = MAX_THREAD;
}
//新开一个定长的线程池
ExecutorService executorService = Executors.newFixedThreadPool(maxNum);
List<Callable> tasks = new ArrayList<Callable>();
List<T> reqList = new ArrayList<>();
//保证线程同步发挥数据
for (T t : req) {
tasks.add((Callable<V>) () -> executor.execute(t));
reqList.add(t);
}
List<Future<V>> resultList = new ArrayList();
for (Callable task : tasks) {
Future future = executorService.submit(task);
resultList.add(future);
}
//执行过后关闭线程池
executorService.shutdown();
//拼接错误信息返回给用户
StringBuilder sb = new StringBuilder("错误信息");
//记录是否成功
Boolean isSuccess = true;
//记录是否部分成功
Boolean partSuccess = false;
Set<String> errorMsg = new HashSet<>();
for (int i = 0; i < resultList.size(); i++) {
Future<V> fs = resultList.get(i);
try {
V result = fs.get();
combiner.combine(result);
partSuccess= true;
} catch (InterruptedException e) {
isSuccess = false;
if(handler!=null){
handler.handError(reqList.get(i),e);
}else{
throw new RuntimeException(e.getMessage());
}
} catch (ExecutionException e) {
isSuccess = false;
if(handler!=null){
if(!errorMsg.contains(e.getMessage())){
sb.append(":"+e.getMessage());
errorMsg.add(e.getMessage());
}
handler.handError(reqList.get(i),e);
}else{
//业务异常记录日志
log.error("批量执行时出错:",e);
throw new RuntimeException(e.getMessage());
}
}
}
WhhRestResult result;
//如果没有部分成功,则返回异常
if(!partSuccess){
result = WhhRestResult.server_error().message(sb.toString());
}else{
//如果部分成功,返回状态码都是200
result = WhhRestResult.ok();
if(!isSuccess){
result.setMessage(sb.toString());
}else{
result.setMessage(null);
}
}
return result;
}
}
参与的一些函数式接口
public interface ResultCombiner<V> {
/**
* 处理返回值
* @param v 返回值
*/
void combine(V v);
}
public interface BatchCallExecutor<T, V> {
V execute(T req);
}
public interface BatchExecutor<T> {
/**
*
* @param req 请求参数
*/
void execute(T req);
}
public interface BatchErrorHandler<T> {
/**
*
* @param req 请求参数
* @param ex 异常
*/
void handError(T req,Exception ex);
}
List分组类
public class ListSplitUtils {
public static <T> List<List<T>> splitList(List<T> list, int len) {
if (list == null || list.size() == 0 || len < 1) {
return null;
}
List<List<T>> result = new ArrayList<>();
int size = list.size();
int count = (size + len - 1) / len;
for (int i = 0; i < count; i++) {
List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
result.add(subList);
}
return result;
}
}