业务背景:
我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:
{
"createBy": "aa",
"receiverList": [
"bb",
"cc"
]
}
实际第三方业务会进行多次调用接口,每次传递的数据可能如下:
{
"createBy": "aa",
"receiverList": [
"bb"
]
}
或者
{
"createBy": "aa",
"receiverList": [
"cc"
]
}
或者
{
"createBy": "bb",
"receiverList": [
"cc"
]
}
或者
{
"createBy": "aa",
"receiverList": [
"bb",
"cc"
]
}
所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。
代码实现
package com.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Description: 请求合并管理类
*/
@Slf4j
@Component
public class RequestMerger {
// 线程池核心线程数
private final int corePoolSize = 200;
// 任务执行超时时间,单位:毫秒
private final int timeout = 5 * 60 * 1000;
// 队列,队列长度为Integer.MAX_VALUE
private final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();
// 定时器,所有任务共用线程池
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,
new CustomizableThreadFactory("schedule-executor-"));
// 是否关闭标志
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
/**
* 构造函数,用于初始化请求合并器。
*
* @param batchSize 每次合并的最大请求数量。
* @param delayMillis 合并请求的周期间隔,单位为毫秒。
*/
public RequestMerger(int batchSize, long delayMillis) {
// 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次
scheduler.scheduleAtFixedRate(() -> {
if (!isShutdown.get()) {
List<String> batch = new ArrayList<>(batchSize);
int drainedCount = requestQueue.drainTo(batch, batchSize);
log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());
if (!batch.isEmpty()) {
// 异步执行任务,防止业务执行时间过长导致业务整体延迟过大
scheduler.submit(() -> {
sendRequestBatch(batch);
});
}
}
}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);
}
/**
* 发送请求批次的方法。
*
* @param batch 请求批次。
*/
private void sendRequestBatch(List<String> batch) {
Future<?> future = scheduler.submit(() -> {
try {
// 在这里实现你的请求发送逻辑
// 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求
// ...
System.out.println("Sending batch of " + batch.size() + " requests");
} catch (Exception e) {
// 异常处理逻辑
System.err.println("Error sending requests: " + e.getMessage());
}
});
// 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务
try {
// 超时时间,单位:毫秒
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException e) {
// 超时或执行异常时取消任务
future.cancel(true);
} catch (Exception e) {
log.error("==>任务执行异常", e);
// 任务执行异常
future.cancel(true);
}
}
/**
* 在对象销毁前执行的关闭操作。
* 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。
* 无参数和返回值。
*/
@PreDestroy
public void shutdown() {
isShutdown.set(true);
List<String> batch = new ArrayList<>();
// 获取请求队列中的剩余所有请求
int drainedCount = requestQueue.drainTo(batch);
log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());
// 批量发送收集到的剩余请求
sendRequestBatch(batch);
// 关闭定时执行器
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("Interrupted during scheduler termination, force shutting down.");
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;
* 如果服务已关闭,则将该请求作为一批请求发送。
*
* @param request 要添加的请求字符串。
*/
public void addRequest(String request) throws InterruptedException {
// 检查服务是否已关闭
if (!isShutdown.get()) {
// 未关闭,直接添加到请求队列
requestQueue.put(request);
} else {
// 已关闭,将当前请求作为一批发送
List<String> batch = new ArrayList<>();
batch.add(request);
sendRequestBatch(batch);
}
}
}
参考资料
https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent
注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。