0
点赞
收藏
分享

微信扫一扫

Java Hutool TriggerContainer介绍

秦瑟读书 2022-03-10 阅读 49

Java Hutool TriggerContainer介绍

一.什么是TriggerContainer

TriggerContainer是啥,自从Hutool从5.7.24版本后,增加了一个新的类。它就是TriggerContainer。

为啥把它叫做触发容器, 它的存在到底能解决哪些问题?相比其他容器它有哪些优势呢?

请不要着急,这还要从上周我遇到的那个需求说起,那可真不是一个美好的回忆。

二.业务介绍

为了讨论一些纯粹的技术,请容许我隐瞒一些业务细节,我将用最简单的话将清楚上周到底发生了什么。

那天,风和日丽,我接受到需求,需要写一个程序将一个文件中的手机序列号清单(IMEI),

通过调用A平台的接口进行校验。

将满足校验的清单结果调用B平台的的上报接口进行上报。

三.编码实现

这很容易,不到半个小时,我就编写完成了大致的代码

    public void test2() throws FileNotFoundException {
        //读取文件
        final List<String> allList = FileUtil.readLines(new FileInputStream("imeiList.txt"));
        //使用HuTool工具进行分批
        final List<List<String>> batchList = ListUtil.split(allList, 10000);
        for (List<String> list : batchList) {
            //调用A平台进行校验,得到满足校验的结果
            final List<String> res = callAPlatform(list);
            //再进行分批
            final List<List<String>> reportData = ListUtil.split(res, 5000);
            for (List<String> reportDatum : reportData) {
                //将过滤后的结果上报B平台
                reportBPlatform(reportDatum);
            }
        }
    }

四.王婆卖瓜

因为A平台的校验接口最多上传1万个IMEI,而B平台的上传接口一次性最多上传5000个IMEI

我使用Hutool工具类进行分批,将分批的结果调用A平台和B平台

我对这段代码非常满意,等待着上线使用。而这,却是…

五.噩梦的开始(一)

首先是需求方和我说,这次要处理的文件,文件的数量大概在4亿条左右,你的程序需要避免数据量过大引发的一些问题。

这可把我吓了一跳。如果数据量过亿的话,可不能使用这句话

//读取文件
final List<String> allList = FileUtil.readLines(new FileInputStream("imeiList.txt"));

把4亿个长度为32的字符串,一口气导入内存,势必会引发OOM异常。(imei是md5加密的,所以是32位长度)

那就需要分批加载数据到内存中,代码类似于

public void test3() throws IOException {
  //读取文件
  BufferedReader bufferedReader = new BufferedReader(new FileReader("imeiList.txt"));
  String str = null;
  while ((str = bufferedReader.readLine()) != null) {
    //doSomething           
  }
}

代码不够简洁,Hutool提供了BufferedReader的封装工具LineIter,可以按行遍历文件,而不把数据全量加载到内存中

@Test
public void test4() throws IOException {
  //读取文件
  LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
  for (String s : lineIter) {
    //doSomething

  }
}

但是这样,就不能使用原来的方法进行分组了。

//读取文件
LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
//使用HuTool工具进行分批(这里不能这么用,编译报错)
final List<List<String>> batchList = ListUtil.split(lineIter, 10000);

不过,好在Hutool灰常给力,在5.7.10版本,提供了一个迭代器的分批工具,输入一个迭代器,返回一个升维的迭代器。你说巧不巧,LineIter正好是一个迭代器。这样代码就变成了如下:

    @Test
    public void test5() throws IOException {
        //读取文件
        LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
       //最关键的一句,调用分组迭起器
        final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
        for (List<String> list : lists) {
            //调用A平台进行校验,得到满足校验的结果
            final List<String> res = callAPlatform(list);
            //再进行分批
            final List<List<String>> reportData = ListUtil.split(res, 5000);
            for (List<String> reportDatum : reportData) {
                //将过滤后的结果上报B平台
                reportBPlatform(reportDatum);
            }
        }
    }

这样,无论文件是4亿,还是40亿,套了这层迭代器,每次进行内存的数据,最多不超过10000条数据。堆中使用过的数据,最终会被垃圾回收器回收。

真是太妙了!我出色的完成了任务,奶丝!

六.噩梦的延续(二)

但是很明显,需要检验的数据量上升,会带来新的问题。因为需要检测的数据量 4亿条,如果使用单线程,需要调用4万次。每次校验接口返回在200-400毫秒的时间,那么调用A接口所需要的时间高达3.33小时
这个时间是不可接受的,如果必须要控制时间在30分钟以内,需要改造成多线程的方式进行调用

我首先做了一次改造

    @Test
    public void test6() throws IOException {
        //创建线程执行池
        final ExecutorService executorService = Executors.newFixedThreadPool(16);
        //读取文件
        LineIter lineIter = new LineIter(new FileReader("imeiList.txt") );
        final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
        for (List<String> list : lists) {
            //将任务提交到线程池中
            executorService.execute(()->{
                //调用A平台进行校验,得到满足校验的结果
                final List<String> res = callAPlatform(list);
                //再进行分批
                final List<List<String>> reportData = ListUtil.split(res, 5000);
                for (List<String> reportDatum : reportData) {
                    //将过滤后的结果上报B平台
                    reportBPlatform(reportDatum);
                }
            });
        }
    }

七.漏洞排查

这个改造有两个不起眼的漏洞

1.主线程可能提前结束

在取出最后一次数据时,可能主线程会先提前结束导致最后一个任务没能成功上报。因此需要避免主线程提前结束。

需要在主线程的末尾加上关闭线程池和判断是否结束的代码。这样可以确保所有任务完成后,主程序才会结束。

		executorService.shutdown();
		while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
			System.out.println("线程还没有关闭");
		}
		System.out.println("线程已经关闭");
2.内存有被占满的风险

因为原本单线程情况下,只会有当前这一批10000条数据在内存中是根可达的。

但是多线程下,读取文件与处理数据是不会相互阻塞的。

那么LineIter就会不断的把数据加载到内存中,提交给executorService。

有一定的概率会发生OutOfMemoryError:GC overhead limit exceeded。当线程池的核心线程数越高,发生概率越大。

这是因为没有控制文件读取行迭代器的读取速度,大量的内存占用导致CPU有绝大部份的时间在gc。

我最终决定采用ArrayBlockingQueue来进行进行速度控制

	@Test
	public void test7() throws IOException, InterruptedException {
		//阻塞队列,控制文件读取迭代器不要迭代太快
		ArrayBlockingQueue<Integer> data = new ArrayBlockingQueue<>(48);
		//创建线程执行池
		final ExecutorService executorService = Executors.newFixedThreadPool(16);
		//读取文件
		LineIter lineIter = new LineIter(new FileReader("imeiList.txt"));
		final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
		for (List<String> list : lists) {
			try {
        //data里每有一个元素,代表内存中有10000万条数据没有被处理。
        //如果data达到48个,再往里面添加1一个就会阻塞,主线程无法继续读取文件迭代器,达到控制读取速度的目的
				data.put(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			//将任务提交到线程池中
			executorService.execute(() -> {
				//调用A平台进行校验,得到满足校验的结果
				try {
					final List<String> res = callAPlatform(list);
					//再进行分批
					final List<List<String>> reportData = ListUtil.split(res, 5000);
					for (List<String> reportDatum : reportData) {
						//将过滤后的结果上报B平台
						reportBPlatform(reportDatum);
					}
          //上报完毕后释放1条记录,之前的阻塞可以解除,相当于通知主线程可以继续读取迭代器
					data.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		}
    //避免主线程提前结束
		executorService.shutdown();
		while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
			System.out.println("线程还没有关闭");
		}
		System.out.println("线程已经关闭");
	}

七.噩梦的延续(三)

通过使用ArrayBlockingQueue,控制了主线程读取文件的速度,避免造成内存OOM。我出色的完成了任务。

接下来,我就遇到了新的问题。

我得到消息,B平台的上报接口,是一个付费接口。

而且,它不是按IMEI个数来收费。是按照接口调用次数来收费。每调用1000次收2块钱。

我的乖乖,这也就意味着,这个接口,我不能随随便便就调用一下。要精打细算,每凑满500条数据,再调一下。

像我现在的写法,1000条数据,假如有700条有效,那么会造成500条调一下。200条调一下。愣是有300条打了水漂。

这个问题该怎么解决呢?

八.消除噩梦

一种思路,需要一个全局的容器

  • 每次调完A接口得到的有效结果,满500先调1次上报接口,不满500的部分先存起来。
  • 等其他线程也来存的时候,凑满了500条,再取出来调一次上报接口。
  • 必须要支持并发存取,提高效率
  • 两个线程之间不能等待太久,不然这些暂存的数据还没有释放掉,GC不清理,内存很快就会占满的
  • 最后一批不满500的也要取出来调一次上报接口

如果自己手写不难,就是有点麻烦,而且需要注意细节的地方有点多。包括如下几点:

  • 1.频繁存取(增删)容器里的数据,而不需要进行随机访问,需要使用哪种数据结构?

  • 2.存与取在并发过程中必须要互斥,避免多线程操作这个容器产生并发问题

  • 3.取出来以后需要调用上报接口,有100-200毫秒的接口调用时长,是否可以避免与存的线程互斥?

  • 4.需要确保最后一次能正常消费

九.在现实中回忆

在古代,有这样一种容器,水不断的往里倒,达到一定程度,容器就会倾倒,自动把水倒出来,再自动恢复原状。循环往复。

十.抽象

如果把它进行一种抽象,那它就是一种具备阈值的容器,一般情况下,往里面放数据,它不会发生任何变化。但是当里面的数据量达到阈值,就会触发事件,消费掉里面的数据,再恢复为空的原状。

而这就是今天的主角,触发容器TriggerContainer。

  • 创建这个容器,需要先定义触发的事件,并且指定阈值。

  • 这样就可以直接往这个容器里放元素,而不用关心什么时候需要触发这个容器。

  • 天然线程安全,不用担心并发的问题

  • 因为它具有水满则覆的特性,所以,容器里的量不会超过阈值。再大的量,也可以顶得住。

  • 该容器提供一个关闭功能,关闭后,可以把最后一批数据进行消费

1.创建容器

//创建容器	
public TriggerContainer createTriggerContainer(){
		return new TriggerContainer<>(this::reportBPlatform, 500);
	}
  //模拟B平台的上报接口
	private void reportBPlatform(List<String> list) {
		ThreadUtil.sleep(RandomUtil.randomInt(100, 200));
	}

2.关闭容器

tc.close();

最终代码改造为如下所示:

	public void test8() throws IOException, InterruptedException {
		//创建触发容器,定义触发事件与触发阈值
		final TriggerContainer<String> tc = new TriggerContainer<>(this::reportBPlatform, 500);
		//控制文件读取迭代器不要迭代太快
		ArrayBlockingQueue<Integer> data = new ArrayBlockingQueue<>(48);
		//创建线程执行池
		final ExecutorService executorService = Executors.newFixedThreadPool(16);
		//读取文件
		LineIter lineIter = new LineIter(new FileReader("imeiList.txt"));
		final PartitionIter<String> lists = new PartitionIter<>(lineIter, 10000);
		for (List<String> list : lists) {
			try {
				data.put(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			//将任务提交到线程池中
			executorService.execute(() -> {
				//调用A平台进行校验,得到满足校验的结果
				try {
					final List<String> res = callAPlatform(list);
					//这里不用再分批了,直接丢进去,多次满会多次触发的
					tc.addAll(res);
					data.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		}
		executorService.shutdown();
		while (!executorService.awaitTermination(10, TimeUnit.MILLISECONDS)) {
			System.out.println("线程还没有关闭");
		}
		System.out.println("线程已经关闭");
		try {
      //记得关闭容器,消费最后一次数据
			tc.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

举报

相关推荐

0 条评论