翼度科技»论坛 编程开发 python 查看内容

Python elasticsearch-py类库基础用法

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
实践环境

https://pypi.org/project/elasticsearch/
pip install elasticsearch==7.6.0
离线安装包及依赖包下载地址:
https://files.pythonhosted.org/packages/f5/71/45d36a8df68f3ebb098d6861b2c017f3d094538c0fb98fa61d4dc43e69b9/urllib3-1.26.2-py2.py3-none-any.whl#sha256=d8ff90d979214d7b4f8ce956e80f4028fc6860e4431f731ea4a8c08f23f99473
https://files.pythonhosted.org/packages/98/98/c2ff18671db109c9f10ed27f5ef610ae05b73bd876664139cf95bd1429aa/certifi-2023.7.22.tar.gz#sha256=539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082
https://files.pythonhosted.org/packages/1f/12/7919c5d8b9c497f9180db15ea8ead6499812ea8264a6ae18766d93c59fe5/dataclasses-0.8.tar.gz#sha256=8479067f342acf957dc82ec415d355ab5edb7e7646b90dc6e2fd1d96ad084c97
https://files.pythonhosted.org/packages/6b/db/d934d605258d38bd470c83d535c3a73c3d01e4ad357ecb4336300fbb8e88/elastic-transport-8.4.1.tar.gz#sha256=e5548997113c5d9566c9a1a51ed67bce50a4871bc0e44b692166461279e4167e
https://files.pythonhosted.org/packages/94/1a/2369fc9264c655c20908053b59fae7f65ddc47f123d89b533a724ae1d19d/elasticsearch-7.6.0.tar.gz
Python3.6.2
Elasticsearch服务 7.6
注意:elasticsearch-py类库版本必须和Elasticsearch服务器版本保持对应,否则会有兼容性问题,具体如下,
  1. # Elasticsearch 7.x
  2. elasticsearch>=7.0.0,<8.0.0
  3. # Elasticsearch 6.x
  4. elasticsearch>=6.0.0,<7.0.0
  5. # Elasticsearch 5.x
  6. elasticsearch>=5.0.0,<6.0.0
  7. # Elasticsearch 2.x
  8. elasticsearch>=2.0.0,<3.0.0
复制代码
注意:笔者实践时,发现运行以下语句,会报错
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. from elasticsearch import Elasticsearch
  4. from elasticsearch import helpers
  5. from datetime import datetime
  6. def search_data_by_scroll(es, index, body):
  7.     '''翻页查询'''
  8.     request_timeout = 600
  9.     scroll = '20m' # 用于配置scroll有效期--滚动搜索上下文保留时间,默认不能超过 1d (24 小时). 可使用search.max_keep_alive集群配置修改该值
  10.     query_result = es.search(index=index,
  11.                                   scroll=scroll,
  12.                                   body=body,
  13.                                   request_timeout=request_timeout)
  14.     scroll_id = query_result['_scroll_id']
  15.     while len(query_result['hits']['hits']) > 0:
  16.         yield query_result['hits']['hits']
  17.         query_result = es.scroll(scroll_id=scroll_id,
  18.                                       scroll=scroll,
  19.                                       request_timeout=request_timeout)
  20.         scroll_id = query_result['_scroll_id']
  21. if __name__ == '__main__':
  22.     # ES连接相关配置信息
  23.     hosts = [ # ES 集群服务器信息
  24.         {"host": "10.153.1.4", "port": 9200},
  25.         {"host": "10.153.1.5", "port": 9200},
  26.         {"host": "10.153.1.6", "port": 9200}
  27.     ]
  28.     username = 'myusername'  #  ES用户账号 可选配置, 如果无需访问验证, 则配置为空字符串 ''
  29.     password = 'mypassword'  # ES用户密码 可选配置,如果无需访问验证, 则配置为空字符串 ''
  30.     es = Elasticsearch(hosts=hosts,
  31.                        http_auth=(username, password),
  32.                        # 配置连接前进行探测
  33.                        sniff_on_connection_fail = True,  # 节点无响应时刷新节点
  34.                        sniffer_timeout = 60  # 设置超时时间,单位 秒
  35.                        )
  36.     ############### 增 ###############
  37.     #### 创建索引
  38.     #创建索引,如果索引已存在,则报错
  39.     res = es.indices.create(index='my-test-index')
  40.     print(res) # 输出:{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-test-index'}
  41.     # 创建索引,如果索引已存在,则忽略400错误, 形如 elasticsearch.exceptions.RequestError: RequestError(400, 'resource_already_exists_exception', 'index [my-test-index/rOk1sVW9R46GZq9o7ACVxQ] already exists')
  42.     res = es.indices.create(index='my-test-index', ignore=400)
  43.     print(res) # 输出索引已存在相关错误信息
  44.     # 定义mapping body
  45.     index_mappings = {
  46.         'mappings': { # key固定为 mappings
  47.             'properties': {
  48.                 'name': {
  49.                     'type': 'keyword'
  50.                 },
  51.                 'age': {
  52.                     'type': 'integer'
  53.                 },
  54.                 'tags': {
  55.                     'type': 'text'
  56.                 },
  57.                 "timestamp": {
  58.                     "type": "date"
  59.                 }
  60.             }
  61.         },
  62.         'settings': {
  63.             'index': {
  64.                 'number_of_shards': '3',
  65.                 'number_of_replicas': '0'
  66.             }
  67.         }
  68.     }
  69.     # 创建索引的同时,设置mapping
  70.     es.indices.create(index='my-test-index', body=index_mappings, ignore=400)
  71.     #### 为索引创建别名
  72.     # 为单个索引创建别名
  73.     print(es.indices.put_alias(index='my-test-index', name='my-test-index-alias')) # 输出:{'acknowledged': True}
  74.     res = es.indices.create(index='my-test-index2', ignore=400)
  75.     print(res)
  76.     # 为多个索引创建同一个别名,联合查询时用
  77.     print(es.indices.put_alias(index=['my-test-index', 'my-test-index2'], name='test-index-alias')) # 输出:{'acknowledged': True}
  78.     #### 插入数据
  79.     # 插入单条数据
  80.     data = {
  81.         'name': '张三',
  82.         'age': 18,
  83.         'tags': '勤奋',
  84.         'timestamp': datetime.now()
  85.     }
  86.     res = es.index(index='my-test-index', body=data)
  87.     print(res) # 输出:{'_index': 'my-test-index', '_type': '_doc', '_id': 'ELlNE4sBRHfq82dAAhMz', '_version': 1, 'result': 'created', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
  88.     # 插入记录时,指定记录ID
  89.     data = {
  90.         'name': '晓晓',
  91.         'age': 19,
  92.         'tags': '尊师',
  93.         'timestamp': datetime.now()
  94.     }
  95.     res = es.index(index='my-test-index', id=1, body=data)
  96.     print(res) # 输出:{'_index': 'my-test-index', '_type': '_doc', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
  97.     # 插入记录时,指定文档类型
  98.     # res = es.index(index='mytest-index', doc_type='person', id=1, body=data) # 注意:如果索引不存在,会自动创建索引
  99.     # print(res) # {'_index': 'mytest-index', '_type': 'person', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
  100.     # 批量插入数据
  101.     insert_records = []
  102.     index_name = 'my-test-index'
  103.     data1 = {
  104.         '_index': index_name,
  105.         'name': '李四',
  106.         'age': 20,
  107.         'tags': '和善',
  108.         'timestamp': datetime.now()
  109.     }
  110.     data2 = {
  111.         '_index': index_name,
  112.         'name': '王五',
  113.         'age': 19,
  114.         'tags': '好学',
  115.         'timestamp': datetime.now()
  116.     }
  117.     insert_records.append(data1)
  118.     insert_records.append(data2)
  119.     res = helpers.bulk(client=es, actions=insert_records)
  120.     print(res) # 输出:(2, [])
  121.     ############### 改 ###############
  122.     #### 更新记录
  123.     # 使用 index 方法 # 注意:使用index实现更新时,body数据必须包含记录的全部字段,否则对应记录,未包含的字段将会被删除
  124.     data = {
  125.         'name': '晓晓',
  126.         'age': 23,
  127.         'timestamp': datetime.now()
  128.     }
  129.     res = es.index(index='mytest-index', id='1', body=data) # id为es中记录的_id值
  130.     print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
  131.     # 使用 update 方法
  132.     body = {
  133.         'doc': {
  134.             'tags': '尊师重教',
  135.         }
  136.     }
  137.     res = es.update(index='mytest-index', id='1', body=body)
  138.     print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 3, 'result': 'updated', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
  139.     ############### 查 ###############
  140.     # 查看全部索引的信息
  141.     index_info = es.indices.get('*')
  142.     # print(index_info) # 输出数据格式同 查看某个索引的信息 调用输出
  143.     # 查看某个索引的信息
  144.     index_info = es.indices.get('mytest-index')
  145.     print(index_info) # 输出:{'mytest-index': {'aliases': {}, 'mappings': {'properties': {'age': {'type': 'long'}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'tags': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'timestamp': {'type': 'date'}}}, 'settings': {'index': {'search': {'slowlog': {'threshold': {'query': {'warn': '2s', 'info': '1s'}}}}, 'number_of_shards': '1', 'provided_name': 'mytest-index', 'creation_date': '1696837120143', 'number_of_replicas': '1', 'uuid': 'EzxFtEyGQBKPUCu9usa8XA', 'version': {'created': '7060099'}}}}}
  146.     # 查看多个指定索引的信息
  147.     print(es.indices.get(index=['my-test-index', 'mytest-index']))
  148.     # 查询索引相关信息
  149.     body = '''
  150.         {
  151.            "query": {
  152.               "match_all": {}
  153.            }
  154.         }'''
  155.     body = body.strip()
  156.     res = es.search(index=index_name,
  157.                     doc_type='_doc', # 可以配置为None
  158.                     request_timeout=120, # 设置查询超时时间120秒,es默认的查询超时时间是10s
  159.                     body=body) # 注意:这里的body也可以是json,不一定要用字符串
  160.     # print(res)
  161.     # 查看索引是否存在
  162.     print(es.indices.exists('my-test-index'))  # 输出:True
  163.     # 获取指定索引中指定ID的记录信息
  164.     res = es.get(index='mytest-index', id=1)
  165.     print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 3, '_seq_no': 2, '_primary_term': 1, 'found': True, '_source': {'name': '晓晓', 'age': 23, 'timestamp': '2023-10-09T15:49:53.068134', 'tags': '尊师重教'}}
  166.     # # 获取所有索引中指定ID的记录信息
  167.     # res = es.get(index="*", id="1") # 报错了
  168.     # 查询记录数
  169.     doc_count_info = es.count(index=index_name)
  170.     print(doc_count_info) # 输出:{'count': 4, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}}
  171.     # 查询记录信息
  172.     res = es.search(index=index_name,
  173.               request_timeout=120, # 设置查询超时时间120秒,es默认的查询超时时间是10s
  174.               body=body)
  175.     # print(res) # 反回一个字典,同界面查询结果
  176.     # 查询指定索引,指定文档类型,指定ID记录是否存在
  177.     print(es.exists(index='mytest-index', doc_type='doc', id='1')) # 输出:False
  178.     # 查询指定索引中,指定ID记录是否存在
  179.     print(es.exists(index='mytest-index', id='1')) # 输出:True
  180.     # 翻页查询记录信息
  181.     for data in search_data_by_scroll(es, 'index_presale_orderinfo', body):
  182.         print(data) # 输出数据格式为列表, 形如 [{...},{...}]
  183.     # 获取当前集群的基本信息
  184.     print(es.info())
  185.     # 获取集群健康信息
  186.     print(es.cluster.health())
  187.     # 获取整个集群的综合状态信息。
  188.     print(es.cluster.state())
  189.     # 返回群集的当前节点的信息
  190.     print(es.cluster.stats())
  191.     # 获取索引mapping
  192.     print(es.indices.get_mapping(index='my-test-index'))
  193.     # 获取索引设置
  194.     # 获取单个索引的设置
  195.     print(es.indices.get_settings(index='my-test-index'))
  196.     # 获取多个索引的设置
  197.     print(es.indices.get_settings(index=['my-test-index', 'my-test-index2']))
  198.     # 获取所有索引的设置
  199.     print(es.indices.get_settings(index='*'))
  200.     # 获取索引别名
  201.     # 获取单个索引的别名
  202.     print(es.indices.get_alias(index='my-test-index'))
  203.     # 获取多个索引的别名
  204.     print(es.indices.get_alias(index=['my-test-index', 'my-test-index2']))
  205.     # 获取任务列表
  206.     print(es.tasks.list(detailed=True, timeout='60s'))
  207.        
  208.        
  209.     ############### 删 ###############
  210.     # 删除文档记录
  211.     # 按id删除
  212.     res = es.delete(index='mytest-index', id='1')
  213.     print(res) # 输出:{'_index': 'mytest-index', '_type': '_doc', '_id': '1', '_version': 4, 'result': 'deleted', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
  214.     # 按条件删除
  215.     body = {
  216.         'query': {
  217.             'match': {
  218.                 'name': '张三'
  219.             }
  220.         }
  221.     }
  222.     res = es.delete_by_query(index=index_name, body=body, ignore=[400, 404])
  223.     print(res) # 输出:{'took': 25, 'timed_out': False, 'total': 1, 'deleted': 1, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}
  224.     # 删除索引别名
  225.     # 删除单个索引的别名
  226.     print(es.indices.delete_alias(index='my-test-index', name='my-test-index-alias')) # 输出:{'acknowledged': True}
  227.     # 删除多个索引的指定别名,如果别名不存在,则忽略错误
  228.     # print(es.indices.delete_alias(index=['my-test-index', 'my-test-index2'], name=['test-index-alias']))
  229.     # 删除多个索引的所有别名
  230.     print(es.indices.delete_alias(index=['my-test-index', 'my-test-index2'], name='_all')) # 如果存在索引别名,则会报错
  231.     # 删除索引
  232.     print(es.indices.delete(index_name)) # 输出:{'acknowledged': True}
  233.     # 删除索引,忽略400,404错误,索引不存在时,会报404错误
  234.     print(es.indices.delete(index_name, ignore = [400, 404]))
复制代码
其它说明:
  1. res = es.get(index="*", id="1") # 报错:elasticsearch.exceptions.AuthorizationException: AuthorizationException(403, 'security_exception', 'action [indices:data/read/get] is unauthorized for user [elastic]')
复制代码
scroll 时间单位描述:
  1. es.scroll(scroll_id=scroll_id, scroll='10m', request_timeout=20)
复制代码
参考链接

https://pypi.org/project/elasticsearch/7.6.0/#description
https://elasticsearch-py.readthedocs.io/en/v8.10.0/
https://elasticsearch-py.readthedocs.io/en/v8.10.0/api.html
http://runxinzhi.com/bubu99-p-13580687.html

来源:https://www.cnblogs.com/shouke/p/17993185
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具