分析步骤:
Step1. ThreadPoolExecutor 的基本用法,编写通用工具类
Step2. 基于面向接口开发,进行通用抽象
Step3. 分析spring事务,将基于注解的声明式事务,改为编程式事务
Step4. 使用 变量表示来决定是否使用统一事务
Step1: ThreadPoolExecutor 简单用法
基本处理代码
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("flow-pool-%d")
.build();
int corePoolSize = 10;
int maximumPoolSize = 10;
long keepAliveTime = 3;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
namedThreadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 线程处理前置方法
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 线程处理后置方法
}
};
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < group; i++) {
int startIndex = i * groupSize;
int endIndex = (i + 1) * groupSize;
if (endIndex > toDoList.size()) {
endIndex = toDoList.size();
}
List<?> items = toDoList.subList(startIndex, endIndex);
futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
}
try {
for (Future<?> future : futures) {
future.get();
}
} catch (Exception e) {
e.printStackTrace();
// 业务操作
} finally {
executorService.shutdown();
}
- 构造方法
名称 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程池大小 |
maximumPoolSize | int | 最大线程池大小 |
keepAliveTime | long | 线程最大空闲时间 |
unit | TimeUnit | 时间单位 |
workQueue | BlockingQueue<Runnable> | 线程等待队列 |
threadFactory | ThreadFactory | 线程创建工厂 |
- ThreadPoolExecutor 重写方法
方法名 | 作用 |
---|---|
protected void beforeExecute(Thread t, Runnable r) { } | 线程处理前置调用 |
protected void afterExecute(Runnable r, Throwable t) { } | 线程处理后置调用 |
protected void terminated() { } | 线程处理结束之后调用 |
- 线程池调用任务
此处为线程池实际处理方法,
ExecutionService.submit(Runnable task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
SingleTransactionPoolTask 实现 runnable 接口
public class SingleTransactionPoolTask implements Runnable {
private final ThreadExecution threadExecution;
private final List<?> list;
private final BatchTransactionFlag flag;
public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
this.threadExecution = threadExecution;
this.list = list;
this.flag = flag;
}
@Override
public void run() {
try {
threadExecution.threadExecute(list);
} finally {
flag.getCompleteThreads().incrementAndGet();
}
}
}
- 返回线程调用的 处理方法
主要进行子线程中是否有异常,如果具有异常则应该进行的对应业务处理
try {
for (Future<?> future : futures) {
future.get();
}
} catch (Exception e) {
e.printStackTrace();
// 业务操作
} finally {
executorService.shutdown();
}
Step2: 基于面向接口开发,将业务操作进行多态
ThreadExecution 抽象子任务接口,具体不同业务编写指定的实现类,形成多态。通用工具类统一调用接口
public interface ThreadExecution {
/**
* 处理线程任务
* @param list
*/
void threadExecute(List<?> list);
}
SingleTransactionPoolTask 通用任务实现类,基于 依赖倒置原则 调用 ThreadExecution
public class SingleTransactionPoolTask implements Runnable {
private final ThreadExecution threadExecution;
private final List<?> list;
private final BatchTransactionFlag flag;
public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) {
this.threadExecution = threadExecution;
this.list = list;
this.flag = flag;
}
@Override
public void run() {
try {
threadExecution.threadExecute(list);
} finally {
flag.getCompleteThreads().incrementAndGet();
}
}
}
实现 ThreadExecution 接口,进行业务多态
BatchStartProcessThreadExecutionImpl
@Slf4j
public class BatchStartProcessThreadExecutionImpl implements ThreadExecution {
private RuntimeService runtimeService;
private List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records;
public BatchStartProcessThreadExecutionImpl(List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records) {
this.records = records;
this.runtimeService = SpringContextUtils.getBean(RuntimeService.class);
}
@Override
public void threadExecute(List list) {
// 省略业务代码
}
}
BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl
@Slf4j
public class BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl implements ThreadExecution {
private List<BatchCompleteTaskRsp.CompleteTaskItem> result;
private FlowTaskService flowTaskService;
public BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl(List<BatchCompleteTaskRsp.CompleteTaskItem> result) {
this.result = result;
this.flowTaskService = SpringContextUtils.getBean(FlowTaskService.class);
}
@Override
public void threadExecute(List list) {
// 省略业务代码
}
}
Step3. 分析spring事务,将基于注解的声明式事务,改为编程式事务
PlatformTransactionManager接口的方法
方法名 | 功能 |
---|---|
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) | 获取当前的事务 |
void commit(TransactionStatus status) | 提交事务 |
void rollback(TransactionStatus status) | 回滚事务 |
所以获取事务的代码则为
// 获取事务
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
// 提交事务
transactionManager.commit(transactionStatus);
// 回滚事务
transactionManager.rollback(transactionStatus);
DefaultCommonThreadExecutionServiceBean
@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {
@Resource
private DataSourceTransactionManager transactionManager;
@Override
// @Transactional(rollbackFor = Exception.class)
public int executeBatch(ThreadExecution threadExecution, List<?> sequence, List<TransactionStatus> transactionStatusList) {
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
transactionStatusList.add(transactionStatus);
threadExecution.threadExecute(sequence);
return 0;
}
}
FlowThreadPoolExecutor 代码段
DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);
try {
for (Future future : futures) {
future.get();
}
transactionStatusList.forEach(obj -> {
transactionManager.commit(obj);
});
} catch (Exception e) {
e.printStackTrace();
transactionStatusList.forEach(obj -> {
transactionManager.rollback(obj);
});
} finally {
executorService.shutdown();
}
org.springframework.transaction.support.TransactionSynchronizationManager#unbindResource
/**
* Unbind a resource for the given key from the current thread.
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value (usually the active resource object)
* @throws IllegalStateException if there is no value bound to the thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
org.springframework.transaction.support.TransactionSynchronizationManager
提交事务:org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
抛出异常:org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
/**
* Clean up after completion, clearing synchronization if necessary,
* and invoking doCleanupAfterCompletion.
* @param status object representing the transaction
* @see #doCleanupAfterCompletion
*/
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
BatchTransactionFlag
@Getter
public class BatchTransactionFlag {
private final AtomicInteger completeThreads = new AtomicInteger();
private final AtomicInteger successThreads = new AtomicInteger();
private final int groupSize;
private boolean batchTransaction;
private Map<Long, TransactionStatus> longTransactionStatusMap;
private final List<?> toDoList;
public BatchTransactionFlag(int groupSize, boolean batchTransaction, List<?> toDoList) {
this.groupSize = groupSize;
this.batchTransaction = batchTransaction;
this.toDoList = toDoList;
if (batchTransaction) {
longTransactionStatusMap = new ConcurrentHashMap<>();
}
}
}
CommonThreadExecutionService实现
@Slf4j
@Service
public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService {
@Resource
private DataSourceTransactionManager transactionManager;
@Override
public int executeBatch(ThreadExecution threadExecution, List sequence, Map<Long, TransactionStatus> longTransactionStatusMap, BatchTransactionFlag flag) {
synchronized (flag) {
TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults());
longTransactionStatusMap.put(Thread.currentThread().getId(), transactionStatus);
try {
threadExecution.threadExecute(sequence);
flag.getSuccessThreads().incrementAndGet();
} finally {
flag.getCompleteThreads().incrementAndGet();
log.info("完成任务:" + Thread.currentThread().getName());
}
}
return 0;
}
}
Step4. 使用 变量表示来决定是否使用统一事务
线程池执行的代码
for (int i = 0; i < group; i++) {
int startIndex = i * groupSize;
int endIndex = (i + 1) * groupSize;
if (endIndex > toDoList.size()) {
endIndex = toDoList.size();
}
List<?> items = toDoList.subList(startIndex, endIndex);
if (batchTransaction) {
futures.add(executorService.submit(new BatchTransactionPoolTask(execution, items, flag.getLongTransactionStatusMap(), flag)));
} else {
futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));
}
}
线程池的构建
private static ThreadPoolExecutor createThreadPoolExecutorInstance(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
BatchTransactionFlag flag
) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("flow-pool-%d")
.build();
String currentUserId = SecurityUtils.getCurrentUserId();
DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class);
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
namedThreadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
Authentication.setAuthenticatedUserId(currentUserId);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (flag.isBatchTransaction()) {
try {
while (flag.getCompleteThreads().get() != flag.getGroupSize()) {
log.info(Thread.currentThread().getName() + " 等待主线程:getGroupSize:" + flag.getGroupSize() + "\tgetCompleteThreads:" + flag.getCompleteThreads().get());
log.info("开启事务个数:" + flag.getLongTransactionStatusMap().size());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
TransactionStatus status = flag.getLongTransactionStatusMap().get(Thread.currentThread().getId());
if (flag.getSuccessThreads().get() == flag.getCompleteThreads().get()) {
log.info(Thread.currentThread().getName() + ":全部执行成功,提交事务");
transactionManager.commit(status);
} else {
log.info(Thread.currentThread().getName() + ":具有线程执行失败,回滚事务");
transactionManager.rollback(status);
}
}
}
};
}