Java Hutool TriggerContainer介绍
一.什么是TriggerContainer
TriggerContainer是啥,自从Hutool从5.7.24版本后,增加了一个新的类。它就是TriggerContainer。
为啥把它叫做触发容器, 它的存在到底能解决哪些问题?相比其他容器它有哪些优势呢?
请不要着急,这还要从上周我遇到的那个需求说起,那可真不是一个美好的回忆。
二.业务介绍
为了讨论一些纯粹的技术,请容许我隐瞒一些业务细节,我将用最简单的话将清楚上周到底发生了什么。
那天,风和日丽,我接受到需求,需要写一个程序将一个文件中的手机序列号清单(IMEI),
通过调用A平台的接口进行校验。
将满足校验的清单结果调用B平台的的上报接口进行上报。
三.编码实现
这很容易,不到半个小时,我就编写完成了大致的代码
public void test2() throws FileNotFoundException {
//读取文件
final List<String> allList = FileUtil.readLines(new FileInputStream("imeiList.txt"));
//使用HuTool工具进行分批
final List<List<String>> batchList = ListUtil.split(allList, 10000);
for (List<String> list : batchList) {
//调用A平台进行校验,得到满足校验的结果
final List<String> res = callAPlatform(list);
//再进行分批
final List<List<String>> reportData = ListUtil.split(res, 5000);
for (List<String> reportDatum : reportData) {
//将过滤后的结果上报B平台
reportBPlatform(reportDatum);
}
}
}
四.王婆卖瓜
因为A平台的校验接口最多上传1万个IMEI,而B平台的上传接口一次性最多上传5000个IMEI
我使用Hutool工具类进行分批,将分批的结果调用A平台和B平台
我对这段代码非常满意,等待着上线使用。而这,却是…
五.噩梦的开始(一)
首先是需求方和我说,这次要处理的文件,文件的数量大概在4亿条左右,你的程序需要避免数据量过大引发的一些问题。
这可把我吓了一跳。如果数据量过亿的话,可不能使用这句话
//读取文件
final List<String> allList = FileUtil.readLines(new FileInputStream("imeiList.txt"));
把4亿个长度为32的字符串,一口气导入内存,势必会引发OOM异常。(imei是md5加密的,所以是32位长度)
那就需要分批加载数据到内存中,代码类似于
public void test3() throws IOException {
//读取文件
BufferedReader bufferedReader = new BufferedReader(new FileReader("imeiList.txt"));
String str = null;
while ((str = bufferedReader.readLine()) != null) {
//doSomething
}
}
代码不够简洁,Hutool提供了BufferedReader的封装工具LineIter,可以按行遍历文件,而不把数据全量加载到内存中
@Test
public void test4() throws IOException {
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
for (String s : lineIter) {
//doSomething
}
}
但是这样,就不能使用原来的方法进行分组了。
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
//使用HuTool工具进行分批(这里不能这么用,编译报错)
final List<List<String>> batchList = ListUtil.split(lineIter, 10000);
不过,好在Hutool灰常给力,在5.7.10版本,提供了一个迭代器的分批工具,输入一个迭代器,返回一个升维的迭代器。你说巧不巧,LineIter正好是一个迭代器。这样代码就变成了如下:
@Test
public void test5() throws IOException {
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
//最关键的一句,调用分组迭起器
final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
for (List<String> list : lists) {
//调用A平台进行校验,得到满足校验的结果
final List<String> res = callAPlatform(list);
//再进行分批
final List<List<String>> reportData = ListUtil.split(res, 5000);
for (List<String> reportDatum : reportData) {
//将过滤后的结果上报B平台
reportBPlatform(reportDatum);
}
}
}
这样,无论文件是4亿,还是40亿,套了这层迭代器,每次进行内存的数据,最多不超过10000条数据。堆中使用过的数据,最终会被垃圾回收器回收。
真是太妙了!我出色的完成了任务,奶丝!
六.噩梦的延续(二)
但是很明显,需要检验的数据量上升,会带来新的问题。因为需要检测的数据量 4亿条,如果使用单线程,需要调用4万次。每次校验接口返回在200-400毫秒的时间,那么调用A接口所需要的时间高达3.33小时
这个时间是不可接受的,如果必须要控制时间在30分钟以内,需要改造成多线程的方式进行调用
我首先做了一次改造
@Test
public void test6() throws IOException {
//创建线程执行池
final ExecutorService executorService = Executors.newFixedThreadPool(16);
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
for (List<String> list : lists) {
//将任务提交到线程池中
executorService.execute(()->{
//调用A平台进行校验,得到满足校验的结果
final List<String> res = callAPlatform(list);
//再进行分批
final List<List<String>> reportData = ListUtil.split(res, 5000);
for (List<String> reportDatum : reportData) {
//将过滤后的结果上报B平台
reportBPlatform(reportDatum);
}
});
}
}
七.漏洞排查
这个改造有两个不起眼的漏洞
1.主线程可能提前结束
在取出最后一次数据时,可能主线程会先提前结束导致最后一个任务没能成功上报。因此需要避免主线程提前结束。
需要在主线程的末尾加上关闭线程池和判断是否结束的代码。这样可以确保所有任务完成后,主程序才会结束。
executorService.shutdown();
while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
System.out.println("线程还没有关闭");
}
System.out.println("线程已经关闭");
2.内存有被占满的风险
因为原本单线程情况下,只会有当前这一批10000条数据在内存中是根可达的。
但是多线程下,读取文件与处理数据是不会相互阻塞的。
那么LineIter就会不断的把数据加载到内存中,提交给executorService。
有一定的概率会发生OutOfMemoryError:GC overhead limit exceeded。当线程池的核心线程数越高,发生概率越大。
这是因为没有控制文件读取行迭代器的读取速度,大量的内存占用导致CPU有绝大部份的时间在gc。
我最终决定采用ArrayBlockingQueue来进行进行速度控制
@Test
public void test7() throws IOException, InterruptedException {
//阻塞队列,控制文件读取迭代器不要迭代太快
ArrayBlockingQueue<Integer> data = new ArrayBlockingQueue<>(48);
//创建线程执行池
final ExecutorService executorService = Executors.newFixedThreadPool(16);
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt"));
final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
for (List<String> list : lists) {
try {
//data里每有一个元素,代表内存中有10000万条数据没有被处理。
//如果data达到48个,再往里面添加1一个就会阻塞,主线程无法继续读取文件迭代器,达到控制读取速度的目的
data.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//将任务提交到线程池中
executorService.execute(() -> {
//调用A平台进行校验,得到满足校验的结果
try {
final List<String> res = callAPlatform(list);
//再进行分批
final List<List<String>> reportData = ListUtil.split(res, 5000);
for (List<String> reportDatum : reportData) {
//将过滤后的结果上报B平台
reportBPlatform(reportDatum);
}
//上报完毕后释放1条记录,之前的阻塞可以解除,相当于通知主线程可以继续读取迭代器
data.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
//避免主线程提前结束
executorService.shutdown();
while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
System.out.println("线程还没有关闭");
}
System.out.println("线程已经关闭");
}
七.噩梦的延续(三)
通过使用ArrayBlockingQueue,控制了主线程读取文件的速度,避免造成内存OOM。我出色的完成了任务。
接下来,我就遇到了新的问题。
我得到消息,B平台的上报接口,是一个付费接口。
而且,它不是按IMEI个数来收费。是按照接口调用次数来收费。每调用1000次收2块钱。
我的乖乖,这也就意味着,这个接口,我不能随随便便就调用一下。要精打细算,每凑满500条数据,再调一下。
像我现在的写法,1000条数据,假如有700条有效,那么会造成500条调一下。200条调一下。愣是有300条打了水漂。
这个问题该怎么解决呢?
八.消除噩梦
一种思路,需要一个全局的容器
- 每次调完A接口得到的有效结果,满500先调1次上报接口,不满500的部分先存起来。
- 等其他线程也来存的时候,凑满了500条,再取出来调一次上报接口。
- 必须要支持并发存取,提高效率
- 两个线程之间不能等待太久,不然这些暂存的数据还没有释放掉,GC不清理,内存很快就会占满的
- 最后一批不满500的也要取出来调一次上报接口
如果自己手写不难,就是有点麻烦,而且需要注意细节的地方有点多。包括如下几点:
-
1.频繁存取(增删)容器里的数据,而不需要进行随机访问,需要使用哪种数据结构?
-
2.存与取在并发过程中必须要互斥,避免多线程操作这个容器产生并发问题
-
3.取出来以后需要调用上报接口,有100-200毫秒的接口调用时长,是否可以避免与存的线程互斥?
-
4.需要确保最后一次能正常消费
九.在现实中回忆
在古代,有这样一种容器,水不断的往里倒,达到一定程度,容器就会倾倒,自动把水倒出来,再自动恢复原状。循环往复。
十.抽象
如果把它进行一种抽象,那它就是一种具备阈值的容器,一般情况下,往里面放数据,它不会发生任何变化。但是当里面的数据量达到阈值,就会触发事件,消费掉里面的数据,再恢复为空的原状。
而这就是今天的主角,触发容器TriggerContainer。
-
创建这个容器,需要先定义触发的事件,并且指定阈值。
-
这样就可以直接往这个容器里放元素,而不用关心什么时候需要触发这个容器。
-
天然线程安全,不用担心并发的问题
-
因为它具有水满则覆的特性,所以,容器里的量不会超过阈值。再大的量,也可以顶得住。
-
该容器提供一个关闭功能,关闭后,可以把最后一批数据进行消费
1.创建容器
//创建容器
public TriggerContainer createTriggerContainer(){
return new TriggerContainer<>(this::reportBPlatform, 500);
}
//模拟B平台的上报接口
private void reportBPlatform(List<String> list) {
ThreadUtil.sleep(RandomUtil.randomInt(100, 200));
}
2.关闭容器
tc.close();
最终代码改造为如下所示:
public void test8() throws IOException, InterruptedException {
//创建触发容器,定义触发事件与触发阈值
final TriggerContainer<String> tc = new TriggerContainer<>(this::reportBPlatform, 500);
//控制文件读取迭代器不要迭代太快
ArrayBlockingQueue<Integer> data = new ArrayBlockingQueue<>(48);
//创建线程执行池
final ExecutorService executorService = Executors.newFixedThreadPool(16);
//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt"));
final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
for (List<String> list : lists) {
try {
data.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//将任务提交到线程池中
executorService.execute(() -> {
//调用A平台进行校验,得到满足校验的结果
try {
final List<String> res = callAPlatform(list);
//这里不用再分批了,直接丢进去,多次满会多次触发的
tc.addAll(res);
data.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
System.out.println("线程还没有关闭");
}
System.out.println("线程已经关闭");
try {
//记得关闭容器,消费最后一次数据
tc.close();
} catch (Exception e) {
e.printStackTrace();
}
}