// 线程数
private static final int numberOfConcurrentThreads = 10;
// 当指定时间队列中的数据未达到eventStorageListMaxCapacity的值时, 按该时间实行 min为单位
private static final Integer notReachedIntervals = 5;
// 队列最大容量
private static final Integer storageListMaxCapacity = 10000;
// listener执行次数 计数器
private static AtomicInteger executeCount = new AtomicInteger();
// 事件集合
private static List<EventObject> eventStorageList = Collections.synchronizedList(new ArrayList<>());
public static void add(EventObject eventObject) {
eventStorageList.add(eventObject);
}
/**
- 举例: 监听存储时间的list 若大于10000执行保存操作 或者到了5分钟还没大于10000也执行保存操作
*/
@PostConstruct
public void initEventStorageListener() {
ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
scheduled.scheduleWithFixedDelay(() -> {
if (eventStorageList.size() > storageListMaxCapacity) {
List<EventObject> temporary = new ArrayList<>(eventStorageList);
eventStorageList.removeAll(temporary);
saveEvent(temporary);
} else {
// 一分钟执行一次 计数器加一
int cr = executeCount.incrementAndGet();
if (cr > notReachedIntervals) {
if (!eventStorageList.isEmpty()) {
List<EventObject> temporary = new ArrayList<>(eventStorageList);
eventStorageList.removeAll(temporary);
saveEvent(temporary);
// 初始计数器
executeCount = new AtomicInteger();
}
}
}
}, 1, 1, TimeUnit.MINUTES);
}
private static Executor executor = Executors.newFixedThreadPool(numberOfConcurrentThreads);
private void saveEvent(List<EventObject> temporary) {
List<List<EventObject>> lists = ListUtils.cut(temporary, (int) Math.ceil((double) temporary.size() / numberOfConcurrentThreads));
for (List<EventObject> list : lists) {
try {
executor.execute(() -> {
// 业务代码
// do something
});
} catch (Exception e) {
log.error("{}, {}", Thread.currentThread().getName(), e.getMessage());
eventStorageList.addAll(list);
}
}
}
}
集合切割工具类
public class ListUtils {
/**
-
list分割 list<list<T>>
- @param L 按多少量切割
*/
public static <T> List<List<T>> cut(List<T> list, final int L) {
if (L <= 0) {
return Collections.singletonList(list);
}
List<List<T>> parts = new ArrayList<>();
final int N = list.size();
for (int i = 0; i < N; i += L) {
parts.add(new ArrayList<>(
list.subLi
st(i, Math.min(N, i + L)))
);