在MysQL也有聚合功能:聚合函数:avg、max、min等等,聚合函数需要结合group by分组去使用,ES具备MySQL类似的聚合功能,并且还做了更丰富的聚合功能。
聚合可以先学习下面常见的,详细可以查看官网:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
一、聚合的种类
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
1)桶(Bucket)聚合:用来对文档做分组,桶聚合类似Group by
- Term Aggregation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
2)度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
3)管道(pipeline)聚合:聚合的结果为基础做聚合
参与聚合的字段类型必须是:
- keyword
- 数值
- 日期
- 布尔
二、DSL实现Bucket聚合
2.1.DSL实现Bucket聚合
案例:要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。类型为term类型,DSL语法示例:
GET /hotel/_search
{
"size": 0, //设置为0,结果不包含文档,只包含聚合结果
"aggs": { //定义聚合
"bradAgg": { //给聚合起的名字
"terms": { //聚合的类型,按照品牌值聚合,所以选择terms
"field": "brand", //参与聚合的字段
"size": 20 //希望获取的聚合结果数量
}
}
}
}
注意:
- aggs:aggressions的缩写,aggs里面可以定义好多的聚合的,所以是一个数组,里面可以定义多个
- size=0 是搜索中不包含文档,所以hits是空的
执行后结果如下:
注意:
- doc_count:是文档数量,桶里面有几条文档啊,是倒叙排序,统计出来越多的排名越靠前,这是默认的排序规则,排序可以更改
- 关注的是aggregations
- buckets是桶:聚合是桶的聚合,所以结果有很多桶,这个桶是品牌的桶,品牌一样的放在一个桶里面,所以是一个数组,会有很多桶,里面的size是显示20条,
2.2.Bucket聚合-聚合结果排序
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:
GET /hotel/_search
{
"size": 0,
"aggs": {
"bradAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" //按照_count升序排列
},
"size": 5
}
}
}
}
执行后结果如下:
上面的聚合是对整个索引库的搜索,如果有上数据量特别庞大,例如亿级的数据量,这个聚合对内存的消耗还是非常大的 ,我们能不能限定聚合搜索的范围呢?
2.3.Bucket聚合-限定聚合范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可
# 限定聚合范围
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 //只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"bradAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" //按照_count升序排列
},
"size": 5
}
}
}
}
执行后如下:
三、DSL实现Metrics聚合
3.1.DSL实现Metrics聚合
例如,我们要求获取每个品牌的用户评分的min、max、avg等值。我们可以利用stats聚合:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 10
},
"aggs": {//是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { //聚会的名称,自己定义即可
"stats": {//聚合的类型,这里stats可以计算min、max、avg等
"field": "score" //聚合字段,这里是score
}
}
}
}
}
}
执行后结构如下:
3.2. 按照平均评分做降序排列
上面执行可以获取获取每个品牌的用户评分的min、max、avg等值,那么获取结果后,下面我们按照评分降序排序:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 10,
"order": {
"score_stats.avg": "desc"//先聚合然后根据品牌的平均评分进行降序排序排序
}
},
"aggs": {
"score_stats": {
"stats": {
"field": "score"//metrics聚合
}
}
}
}
}
}
执行后结果如下:
四、Rest APl实现聚合
4.1.语法说明
以品牌聚合为例,演示下Java的Rest Client使用,先看请求组装:
创建测试代码如下:
@Test
void testAggregation() throws IOException {
//1.准备request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
//2.1.设置size
request.source().size(0); ////设置为0,结果不包含文档,只包含聚合结果
//2.2.聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg") ///聚合的类型,按照品牌值聚合,所以选择terms,名字自定义即可
.field("brand") //参与聚合的字段
.size(10)); //希望获取的聚合结果数量
//3.发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
System.out.println(response);
}
执行后结果如下:
4.2.聚合结果解析
语法如下:
代码如下:
@Test
void testAggregation() throws IOException {
//1.准备request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
//2.1.设置size
request.source().size(0); ////设置为0,结果不包含文档,只包含聚合结果
//2.2.聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg") ///聚合的类型,按照品牌值聚合,所以选择terms,名字自定义即可
.field("brand") //参与聚合的字段
.size(10)); //希望获取的聚合结果数量
//3.发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Aggregations aggregations = response.getAggregations();
//4.1.根据聚合的名称获取结果
Terms bradAgg = aggregations.get("brandAgg");
System.out.println(bradAgg);
//4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();
//4.3.遍历
for (Terms.Bucket bucket : buckets) {
//4.4.获取key
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
4.2.数据聚合-多条件聚合
在Service层中定义方法,实现对品牌、城市、星级的聚合
需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是根据数据库中的信息展示。
上面的城市、星级、品牌要从数据库中查询得出,结果应该是{"city":[上海,北京、深圳],"starName":[四星、五星],"brand":[如家、七天、万豪]}
4.2.1.在IHotelService中定义接口
接口如下:
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParam param) throws IOException;
Map<String, List<String>> filters();
}
4.2.2.在HotelService中实现接口
在之前的代码的基础上添加如下代码实现上面的接口:
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public Map<String, List<String>> filters() {
try {
//1.准备request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
//2.1.设置size
request.source().size(0);
//2.2.聚合
buildAggregation(request);
//3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Aggregations aggregations = response.getAggregations();
Map<String, List<String>> result = new HashMap<>();
//有三个解析,所以需要解析3次
//1.解析 品牌
List<String> brandList = getAggByName(aggregations, "brandAgg");
result.put("brand", brandList);
//2.解析 城市
List<String> cityList = getAggByName(aggregations, "cityAgg");
result.put("city", cityList);
//3.解析 星级
List<String> starList = getAggByName(aggregations, "starAgg");
result.put("starName", starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 准备DSL
* @param request
*/
private void buildAggregation(SearchRequest request){
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
}
/**
* 解析响应,将结果保存到List集合中
* @param aggregations 获取Aggregations
* @param aggName 聚合的名称
* @return
*/
private List<String> getAggByName(Aggregations aggregations, String aggName){
//4.1.根据聚合的名称获取结果
Terms bradAgg = aggregations.get(aggName);
System.out.println(bradAgg);
//4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();
//4.3.遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
//4.4.获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
}
4.2.3.对IHotelService中定义接口进行单元测试
编写测试代码:
@SpringBootTest
public class HotelBrandTests {
@Autowired
private IHotelService iHotelService;
@Test
void testHotelBrand() {
Map<String, List<String>> filters = iHotelService.filters();
System.out.println(filters);
}
}
执行后结果如下:
4.3.数据聚合-带过滤条件的聚合
4.3.1.对接前端接口
前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:
注意
- filter请求:是查询聚合过滤项的请求
- 可以看到请求参数与之前search时的Request Param完全一致,这是在限定聚合时的文档范围。
发现这个请求和之前list请求携带的参数是一样的,为什么查过滤项的时候也要带条件呢?过滤项查询要通过聚合来实现,聚合一带上条件就来限定聚合的范围,为何要限定范围呢?直接对整个索引库做聚合不行呢?
在搜索是没有加条件,搜索的是索引库的所有数据,对所有数据做聚合得到城市和品牌没有问题,但是当输入内容虹桥,得到的数据一定是跟上海虹桥有关的结果,上海虹桥有关的城市对应的一定是上海,但是对索引库的所有数据做聚合,得到的城市一定包含所有的城市,所以用户一定就很奇怪拉,命名搜索的是上海的还能出现北京的呢?如果再点击北京在结合搜索条件虹桥,能搜到任何东西吗?肯定是不能的北京没有虹桥,所以说不应该对索引库的所有字段做聚合,用户条件是虹桥,就应该对虹桥相关的酒店做聚合,限定聚合的范围,需要加查询条件,查询时用什么条件聚合时也用什么条件,这样就是在酒店的基础上做聚合,这样查询结果就更精确了因此,在查询过滤项时和查询时要用相同的条件
例如:用户搜索“上海”,价格在100~300,二钻、速八、那聚合必须是,在这个搜索条件基础上完成。因此我们需要:
- 编写controller接口,接收该请求
- 修改IHotelService中接口filters,添加RequestParam参数
- 修改实现类中HotelService中的filters()方法的业务,聚合时添加query条件
4.3.2.创建controller
在之前的 HotelController 中添加的方法如下:
@RestController
@RequestMapping("/hotel")
public class HotelController {
@Autowired
private IHotelService iHotelService;
/**
* 搜索
* @param param 通过对象接收参数
* @return PageResult JSON对象
* @throws IOException
*/
@PostMapping("/list")
public PageResult searchList(@RequestBody RequestParam param) throws IOException {
return iHotelService.search(param);
}
/**
* 实现聚合
* @param param
* @return
*/
@PostMapping("/filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParam param){
return iHotelService.filters(param);
}
}
4.3.2.修改在IHotelService中定义接口
添加参数,需要根据前端传递过来的参数进行聚合
public interface IHotelService extends IService<Hotel> {
PageResult search(RequestParam param) throws IOException;
Map<String, List<String>> filters(RequestParam param);
}
4.3.3.修改HotelService中的实现方法
在之前的代码的基础上添加如下代码:
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient client;
@Override
public Map<String, List<String>> filters(RequestParam param) {
try {
//1.准备request
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
//2.1.query
buildBaiscQuery(param, request);//新添加的构建query条件
//2.2.设置size
request.source().size(0);
//2.3.聚合
buildAggregation(request);
//3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Aggregations aggregations = response.getAggregations();
Map<String, List<String>> result = new HashMap<>();
//有三个解析,所以需要解析3次
//1.解析 品牌
List<String> brandList = getAggByName(aggregations, "brandAgg");
result.put("品牌", brandList);
//2.解析 城市
List<String> cityList = getAggByName(aggregations, "cityAgg");
result.put("城市", cityList);
//3.解析 星级
List<String> starList = getAggByName(aggregations, "starAgg");
result.put("星级", starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 准备DSL
* @param request
*/
private void buildAggregation(SearchRequest request){
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
}
/**
* 解析响应,将结果保存到List集合中
* @param aggregations 获取Aggregations
* @param aggName 聚合的名称
* @return
*/
private List<String> getAggByName(Aggregations aggregations, String aggName){
//4.1.根据聚合的名称获取结果
Terms bradAgg = aggregations.get(aggName);
System.out.println(bradAgg);
//4.2.获取buckets 结果是一个集合,里面存储了聚合后的信息
List<? extends Terms.Bucket> buckets = bradAgg.getBuckets();
//4.3.遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
//4.4.获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
/**
* 抽离构建DSL条件的代码封装成方法
* @param param 前端参数对象
* @param request
*/
private void buildBaiscQuery(RequestParam param,SearchRequest request){
//2.1.BooleanQuery
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//2.2.关键字搜索
String key = param.getKey();
//健壮性判断,如果前端传递的参数为null或者为空字符串,就进行全文检索,else则是按照条件搜索
if (key == null || "".equals(key)) {
boolQuery.must(QueryBuilders.matchAllQuery());
}else {
boolQuery.must(QueryBuilders.matchQuery("all", param.getKey()));
}
//2.3.条件:城市
String city = param.getCity();
if(city != null && !"".equals(city)){
boolQuery.filter(QueryBuilders.termQuery("city",city));
}
//2.4.条件:品牌
String brand = param.getBrand();
if(brand != null && !"".equals(brand)){
boolQuery.filter(QueryBuilders.termQuery("brand",brand));
}
//2.5.条件:星级
String starName = param.getStarName();
if(starName != null && !"".equals(starName)){
boolQuery.filter(QueryBuilders.termQuery("starName",starName));
}
//2.6.条件:价格
Integer maxPrice = param.getMaxPrice();
Integer minPrice = param.getMinPrice();
if(maxPrice != null && minPrice != null){
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
}
//TODO 控制算分,先传入boolQuery
FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(boolQuery,
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 一个function score元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
//过滤条件
QueryBuilders.termQuery("isAD", true),
//算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
});
//添加boolQuery
request.source().query(functionScoreQuery);
}
}
4.3.4.测试
重启项目,然后进行筛选查询,每次添加一个条件,剩余的条件可选性也会发送变化,如下: