以下内容是通过Python来操作Es的代码操作示例
Demo示例
from elasticsearch import Elasticsearch
# 设置连接Es
es_client = Elasticsearch([{"host": "IP", "port": 9200}])
系统操作
# 查看Es状态
print(es_client.cat.health())
# Output: 1654140714 11:31:54 TestEs yellow 1 1 620 620 0 0 620 0 - 50.0%
# 查看Es有哪些索引
print(es_client.cat.indices())
# Output:
yellow open user_info tJ_1DHJ7TjWdgZz9FSIvkg 1 1 9 1 183.6kb 183.6kb
yellow open test KkmiWv_yQ265u7qbg2Z0FA 5 1 1851 0 3mb 3mb
# 查看Es节点有哪些
print(es_client.cat.nodes())
# Output: 192.168.5.112 56 59 0 0.00 0.01 0.05 mdi * node-1
# 查看Es主节点
print(es_client.cat.master())
# Output: UTKLTopJQv-qC1CUPA2aSg 192.168.5.112 192.168.5.112 node-1
索引操作
# 删除索引
print(es_client.indices.delete(index="test"))
# Output: {'acknowledged': True}
# 创建索引
body = {
"mappings": {
"_default_": {
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "text"
}
}
}
}
}
# 通过设置mapping来新建索引,当然body不指定的话默认使用的是Es自动推导的类型
print(es_client.indices.create(index="test", body=body))
# Output: {'acknowledged': True, 'shards_acknowledged': True}
# 重建索引
body = {"query": {"match_all": {}}} #遍历原索引,可自定义query
print(helpers.reindex(client=es_client, source_index="test", target_index="new_test", target_client=es_client,
query=body))
# Output: (10, 10)
数据操作
增加操作
# 单条新增
print(es_client.index(index="test",
doc_type="user_info",
id='1',
body={"name": "张三", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, 'created': True}
# 批量新增
action = [{
"_index": "test",
"_type": "user_info",
"_id": "1",
"_source": {
'name': "李四",
'age': 35,
'isChina': True,
"hobby": ["唱歌", "跳舞"]
}
}]
bulk_insert_res = helpers.bulk(client=es_client, actions=action, stats_only=True)
print(bulk_insert_res)
# Output: (1, 0)
删除操作
# 单条删除
print(es_client.delete(index='test', doc_type='user_info', id=1))
# Output: {'found': True, '_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}
# 删除指定条件数据, 批量删除
query = {
"query": {
"term": {"age": 35}
}
}
print(es_client.delete_by_query(index='test', doc_type='user_info', body=query))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'deleted': 1, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}
修改操作
ES不支持更新操作,具体更新操作底层实现的原理是: 删除原来索引的数据,插入新索引的数据。每一次更新,Es的_version字段内容会递增
# 单条更新(注意:这里的更新对象中,需要有doc关键字作为key)
body = {
"doc": {"name": "张三-修改", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}
}
# print(es_client.update(index='test', doc_type='user_info', id="1",body=body))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}
# 批量更新带条件(注意:这里使用的是script更新,老版本的需要将inline换成source)
uBody = {
# 查询条件
'query': {
'term': {
"age": 35
}
},
# 更新内容,第一种更新方式
'script': {
"inline": "ctx._source.name = params.name",
"params": {
"name": "张三-script第一种修改"
},
}
}
print(es_client.update_by_query(index='test', doc_type='user_info', body=uBody))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'updated': 1, 'deleted': 0, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}
# bulk批量更新
actions = [
{
"_op_type": "update", # 操作命令,这里为更新
"index": "test", # 数据的索引
"_id": "1", # 要更新的数据 _id
"doc": {
"name": "bulk更新"
}
}
]
print(helpers.bulk(client=es_client, actions=actions, index="test", doc_type="user_info", raise_on_error=True))
# Output: (1, [])
查询操作
# 返回全部数据
print(es_client.search(index='test', doc_type='user_info'))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}
# 条件查询(注意:这里变化的就是DSL语句)
queryBody = {
'query': {
'match_all': {}
}
}
print(es_client.search(index='test', doc_type='user_info', body=queryBody))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}