0
点赞
收藏
分享

微信扫一扫

Elasticsearch——java api构建聚合搜索和ES SQL功能

版本

不同版本的elasticsearch-rest-high-level-client和 elasticsearch之间存在兼容风险,请确保和elasticsearch版本一致,否则会出现无法预计的错误。

es配置

maven依赖

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.3.0</version>
</dependency>

配置application.properties

spring.application.name=service-search

#多个节点用逗号隔开
elasticsearch.hostlist=127.0.0.1:9200

创建配置类ElasticsearchConfig

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.hostlist}")
    private String hostlist;

    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient(){
        String[] split = hostlist.split(",");
        HttpHost[] httpHost = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String[] item = split[i].split(":");
            httpHost[i] = new HttpHost(item[0],Integer.parseInt(item[1]),"http");
        }
        return new RestHighLevelClient(RestClient.builder(httpHost));
    }
}

搜索聚合测试代码

1、按颜色分组,计算每个颜色的销售数量

GET /tvs/_search
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      }
    }
  }
} 
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestAggs {

    @Autowired
    private RestHighLevelClient client;

    //按颜色分组,计算每个颜色的销售数量
    @Test
    public void testAggs() throws IOException {
        //1、构建请求
        SearchRequest searchRequest = new SearchRequest("tvs");
        //请求体
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(0);
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");
        searchSourceBuilder.aggregation(termsAggregationBuilder);
        //请求体放入请求头
        searchRequest.source(searchSourceBuilder);
        //2、执行
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        //3、获取结果
        Aggregations aggregations = searchResponse.getAggregations();
        Terms terms = aggregations.get("group_by_color");
        List<? extends Terms.Bucket> buckets = terms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            System.out.println("key: " + key);
            long docCount = bucket.getDocCount();
            System.out.println("docCount: " + docCount);
            System.out.println("-------------------------------");
        }
    }
}

2、按颜色分组,计算每个颜色的销售数量、且每个颜色卖出的平均价格

GET /tvs/_search
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndAvg() throws IOException {
	//1、构建请求
	SearchRequest searchRequest = new SearchRequest("tvs");
	//请求体
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	searchSourceBuilder.size(0);
	searchSourceBuilder.query(QueryBuilders.matchAllQuery());
	TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");

	//在terms聚合下填充一个子聚合
	AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");
	termsAggregationBuilder.subAggregation(avgAggregationBuilder);
	searchSourceBuilder.aggregation(termsAggregationBuilder);
	//请求体放入请求头
	searchRequest.source(searchSourceBuilder);
	//2、执行
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

	//3、获取结果
	Aggregations aggregations = searchResponse.getAggregations();
	Terms terms = aggregations.get("group_by_color");
	List<? extends Terms.Bucket> buckets = terms.getBuckets();
	for (Terms.Bucket bucket : buckets) {
		String key = bucket.getKeyAsString();
		System.out.println("key: " + key);
		long docCount = bucket.getDocCount();
		System.out.println("docCount: " + docCount);
		Aggregations aggregations1 = bucket.getAggregations();
		Avg avg_price = aggregations1.get("avg_price");
		System.out.println("avg_price: "+avg_price.getValue());
		System.out.println("-------------------------------");
	}
}

3、按颜色分组,计算每个颜色的销售数量、以及每个颜色卖出的平均值,最大值,最小值,总和。

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        },
        "min_price":{
          "min": {
            "field": "price"
          }
        },
        "max_price":{
          "max": {
            "field": "price"
          }
        },
        "sum_price":{
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndMore() throws IOException {
	//1、构建请求
	SearchRequest searchRequest = new SearchRequest("tvs");
	//请求体
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	searchSourceBuilder.size(0);
	searchSourceBuilder.query(QueryBuilders.matchAllQuery());
	TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");

	//在termsAggregationBuilder聚合下填充多个子聚合
	AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");
	MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_price").field("price");
	MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("max_price").field("price");
	SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
	termsAggregationBuilder.subAggregation(avgAggregationBuilder);
	termsAggregationBuilder.subAggregation(minAggregationBuilder);
	termsAggregationBuilder.subAggregation(maxAggregationBuilder);
	termsAggregationBuilder.subAggregation(sumAggregationBuilder);
	searchSourceBuilder.aggregation(termsAggregationBuilder);
	//请求体放入请求头
	searchRequest.source(searchSourceBuilder);
	//2、执行
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

	//3、获取结果
	Aggregations aggregations = searchResponse.getAggregations();
	Terms terms = aggregations.get("group_by_color");
	List<? extends Terms.Bucket> buckets = terms.getBuckets();
	for (Terms.Bucket bucket : buckets) {
		String key = bucket.getKeyAsString();
		System.out.println("key: " + key);
		long docCount = bucket.getDocCount();
		System.out.println("docCount: " + docCount);
		Aggregations aggregations1 = bucket.getAggregations();
		Avg avg_price = aggregations1.get("avg_price");
		System.out.println("avg_price: "+avg_price.getValue());
		Max max_price = aggregations1.get("max_price");
		System.out.println("max_price: "+max_price.getValue());
		Min min_price = aggregations1.get("min_price");
		System.out.println("min_price: "+min_price.getValue());
		Sum sum_price = aggregations1.get("sum_price");
		System.out.println("sum_price: "+sum_price.getValue());
		System.out.println("-------------------------------");
	}
}

4、按售价每2000价格划分范围,算出每个区间的销售总额 histogram

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "by_histogram": {
      "histogram": {
        "field": "price",
        "interval": 2000
      },
      "aggs": {
        "sum_price": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
} 
@Test
public void testAggsAndHistogram() throws IOException {
	//1、构建请求
	SearchRequest searchRequest = new SearchRequest("tvs");
	//请求体
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	searchSourceBuilder.size(0);
	searchSourceBuilder.query(QueryBuilders.matchAllQuery());
	HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("by_histogram").field("price").interval(2000);
	SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
	histogramAggregationBuilder.subAggregation(sumAggregationBuilder);
	searchSourceBuilder.aggregation(histogramAggregationBuilder);
	//请求体放入请求头
	searchRequest.source(searchSourceBuilder);
	//2、执行
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

	//3、获取结果
	Aggregations aggregations = searchResponse.getAggregations();
	Histogram histogram = aggregations.get("by_histogram");
	List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
	for (Histogram.Bucket bucket : buckets) {
		String key = bucket.getKeyAsString();
		System.out.println("key: " + key);
		long docCount = bucket.getDocCount();
		System.out.println("docCount: " + docCount);
		Aggregations aggregations1 = bucket.getAggregations();
		Sum sum_price = aggregations1.get("sum_price");
		System.out.println("sum_price: "+sum_price.getValue());
		System.out.println("-------------------------------");
	}
}

5、计算每个季度的销售总额

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "date_sales": {
      "date_histogram": {
        "field": "sold_date",
        "calendar_interval": "quarter",
        "format": "yyyy-MM-dd",
        "min_doc_count": 0,
        "extended_bounds": {
          "min": "2019-01-01",
          "max": "2020-12-31"
        }
      },
      "aggs": {
        "sum_price": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndDateHistogram() throws IOException {
	//1、构建请求
	SearchRequest searchRequest = new SearchRequest("tvs");
	//请求体
	SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
	searchSourceBuilder.size(0);
	searchSourceBuilder.query(QueryBuilders.matchAllQuery());
	DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("date_sales").field("sold_date")
			.calendarInterval(DateHistogramInterval.QUARTER).format("yyyy-MM-dd")
			.minDocCount(0).extendedBounds(new ExtendedBounds("2019-01-01", "2020-12-31"));

	SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
	dateHistogramAggregationBuilder.subAggregation(sumAggregationBuilder);
	searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);
	//请求体放入请求头
	searchRequest.source(searchSourceBuilder);
	//2、执行
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

	//3、获取结果
	Aggregations aggregations = searchResponse.getAggregations();
	ParsedDateHistogram dateHistogram = aggregations.get("date_sales");
	List<? extends Histogram.Bucket> buckets = dateHistogram.getBuckets();
	for (Histogram.Bucket bucket : buckets) {
		String key = bucket.getKeyAsString();
		System.out.println("key: " + key);
		long docCount = bucket.getDocCount();
		System.out.println("docCount: " + docCount);
		Aggregations aggregations1 = bucket.getAggregations();
		Sum sum_price = aggregations1.get("sum_price");
		System.out.println("sum_price: "+sum_price.getValue());
		System.out.println("-------------------------------");
	}
}

Elasticsearch 7 sql新功能

在 ES 里面执行 SQL 语句,有三种方式,第一种是 RESTful 方式,第二种是 SQL-CLI 命令行工具,第三种是通过 JDBC 来连接 ES,执行的 SQL 语句其实都一样,我们先以 RESTful 方式来说明用法。

RESTful下调用SQL

快速入门

POST /_sql?format=txt
{
  "query":"select * from tvs"
}
POST /_sql?format=txt
{
  "query":"select color,avg(price),min(price),max(price),sum(price) from tvs group by color"
}

SQL-CLI 命令行工具

elasticsearch-sql-cli 进入elasticsearch的安装目录

然后再sql命令行可以执行sql语句

es sql与其他DSL结合

POST /_sql?format=txt
{
  "query":"select * from tvs",
  "filter":{
    "range":{
      "price":{
        "gte" : 1200,
        "lte" : 2000
      }
    }
  }
}

java代码实现sql功能

1、在kibana中开启白金版试用

2、导入相关依赖

<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>x-pack-sql-jdbc</artifactId>
    <version>7.3.0</version>
</dependency>

<repositories><!-- 如果jar包下载不下来,就需要在pom文件中配置一下仓库 -->
    <repository>
        <id>elastic.co</id>
        <url>https://artifacts.elastic.co/maven</url>
    </repository>
</repositories>  

3、相关测试代码

public class TestESJdbc {

    public static void main(String[] args) throws SQLException {
        //创建连接
        Connection connection = DriverManager.getConnection("jdbc:es://http://127.0.0.1:9200");
        //创建statement
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select * from tvs");
        //获取结果
        while (resultSet.next()){
            System.out.println(resultSet.getString(1));
            System.out.println(resultSet.getString(2));
            System.out.println(resultSet.getString(3));
            System.out.println(resultSet.getString(4));
            System.out.println("--------");
        }
    }
}
举报

相关推荐

0 条评论