删除索引库
可以看到id为1的索引库不见了
这里要修改下配置文件
slave1,slave2也做同样的操作,在这里就不多赘述了。
这个时候记得要重启elasticseach才能生效,怎么重启这里就不多说了
运行程序
这个函数的意思是如果文件存在就更新,不存在就创建
第一次执行下来
第二次执行(因为文件已经存在了,所以就把里面的内容更新)
这个是批量操作,来获取多条索引
添加两个删除一个
1 public void test13() throws IOException, InterruptedException,
2 ExecutionException {
3
4 BulkProcessor bulkProcessor = BulkProcessor.builder(
5 client,
6 new BulkProcessor.Listener() {
7
8 public void beforeBulk(long executionId, BulkRequest request) {
9 // TODO Auto-generated method stub
10 System.out.println(request.numberOfActions());
11 }
12
13 public void afterBulk(long executionId, BulkRequest request,
14 Throwable failure) {
15 // TODO Auto-generated method stub
16 System.out.println(failure.getMessage());
17 }
18
19 public void afterBulk(long executionId, BulkRequest request,
20 BulkResponse response) {
21 // TODO Auto-generated method stub
22 System.out.println(response.hasFailures());
23 }
24 })
25 .setBulkActions(1000) // 每个批次的最大数量
26 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
27 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
28 .setConcurrentRequests(1) //设置多少个并发处理线程
29 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
30 .setBackoffPolicy(
31 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
32 .build();
33 String json = "{" +
34 "\"user\":\"kimchy\"," +
35 "\"postDate\":\"2013-01-30\"," +
36 "\"message\":\"trying out Elasticsearch\"" +
37 "}";
38
39 for (int i = 0; i < 1000; i++) {
40 bulkProcessor.add(new IndexRequest("djt6", "user").source(json));
41 }
42 //阻塞至所有的请求线程处理完毕后,断开连接资源
43 bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
44 client.close();
45 }
46 /**
47 * SearchType使用方式
48 * @throws Exception
49 */
50 @Test
51 public void test14() throws Exception {
52 SearchResponse response = client.prepareSearch("djt")
53 .setTypes("user")
54 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
55 .setSearchType(SearchType.QUERY_AND_FETCH)
56 .execute()
57 .actionGet();
58 SearchHits hits = response.getHits();
59 System.out.println(hits.getTotalHits());
60 }
61 }
这个是批量插入
这里有1000个,我就不数了
参考代码ESTestDocumentAPI.java
1 package com.dajiangtai.djt_spider.elasticsearch;
2
3 import java.io.IOException;
4 import java.net.InetAddress;
5 import java.net.UnknownHostException;
6 import java.util.Date;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.TimeUnit;
13 import static org.elasticsearch.node.NodeBuilder.*;
14 import static org.elasticsearch.common.xcontent.XContentFactory.*;
15 import org.elasticsearch.action.bulk.BackoffPolicy;
16 import org.elasticsearch.action.bulk.BulkProcessor;
17 import org.elasticsearch.common.unit.ByteSizeUnit;
18 import org.elasticsearch.common.unit.ByteSizeValue;
19 import org.elasticsearch.common.unit.TimeValue;
20 import org.codehaus.jackson.map.ObjectMapper;
21 import org.elasticsearch.action.bulk.BulkItemResponse;
22 import org.elasticsearch.action.bulk.BulkRequest;
23 import org.elasticsearch.action.bulk.BulkRequestBuilder;
24 import org.elasticsearch.action.bulk.BulkResponse;
25 import org.elasticsearch.action.delete.DeleteRequestBuilder;
26 import org.elasticsearch.action.delete.DeleteResponse;
27 import org.elasticsearch.action.get.GetResponse;
28 import org.elasticsearch.action.get.MultiGetItemResponse;
29 import org.elasticsearch.action.get.MultiGetResponse;
30 import org.elasticsearch.action.index.IndexRequest;
31 import org.elasticsearch.action.index.IndexRequestBuilder;
32 import org.elasticsearch.action.index.IndexResponse;
33 import org.elasticsearch.action.search.SearchResponse;
34 import org.elasticsearch.action.search.SearchType;
35 import org.elasticsearch.action.update.UpdateRequest;
36 import org.elasticsearch.client.Client;
37 import org.elasticsearch.client.transport.TransportClient;
38 import org.elasticsearch.cluster.node.DiscoveryNode;
39 import org.elasticsearch.common.settings.Settings;
40 import org.elasticsearch.common.transport.InetSocketTransportAddress;
41 import org.elasticsearch.index.query.QueryBuilders;
42 import org.elasticsearch.node.Node;
43 import org.elasticsearch.script.Script;
44 import org.elasticsearch.script.ScriptService;
45 import org.elasticsearch.search.SearchHits;
46 import org.junit.Before;
47 import org.junit.Test;
48
49 /**
50 * Document API 操作
51 *
52 * @author 大讲台
53 *
54 */
55 public class ESTestDocumentAPI {
56 private TransportClient client;
57
58 @Before
59 public void test0() throws UnknownHostException {
60
61 // 开启client.transport.sniff功能,探测集群所有节点
62 Settings settings = Settings.settingsBuilder()
63 .put("cluster.name", "escluster")
64 .put("client.transport.sniff", true).build();
65 // on startup
66 // 获取TransportClient
67 client = TransportClient
68 .builder()
69 .settings(settings)
70 .build()
71 .addTransportAddress(
72 new InetSocketTransportAddress(InetAddress
73 .getByName("master"), 9300))
74 .addTransportAddress(
75 new InetSocketTransportAddress(InetAddress
76 .getByName("slave1"), 9300))
77 .addTransportAddress(
78 new InetSocketTransportAddress(InetAddress
79 .getByName("slave2"), 9300));
80 }
81
82 /**
83 * 创建索引:use ElasticSearch helpers
84 *
85 * @throws IOException
86 */
87 @Test
88 public void test1() throws IOException {
89 IndexResponse response = client
90 .prepareIndex("twitter", "tweet", "1")
91 .setSource(
92 jsonBuilder().startObject().field("user", "kimchy")
93 .field("postDate", new Date())
94 .field("message", "trying out Elasticsearch")
95 .endObject()).get();
96 System.out.println(response.getId());
97 client.close();
98 }
99
100 /**
101 * 创建索引:do it yourself
102 *
103 * @throws IOException
104 */
105 @Test
106 public void test2() throws IOException {
107 String json = "{" + "\"user\":\"kimchy\","
108 + "\"postDate\":\"2013-01-30\","
109 + "\"message\":\"trying out Elasticsearch\"" + "}";
110 IndexResponse response = client.prepareIndex("twitter", "tweet")
111 .setSource(json).get();
112 System.out.println(response.getId());
113 client.close();
114 }
115
116 /**
117 * 创建索引:use map
118 *
119 * @throws IOException
120 */
121 @Test
122 public void test3() throws IOException {
123 Map<String, Object> json = new HashMap<String, Object>();
124 json.put("user", "kimchy");
125 json.put("postDate", new Date());
126 json.put("message", "trying out Elasticsearch");
127
128 IndexResponse response = client.prepareIndex("twitter", "tweet")
129 .setSource(json).get();
130 System.out.println(response.getId());
131 client.close();
132 }
133
134 /**
135 * 创建索引:serialize your beans
136 *
137 * @throws IOException
138 */
139 @Test
140 public void test4() throws IOException {
141 User user = new User();
142 user.setUser("kimchy");
143 user.setPostDate(new Date());
144 user.setMessage("trying out Elasticsearch");
145
146 // instance a json mapper
147 ObjectMapper mapper = new ObjectMapper(); // create once, reuse
148
149 // generate json
150 byte[] json = mapper.writeValueAsBytes(user);
151
152 IndexResponse response = client.prepareIndex("twitter", "tweet")
153 .setSource(json).get();
154 System.out.println(response.getId());
155 client.close();
156 }
157
158 /**
159 * 查询索引:get
160 *
161 * @throws IOException
162 */
163 @Test
164 public void test5() throws IOException {
165 GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
166 System.out.println(response.getSourceAsString());
167
168 client.close();
169 }
170
171 /**
172 * 删除索引:delete
173 *
174 * @throws IOException
175 */
176 @Test
177 public void test6() throws IOException {
178 client.prepareDelete("twitter", "tweet", "1").get();
179 client.close();
180 }
181
182 /**
183 * 更新索引:Update API-UpdateRequest
184 *
185 * @throws IOException
186 * @throws ExecutionException
187 * @throws InterruptedException
188 */
189 @Test
190 public void test7() throws IOException, InterruptedException,
191 ExecutionException {
192 UpdateRequest updateRequest = new UpdateRequest();
193 updateRequest.index("twitter");
194 updateRequest.type("tweet");
195 updateRequest.id("AVyi3OORot7zkId708s8");
196 updateRequest.doc(jsonBuilder().startObject().field("gender", "male")
197 .endObject());
198 client.update(updateRequest).get();
199 System.out.println(updateRequest.version());
200 client.close();
201 }
202
203 /**
204 * 更新索引:Update API-prepareUpdate()-doc
205 *
206 * @throws IOException
207 * @throws ExecutionException
208 * @throws InterruptedException
209 */
210 @Test
211 public void test8() throws IOException, InterruptedException,
212 ExecutionException {
213 client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6")
214 .setDoc(jsonBuilder().startObject().field("gender", "female")
215 .endObject()).get();
216 client.close();
217 }
218
219 /**
220 * 更新索引:Update API-prepareUpdate()-script
221 * 需要开启:script.engine.groovy.inline.update: on
222 *
223 * @throws IOException
224 * @throws ExecutionException
225 * @throws InterruptedException
226 */
227 @Test
228 public void test9() throws IOException, InterruptedException,
229 ExecutionException {
230 client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-")
231 .setScript(
232 new Script("ctx._source.gender = \"female\"",
233 ScriptService.ScriptType.INLINE, null, null))
234 .get();
235 client.close();
236 }
237
238 /**
239 * 更新索引:Update API-UpdateRequest-upsert
240 *
241 * @throws IOException
242 * @throws ExecutionException
243 * @throws InterruptedException
244 */
245 @Test
246 public void test10() throws IOException, InterruptedException,
247 ExecutionException {
248 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")
249 .source(jsonBuilder()
250 .startObject()
251 .field("name", "Joe Smith")
252 .field("gender", "male")
253 .endObject());
254 UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
255 .doc(jsonBuilder()
256 .startObject()
257 .field("gender", "female")
258 .endObject()).upsert(indexRequest);
259 client.update(updateRequest).get();
260 client.close();
261 }
262
263 /**
264 * 批量查询索引:Multi Get API
265 *
266 * @throws IOException
267 * @throws ExecutionException
268 * @throws InterruptedException
269 */
270 @Test
271 public void test11() throws IOException, InterruptedException,
272 ExecutionException {
273 MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
274 .add("twitter", "tweet", "1")
275 .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6")
276 .add("djt2", "user", "1")
277 .get();
278
279 for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
280 GetResponse response = itemResponse.getResponse();
281 if (response.isExists()) {
282 String json = response.getSourceAsString();
283 System.out.println(json);
284 }
285 }
286 client.close();
287 }
288
289 /**
290 * 批量操作索引:Bulk API
291 *
292 * @throws IOException
293 * @throws ExecutionException
294 * @throws InterruptedException
295 */
296 @Test
297 public void test12() throws IOException, InterruptedException,
298 ExecutionException {
299 BulkRequestBuilder bulkRequest = client.prepareBulk();
300
301 // either use client#prepare, or use Requests# to directly build index/delete requests
302 bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")
303 .setSource(jsonBuilder()
304 .startObject()
305 .field("user", "kimchy")
306 .field("postDate", new Date())
307 .field("message", "trying out Elasticsearch")
308 .endObject()
309 )
310 );
311
312 bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
313 .setSource(jsonBuilder()
314 .startObject()
315 .field("user", "kimchy")
316 .field("postDate", new Date())
317 .field("message", "another post")
318 .endObject()
319 )
320 );
321 DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6");
322 bulkRequest.add(prepareDelete);
323
324
325 BulkResponse bulkResponse = bulkRequest.get();
326 //批量操作:其中一个操作失败不影响其他操作成功执行
327 if (bulkResponse.hasFailures()) {
328 // process failures by iterating through each bulk response item
329 BulkItemResponse[] items = bulkResponse.getItems();
330 for (BulkItemResponse bulkItemResponse : items) {
331 System.out.println(bulkItemResponse.getFailureMessage());
332 }
333 }else{
334 System.out.println("bulk process success!");
335 }
336 client.close();
337 }
338
339 /**
340 * 批量操作索引:Using Bulk Processor
341 * 优化:先关闭副本,再添加副本,提升效率
342 * @throws IOException
343 * @throws ExecutionException
344 * @throws InterruptedException
345 */
346 @Test
347 public void test13() throws IOException, InterruptedException,
348 ExecutionException {
349
350 BulkProcessor bulkProcessor = BulkProcessor.builder(
351 client,
352 new BulkProcessor.Listener() {
353
354 public void beforeBulk(long executionId, BulkRequest request) {
355 // TODO Auto-generated method stub
356 System.out.println(request.numberOfActions());
357 }
358
359 public void afterBulk(long executionId, BulkRequest request,
360 Throwable failure) {
361 // TODO Auto-generated method stub
362 System.out.println(failure.getMessage());
363 }
364
365 public void afterBulk(long executionId, BulkRequest request,
366 BulkResponse response) {
367 // TODO Auto-generated method stub
368 System.out.println(response.hasFailures());
369 }
370 })
371 .setBulkActions(1000) // 每个批次的最大数量
372 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数
373 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔
374 .setConcurrentRequests(1) //设置多少个并发处理线程
375 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作
376 .setBackoffPolicy(
377 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
378 .build();
379 String json = "{" +
380 "\"user\":\"kimchy\"," +
381 "\"postDate\":\"2013-01-30\"," +
382 "\"message\":\"trying out Elasticsearch\"" +
383 "}";
384
385 for (int i = 0; i < 1000; i++) {
386 bulkProcessor.add(new IndexRequest("djt6", "user").source(json));
387 }
388 //阻塞至所有的请求线程处理完毕后,断开连接资源
389 bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
390 client.close();
391 }
392 /**
393 * SearchType使用方式
394 * @throws Exception
395 */
396 @Test
397 public void test14() throws Exception {
398 SearchResponse response = client.prepareSearch("djt")
399 .setTypes("user")
400 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
401 .setSearchType(SearchType.QUERY_AND_FETCH)
402 .execute()
403 .actionGet();
404 SearchHits hits = response.getHits();
405 System.out.println(hits.getTotalHits());
406 }
407 }