翼度科技»论坛 编程开发 python 查看内容

Python连接es笔记三之es更新操作

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
本文首发于公众号:Hunter后端
原文链接:Python连接es笔记三之es更新操作
这一篇笔记介绍如何使用 Python 对数据进行更新操作。
对于 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的连接加上相应的函数来操作,本篇笔记目录如下:

  • 获取连接
  • update()
  • update_by_query()
  • 批量更新
  • UpdateByQuery()
1、获取连接

如果使用的是之前的全局创建连接的方式:
  1. from elasticsearch_dsl import connections
  2. connections.configure(
  3.     default={"hosts": "localhost:9200"},
  4. )
复制代码
我们可以根据别名获取相应的连接:
  1. conn = connections.connections.get_connection("default")
复制代码
或者我们直接使用 elasticsearch.Elasticsearch 模块来重新建立一个连接:
  1. from elasticsearch import Elasticsearch
  2. conn = Elasticsearch(hosts="localhost:9200")
复制代码
前面介绍过,我们安装 elasticsearch_dsl 依赖的时候,会自动为我们安装上相应的 elasticsearch 模块,我们这里直接使用即可。
然后通过 conn 连接可以直接对数据进行更新,可用的方法有 update(),update_by_query() 以及一个批量的 bulk() 方法。
2、update()

update() 函数一般只用于指定 id 的更新操作,如果我们知道一条数据的 id,我们可以直接使用 update()。
比如对于 exam 这个 index 下 id=18 的数据,我们想要更新它的 name 字段和 address 字段分别为 王五和湖南省,我们可以如下操作:
  1. conn.update(
  2.     index="exam",
  3.     id=18,
  4.     body={
  5.         "doc": {
  6.             "name": "王五2",
  7.             "address": "湖南省",
  8.         }
  9.     }
  10. )
复制代码
在上面的操作中,index 为指定的索引,id 参数为我们需要更新的 id,body 内 doc 下的字段即为我们要更新的数据。
3、update_by_query()

update_by_query() 函数不局限于 id 的查询更新,我们可以更新任意符合条件的数据,以下是一个简单的示例:
  1. conn.update_by_query(
  2.     index="exam",
  3.     body={
  4.         "query": {
  5.             "term": {"name":  "张三丰"}
  6.         },
  7.         "script": {
  8.             "source": "ctx._source.address = params.address",
  9.             "params": {
  10.                 "address": "新地址",
  11.             }
  12.         }
  13.     }
  14. )
复制代码
在这里,index 参数还是指向对应的索引,body 内包含了需要更新查询的条件,这里都在 query 参数内,需要更新的数据在 script 下,通过脚本的形式来操作更新。
这里注意下,我这里用到的是 7.6.0 版本,所以 script 下使用的 source,更低一点版本用的字段可能是 inline,这里使用对应版本的参数即可。
在 script.source 中,内容为 ctx._source.address = params.address,意思是将符合条件数据的 address 字段内容更新为 params 的 address 的数据。
如果想要更改其他字段内容,注意前面 ctx._source 为固定写法,只需要更改后面的字段名即可。
在 script.params 中,我们则可以定义各种对应的字段及其内容。
更新多个字段

如果我们想同时更新多个字段,比如说符合条件的数据将 address 改为 新地址,将 age 字段改为 28,我们则需要将多个条件在 script.source 中使用分号 ; 连接起来,示例如下:
  1. conn.update_by_query(
  2.     index="exam",
  3.     body={
  4.         "query": {
  5.             "term": {"name":  "新张三丰2"}
  6.         },
  7.         "script": {
  8.             "source": "ctx._source.address = params.address; ctx._source.age = params.age",
  9.             "params": {
  10.                 "address": "新地址3",
  11.                 "age": "28"
  12.             }
  13.         }
  14.     }
  15. )
复制代码
虽然这里更新多个字段需要使用分号连接,但是在实际的代码中我们不用这么写死,比如说我们需要更改三个字段,为 ["address", "name", "age"],我们如下操作:
  1. field_list = ["address", "name", "age"]
  2. source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
  3. params = {
  4.     "address": "新地址3",
  5.     "age": "28",
  6.     "name": "new name"
  7. }
  8. conn.update_by_query(
  9.     index="exam",
  10.     body={
  11.         "query": {
  12.             "term": {"name":  "新张三丰3"}
  13.         },
  14.         "script": {
  15.             "source": ";".join(source_list),
  16.             "params": params
  17.         }
  18.     }
  19. )
复制代码
4、批量更新

如果我们想批量更新一批数据,这批数据各个字段的值都不一致,自定义的程度很大,使用 update_by_query() 函数已经不现实了,怎么办?
好解决,我们可以使用 helpers.bulk() 批量更新方法。
首先引入这个模块:
  1. from elasticsearch import helpers
复制代码
假设我们系统里现在有 id 为 21,23,24 的几条数据,还是在 exam 这个索引下,我们来构造几条需要更新的数据来操作:
  1. action_1 = {
  2.     "_op_type": "update",
  3.     "_index": "exam",
  4.     "_id": 21,
  5.     "doc": {"age": 19, "name": "令狐冲", "address": "华山派"},
  6. }
  7. action_2 = {
  8.     "_op_type": "update",
  9.     "_index": "exam",
  10.     "_id": 23,
  11.     "doc": {"age": 20, "name": "杨过", "address": "终南山"},
  12. }
  13. action_3 = {
  14.     "_op_type": "update",
  15.     "_index": "exam",
  16.     "_id": 24,
  17.     "doc": {"age": 21, "name": "张无忌", "address": "武当"},
  18. }
  19. action_list = [action_1, action_2, action_3]
  20. helpers.bulk(conn, actions=action_list)
复制代码
对于每一条需要更新的数据,有这几个参数:
_op_type:如果是更新操作,其值则是 update
_index:表示需要更新的数据所在的索引,这里是 exam
_id:表示这条需要更新的数据的 id
doc:是一个 dict 数据,其下包含了需要更新的字段及其对应的值
至此,一条需要更新的数据的结构就构造完毕了。
然后对于 helpers.bulk() 函数,接收的第一个参数为 es 连接,actions 参数是一个列表,其内容就是我们前面构造的数据的集合。
然后执行这个操作就可以发现 es 中对应的值已经更改了。
5、UpdateByQuery()

UpdateByQuery() 函数来源于 elasticsearch_dsl 模块,它的使用和 Search() 方法差不多,都是通过 using 和 index 参数来获取 es 连接和索引:
  1. from elasticsearch_dsl import connections
  2. from elasticsearch_dsl import UpdateByQuery
  3. from elasticsearch_dsl import Q as ES_Q
  4. connections.configure(
  5.     default={"hosts": "localhost:9200"},
  6. )
  7. ubq = UpdateByQuery(using="default", index="exam")
复制代码
使用这个方法更新数据的具体语法和 update_by_query 差不多,都是通过 script 的方式来操作,以下是一个简单示例:
  1. ubq = UpdateByQuery(using="default", index="exam")
  2. q1 = ES_Q("term", name="郭靖")
  3. ubq = ubq.query(q1)
  4. ubq = ubq.script(
  5.     source="ctx._source.address=params.address",
  6.     params={
  7.         "address": "襄阳城"
  8.     }
  9. )
  10. ubq.execute()
复制代码
与 Search() 函数一样,都需要通过 execute() 函数来向 es 提交数据。
如果想获取更多后端相关文章,可扫码关注阅读:


来源:https://www.cnblogs.com/hunterxiong/p/17444265.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具