ElasticSearch的JAVA客户端操作

一、java操作ES有两种客户端:

1、TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。

2、Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。

 

二、TransportClient的基本使用

1、创建Client

public ElasticSearchService(String ipAddress, int port) {

        client = new TransportClient()

                .addTransportAddress(new InetSocketTransportAddress(ipAddress,

                        port));

 

    }

 

2、创建/删除Index和Type信息

// 创建索引

    public void createIndex() {

        client.admin().indices().create(new CreateIndexRequest(IndexName))

                .actionGet();

    }

 

    // 清除所有索引

    public void deleteIndex() {

        IndicesExistsResponse indicesExistsResponse = client.admin().indices()

                .exists(new IndicesExistsRequest(new String[] { IndexName }))

                .actionGet();

        if (indicesExistsResponse.isExists()) {

            client.admin().indices().delete(new DeleteIndexRequest(IndexName))

                    .actionGet();

        }

    }

    

    // 删除Index下的某个Type

    public void deleteType(){

        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();

    }

 

    // 定义索引的映射类型

    public void defineIndexTypeMapping() {

        try {

            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();

            mapBuilder.startObject()

            .startObject(TypeName)

                .startObject("properties")

                    .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()

                    .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()

                    .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()

                .endObject()

            .endObject()

            .endObject();

 

            PutMappingRequest putMappingRequest = Requests

                    .putMappingRequest(IndexName).type(TypeName)

                    .source(mapBuilder);

            client.admin().indices().putMapping(putMappingRequest).actionGet();

        } catch (IOException e) {

            log.error(e.toString());

        }

 

    }

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。

 

注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。

 

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

 

 

3、索引数据

// 批量索引数据

    public void indexHotSpotDataList(List<Hotspotdata> dataList) {

        if (dataList != null) {

            int size = dataList.size();

            if (size > 0) {

                BulkRequestBuilder bulkRequest = client.prepareBulk();

                for (int i = 0; i < size; ++i) {

                    Hotspotdata data = dataList.get(i);

                    String jsonSource = getIndexDataFromHotspotData(data);

                    if (jsonSource != null) {

                        bulkRequest.add(client

                                .prepareIndex(IndexName, TypeName,

                                        data.getId().toString())

                                .setRefresh(true).setSource(jsonSource));

                    }

                }

 

                BulkResponse bulkResponse = bulkRequest.execute().actionGet();

                if (bulkResponse.hasFailures()) {

                    Iterator<BulkItemResponse> iter = bulkResponse.iterator();

                    while (iter.hasNext()) {

                        BulkItemResponse itemResponse = iter.next();

                        if (itemResponse.isFailed()) {

                            log.error(itemResponse.getFailureMessage());

                        }

                    }

                }

            }

        }

    }

 

    // 索引数据

    public boolean indexHotspotData(Hotspotdata data) {

        String jsonSource = getIndexDataFromHotspotData(data);

        if (jsonSource != null) {

            IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,

                    TypeName).setRefresh(true);

            requestBuilder.setSource(jsonSource)

                    .execute().actionGet();

            return true;

        }

 

        return false;

    }

 

    // 得到索引字符串

    public String getIndexDataFromHotspotData(Hotspotdata data) {

        String jsonString = null;

        if (data != null) {

            try {

                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();

                jsonBuilder.startObject().field(IDFieldName, data.getId())

                        .field(SeqNumFieldName, data.getSeqNum())

                        .field(IMSIFieldName, data.getImsi())

                        .field(IMEIFieldName, data.getImei())

                        .field(DeviceIDFieldName, data.getDeviceID())

                        .field(OwnAreaFieldName, data.getOwnArea())

                        .field(TeleOperFieldName, data.getTeleOper())

                        .field(TimeFieldName, data.getCollectTime())

                        .endObject();

                jsonString = jsonBuilder.string();

            } catch (IOException e) {

                log.equals(e);

            }

        }

 

        return jsonString;

 

    }

ES支持批量和单个数据索引。

 

4、查询获取数据

// 获取少量数据100个

    private List<Integer> getSearchData(QueryBuilder queryBuilder) {

        List<Integer> ids = new ArrayList<>();

        SearchResponse searchResponse = client.prepareSearch(IndexName)

                .setTypes(TypeName).setQuery(queryBuilder).setSize(100)

                .execute().actionGet();

        SearchHits searchHits = searchResponse.getHits();

        for (SearchHit searchHit : searchHits) {

            Integer id = (Integer) searchHit.getSource().get("id");

            ids.add(id);

        }

        return ids;

    }

 

    // 获取大量数据

    private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {

        List<Integer> ids = new ArrayList<>();

        // 一次获取100000数据

        SearchResponse scrollResp = client.prepareSearch(IndexName)

                .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))

                .setQuery(queryBuilder).setSize(100000).execute().actionGet();

        while (true) {

            for (SearchHit searchHit : scrollResp.getHits().getHits()) {

                Integer id = (Integer) searchHit.getSource().get(IDFieldName);

                ids.add(id);

            }

            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())

                    .setScroll(new TimeValue(600000)).execute().actionGet();

            if (scrollResp.getHits().getHits().length == 0) {

                break;

            }

        }

 

        return ids;

 

    }

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。

5、聚合(Aggregation Facet)查询 

// 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>

    public Map<String, String> getDeviceDistributedInfo(String startTime,

            String endTime, List<String> deviceList) {

 

        Map<String, String> resultsMap = new HashMap<>();

 

        QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);

        QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);

        QueryBuilder queryBuilder = QueryBuilders.boolQuery()

                .must(deviceQueryBuilder).must(rangeBuilder);

 

        TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)

                .field(DeviceIDFieldName);

        SearchResponse searchResponse = client.prepareSearch(IndexName)

                .setQuery(queryBuilder).addAggregation(termsBuilder)

                .execute().actionGet();

        Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");

        if (terms != null) {

            for (Terms.Bucket entry : terms.getBuckets()) {

                resultsMap.put(entry.getKey(),

                        String.valueOf(entry.getDocCount()));

            }

        }

        return resultsMap;

 

    }

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

 

 

本文参考:http://www.cnblogs.com/luxiaoxun/p/4869509.html

ElasticSearch
您的回应...

相关话题

查看全部

也许你感兴趣

换一批

热门标签

更多