0
点赞
收藏
分享

微信扫一扫

使用线程池查询ES千万级数据索引遇到的问题

雅典娜的棒槌 2022-03-30 阅读 48

使用场景:

公司接到一个需求,需要查询ES索引A中所有数据,并根据查询到的数据中的某个字段再去查询另外一个索引B,整合并获取最终需要的数据,再生成excel、上传oss等等。其中索引A和索引B中都存储了千万条数据,之前的同事是用单线程写的,查询索引A使用的是limit、from深层分页,最终数据生成大概需要…不知道需要多久,可能一个月也生成不出来,后来这个需求就落在了我这里。

在做这个需求之前我从未使用过ES,对线程池也是一知半解。我想到了使用线程池会提高处理速度,经过了一番研究,终于将处理速度从4分钟处理一千条提升到了一分钟处理6000条,代码如下:

(最耗时的步骤其实就是查询索引A的千万条数据,这里我就把这一步的代码贴出来吧)


int i = 0;
//查询出索引A的数量
int count = esService.queryNum("索引A的名称");
while (true) {
//            如果线程的数量没有超 并且查询出的数据量不够 继续执行(这一步也思考了很久,因为不知道怎么控制是否让新的任务进入线程池,如果不加条件,那么任务就会一股脑的往线程池里送,没一会儿就报错了。MAXIMUMPOOLSIZE是最大线程池数量
    if (threadPool.getActiveCount() < MAXIMUMPOOLSIZE && totalCount < count) {
    //线程池里的任务如果想获取到外部的数据,需要用final定义
        final int n = i;
        i ++;
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
            	int limit = 1000;
                long queryStart = System.currentTimeMillis();
                List<String> dataSetEsQueryList = esService.queryData(yuliaoIndex, n * limit, limit);
                long queryEnd = System.currentTimeMillis();
                logger.info("查询一千条语料成功,耗时:" + (queryEnd - queryStart) / 1000 + "s");
            }
        });
    }
}

queryData方法:

public List<String> queryData(String dataSetEsIndex, int from, int to) {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(dataSetEsIndex);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //根据ID进行排序
        sourceBuilder.sort("_id");
        sourceBuilder.from(from);
        sourceBuilder.size(to);
        //之前查询的是索引的全部字段,但是我只需要一个字段,所以这里做了控制
        sourceBuilder.fetchSource(new String[]{"query"}, null);
        searchRequest.source(sourceBuilder);
        List<String> dataSetQueryEsList = new ArrayList<>();
        try {
            SearchResponse rp = client.search(searchRequest, RequestOptions.DEFAULT);
            if (rp != null) {
                SearchHits hits = rp.getHits();
                if (hits != null) {
                    for (SearchHit hit : hits.getHits()) {
                        String source = hit.getSourceAsString();
                        DataSetEsTwo index = GsonUtil.GSON_FORMAT_DATE.fromJson(source,
                                new TypeToken<DataSetEsTwo>() {
                                }.getType());
                        index.setId(hit.getId());
                        dataSetQueryEsList.add(index.getQuery());
                    }
                }
            }
        } catch (IOException e) {
            logger.error("query ES is error " + e.getMessage(),e);
        }
        return dataSetQueryEsList;
    }

处理结果:

https://data-water.oss-cn-beijing.aliyuncs.com/img/content/0bfd420f7f43deac73e192817ba186fb/48f48246-0aa8-4319-bdcb-55c7c5885f3a.png

大概就是这样,像上面所说的,成功的将处理速度从4分钟处理一千条提升到了一分钟处理6000条,本以为大功告成,但是!!!问题来了,这种情况是索引里只有7000条数据,因为我要查的索引有千万条数据,我就试了下当索引中有千万条数据的时候,看下处理时长是不是按比例增长的,我以为是按比例增长的,晚上调了接口让他跑着我就安心睡觉去了,早上去看之前还开开心心的想,看看这下处理了多少条数据,然后!我就发现了一件令人头秃的事情。

https://data-water.oss-cn-beijing.aliyuncs.com/img/content/0bfd420f7f43deac73e192817ba186fb/2132ee5d-d45c-4807-8e76-234df94f9678.png

https://data-water.oss-cn-beijing.aliyuncs.com/img/content/0bfd420f7f43deac73e192817ba186fb/a65f7d4f-f559-4630-bd74-2c7bcf94e093.png

如图所示,越往后查询耗时越长,之前只有7000条数据的时候,查询一千条需要4s左右,可是当索引数据量很多的时候,这个耗时…无法接受。一个晚上才处理了一万多条数据,我百思不得其解。最开始没有定位到是ES查询的问题,以为是处理的时候比较耗时,后来终于发现是查询ES浪费了很多时间,但是我心想,查询不就是这么查么,分页查啊。就去网上搜了一下,发现了问题所在。

(便于理解,以下内容是从这里copy过来的:https://blog.csdn.net/weixin_30872671/article/details/97804001)

假设我们的ES有三个节点,当分页查询请求过来时,如果落到node1节点,那么node1节点将会向node2和node3发送同样的查询请求,每个节点将topN的文档返回(这里只返回文档的id以及打分排序的字段,减少数据传输),node1会对三个节点的所有文档(3*N个)进行排序,取topN后再根据文档的id到对应的节点上查询整个文档数据,最后返回客户端。

而对于分页查询,比如from=10000,szie=10000,其实每个节点需要查询from+size=20000条数据,排序之后截取后10000条数据。当我们进行深度分页,比如查询第十页数据时,每个节点需要查询10*size=10W条数据,这个太恐怖了。而且默认情况下,当from+size大于10000时,查询会抛出一个异常,ES2.0后有一个max_result_window属性的设置,默认值是10000,也就是from+size的最大限度。当然你可以修改这个值作为临时的应对策略,不过治标不治本,产品也只会变本加厉!

意思就是在使用fromES索引中数据量越大(超过10000的情况下),查询速度越慢,查询速度几乎是成倍成倍成倍增长,看我上面的图可以感受到了。那怎么办呢,还好ES为我们提供了另外一种查询方式,也就是神奇的scroll查询!

scroll查询也叫游标查询或滚动查询,具体的介绍可以看一下官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/6.5/search-request-search-after.html

接着我又进行了一顿改造,改造后的代码如下:

            String queryEnd = "false";
            long startTime = System.currentTimeMillis();
            //        1. 创建查询对象
            SearchRequest searchRequest = new SearchRequest("索引名称");//指定索引
            searchRequest.scroll(TimeValue.timeValueMinutes(1L));//指定存在内存的时长为1分钟
            //    2. 封装查询条件
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.sort("id", SortOrder.DESC); //按照哪个字段进行排序
            searchSourceBuilder.size(2);    //一次查询多少条
            searchSourceBuilder.fetchSource(new String[]{"query"}, null);   //只查询哪些字段或不查询哪些字段
            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
            searchRequest.source(searchSourceBuilder);
            //        3.执行查询
            // client执行
            HttpHost httpHost = new HttpHost("ip", "端口号(int类型)", "http");
            RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
            //也可以多个结点
            //RestClientBuilder restClientBuilder = RestClient.builder(
            //    new HttpHost("ip", "端口号(int类型)", "http"),
            //        new HttpHost("ip", "端口号(int类型)", "http"),
            //        new HttpHost("ip", "端口号(int类型)", "http"));
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);

            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            String scrollId = searchResponse.getScrollId();

//        4.获取数据
            SearchHit[] hits = searchResponse.getHits().getHits();
            totalCount = totalCount + hits.length;
            for(SearchHit searchHit : hits){
                String source = searchHit.getSourceAsString();
                DataSetEsTwo index2 = GsonUtil.GSON_FORMAT_DATE.fromJson(source,
                        new TypeToken<DataSetEsTwo>() {
                        }.getType());
                //index2就是我要的数据
                index2.setId(searchHit.getId());
            }
            //获取全部的下一页
            while (true) {
//                当查不出数据后就不再往下执行 这里做判断是因为走到这里的时候可能有的线程还没执行完
//                  所以需要确保所有的线程都执行结束了,这样数据才是对的
                if ("true".equals(queryEnd)) {
                    if (threadPool.getActiveCount() == 0) {
                        break;
                    }
                }
                SearchHit[] hits1 = null;
                try {
                    //创建SearchScrollRequest对象
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                    searchScrollRequest.scroll(TimeValue.timeValueMinutes(3L));
                    SearchResponse scroll = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                    hits1 = scroll.getHits().getHits();
                } catch (Exception e) {
                    logger.error("第一次查询数据失败:" + e.getMessage());
                }

//                线程池处理获取的结果
                //如果当前线程池的数量是满的 那就等待 直到空出一个线程
                //这个是一样的道理 不可以让任务一股脑的进入线程池
                while (threadPool.getActiveCount() >= MAXIMUMPOOLSIZE) {
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        logger.error("休眠失败...");
                    }
                }

                if (hits1 != null && hits1.length > 0) {
                    //走到下面的肯定是有线程空位的
                    final SearchHit[] hits1Fin = hits1;
                    threadPool.execute(new Runnable() {
                        @SneakyThrows
                        @Override
                        public void run() {
                            //                            线程池处理查询出的结果
                            for (SearchHit searchHit : hits1Fin) {
                                try {
                                    String source = searchHit.getSourceAsString();
                                    DataSetEsTwo index2 = GsonUtil.GSON_FORMAT_DATE.fromJson(source,
                                            new TypeToken<DataSetEsTwo>() {
                                            }.getType());
                                    //index2就是我要的数据
                                    index2.setId(searchHit.getId());
                                } catch (Exception e) {
                                    logger.error("线程执行错误:" +e.getMessage());
                                }
                            }
                        }
                    });
                } else {
                    logger.info("------------语料查询结束--------------");
                    queryEnd = "true";
                }
            }
            //删除ScrollId
            try {
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            } catch (Exception e) {
                logger.error("ScrollId删除失败:" + e.getMessage());
            }
            long endTime = System.currentTimeMillis();
            logger.info("数据查询运行时间:" + (endTime - startTime) / 1000 / 60 + "min");

优化后的代码分钟大概可以处理3000条数据,无论索引里有多少条数据,处理时间都是等比例增长的,完美结束!

举报

相关推荐

0 条评论