0
点赞
收藏
分享

微信扫一扫

常用Py3操作Elasticsearch方法

以下内容是通过Python来操作Es的代码操作示例

Demo示例

from elasticsearch import Elasticsearch

# 设置连接Es
es_client = Elasticsearch([{"host": "IP", "port": 9200}])

系统操作

# 查看Es状态
print(es_client.cat.health())
# Output: 1654140714 11:31:54 TestEs yellow 1 1 620 620 0 0 620 0 - 50.0%

# 查看Es有哪些索引
print(es_client.cat.indices())
# Output:
yellow open user_info tJ_1DHJ7TjWdgZz9FSIvkg 1 1      9    1 183.6kb 183.6kb
yellow open test      KkmiWv_yQ265u7qbg2Z0FA 5 1   1851    0     3mb     3mb

# 查看Es节点有哪些
print(es_client.cat.nodes())
# Output: 192.168.5.112 56 59 0 0.00 0.01 0.05 mdi * node-1

# 查看Es主节点
print(es_client.cat.master())
# Output: UTKLTopJQv-qC1CUPA2aSg 192.168.5.112 192.168.5.112 node-1

索引操作


# 删除索引
print(es_client.indices.delete(index="test"))
# Output: {'acknowledged': True}

# 创建索引
body = {
    "mappings": {
        "_default_": {
            "properties": {
                "id": {
                    "type": "integer"
                },
                "name": {
                    "type": "text"
                }
            }
        }
    }
}
# 通过设置mapping来新建索引,当然body不指定的话默认使用的是Es自动推导的类型
print(es_client.indices.create(index="test", body=body))
# Output: {'acknowledged': True, 'shards_acknowledged': True}

# 重建索引
body = {"query": {"match_all": {}}}  #遍历原索引,可自定义query
print(helpers.reindex(client=es_client, source_index="test", target_index="new_test", target_client=es_client,
                      query=body))
# Output: (10, 10)

数据操作

增加操作

# 单条新增
print(es_client.index(index="test",
                      doc_type="user_info",
                      id='1',
                      body={"name": "张三", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, 'created': True}

# 批量新增
action = [{
            "_index": "test",
            "_type": "user_info",
            "_id": "1",
            "_source": {
                'name': "李四",
                'age': 35,
                'isChina': True,
                "hobby": ["唱歌", "跳舞"]
            }
        }]
bulk_insert_res = helpers.bulk(client=es_client, actions=action, stats_only=True)
print(bulk_insert_res)
# Output: (1, 0)

删除操作

# 单条删除
print(es_client.delete(index='test', doc_type='user_info', id=1))
# Output: {'found': True, '_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}

# 删除指定条件数据, 批量删除
query = {
    "query": {
        "term": {"age": 35}
    }
}
print(es_client.delete_by_query(index='test', doc_type='user_info', body=query))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'deleted': 1, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

修改操作

ES不支持更新操作,具体更新操作底层实现的原理是: 删除原来索引的数据,插入新索引的数据。每一次更新,Es的_version字段内容会递增

# 单条更新(注意:这里的更新对象中,需要有doc关键字作为key)
body = {
    "doc": {"name": "张三-修改", "age": 35, "isChina": True, "hobby": ["唱歌", "跳舞"]}
}
# print(es_client.update(index='test', doc_type='user_info', id="1",body=body))
# Output: {'_index': 'test', '_type': 'user_info', '_id': '1', '_version': 3, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}}

# 批量更新带条件(注意:这里使用的是script更新,老版本的需要将inline换成source)
uBody = {
        # 查询条件
        'query': {
            'term': {
                "age": 35
            }
        },
        # 更新内容,第一种更新方式
        'script': {
            "inline": "ctx._source.name = params.name",
            "params": {
                "name": "张三-script第一种修改"
            },
        }
}
print(es_client.update_by_query(index='test', doc_type='user_info', body=uBody))
# Output: {'took': 0, 'timed_out': False, 'total': 0, 'updated': 1, 'deleted': 0, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

# bulk批量更新
actions = [
    {
        "_op_type": "update",  # 操作命令,这里为更新
        "index": "test",  # 数据的索引
        "_id": "1",  # 要更新的数据 _id
        "doc": {
            "name": "bulk更新"
        }
    }
]
print(helpers.bulk(client=es_client, actions=actions, index="test", doc_type="user_info", raise_on_error=True))
# Output: (1, [])

查询操作

# 返回全部数据
print(es_client.search(index='test', doc_type='user_info'))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}

# 条件查询(注意:这里变化的就是DSL语句)
queryBody = {
    'query': {
        'match_all': {}
    }
}
print(es_client.search(index='test', doc_type='user_info', body=queryBody))
# Output: {'took': 0, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'failed': 0}, 'hits': {'total': 1, 'max_score': 1.0, 'hits': [{'_index': 'test', '_type': 'user_info', '_id': '1', '_score': 1.0, '_source': {'name': '李四', 'age': 35, 'isChina': True, 'hobby': ['唱歌', '跳舞']}}]}}
举报

相关推荐

0 条评论