翼度科技»论坛 编程开发 python 查看内容

Python 基于pymongo操作Mongodb学习总结

6

主题

6

帖子

18

积分

新手上路

Rank: 1

积分
18
实践环境

Python 3.6.4
pymongo 4.1.1
pymongo-3.12.3-cp36-cp36m-win_amd64.whl
下载地址:https://pypi.org/simple/pymongo/
代码实践
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import datetime
  4. import random
  5. import pymongo
  6. from pymongo import MongoClient
  7. from bson.objectid import ObjectId
  8. # # # # # # # # # 建立连接
  9. # 方式1
  10. # client = MongoClient() # 使用默认主机和端口连接本地Mongodb服务器
  11. # 方式2:
  12. # client = MongoClient("localhost", 27017)  # 也可以手动指定服务器和端口
  13. # 方式3:采用url
  14. user_name = 'tcems'
  15. password = 'Password123456'
  16. uri = f'mongodb://{user_name}:{password}@polartcems-mrs1.dbsit.sfcloud.local:24000,polartcems-mrs2.dbsit.sfcloud.local:24000,polartcems-mrs3.dbsit.sfcloud.local:24000'
  17. client = MongoClient(uri)
  18. # # # # # # # # # 获取数据库信息
  19. databases = client.list_databases()
  20. for database in databases:
  21.     print(database) # 输出字典,形如:{'name': 'custom_db_name', 'sizeOnDisk': 5001216.0, 'empty': False}
  22. # # # # # # # # # 获取数据库
  23. my_test_db = client.tcems
  24. # my_test_db = client['tcems'] # 如果不支持.属性方式访问(比如test-db),可以考虑使用字典方式访问
  25. # # # # # # # # # 获取当前数据库拥有的集合名称列表
  26. collections = my_test_db.list_collection_names()
  27. print(collections) # 输出名称列表,形如['YiLiuTemHumLog', 'test_collection', ...]
  28. # # # # # # # # # 获取集合
  29. collection = my_test_db.test_collection
  30. # collection = my_test_db['test_collection'] # 如果不支持.属性方式访问(比如test-db),可以考虑使用字典方式访问
  31. # # # # # # # # # 索引
  32. # 创建索引
  33. # collection.create_index([("date", pymongo.ASCENDING)])
  34. # collection.create_index([("field_name", pymongo.ASCENDING)], unique=True) # 创建唯一索引
  35. # # # # # # # # #  集合文档操作
  36. # # # # # # # # # 插入文档
  37. # 逐条插入文档
  38. post = {
  39.     'author': 'Mike',
  40.     'visitor_num': random.randint(0,100),
  41.     'text': 'blog post of Mike!',
  42.     'tags': ['mongodb', 'python', 'pymongo'],
  43.     'date': datetime.datetime.now(tz=datetime.timezone.utc)
  44. }
  45. post_id = collection.insert_one(post).inserted_id  # 注意:insert_one函数返回 pymongo.results.InsertOneResult对象
  46. print(post_id, type(post_id)) # 输出形如:65a881fffa04b0dc0e7a74bc <class 'bson.objectid.ObjectId'>    # 注意,获取的insert_id为 bson.objectid.ObjectId类型
  47. try:
  48.     post['_id'] = post_id
  49.     collection.insert_one(post)  # 运行报错: pymongo.errors.DuplicateKeyError: E11000 duplicate key error collection:...
  50. except pymongo.errors.DuplicateKeyError:
  51.     print('id重复')
  52.     # do something
  53. # 批量插入文档
  54. posts = [
  55.     {
  56.         'author': 'Mike',
  57.         'visitor_num': random.randint(0,100),
  58.         'text': 'blog post of Mike!',
  59.         'tags': ['mongodb', 'python', 'pymongo'],
  60.         'date': datetime.datetime.now(tz=datetime.timezone.utc)
  61.     },
  62.    {
  63.         'author': 'Jack',
  64.         'visitor_num': random.randint(0,100),
  65.         'text': 'blog post of Jack!',
  66.         'tags': ['mongodb', 'python', 'pymongo'],
  67.         'date': datetime.datetime.now(tz=datetime.timezone.utc)
  68.     }
  69. ]
  70. res = collection.insert_many(posts) # insert_many函数返回 pymongo.results.InsertManyResult 对象
  71. print(res.inserted_ids) # 获取插入记录的id,形如 [ObjectId('65a9423b782fc7838d729033'), ObjectId('65a9423b782fc7838d729034')]
  72. inserted_ids = res.inserted_ids
  73. # # # # # # # # # 查询文档
  74. # 查询单条文档
  75. res = collection.find_one() # 注意:如果集合中存在记录,则find_one返回字典对象,否则返回None
  76. print(res, type(res))
  77. print(res.get('_id'), str(res.get('_id')))  # 获取插入时自动生成的文档ID
  78. # 查询时指定查询条件
  79. print(collection.find_one({'author': 'Mike'})) # 文档author必须为Mike
  80. print(collection.find_one({'_id': res.get('_id')})) # 按_id查询
  81. print(collection.find_one({'_id': post_id}))  # 注意:_id值类型必须为ObjectId
  82. # 批量查询
  83. # 遍历所有记录
  84. for post in collection.find():
  85.     print(post)   # 此处,post为字典类型
  86.     break
  87. # 只查询满足条件的记录(注意:字典中逗号分隔的多个条件,默认的and关系
  88. for post in collection.find({'visitor_num': {'$gte': 12, '$lte': 18}}): # 查找visitor_num大于等于12小于等于18的文档
  89.     print(post)
  90. for post in collection.find({'author': {'$ne': 'Mike'}}): # 查找 author 不等于 Mike的文档
  91.     print(post)
  92. for post in collection.find({'visitor_num': {'$ne': None}}): # 查找 visitor_num不为null的文档记录
  93.     print(post)
  94. for post in collection.find({'$or':[{'visitor_num':{'$gte':18}},{'author': 'Mike'}]}): # 查找 visitor_num 大于等于18,或者 author 等于 Mike 的文档
  95.     print(post)
  96. for post in collection.find({'visitor_num':{'$in': [58, 90, 41]}}): # 查找 visitor_num 值在数组 [58,90,41]中的文档
  97.     print(post)
  98. for post in collection.find({'author':{'$nin': ['Mike','Jack']}}): # 查找 visitor_num 不在数组 ['Mike','Jack'] 中的文档
  99.     print(post)
  100.   
  101. # 限制返回文档数
  102. for post in collection.find().limit(10): # 仅返回10条文档
  103.     print(post)
  104. # 查询排序
  105. # 单个字段排序
  106. # for post in collection.find().sort('visitor_num'): # 按 visitor_num 升序排序 # 注意:不存在排序字段的文档在有排序字段文档之上
  107. for post in collection.find().sort('visitor_num', pymongo.ASCENDING): # 按 visitor_num 升序排序 pymongo.ASCENDING = 1
  108.     print(post)
  109. # 多字段排序
  110. print('多字段排序1: 按 _id 升序,再按 visitor_num 降序')
  111. for post in collection.find().sort([('_id', 1), ('visitor_num', pymongo.DESCENDING)]):
  112.     print(post)
  113. print('多字段排序2: 按 _id 降序,再按 visitor_num 升序')
  114. for post in collection.find().sort([('_id', -1), ('visitor_num', 1)]):
  115.     print(post)
  116. # 聚合查询
  117. start_time = datetime.datetime(2024, 1, 15, 6, 37, 37, 246000)
  118. end_time = datetime.datetime(2024, 1, 15, 6, 44, 7, 239000)
  119. # 查询当前集合中最大,最小文档ID
  120. for record in collection.aggregate([
  121.     {
  122.         "$group": {
  123.             "_id": None,
  124.             "min_id": {"$min": '$_id'},
  125.             "max_id": {"$max": '$_id'}
  126.         }
  127.     }
  128. ]):
  129.     if record: # record为字典类型
  130.         print(record.get('min_id'))
  131.         print(record.get('max_id'))
  132. # 查询当前集合中指定时间范围内最大,最小文档ID
  133. for record in collection.aggregate([
  134.     {
  135.         '$match': {
  136.             'date': {'$gte': start_time, '$lte': end_time}},
  137.     },
  138.     {
  139.         "$group": {
  140.             "_id": None,
  141.             "min_id": {"$min": '$_id'},
  142.             "max_id": {"$max": '$_id'}
  143.         }
  144.     }
  145. ]):
  146.     if record: # record为字典类型
  147.         print(record.get('min_id'))
  148.         print(record.get('max_id'))
  149. # 聚合管道
  150. # 查询文档,按visitor_num降序排序,限制返回文档数为10--获取visitor_num top 10的记录
  151. for post in collection.find().sort('visitor_num', pymongo.DESCENDING).limit(10):
  152.     print(post)
  153. # 或者
  154. for post in collection.find().limit(10).sort('visitor_num', pymongo.ASCENDING):
  155.     print(post)
  156. # 分页查询
  157. for post in collection.find().skip(5).limit(10): # skip(N) 跳过前N个文档,等价于mysql查询中的offset 根据limit参数值,返回第N+1条及往后文档
  158.     print(post)
  159. # 统计
  160. # 获取文档总数
  161. print(collection.count_documents({})) # 获取文档总数 # 注意:查询条件 {} 不能少,否则会报错
  162. print(collection.count_documents({'author': 'Jack'})) # 获取author值为Jack的文档总数
  163. # 获取最小值
  164. min_visitor_num = collection.find_one(sort=[('visitor_num', pymongo.ASCENDING)]).get('visitor_num') # 获取最小 visitor_num
  165. print('min_visitor_num', min_visitor_num)
  166. # 获取最大值
  167. max_visitor_num = collection.find_one(sort=[('visitor_num', pymongo.DESCENDING)]).get('visitor_num') # 获取最大 visitor_num
  168. print(collection.find_one(sort=[('visitor_num', pymongo.DESCENDING)]))
  169. # # # # # # # # # 更新文档
  170. # 逐条更新
  171. new_content = {
  172.     'author': 'Json',
  173.     'visitor_num': random.randint(0,100),
  174.     'text': 'blog post of Json!',
  175.     'tags': ['mongodb', 'python', 'pymongo'],
  176.     'date': datetime.datetime.now(tz=datetime.timezone.utc)
  177. }
  178. query_condition = {'_id':  ObjectId('65a4d2b165b14a57a38a1504')}
  179. collection.update_one(query_condition, {'$set': new_content})
  180. print(collection.find_one({'_id': ObjectId('65a4d2b165b14a57a38a1504')})) # 查看更新后的文档
  181. # 批量更新
  182. query_condition = {'visitor_num': None}
  183. collection.update_many(query_condition, {'$set': new_content}) # 批量更新visitor_num值为null的的文档内容为 new_content变量值
  184. # # # # # # # # #  删除文档
  185. # 逐条删除文档
  186. res = collection.delete_one({'_id': post_id}) # 删除指定_id等于 post_id变量值的文档 # 注意:delete_one函数返回 pymongo.results.DeleteResult对象
  187. # print(res.deleted_count, res.raw_result)
  188. if res.deleted_count == 1:
  189.     print('删除成功')
  190. # 批量删除
  191. res = collection.delete_many({'_id': {'$in': inserted_ids}}) # delete_many函数返回 pymongo.results.DeleteResult  对象
  192. print(res.deleted_count) # 输出被删除文档数量
  193. # 删除全部文档
  194. collection.delete_many({})
  195. # # # # # # # # # 删除集合
  196. collection.drop()
复制代码
说明:

  • 如果连接用户名和密码包含诸如':', '/', '+' 及'@'保留字符,则使用前应该先进行编码,如下:
    1. from urllib.parse import quote_plus
    2. user_name = quote_plus('@username')
    3. password = quote_plus('test_password+')
    4. host = 'project.example.local'
    5. port = 27017
    6. uri = f'mongodb://{user_name}:{password}@{host}:{port}'
    7. client = MongoClient(uri)
    复制代码
  • 使用URI建立连接时,URI书写格式分这么几种情况:

    • 需要验证密码
      1. uri = 'mongodb://user_name:password@host:port/authentication_database'
      复制代码
      说明:authentication_database :授权数据库,可选配,默认admin,如果不为admin时,必须显示指明,否则会报类似如下错误:
      1. pymongo.errors.OperationFailure: Authentication failed., full error: {'ok': 0.0, 'errmsg': 'Authentication failed.', 'code': 18, 'codeName': 'AuthenticationFailed'}
      复制代码
    • 不需要密码验证
      1. uri = 'mongodb://host:port'
      复制代码
    • 集群模式
      1. uri = 'mongodb://user_name:password@host1:port1,host2:port2,host3:port3,...hostN:portN/authentication_database'
      复制代码

  • MongoDB中的集合和数据库,都是懒惰地创建的——在第一个文档插入其中时创建的。
    备注:笔者实践时发现,无法自动创建数据库和集合,会提示授权认证失败。
  • MongoDB中的数据使用JSON样式的文档表示(和存储)。在PyMongo中,使用字典来表示文档。例如,以下字典可能用于表示博客文章:
    1. import datetime
    2. post = {
    3.     "author": "Mike",
    4.     "text": "My first blog post!",
    5.     "tags": ["mongodb", "python", "pymongo"],
    6.     "date": datetime.datetime.now(tz=datetime.timezone.utc),
    7. }
    复制代码
    注意,文档可以包含本地Python类型(如datetime.datetime实例),这些类型将自动转换为相应的BSON类型或从相应的BSON类型转换。
  • 实践时遇到类似如下错误:
    1. pymongo.errors.ConfigurationError: Server at * reports wire version 5, but this version of PyMongo requires at least 6 (MongoDB 3.6).
    复制代码
    原因分析:
    错误信息提示来看,使用的PyMongo版本与MongoDB服务器的Wire版本不兼容。服务器wire版本为5,而该PyMongo要求至少wire版本6(MongoDB 3.6)
    解决方法:

    • 升级MongoDB服务器:将MongoDB服务器升级到PyMongo所需的版本,即MongoDB 3.6或更高版本。
    • 降级PyMongo版本:如果不能升级MongoDB服务器,可以尝试降级PyMongo版本,以匹配MongoDB服务器版本。
      笔者实践时选择了降低PyMongo版本为 pymongo-3.12.3-cp36-cp36m-win_amd64.whl

参考链接

https://pymongo.readthedocs.io/en/stable/tutorial.html
https://pymongo.readthedocs.io/en/stable/api/pymongo/cursor.html#pymongo.cursor.Cursor.sort
https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.count_documents
https://www.mongodb.com/docs/manual/reference/operator/query/
https://www.mongodb.com/docs/manual/reference/operator/aggregation/
https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/
https://www.mongodb.com/docs/manual/reference/operator/aggregation/match/#mongodb-pipeline-pipe.-match

来源:https://www.cnblogs.com/shouke/p/17993189
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具