博客目录
- 一.安装 ElasticSearch
- 1.注意事项
- 2.安装 java11
- 3.下载 es
- 4.增加用户
- 5.赋权限
- 6.修改系统配置
- 7.内存权限
- 8.启动
- 9.验证
- 二.安装 Kibana
- 1.下载 Kibana
- 2.安装
- 3.修改配置
- 4.启动
- 5.验证
- 6.测试
- 三.简单使用
- 1.获取基本信息
- 2.集群状态
- 3.查看索引
- 4.创建索引
- 5.删除索引
- 6.主键 id
- 7.插入文档
- 8.查询文档
- 9.图书 crud
- 四.高阶使用
- 1.属性分析
- 2.\_source 字段
- 3.强制创建
- 4.脚本使用
- 5.查询索引
- 6.\_version 字段
- 7.重试
- 8.批量查询 mget
- 9.bulk
- 五.Java API
- 1.pom
- 2.yaml
- 3.config
- 4.get
- 5.add
- 6.update
- 7.delete
- 8.bulk
- 六.分词器
- 1.mysql 热更新词库
- 2.验证 ik 分词器
一.安装 ElasticSearch
1.注意事项
注意事项:
- 内存不能太小,否则会启动失败
- JDK 版本需要对应,es7 需要 Java 11
- 不能以 root 用户启动
2.安装 java11
#java版本查看
java -version
#下载安装
yum install java-11-openjdk.x86_64 -y
#查看位置
ls -rl $(which java)
#修改环境变量
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.0.1.al8.x86_64
export PATH=$PATH:$JAVA_HOME/bin
#使生效
source /etc/profile
3.下载 es
下载地址
#解压到到/usr/local/目录下
tar -zxvf elasticsearch-7.12.0-linux-x86_64.tar.gz -C /usr/local/
#进入解压后的目录
cd /usr/local/elasticsearch-7.12.0
#创建文件和日志目录(日志文件已经存在)
mkdir data
修改配置:
vim config/elasticsearch.yml
cluster.name: my-application #集群名称
node.name: node-1 #节点名称
path.data: /usr/local/elasticsearch-7.12.0/data
path.logs: /usr/local/elasticsearch-7.12.0/logs
#设置绑定的ip,设置为0.0.0.0以后就可以让任何计算机节点访问到了
network.host: 0.0.0.0
http.port: 9200 #端口
#设置在集群中的所有节点名称,这个节点名称就是之前所修改的,目前是单机,放入一个节点即可
cluster.initial_master_nodes: ["node-1"]
4.增加用户
#创建用户
useradd elasticsearch
#用户加密码(elastic-search1)
passwd elasticsearch
5.赋权限
# 修改目录权限至新增的elasticsearch用户
chown -R elasticsearch:elasticsearch /usr/local/elasticsearch-7.12.0
6.修改系统配置
- 修改系统中允许应用最多创建多少文件等的限制权限。Linux 默认来说,一般限制应用最多创建的文件是 65535 个。但是 ES 至少需要 65536 的文件创建权限。
- 修改系统中允许用户启动的进程开启多少个线程。默认的 Linux 限制 root 用户开启的进程可以开启任意数量的线程,其他用户开启的进程可以开启 1024 个线程。必须修改限制数为 4096+。因为 ES 至少需要 4096 的线程池预备。ES 在 5.x 版本之后,强制要求在 linux 中不能使用 root 用户启动 ES 进程。所以必须使用其他用户启动 ES 进程才可以。
- Linux 低版本内核为线程分配的内存是 128K。4.x 版本的内核分配的内存更大。如果虚拟机的内存是 1G,最多只能开启 3000+个线程数。至少为虚拟机分配 1.5G 以上的内存。
#修改系统配置
vim /etc/security/limits.conf
# 追加到末尾即可
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096
7.内存权限
vim /etc/sysctl.conf
vm.max_map_count=262144
#使配置生效
/sbin/sysctl -p
8.启动
#切换用户
su elasticsearch
#进入目录
cd /usr/local/elasticsearch-7.12.0
#启动
./bin/elasticsearch
./bin/elasticsearch -d
#启动Kibana
nohup sh /usr/local/kibana/bin/kibana &
9.验证
#查询端口信息
netstat -ntlp | grep -E '9200|5601'
#linux访问
curl 127.0.0.1:9200
#浏览器访问
http://47.119.160.231:9200
#查询集群状态
http://47.119.160.231:9200/cluster/health
解释:Status:集群状态。Green所有分片可用。Yellow所有主分片可用。Red主分片不可用,集群不可用。
{
"name": "node-1",
"cluster_name": "my-application",
"cluster_uuid": "jFwKZO8cT12BdPtU63m-ew",
"version": {
"number": "7.12.0",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "78722783c38caa25a70982b5b042074cde5d3b3a",
"build_date": "2021-03-18T06:17:15.410153305Z",
"build_snapshot": false,
"lucene_version": "8.8.0",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}
二.安装 Kibana
1.下载 Kibana
下载地址
选择要下载的版本 尽量和 ES 版本 保持一致 文本采用 7.12.0 并选择 linux 版本
2.安装
#安装目录
cd /usr/local
#解压
tar -zxvf kibana-7.12.0-linux-x86_64.tar.gz
#改名
mv kibana-7.12.0-linux-x86_64 kibana
3.修改配置
修改 kibana.yml
cd /usr/local/kibana
vim config/kibana.yml
server.port: 5601 #kibana端口
server.host: "0.0.0.0" #所有主机都能访问,或者也可以指定一个ip
elasticsearch.hosts: "http://es服务公网IP:9200" #配置es的访问地址
kibana.index: ".kibana"
#root
chown -R elasticsearch:elasticsearch /usr/local/kibana
4.启动
#切换用户
su elasticsearch
#启动
cd /usr/local/kibana/bin
./kabana
#后台启动
nohup sh /usr/local/kibana/bin/kibana &
5.验证
#端口号
netstat -ntlp | grep 5601
#浏览器
http://47.119.160.231:5601/
6.测试
GET _search
{
"query": {
"match_all": {}
}
}
GET /
GET /_cluster/health
GET /_cat/health?v
GET /_cat/indices?v
PUT /demo_index?pretty
DELETE /demo_index?pretty
PUT /book
PUT /book/_doc/1
{
"id":1,
"title":"这是一文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
PUT /book/_doc/2
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
GET /book/_doc/1
POST /book/_doc/1/_update
{
"doc": {
"title": "这是一333文章"
}
}
POST /book/_update/1
{
"doc": {
"title": "这是一3333333444555文章"
}
}
DELETE /book/_doc/1
POST /book/_doc/
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
GET /book/_doc/1
GET /book/_doc/1?_source_includes=id,title
PUT /read_index/_doc/1/_create
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
#插入数据
PUT /test_index/_doc/6
{
"num": 0
}
#执行脚本
POST /test_index/_doc/6/_update
{
"script": "ctx._source.num+=1"
}
#查询数据
GET /test_index/_doc/6
GET /test_index/_search
三.简单使用
1.获取基本信息
get /
{
"name": "node-1",
"cluster_name": "my-application",
"cluster_uuid": "gmjBLVYEQFO6ZR2bveXFig",
"version": {
"number": "7.12.0",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "78722783c38caa25a70982b5b042074cde5d3b3a",
"build_date": "2021-03-18T06:17:15.410153305Z",
"build_snapshot": false,
"lucene_version": "8.8.0",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}
2.集群状态
GET /_cluster/health
解释:Status:集群状态。Green 所有分片可用。Yellow 所有主分片可用。Red 主分片不可用,集群不可用。
{
"cluster_name": "my-application",
"status": "yellow",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 8,
"active_shards": 8,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 1,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 88.88888888888889
}
GET /_cat/health?v
- green:每个索引的 primary shard 和 replica shard 都是 active 状态的.
- yellow:每个索引的 primary shard 都是 active 状态的,但是部分 replica shard 不是 active 状态,处于不可用的状态
- red:不是所有索引的 primary shard 都是 active 状态的,部分索引有数据丢失了
3.查看索引
GET /_cat/indices?v
4.创建索引
PUT /demo_index?pretty
{
"acknowledged": true,
"shards_acknowledged": true,
"index": "demo_index"
}
5.删除索引
DELETE /demo_index?pretty
{
"acknowledged": true
}
6.主键 id
手动 id
PUT /book/_doc/2
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
自动 id
POST /book/_doc/
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
自动id特点:
长度为 20 个字符,URL 安全,base64 编码,GUID,分布式生成不冲突
{
"_index": "book",
"_type": "_doc",
"_id": "Qk9rAIgBMxAD_MReKtAQ",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 8,
"_primary_term": 1
}
7.插入文档
put /blog_index/_doc/2
{
"id":1,
"title":"这是一篇文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
{
"_index": "blog_index",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
8.查询文档
get /blog_index/_doc/1
{
"_index": "blog_index",
"_type": "_doc",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"id": 1,
"title": "这是一篇文章",
"content": "xxxxx",
"comment": "备注信息",
"mobile": "13344556677"
}
}
9.图书 crud
创建 book 索引
PUT /book
插入数据
PUT /book/_doc/1
{
"id":1,
"title":"这是一篇文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
查询数据
GET /book/_doc/1
使用 put 全量替换
实质:日文档的内容不会立即删除,只是标记为 deleted。适当的时机,集群会将这些文档删除。
PUT /book/_doc/2
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
局部更新
- es 内部获取旧文档
- 将传来的文档 field 更新到旧数据(内存)
- 将旧文档标记问 delete
- 创建新文档
POST /book/_doc/1/_update
{
"doc": {
"title": "这是一333文章"
}
}
noop:
- 多次执行"result" : "noop’
- no operation
局部更新
POST /book/_update/1
{
"doc": {
"title": "这是一3333333444555文章"
}
}
删除数据
DELETE /book/_doc/1
四.高阶使用
1.属性分析
- 版本号
- 删除的时候做标记,使用的是懒删除
- _id 分为手动 id 和默认 id
{
"_index": "book",
"_type": "_doc",
"_id": "1",
"_version": 6,
"_seq_no": 7,
"_primary_term": 1,
"found": true,
"_source": {
"id": 1,
"title": "这是一文章",
"content": "xxxxx",
"comment": "备注信息",
"mobile": "13344556677"
}
}
2._source 字段
含义:插入数据时的所有字段和值。在 get 获取数据时,在 source 字段中原样返回。
GET /book/_doc/1
定制返回:
GET /book/_doc/1?_source_includes=id,title
{
"_index" : "book",
"_type" : "_doc",
"_id" : "1",
"_version" : 6,
"_seq_no" : 7,
"_primary_term" : 1,
"found" : true,
"_source" : {
"id" : 1,
"title" : "这是一文章"
}
}
3.强制创建
为防止覆盖原有数据,我们在新增时,设置为强制创建,不会覆盖原有文档。
PUT /index/_doc/1/_create
PUT /read_index/_doc/1/_create
{
"id":1,
"title":"这是一11文章",
"content":"xxxxx",
"comment":"备注信息",
"mobile":"13344556677"
}
{
"_index": "read_index",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
4.脚本使用
#插入数据
PUT /test_index/_doc/6
{
"num": 0
}
#执行脚本
POST /test_index/_doc/6/_update
{
"script": "ctx._source.num+=1"
}
#查询数据
GET /test_index/_doc/6
{
"_index": "test_index",
"_type": "_doc",
"_id": "6",
"_version": 2,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"num": 1
}
}
5.查询索引
GET /test_index/_search
{
"took": 339,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "test_index",
"_type": "_doc",
"_id": "6",
"_score": 1.0,
"_source": {
"num": 1
}
}
]
}
}
6._version 字段
删除的时候是异步删除,只是做了删除标记,延时删除策略
es 内部主从同步时,是多线程异步,乐观锁机制。也是基于版本号的
- 线程 1 先到,线程 2 后到,副本数据是没有问题的
- 线程 2 先到,线程 1 后到:
- 副本分片先把数据改为 test3,verion=3.
- 线程 1 请求到了,副本分片,判断请求的 verison=1 太旧了,就会丢弃这个请求。
说明:
- version 使用数据自带的 version 版本号
- _version&version_type=external 则是并发时使用程序自己指定的 version,且是不存在的
PUT /test_index/_doc/4?version=2&version_type=external
{
"test_field": "itcast1"
}
7.重试
指定重试次数
POST /test_index/_doc/5/_update?retry_on_conflict=3
{
"doc": {
"test_field": "itcast1"
}
}
结合 version
POST /test_index/_doc/5/_update?retry_on_conflict=3&version=22&version_type=external
{
"doc": {
"test_field": "itcast1"
}
}
8.批量查询 mget
单条查询 GET /test_index/_doc/1,如果查询多个 id 的文档一条一条查询,网络开销太大。
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : 1
},
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : 7
}
]
}
返回:
{
"docs": [
{
"_index": "test_index",
"_type": "_doc",
"_id": "2",
"_version": 6,
"_seq_no": 12,
"_primary_term": 1,
"found": true,
"_source": {
"test_field": "test12333123321321"
}
},
{
"_index": "test_index",
"_type": "_doc",
"_id": "3",
"_version": 6,
"_seq_no": 18,
"_primary_term": 1,
"found": true,
"_source": {
"test_field": "test3213"
}
}
]
}
提示去掉 type
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_id" : 2
},
{
"_index" : "test_index",
"_id" : 3
}
]
}
同一索引下批量查询:
GET /test_index/_mget
{
"docs" : [
{
"_id" : 2
},
{
"_id" : 3
}
]
}
第三种写法:搜索写法
post /test_index/_doc/_search
{
"query": {
"ids" : {
"values" : ["1", "7"]
}
}
}
9.bulk
Bulk 操作解释将文档的增删改查一些列操作,通过一次请求全都做完。减少网络传输次数。
#语法
POST /_bulk
{"action": {"metadata"}}
{"data"}
示例:
#如下操作,删除5,新增14,修改2。
POST /_bulk
{ "create": { "_index": "test_index","_id": "8"}}
{ "test_field": "test8" }
{ "update": { "_index": "test_index","_id": "3"} }
{ "doc": {"test_field": "bulk test"} }
{ "delete": { "_index": "test_index","_id": "5" }}
总结:
- 功能:
- delete:删除一个文档,只要 1 个 json 串就可以了
- create:相当于强制创建 PUT /index/type/id/_create
- index:普通的 put 操作,可以是创建文档,也可以是全量替换文档
- update:执行的是局部更新 partial update 操作
- 格式:每个 json 不能换行。相邻 json 必须换行。
- 隔离:每个操作互不影响。操作失败的行会返回其失败信息。
- 实际用法:bulk 请求一次不要太大,否则一下积压到内存中,性能会下降。所以,一次请求几千个操作、大小在几 M 正好。
五.Java API
1.pom
<dependencies>
<!--es客户端-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<!-- low:偏向底层。high :高級封装。足够。-->
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>2.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
</dependencies>
2.yaml
spring:
application:
name: search-service
kwan:
elasticsearch:
hostlist: 47.119.160.231:9200 #多个节点用逗号分隔
3.config
@Configuration
public class ElasticsearchConfig {
@Value("${kwan.elasticsearch.hostlist}")
private String hostlist;
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient() {
String[] split = hostlist.split(",");
HttpHost[] httpHostsArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostsArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
return new RestHighLevelClient(RestClient.builder(httpHostsArray));
}
}
4.get
@Slf4j
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
public class TestDocument_01_get {
@Autowired
RestHighLevelClient client;
@Test
public void testGet() throws IOException {
//构建请求
GetRequest getRequest = new GetRequest("test_post", "1");
//添加可选参数
String[] includes = new String[]{"id", "comment"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
//同步查询
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
//获取结果
if (getResponse.isExists()) {
log.info(getResponse.getId());
log.info(String.valueOf(getResponse.getVersion()));
log.info(getResponse.getSourceAsString());//以string获取数据
log.info(String.valueOf(getResponse.getSourceAsBytes()));以Bytes获取数据
log.info(String.valueOf(getResponse.getSourceAsMap()));//以Map获取数据
} else {
log.info("数据不存在");
}
}
}
@Test
public void testGet() {
//构建请求
GetRequest getRequest = new GetRequest("test_post", "1");
//添加可选参数
String[] includes = new String[]{"id", "title"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
//设置监听器
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
//成功时
public void onResponse(GetResponse getResponse) {
log.info(getResponse.getId());
log.info(String.valueOf(getResponse.getVersion()));
log.info(getResponse.getSourceAsString());
}
//失败时
public void onFailure(Exception e) {
e.printStackTrace();
log.info("数据获取异常");
}
};
//异步查询
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
5.add
@Test
public void testAdd() throws IOException {
//构建请求
IndexRequest request = new IndexRequest("test_post");
request.id("5");
//构建文档数据
String jsonString = "{\n" +
" \"user\":\"tomas\",\n" +
" \"postDate\":\"2019-07-18\",\n" +
" \"message\":\"trying out es1\"\n" +
"}";
request.source(jsonString, XContentType.JSON);
//同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//获取结果
log.info(indexResponse.getIndex());
log.info(indexResponse.getId());
log.info(String.valueOf(indexResponse.getResult()));
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATE" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED" + result);
} else {
log.info("其他操作");
}
//获取分片信息
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
@Test
public void testAdd() throws IOException {
//构建请求
IndexRequest request = new IndexRequest("test_post");
request.id("6");
//构建文档数据
Map<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("user", "tomas");
jsonMap.put("postDate", "2019-07-18");
jsonMap.put("message", "trying out es1");
request.source(jsonMap);
//同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//获取结果
log.info(indexResponse.getIndex());
log.info(indexResponse.getId());
log.info(String.valueOf(indexResponse.getResult()));
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATE" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED" + result);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
@Test
public void testAdd() throws IOException {
//构建请求
IndexRequest request = new IndexRequest("test_post");
request.id("7");
//构建文档数据
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "tomas");
builder.field("message", "trying out es1");
builder.timeField("postDate", "2019-07-18");
}
builder.endObject();
request.source(builder);
//同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//获取结果
log.info(indexResponse.getIndex());
log.info(indexResponse.getId());
log.info(String.valueOf(indexResponse.getResult()));
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATE" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED" + result);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
@Test
public void testAdd() throws IOException {
//构建请求
IndexRequest request = new IndexRequest("test_post");
request.id("9");
//构建文档数据
request.source("user", "tomas",
"message", "trying out es1",
"postDate", "2019-07-18");
//同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//获取结果
log.info(indexResponse.getIndex());
log.info(indexResponse.getId());
log.info(String.valueOf(indexResponse.getResult()));
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATE" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED" + result);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
@Test
public void testAdd() throws IOException {
//构建请求
IndexRequest request = new IndexRequest("test_post");
request.id("10");
//构建文档数据
String jsonString = "{\n" +
" \"user\":\"tomas\",\n" +
" \"postDate\":\"2019-07-18\",\n" +
" \"message\":\"trying out es1\"\n" +
"}";
request.source(jsonString, XContentType.JSON);
//设置超时时间
request.timeout("1s");
request.timeout(TimeValue.timeValueSeconds(1));
//手动维护版本号
request.version(4);
request.versionType(VersionType.EXTERNAL);
//同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//获取结果
log.info(indexResponse.getIndex());
log.info(indexResponse.getId());
log.info(String.valueOf(indexResponse.getResult()));
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATE" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED" + result);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
6.update
@Test
public void testUpdate() throws IOException {
//创建请求
UpdateRequest request = new UpdateRequest("test_post", "5");
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "tomas Lee");
request.doc(jsonMap);
request.timeout("1s");
request.retryOnConflict(3);//重试次数
//同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//获取结果
updateResponse.getId();
updateResponse.getIndex();
//判断结果
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = updateResponse.getResult();
log.info("CREATED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = updateResponse.getResult();
log.info("UPDATED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
DocWriteResponse.Result result = updateResponse.getResult();
log.info("DELETED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//没有操作
DocWriteResponse.Result result = updateResponse.getResult();
log.info("NOOP:" + result);
}
}
7.delete
@Test
public void testDelete() throws IOException {
//创建请求
DeleteRequest request = new DeleteRequest("test_post", "3");
//执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
//获取结果
deleteResponse.getId();
deleteResponse.getIndex();
DocWriteResponse.Result result = deleteResponse.getResult();
log.info(result.toString());
}
8.bulk
@Test
public void testBulk() throws IOException {
//创建请求
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("post").id("1").source(XContentType.JSON, "field", "1"));
request.add(new IndexRequest("post").id("2").source(XContentType.JSON, "field", "2"));
request.add(new UpdateRequest("post", "1").doc(XContentType.JSON, "field", "3"));
request.add(new DeleteRequest("post").id("2"));
//执行
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
//获取结果
for (BulkItemResponse itemResponse : bulkResponse) {
DocWriteResponse response = itemResponse.getResponse();
switch (itemResponse.getOpType()) {
case INDEX:
IndexResponse indexResponse = (IndexResponse) response;
log.info("INDEX:" + indexResponse.getResult());
break;
case CREATE:
IndexResponse createResponse = (IndexResponse) response;
log.info("CREATE:" + createResponse.getResult());
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) response;
log.info("UPDATE:" + updateResponse.getResult());
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) response;
log.info("DELETE:" + deleteResponse.getResult());
break;
}
}
}
六.分词器
1.mysql 热更新词库
1.下载源码
https://github.com/medcl/elasticsearch-analysis-ik/releases
ik 分词器,是个标准的 java maven 工程,直接导入 idea 就可以看到源码
2.修改源
- org.wltea.analyzer.dic.Dictionary 类,160 行 Dictionary 单例类的初始化方法,在这里需要创建一个我们自定义的线程,并且启动它
- org.wltea.analyzer.dic.HotDictReloadThread 类:就是死循环,不断调用 Dictionary.getSingleton().reLoadMainDict(),去重新加载词典
- Dictionary 类,399 行:this.loadMySQLExtDict(); 加载 mymsql 字典。
- Dictionary 类,609 行:this.loadMySQLStopwordDict();加载 mysql 停用词
- config 下 jdbc-reload.properties。mysql 配置文件
3.mvn package 打包代码
target\releases\elasticsearch-analysis-ik-7.3.0.zip
4.解压缩 ik 压缩包
将 mysql 驱动 jar,放入 ik 的目录下
5.修改 jdbc 相关配置
6.重启 es
观察日志,日志中就会显示我们打印的那些东西,比如加载了什么配置,加载了什么词语,什么停用词
7.在 mysql 中添加词库与停用词
8.分词实验,验证热更新生效
GET /_analyze
{
"analyzer": "ik_smart",
"text": "喊麦"
}
2.验证 ik 分词器
standard分词器:
GET /_analyze
{
"analyzer": "standard",
"text": "中华人民共和国人民大会堂"
}
{
"tokens": [
{
"token": "中",
"start_offset": 0,
"end_offset": 1,
"type": "<IDEOGRAPHIC>",
"position": 0
},
{
"token": "华",
"start_offset": 1,
"end_offset": 2,
"type": "<IDEOGRAPHIC>",
"position": 1
},
{
"token": "人",
"start_offset": 2,
"end_offset": 3,
"type": "<IDEOGRAPHIC>",
"position": 2
},
{
"token": "民",
"start_offset": 3,
"end_offset": 4,
"type": "<IDEOGRAPHIC>",
"position": 3
},
{
"token": "共",
"start_offset": 4,
"end_offset": 5,
"type": "<IDEOGRAPHIC>",
"position": 4
},
{
"token": "和",
"start_offset": 5,
"end_offset": 6,
"type": "<IDEOGRAPHIC>",
"position": 5
},
{
"token": "国",
"start_offset": 6,
"end_offset": 7,
"type": "<IDEOGRAPHIC>",
"position": 6
},
{
"token": "人",
"start_offset": 7,
"end_offset": 8,
"type": "<IDEOGRAPHIC>",
"position": 7
},
{
"token": "民",
"start_offset": 8,
"end_offset": 9,
"type": "<IDEOGRAPHIC>",
"position": 8
},
{
"token": "大",
"start_offset": 9,
"end_offset": 10,
"type": "<IDEOGRAPHIC>",
"position": 9
},
{
"token": "会",
"start_offset": 10,
"end_offset": 11,
"type": "<IDEOGRAPHIC>",
"position": 10
},
{
"token": "堂",
"start_offset": 11,
"end_offset": 12,
"type": "<IDEOGRAPHIC>",
"position": 11
}
]
}
ik_max_word:
GET /_analyze
{
"analyzer": "ik_max_word",
"text": "中华人民共和国人民大会堂"
}
{
"tokens": [
{
"token": "中华人民共和国",
"start_offset": 0,
"end_offset": 7,
"type": "CN_WORD",
"position": 0
},
{
"token": "中华人民",
"start_offset": 0,
"end_offset": 4,
"type": "CN_WORD",
"position": 1
},
{
"token": "中华",
"start_offset": 0,
"end_offset": 2,
"type": "CN_WORD",
"position": 2
},
{
"token": "华人",
"start_offset": 1,
"end_offset": 3,
"type": "CN_WORD",
"position": 3
},
{
"token": "人民共和国",
"start_offset": 2,
"end_offset": 7,
"type": "CN_WORD",
"position": 4
},
{
"token": "人民",
"start_offset": 2,
"end_offset": 4,
"type": "CN_WORD",
"position": 5
},
{
"token": "共和国",
"start_offset": 4,
"end_offset": 7,
"type": "CN_WORD",
"position": 6
},
{
"token": "共和",
"start_offset": 4,
"end_offset": 6,
"type": "CN_WORD",
"position": 7
},
{
"token": "国人",
"start_offset": 6,
"end_offset": 8,
"type": "CN_WORD",
"position": 8
},
{
"token": "人民大会堂",
"start_offset": 7,
"end_offset": 12,
"type": "CN_WORD",
"position": 9
},
{
"token": "人民大会",
"start_offset": 7,
"end_offset": 11,
"type": "CN_WORD",
"position": 10
},
{
"token": "人民",
"start_offset": 7,
"end_offset": 9,
"type": "CN_WORD",
"position": 11
},
{
"token": "大会堂",
"start_offset": 9,
"end_offset": 12,
"type": "CN_WORD",
"position": 12
},
{
"token": "大会",
"start_offset": 9,
"end_offset": 11,
"type": "CN_WORD",
"position": 13
},
{
"token": "会堂",
"start_offset": 10,
"end_offset": 12,
"type": "CN_WORD",
"position": 14
}
]
}
ik_smart:
GET /_analyze
{
"analyzer": "ik_smart",
"text": "中华人民共和国人民大会堂"
}
{
"tokens": [
{
"token": "中华人民共和国",
"start_offset": 0,
"end_offset": 7,
"type": "CN_WORD",
"position": 0
},
{
"token": "人民大会堂",
"start_offset": 7,
"end_offset": 12,
"type": "CN_WORD",
"position": 1
}
]
}
觉得有用的话点个赞 👍🏻
呗。
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄
💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍
🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙