0
点赞
收藏
分享

微信扫一扫

java 自定义队列 队列达到指定的容量执行方法 或者指定时间内未到达指定容量也执行

DYBOY 2022-08-12 阅读 85

// 线程数

private static final int numberOfConcurrentThreads = 10;

// 当指定时间队列中的数据未达到eventStorageListMaxCapacity的值时, 按该时间实行 min为单位

private static final Integer notReachedIntervals = 5;

// 队列最大容量

private static final Integer storageListMaxCapacity = 10000;

// listener执行次数 计数器

private static AtomicInteger executeCount = new AtomicInteger();

// 事件集合

private static List<EventObject> eventStorageList = Collections.synchronizedList(new ArrayList<>());

public static void add(EventObject eventObject) {

eventStorageList.add(eventObject);

}

/**

  • 举例: 监听存储时间的list 若大于10000执行保存操作 或者到了5分钟还没大于10000也执行保存操作

*/

@PostConstruct

public void initEventStorageListener() {

ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();

scheduled.scheduleWithFixedDelay(() -> {

if (eventStorageList.size() > storageListMaxCapacity) {

List<EventObject> temporary = new ArrayList<>(eventStorageList);

eventStorageList.removeAll(temporary);

saveEvent(temporary);

} else {

// 一分钟执行一次 计数器加一

int cr = executeCount.incrementAndGet();

if (cr > notReachedIntervals) {

if (!eventStorageList.isEmpty()) {

List<EventObject> temporary = new ArrayList<>(eventStorageList);

eventStorageList.removeAll(temporary);

saveEvent(temporary);

// 初始计数器

executeCount = new AtomicInteger();

}

}

}

}, 1, 1, TimeUnit.MINUTES);

}

private static Executor executor = Executors.newFixedThreadPool(numberOfConcurrentThreads);

private void saveEvent(List<EventObject> temporary) {

List<List<EventObject>> lists = ListUtils.cut(temporary, (int) Math.ceil((double) temporary.size() / numberOfConcurrentThreads));

for (List<EventObject> list : lists) {

try {

executor.execute(() -> {

// 业务代码

// do something

});

} catch (Exception e) {

log.error("{}, {}", Thread.currentThread().getName(), e.getMessage());

eventStorageList.addAll(list);

}

}

}

}

集合切割工具类

public class ListUtils {

/**

  • list分割 list<list<T>>

  • @param L 按多少量切割

*/

public static <T> List<List<T>> cut(List<T> list, final int L) {

if (L <= 0) {

return Collections.singletonList(list);

}

List<List<T>> parts = new ArrayList<>();

final int N = list.size();

for (int i = 0; i < N; i += L) {

parts.add(new ArrayList<>(

list.subLi

java 自定义队列 队列达到指定的容量执行方法 或者指定时间内未到达指定容量也执行

st(i, Math.min(N, i + L)))

);

举报

相关推荐

0 条评论