0
点赞
收藏
分享

微信扫一扫

数据聚合

在MysQL也有聚合功能:聚合函数:avg、max、min等等,聚合函数需要结合group by分组去使用,ES具备MySQL类似的聚合功能,并且还做了更丰富的聚合功能。

聚合可以先学习下面常见的,详细可以查看官网:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html

一、聚合的种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

1)桶(Bucket)聚合:用来对文档做分组,桶聚合类似Group by

  • Term Aggregation:按照文档字段值分组
  • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组

2)度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

  • Avg:求平均值
  • Max:求最大值
  • Min:求最小值
  • Stats:同时求max、min、avg、sum等

3)管道(pipeline)聚合:聚合的结果为基础做聚合

参与聚合的字段类型必须是:

  • keyword
  • 数值
  • 日期
  • 布尔

二、DSL实现Bucket聚合

2.1.DSL实现Bucket聚合

案例:要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。类型为term类型,DSL语法示例:

GET /hotel/_search
{
  "size": 0, //设置为0,结果不包含文档,只包含聚合结果
  "aggs": { //定义聚合
    "bradAgg": { //给聚合起的名字
      "terms": { //聚合的类型,按照品牌值聚合,所以选择terms
        "field": "brand", //参与聚合的字段
        "size": 20 //希望获取的聚合结果数量
      }
    }
  }
}

注意:

  • aggs:aggressions的缩写,aggs里面可以定义好多的聚合的,所以是一个数组,里面可以定义多个 
  • size=0  是搜索中不包含文档,所以hits是空的

执行后结果如下:

数据聚合_数据

注意:

  • doc_count:是文档数量,桶里面有几条文档啊,是倒叙排序,统计出来越多的排名越靠前,这是默认的排序规则,排序可以更改
  • 关注的是aggregations
  • buckets是桶:聚合是桶的聚合,所以结果有很多桶,这个桶是品牌的桶,品牌一样的放在一个桶里面,所以是一个数组,会有很多桶,里面的size是显示20条,

2.2.Bucket聚合-聚合结果排序

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "bradAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" //按照_count升序排列
        }, 
        "size": 5
      }
    }
  }
}

执行后结果如下:

数据聚合_数据_02

上面的聚合是对整个索引库的搜索,如果有上数据量特别庞大,例如亿级的数据量,这个聚合对内存的消耗还是非常大的 ,我们能不能限定聚合搜索的范围呢?

2.3.Bucket聚合-限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可

# 限定聚合范围
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 //只对200元以下的文档聚合
      }
    }
  },
  "size": 0,
  "aggs": {
    "bradAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" //按照_count升序排列
        }, 
        "size": 5
      }
    }
  }
}

执行后如下:

数据聚合_List_03

三、DSL实现Metrics聚合

3.1.DSL实现Metrics聚合

例如,我们要求获取每个品牌的用户评分的min、max、avg等值。我们可以利用stats聚合:

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 10
      },
      "aggs": {//是brands聚合的子聚合,也就是分组后对每组分别计算
        "score_stats": { //聚会的名称,自己定义即可
          "stats": {//聚合的类型,这里stats可以计算min、max、avg等
            "field": "score" //聚合字段,这里是score
          }
        }
      }
    }
  }
}

执行后结构如下:

数据聚合_数据_04

3.2. 按照平均评分做降序排列

上面执行可以获取获取每个品牌的用户评分的min、max、avg等值,那么获取结果后,下面我们按照评分降序排序:

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 10,
        "order": {
          "score_stats.avg": "desc"//先聚合然后根据品牌的平均评分进行降序排序排序
        }
      },
      "aggs": {
        "score_stats": {
          "stats": {
            "field": "score"//metrics聚合
          }
        }
      }
    }
  }
}

执行后结果如下:

数据聚合_数据_05

四、Rest APl实现聚合

4.1.语法说明

以品牌聚合为例,演示下Java的Rest Client使用,先看请求组装:

数据聚合_数据_06

创建测试代码如下:

@Test
    void testAggregation() throws IOException {
        //1.准备request
        SearchRequest request = new SearchRequest("hotel");

        //2.准备DSL
        //2.1.设置size
        request.source().size(0); ////设置为0,结果不包含文档,只包含聚合结果
        //2.2.聚合
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg") ///聚合的类型,按照品牌值聚合,所以选择terms,名字自定义即可
                .field("brand") //参与聚合的字段
                .size(10)); //希望获取的聚合结果数量

        //3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //4.解析结果
        System.out.println(response);
    }

执行后结果如下:

数据聚合_数据_07

4.2.聚合结果解析

语法如下:

数据聚合_搜索_08

代码如下:

@Test
    void testAggregation() throws IOException {
        //1.准备request
        SearchRequest request = new SearchRequest("hotel");

        //2.准备DSL
        //2.1.设置size
        request.source().size(0); ////设置为0,结果不包含文档,只包含聚合结果
        //2.2.聚合
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg") ///聚合的类型,按照品牌值聚合,所以选择terms,名字自定义即可
                .field("brand") //参与聚合的字段
                .size(10)); //希望获取的聚合结果数量

        //3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //4.解析结果
        Aggregations aggregations = response.getAggregations();
        //4.1.根据聚合的名称获取结果
        Terms bradAgg = aggregations.get("brandAgg");
        System.out.println(bradAgg);
        //4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
        List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();

        //4.3.遍历
        for (Terms.Bucket bucket : buckets) {
            //4.4.获取key
            String key = bucket.getKeyAsString();
            System.out.println(key);
        }
    }

4.2.数据聚合-多条件聚合

在Service层中定义方法,实现对品牌、城市、星级的聚合

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是根据数据库中的信息展示。

数据聚合_数据_09

上面的城市、星级、品牌要从数据库中查询得出,结果应该是{"city":[上海,北京、深圳],"starName":[四星、五星],"brand":[如家、七天、万豪]}

4.2.1.在IHotelService中定义接口

接口如下:

public interface IHotelService extends IService<Hotel> {
    PageResult search(RequestParam param) throws IOException;

    Map<String, List<String>> filters();
}

4.2.2.在HotelService中实现接口

在之前的代码的基础上添加如下代码实现上面的接口:

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
    @Autowired
    private RestHighLevelClient client;

    @Override
    public Map<String, List<String>> filters() {
        try {
            //1.准备request
            SearchRequest request = new SearchRequest("hotel");

            //2.准备DSL
            //2.1.设置size
            request.source().size(0);
            //2.2.聚合
            buildAggregation(request);

            //3.发送请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);

            //4.解析结果
            Aggregations aggregations = response.getAggregations();

            Map<String, List<String>> result = new HashMap<>();
            //有三个解析,所以需要解析3次
            //1.解析 品牌
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            result.put("brand", brandList);
            //2.解析 城市
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            result.put("city", cityList);
            //3.解析 星级
            List<String> starList = getAggByName(aggregations, "starAgg");
            result.put("starName", starList);

            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 准备DSL
     * @param request
     */
    private void buildAggregation(SearchRequest request){
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(100));

        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100));

        request.source().aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100));
    }

    /**
     * 解析响应,将结果保存到List集合中
     * @param aggregations 获取Aggregations
     * @param aggName 聚合的名称
     * @return
     */
    private List<String> getAggByName(Aggregations aggregations, String aggName){

        //4.1.根据聚合的名称获取结果
        Terms bradAgg = aggregations.get(aggName);
        System.out.println(bradAgg);
        //4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
        List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();

        //4.3.遍历
        List<String> brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            //4.4.获取key
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }

        return brandList;
    }
}

4.2.3.对IHotelService中定义接口进行单元测试

编写测试代码:

@SpringBootTest
public class HotelBrandTests {
    @Autowired
    private IHotelService iHotelService;

    @Test
    void testHotelBrand() {
        Map<String, List<String>> filters = iHotelService.filters();
        System.out.println(filters);
    }
}

执行后结果如下:

数据聚合_List_10

4.3.数据聚合-带过滤条件的聚合

4.3.1.对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:

数据聚合_搜索_11

注意

  • filter请求:是查询聚合过滤项的请求 
  • 可以看到请求参数与之前search时的Request Param完全一致,这是在限定聚合时的文档范围。

发现这个请求和之前list请求携带的参数是一样的,为什么查过滤项的时候也要带条件呢?过滤项查询要通过聚合来实现,聚合一带上条件就来限定聚合的范围,为何要限定范围呢?直接对整个索引库做聚合不行呢?

在搜索是没有加条件,搜索的是索引库的所有数据,对所有数据做聚合得到城市和品牌没有问题,但是当输入内容虹桥,得到的数据一定是跟上海虹桥有关的结果,上海虹桥有关的城市对应的一定是上海,但是对索引库的所有数据做聚合,得到的城市一定包含所有的城市,所以用户一定就很奇怪拉,命名搜索的是上海的还能出现北京的呢?如果再点击北京在结合搜索条件虹桥,能搜到任何东西吗?肯定是不能的北京没有虹桥,所以说不应该对索引库的所有字段做聚合,用户条件是虹桥,就应该对虹桥相关的酒店做聚合,限定聚合的范围,需要加查询条件,查询时用什么条件聚合时也用什么条件,这样就是在酒店的基础上做聚合,这样查询结果就更精确了因此,在查询过滤项时和查询时要用相同的条件

例如:用户搜索“上海”,价格在100~300,二钻、速八、那聚合必须是,在这个搜索条件基础上完成。因此我们需要:

  1. 编写controller接口,接收该请求
  2. 修改IHotelService中接口filters,添加RequestParam参数
  3. 修改实现类中HotelService中的filters()方法的业务,聚合时添加query条件

4.3.2.创建controller

在之前的 HotelController 中添加的方法如下:

@RestController
@RequestMapping("/hotel")
public class HotelController {

    @Autowired
    private IHotelService iHotelService;

    /**
     * 搜索
     * @param param 通过对象接收参数
     * @return PageResult JSON对象
     * @throws IOException
     */
    @PostMapping("/list")
    public PageResult searchList(@RequestBody RequestParam param) throws IOException {
        return iHotelService.search(param);
    }

    /**
     * 实现聚合
     * @param param
     * @return
     */
    @PostMapping("/filters")
    public Map<String, List<String>> getFilters(@RequestBody RequestParam param){
        return iHotelService.filters(param);
    }
}

4.3.2.修改在IHotelService中定义接口

添加参数,需要根据前端传递过来的参数进行聚合

public interface IHotelService extends IService<Hotel> {
    PageResult search(RequestParam param) throws IOException;

    Map<String, List<String>> filters(RequestParam param);
}

4.3.3.修改HotelService中的实现方法

在之前的代码的基础上添加如下代码:

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
    @Autowired
    private RestHighLevelClient client;

    @Override
    public Map<String, List<String>> filters(RequestParam param) {
        try {
            //1.准备request
            SearchRequest request = new SearchRequest("hotel");

            //2.准备DSL
            //2.1.query
            buildBaiscQuery(param, request);//新添加的构建query条件
            //2.2.设置size
            request.source().size(0);
            //2.3.聚合
            buildAggregation(request);

            //3.发送请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);

            //4.解析结果
            Aggregations aggregations = response.getAggregations();

            Map<String, List<String>> result = new HashMap<>();
            //有三个解析,所以需要解析3次
            //1.解析 品牌
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            result.put("品牌", brandList);
            //2.解析 城市
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            result.put("城市", cityList);
            //3.解析 星级
            List<String> starList = getAggByName(aggregations, "starAgg");
            result.put("星级", starList);

            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 准备DSL
     * @param request
     */
    private void buildAggregation(SearchRequest request){
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(100));

        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100));

        request.source().aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100));
    }

    /**
     * 解析响应,将结果保存到List集合中
     * @param aggregations 获取Aggregations
     * @param aggName 聚合的名称
     * @return
     */
    private List<String> getAggByName(Aggregations aggregations, String aggName){

        //4.1.根据聚合的名称获取结果
        Terms bradAgg = aggregations.get(aggName);
        System.out.println(bradAgg);
        //4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
        List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();

        //4.3.遍历
        List<String> brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            //4.4.获取key
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }

        return brandList;
    }

    /**
     * 抽离构建DSL条件的代码封装成方法
     * @param param 前端参数对象
     * @param request
     */
    private void buildBaiscQuery(RequestParam param,SearchRequest request){
        //2.1.BooleanQuery
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        //2.2.关键字搜索
        String key = param.getKey();
        //健壮性判断,如果前端传递的参数为null或者为空字符串,就进行全文检索,else则是按照条件搜索
        if (key == null || "".equals(key)) {
            boolQuery.must(QueryBuilders.matchAllQuery());
        }else {
            boolQuery.must(QueryBuilders.matchQuery("all", param.getKey()));
        }

        //2.3.条件:城市
        String city = param.getCity();
        if(city != null && !"".equals(city)){
            boolQuery.filter(QueryBuilders.termQuery("city",city));
        }

        //2.4.条件:品牌
        String brand = param.getBrand();
        if(brand != null && !"".equals(brand)){
            boolQuery.filter(QueryBuilders.termQuery("brand",brand));
        }

        //2.5.条件:星级
        String starName = param.getStarName();
        if(starName != null && !"".equals(starName)){
            boolQuery.filter(QueryBuilders.termQuery("starName",starName));
        }

        //2.6.条件:价格
        Integer maxPrice = param.getMaxPrice();
        Integer minPrice = param.getMinPrice();
        if(maxPrice != null && minPrice != null){
            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
        }

        //TODO 控制算分,先传入boolQuery
        FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(boolQuery,
                new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
                        // 一个function score元素
                        new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                                //过滤条件
                                QueryBuilders.termQuery("isAD", true),
                                //算分函数
                                ScoreFunctionBuilders.weightFactorFunction(10)
                        )
                });

        //添加boolQuery
        request.source().query(functionScoreQuery);
    }
}

4.3.4.测试

重启项目,然后进行筛选查询,每次添加一个条件,剩余的条件可选性也会发送变化,如下:

数据聚合_List_12



举报

相关推荐

0 条评论