翼度科技»论坛 编程开发 python 查看内容

Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。
MySQL

连接数据库

Python可以使用mysql-connector-python库连接MySQL数据库:
  1. import mysql.connector
  2. conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
  3. print("Opened MySQL database successfully")
  4. conn.close()
复制代码
CRUD操作

接下来,我们将展示在MySQL中如何进行基本的CRUD操作。
创建(Create)
  1. conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
  2. cursor = conn.cursor()
  3. cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
  4. print("Table created successfully")
  5. conn.close()
复制代码
读取(Retrieve)
  1. conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
  2. cursor = conn.cursor()
  3. cursor.execute("SELECT id, name, address, salary from Employees")
  4. rows = cursor.fetchall()
  5. for row in rows:
  6.     print("ID = ", row[0])
  7.     print("NAME = ", row[1])
  8.     print("ADDRESS = ", row[2])
  9.     print("SALARY = ", row[3])
  10. conn.close()
复制代码
更新(Update)
  1. conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
  2. cursor = conn.cursor()
  3. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  4. conn.commit()
  5. print("Total number of rows updated :", cursor.rowcount)
  6. conn.close()
复制代码
删除(Delete)
  1. conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
  2. cursor = conn.cursor()
  3. cursor.execute("DELETE from Employees where ID = 1")
  4. conn.commit()
  5. print("Total number of rows deleted :", cursor.rowcount)
  6. conn.close()
复制代码
SQL Server

连接数据库

Python可以使用pyodbc库连接SQL Server数据库:
  1. import pyodbc
  2. conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
  3. print("Opened SQL Server database successfully")
  4. conn.close()
复制代码
CRUD操作

接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。
创建(Create)
  1. conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
  2. cursor = conn.cursor()
  3. cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
  4. conn.commit()
  5. print("Table created successfully")
  6. conn.close()
复制代码
读取(Retrieve)
  1. conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
  2. cursor = conn.cursor()
  3. cursor.execute("SELECT id, name, address, salary from Employees")
  4. rows = cursor.fetchall()
  5. for row in rows:
  6.     print("ID = ", row[0])
  7.     print("NAME = ", row[1])
  8.     print("ADDRESS = ", row[2])
  9.     print("SALARY = ", row[3])
  10. conn.close()
复制代码
更新(Update)
  1. conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
  2. cursor = conn.cursor()
  3. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  4. conn.commit()
  5. print("Total number of rows updated :", cursor.rowcount)
  6. conn.close()
复制代码
删除(Delete)
  1. conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
  2. cursor = conn.cursor()
  3. cursor.execute("DELETE from Employees where ID = 1")
  4. conn.commit()
  5. print("Total number of rows deleted :", cursor.rowcount)
  6. conn.close()
复制代码
Oracle

连接数据库

Python可以使用cx_Oracle库连接Oracle数据库:
  1. import cx_Oracle
  2. dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
  3. conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
  4. print("Opened Oracle database successfully")
  5. conn.close()
复制代码
CRUD操作

接下来,我们将展示在Oracle中如何进行基本的CRUD操作。
创建(Create)
  1. dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
  2. conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
  3. cursor = conn.cursor()
  4. cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")
  5. conn.commit()
  6. print("Table created successfully")
  7. conn.close()
复制代码
读取(Retrieve)
  1. dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
  2. conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
  3. cursor = conn.cursor()
  4. cursor.execute("SELECT id, name, address, salary from Employees")
  5. rows = cursor.fetchall()
  6. for row in rows:
  7.     print("ID = ", row[0])
  8.     print("NAME = ", row[1])
  9.     print("ADDRESS = ", row[2])
  10.     print("SALARY = ", row[3])
  11. conn.close()
复制代码
更新(Update)
  1. dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
  2. conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
  3. cursor = conn.cursor()
  4. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  5. conn.commit()
  6. print("Total number of rows updated :", cursor.rowcount)
  7. conn.close()
复制代码
删除(Delete)
  1. dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
  2. conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
  3. cursor = conn.cursor()
  4. cursor.execute("DELETE from Employees where ID = 1")
  5. conn.commit()
  6. print("Total number of rows deleted :", cursor.rowcount)
  7. conn.close()
复制代码
PostgreSQL

连接数据库

Python可以使用psycopg2库连接PostgreSQL数据库:
  1. import psycopg2
  2. conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
  3. print("Opened PostgreSQL database successfully")
  4. conn.close()
复制代码
CRUD操作

接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。
创建(Create)
  1. conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
  2. cursor = conn.cursor()
  3. cursor.execute('''CREATE TABLE Employees
  4.       (ID INT PRIMARY KEY     NOT NULL,
  5.       NAME           TEXT    NOT NULL,
  6.       AGE            INT     NOT NULL,
  7.       ADDRESS        CHAR(50),
  8.       SALARY         REAL);''')
  9. conn.commit()
  10. print("Table created successfully")
  11. conn.close()
复制代码
读取(Retrieve)
  1. conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
  2. cursor = conn.cursor()
  3. cursor.execute("SELECT id, name, address, salary from Employees")
  4. rows = cursor.fetchall()
  5. for row in rows:
  6.     print("ID = ", row[0])
  7.     print("NAME = ", row[1])
  8.     print("ADDRESS = ", row[2])
  9.     print("SALARY = ", row[3])
  10. conn.close()
复制代码
更新(Update)
  1. conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
  2. cursor = conn.cursor()
  3. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  4. conn.commit()
  5. print("Total number of rows updated :", cursor.rowcount)
  6. conn.close()
复制代码
删除(Delete)
  1. conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
  2. cursor = conn.cursor()
  3. cursor.execute("DELETE from Employees where ID = 1")
  4. conn.commit()
  5. print("Total number of rows deleted :", cursor.rowcount)
  6. conn.close()
复制代码
MongoDB

连接数据库

Python可以使用pymongo库连接MongoDB数据库:
  1. from pymongo import MongoClient
  2. client = MongoClient("mongodb://localhost:27017/")
  3. db = client["my_database"]
  4. print("Opened MongoDB database successfully")
  5. client.close()
复制代码
CRUD操作

接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。
创建(Create)

在MongoDB中,文档的创建操作通常包含在插入操作中:
  1. client = MongoClient("mongodb://localhost:27017/")
  2. db = client["my_database"]
  3. employees = db["Employees"]
  4. employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}
  5. employees.insert_one(employee)
  6. print("Document inserted successfully")
  7. client.close()
复制代码
读取(Retrieve)
  1. client = MongoClient("mongodb://localhost:27017/")
  2. db = client["my_database"]
  3. employees = db["Employees"]
  4. cursor = employees.find()
  5. for document in cursor:
  6.     print(document)
  7. client.close()
复制代码
更新(Update)
  1. client = MongoClient("mongodb://localhost:27017/")
  2. db = client["my_database"]
  3. employees = db["Employees"]
  4. query = { "id": "1" }
  5. new_values = { "$set": { "salary": "25000.00" } }
  6. employees.update_one(query, new_values)
  7. print("Document updated successfully")
  8. client.close()
复制代码
删除(Delete)
  1. client = MongoClient("mongodb://localhost:27017/")
  2. db = client["my_database"]
  3. employees = db["Employees"]
  4. query = { "id": "1" }
  5. employees.delete_one(query)
  6. print("Document deleted successfully")
  7. client.close()
复制代码
SQLite

连接数据库

Python使用sqlite3库连接SQLite数据库:
  1. import sqlite3
  2. conn = sqlite3.connect('my_database.db')
  3. print("Opened SQLite database successfully")
  4. conn.close()
复制代码
CRUD操作

接下来,我们将展示在SQLite中如何进行基本的CRUD操作。
创建(Create)
  1. conn = sqlite3.connect('my_database.db')
  2. cursor = conn.cursor()
  3. cursor.execute('''CREATE TABLE Employees
  4.       (ID INT PRIMARY KEY     NOT NULL,
  5.       NAME           TEXT    NOT NULL,
  6.       AGE            INT     NOT NULL,
  7.       ADDRESS        CHAR(50),
  8.       SALARY         REAL);''')
  9. conn.commit()
  10. print("Table created successfully")
  11. conn.close()
复制代码
读取(Retrieve)
  1. conn = sqlite3.connect('my_database.db')
  2. cursor = conn.cursor()
  3. cursor.execute("SELECT id, name, address, salary from Employees")
  4. rows = cursor.fetchall()
  5. for row in rows:
  6.     print("ID = ", row[0])
  7.     print("NAME = ", row[1])
  8.     print("ADDRESS = ", row[2])
  9.     print("SALARY = ", row[3])
  10. conn.close()
复制代码
更新(Update)
  1. conn = sqlite3.connect('my_database.db')
  2. cursor = conn.cursor()
  3. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  4. conn.commit()
  5. print("Total number of rows updated :", cursor.rowcount)
  6. conn.close()
复制代码
删除(Delete)
  1. conn = sqlite3.connect('my_database.db')
  2. cursor = conn.cursor()
  3. cursor.execute("DELETE from Employees where ID = 1")
  4. conn.commit()
  5. print("Total number of rows deleted :", cursor.rowcount)
  6. conn.close()
复制代码
DB2

连接数据库

Python可以使用ibm_db库连接DB2数据库:
  1. import ibm_db
  2. dsn = (
  3.     "DRIVER={{IBM DB2 ODBC DRIVER}};"
  4.     "DATABASE=my_database;"
  5.     "HOSTNAME=127.0.0.1;"
  6.     "PORT=50000;"
  7.     "PROTOCOL=TCPIP;"
  8.     "UID=username;"
  9.     "PWD=password;"
  10. )
  11. conn = ibm_db.connect(dsn, "", "")
  12. print("Opened DB2 database successfully")
  13. ibm_db.close(conn)
复制代码
CRUD操作

接下来,我们将展示在DB2中如何进行基本的CRUD操作。
创建(Create)
  1. conn = ibm_db.connect(dsn, "", "")
  2. sql = '''CREATE TABLE Employees
  3.       (ID INT PRIMARY KEY     NOT NULL,
  4.       NAME           VARCHAR(20)    NOT NULL,
  5.       AGE            INT     NOT NULL,
  6.       ADDRESS        CHAR(50),
  7.       SALARY         DECIMAL(9, 2));'''
  8. stmt = ibm_db.exec_immediate(conn, sql)
  9. print("Table created successfully")
  10. ibm_db.close(conn)
复制代码
读取(Retrieve)
  1. conn = ibm_db.connect(dsn, "", "")
  2. sql = "SELECT id, name, address, salary from Employees"
  3. stmt = ibm_db.exec_immediate(conn, sql)
  4. while ibm_db.fetch_row(stmt):
  5.     print("ID = ", ibm_db.result(stmt, "ID"))
  6.     print("NAME = ", ibm_db.result(stmt, "NAME"))
  7.     print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS"))
  8.     print("SALARY = ", ibm_db.result(stmt, "SALARY"))
  9. ibm_db.close(conn)
复制代码
更新(Update)
  1. conn = ibm_db.connect(dsn, "", "")
  2. sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"
  3. stmt = ibm_db.exec_immediate(conn, sql)
  4. ibm_db.commit(conn)
  5. print("Total number of rows updated :", ibm_db.num_rows(stmt))
  6. ibm_db.close(conn)
复制代码
删除(Delete)
  1. conn = ibm_db.connect(dsn, "", "")
  2. sql = "DELETE from Employees where ID = 1"
  3. stmt = ibm_db.exec_immediate(conn, sql)
  4. ibm_db.commit(conn)
  5. print("Total number of rows deleted :", ibm_db.num_rows(stmt))
  6. ibm_db.close(conn)
复制代码
Microsoft Access

连接数据库

Python可以使用pyodbc库连接Microsoft Access数据库:
  1. import pyodbc
  2. conn_str = (
  3.     r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
  4.     r'DBQ=path_to_your_access_file.accdb;'
  5. )
  6. conn = pyodbc.connect(conn_str)
  7. print("Opened Access database successfully")
  8. conn.close()
复制代码
CRUD操作

接下来,我们将展示在Access中如何进行基本的CRUD操作。
创建(Create)
  1. conn = pyodbc.connect(conn_str)
  2. cursor = conn.cursor()
  3. cursor.execute('''CREATE TABLE Employees
  4.       (ID INT PRIMARY KEY     NOT NULL,
  5.       NAME           TEXT    NOT NULL,
  6.       AGE            INT     NOT NULL,
  7.       ADDRESS        CHAR(50),
  8.       SALARY         DECIMAL(9, 2));''')
  9. conn.commit()
  10. print("Table created successfully")
  11. conn.close()
复制代码
读取(Retrieve)
  1. conn = pyodbc.connect(conn_str)
  2. cursor = conn.cursor()
  3. cursor.execute("SELECT id, name, address, salary from Employees")
  4. rows = cursor.fetchall()
  5. for row in rows:
  6.     print("ID = ", row[0])
  7.     print("NAME = ", row[1])
  8.     print("ADDRESS = ", row[2])
  9.     print("SALARY = ", row[3])
  10. conn.close()
复制代码
更新(Update)
  1. conn = pyodbc.connect(conn_str)
  2. cursor = conn.cursor()
  3. cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
  4. conn.commit()
  5. print("Total number of rows updated :", cursor.rowcount)
  6. conn.close()
复制代码
删除(Delete)
  1. conn = pyodbc.connect(conn_str)
  2. cursor = conn.cursor()
  3. cursor.execute("DELETE from Employees where ID = 1")
  4. conn.commit()
  5. print("Total number of rows deleted :", cursor.rowcount)
  6. conn.close()
复制代码
Cassandra

连接数据库

Python可以使用cassandra-driver库连接Cassandra数据库:
  1. from cassandra.cluster import Cluster
  2. cluster = Cluster(['127.0.0.1'])
  3. session = cluster.connect('my_keyspace')
  4. print("Opened Cassandra database successfully")
  5. cluster.shutdown()
复制代码
CRUD操作

接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。
创建(Create)
  1. cluster = Cluster(['127.0.0.1'])
  2. session = cluster.connect('my_keyspace')
  3. session.execute("""
  4.     CREATE TABLE Employees (
  5.         id int PRIMARY KEY,
  6.         name text,
  7.         age int,
  8.         address text,
  9.         salary decimal
  10.     )
  11. """)
  12. print("Table created successfully")
  13. cluster.shutdown()
复制代码
读取(Retrieve)
  1. cluster = Cluster(['127.0.0.1'])
  2. session = cluster.connect('my_keyspace')
  3. rows = session.execute('SELECT id, name, address, salary FROM Employees')
  4. for row in rows:
  5.     print("ID = ", row.id)
  6.     print("NAME = ", row.name)
  7.     print("ADDRESS = ", row.address)
  8.     print("SALARY = ", row.salary)
  9. cluster.shutdown()
复制代码
更新(Update)
  1. cluster = Cluster(['127.0.0.1'])
  2. session = cluster.connect('my_keyspace')
  3. session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
  4. print("Row updated successfully")
  5. cluster.shutdown()
复制代码
删除(Delete)
  1. cluster = Cluster(['127.0.0.1'])
  2. session = cluster.connect('my_keyspace')
  3. session.execute("DELETE FROM Employees WHERE id = 1")
  4. print("Row deleted successfully")
  5. cluster.shutdown()
复制代码
Redis

连接数据库

Python可以使用redis-py库连接Redis数据库:
  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, db=0)
  3. print("Opened Redis database successfully")
复制代码
CRUD操作

接下来,我们将展示在Redis中如何进行基本的CRUD操作。
创建(Create)
  1. r = redis.Redis(host='localhost', port=6379, db=0)
  2. r.set('employee:1:name', 'John')
  3. r.set('employee:1:age', '30')
  4. r.set('employee:1:address', 'New York')
  5. r.set('employee:1:salary', '1000.00')
  6. print("Keys created successfully")
复制代码
读取(Retrieve)
  1. r = redis.Redis(host='localhost', port=6379, db=0)
  2. print("NAME = ", r.get('employee:1:name').decode('utf-8'))
  3. print("AGE = ", r.get('employee:1:age').decode('utf-8'))
  4. print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
  5. print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))
复制代码
更新(Update)
  1. r = redis.Redis(host='localhost', port=6379, db=0)
  2. r.set('employee:1:salary', '25000.00')
  3. print("Key updated successfully")
复制代码
删除(Delete)
  1. r = redis.Redis(host='localhost', port=6379, db=0)
  2. r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
  3. print("Keys deleted successfully")
复制代码
ElasticSearch

连接数据库

Python可以使用elasticsearch库连接ElasticSearch数据库:
  1. from elasticsearch import Elasticsearch
  2. es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  3. print("Opened ElasticSearch database successfully")
复制代码
CRUD操作

接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。
创建(Create)
  1. es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  2. employee = {
  3.     'name': 'John',
  4.     'age': 30,
  5.     'address': 'New York',
  6.     'salary': 1000.00
  7. }
  8. res = es.index(index='employees', doc_type='employee', id=1, body=employee)
  9. print("Document created successfully")
复制代码
读取(Retrieve)
  1. es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  2. res = es.get(index='employees', doc_type='employee', id=1)
  3. print("Document details:")
  4. for field, details in res['_source'].items():
  5.     print(f"{field.upper()} = ", details)
复制代码
更新(Update)
  1. es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  2. res = es.update(index='employees', doc_type='employee', id=1, body={
  3.     'doc': {
  4.         'salary': 25000.00
  5.     }
  6. })
  7. print("Document updated successfully")
复制代码
删除(Delete)
  1. es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
  2. res = es.delete(index='employees', doc_type='employee', id=1)
  3. print("Document deleted successfully")
复制代码
Neo4j

连接数据库

Python可以使用neo4j库连接Neo4j数据库:
  1. from neo4j import GraphDatabase
  2. driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
  3. print("Opened Neo4j database successfully")
  4. driver.close()
复制代码
CRUD操作

接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。
创建(Create)
  1. driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
  2. with driver.session() as session:
  3.     session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
  4. print("Node created successfully")
  5. driver.close()
复制代码
读取(Retrieve)
  1. driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
  2. with driver.session() as session:
  3.     result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
  4.     for record in result:
  5.         print("ID = ", record["n"]["id"])
  6.         print("NAME = ", record["n"]["name"])
  7.         print("ADDRESS = ", record["n"]["address"])
  8.         print("SALARY = ", record["n"]["salary"])
  9. driver.close()
复制代码
更新(Update)
  1. driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
  2. with driver.session() as session:
  3.     session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
  4. print("Node updated successfully")
  5. driver.close()
复制代码
删除(Delete)
  1. driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
  2. with driver.session() as session:
  3.     session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
  4. print("Node deleted successfully")
  5. driver.close()
复制代码
InfluxDB

连接数据库

Python可以使用InfluxDB-Python库连接InfluxDB数据库:
  1. from influxdb import InfluxDBClient
  2. client = InfluxDBClient(host='localhost', port=8086)
  3. print("Opened InfluxDB database successfully")
  4. client.close()
复制代码
CRUD操作

接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。
创建(Create)
  1. client = InfluxDBClient(host='localhost', port=8086)
  2. json_body = [
  3.     {
  4.         "measurement": "employees",
  5.         "tags": {
  6.             "id": "1"
  7.         },
  8.         "fields": {
  9.             "name": "John",
  10.             "age": 30,
  11.             "address": "New York",
  12.             "salary": 1000.00
  13.         }
  14.     }
  15. ]
  16. client.write_points(json_body)
  17. print("Point created successfully")
  18. client.close()
复制代码
读取(Retrieve)
  1. client = InfluxDBClient(host='localhost', port=8086)
  2. result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
  3. for point in result.get_points():
  4.     print("ID = ", point['id'])
  5.     print("NAME = ", point['name'])
  6.     print("AGE = ", point['age'])
  7.     print("ADDRESS = ", point['address'])
  8.     print("SALARY = ", point['salary'])
  9. client.close()
复制代码
更新(Update)

InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。
删除(Delete)

同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。
  1. client = InfluxDBClient(host='localhost', port=8086)
  2. # 删除整个系列
  3. client.query('DROP SERIES FROM "employees"')
  4. # 删除某个时间段的数据
  5. # client.query('DELETE FROM "employees" WHERE time < now() - 1d')
  6. print("Series deleted successfully")
  7. client.close()
复制代码
Snowflake

连接数据库

Python可以使用snowflake-connector-python库连接Snowflake数据库:
  1. from snowflake.connector import connect
  2. con = connect(
  3.     user='username',
  4.     password='password',
  5.     account='account_url',
  6.     warehouse='warehouse',
  7.     database='database',
  8.     schema='schema'
  9. )
  10. print("Opened Snowflake database successfully")
  11. con.close()
复制代码
CRUD操作

接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。
创建(Create)
  1. con = connect(
  2.     user='username',
  3.     password='password',
  4.     account='account_url',
  5.     warehouse='warehouse',
  6.     database='database',
  7.     schema='schema'
  8. )
  9. cur = con.cursor()
  10. cur.execute("""
  11. CREATE TABLE EMPLOYEES (
  12.     ID INT,
  13.     NAME STRING,
  14.     AGE INT,
  15.     ADDRESS STRING,
  16.     SALARY FLOAT
  17. )
  18. """)
  19. cur.execute("""
  20. INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
  21. (1, 'John', 30, 'New York', 1000.00)
  22. """)
  23. print("Table created and row inserted successfully")
  24. con.close()
复制代码
读取(Retrieve)
  1. con = connect(
  2.     user='username',
  3.     password='password',
  4.     account='account_url',
  5.     warehouse='warehouse',
  6.     database='database',
  7.     schema='schema'
  8. )
  9. cur = con.cursor()
  10. cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
  11. rows = cur.fetchall()
  12. for row in rows:
  13.     print("ID = ", row[0])
  14.     print("NAME = ", row[1])
  15.     print("AGE = ", row[2])
  16.     print("ADDRESS = ", row[3])
  17.     print("SALARY = ", row[4])
  18. con.close()
复制代码
更新(Update)
  1. con = connect(
  2.     user='username',
  3.     password='password',
  4.     account='account_url',
  5.     warehouse='warehouse',
  6.     database='database',
  7.     schema='schema'
  8. )
  9. cur = con.cursor()
  10. cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
  11. print("Row updated successfully")
  12. con.close()
复制代码
删除(Delete)
  1. con = connect(
  2.     user='username',
  3.     password='password',
  4.     account='account_url',
  5.     warehouse='warehouse',
  6.     database='database',
  7.     schema='schema'
  8. )
  9. cur = con.cursor()
  10. cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
  11. print("Row deleted successfully")
  12. con.close()
复制代码
Amazon DynamoDB

连接数据库

Python可以使用boto3库连接Amazon DynamoDB:
  1. import boto3
  2. dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
  3.                           aws_access_key_id='Your AWS Access Key',
  4.                           aws_secret_access_key='Your AWS Secret Key')
  5. print("Opened DynamoDB successfully")
复制代码
CRUD操作

接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。
创建(Create)
  1. table = dynamodb.create_table(
  2.     TableName='Employees',
  3.     KeySchema=[
  4.         {
  5.             'AttributeName': 'id',
  6.             'KeyType': 'HASH'
  7.         },
  8.     ],
  9.     AttributeDefinitions=[
  10.         {
  11.             'AttributeName': 'id',
  12.             'AttributeType': 'N'
  13.         },
  14.     ],
  15.     ProvisionedThroughput={
  16.         'ReadCapacityUnits': 5,
  17.         'WriteCapacityUnits': 5
  18.     }
  19. )
  20. table.put_item(
  21.    Item={
  22.         'id': 1,
  23.         'name': 'John',
  24.         'age': 30,
  25.         'address': 'New York',
  26.         'salary': 1000.00
  27.     }
  28. )
  29. print("Table created and item inserted successfully")
复制代码
读取(Retrieve)
  1. table = dynamodb.Table('Employees')
  2. response = table.get_item(
  3.    Key={
  4.         'id': 1,
  5.     }
  6. )
  7. item = response['Item']
  8. print(item)
复制代码
更新(Update)
  1. table = dynamodb.Table('Employees')
  2. table.update_item(
  3.     Key={
  4.         'id': 1,
  5.     },
  6.     UpdateExpression='SET salary = :val1',
  7.     ExpressionAttributeValues={
  8.         ':val1': 25000.00
  9.     }
  10. )
  11. print("Item updated successfully")
复制代码
删除(Delete)
  1. table = dynamodb.Table('Employees')
  2. table.delete_item(
  3.     Key={
  4.         'id': 1,
  5.     }
  6. )
  7. print("Item deleted successfully")
复制代码
Microsoft Azure CosMos DB

连接数据库

Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:
  1. from azure.cosmos import CosmosClient, PartitionKey, exceptions
  2. url = 'Cosmos DB Account URL'
  3. key = 'Cosmos DB Account Key'
  4. client = CosmosClient(url, credential=key)
  5. database_name = 'testDB'
  6. database = client.get_database_client(database_name)
  7. container_name = 'Employees'
  8. container = database.get_container_client(container_name)
  9. print("Opened CosMos DB successfully")
复制代码
CRUD操作

接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。
创建(Create)
  1. database = client.create_database_if_not_exists(id=database_name)
  2. container = database.create_container_if_not_exists(
  3.     id=container_name,
  4.     partition_key=PartitionKey(path="/id"),
  5.     offer_throughput=400
  6. )
  7. container.upsert_item({
  8.     'id': '1',
  9.     'name': 'John',
  10.     'age': 30,
  11.     'address': 'New York',
  12.     'salary': 1000.00
  13. })
  14. print("Container created and item upserted successfully")
复制代码
读取(Retrieve)
  1. for item in container.read_all_items():
  2.     print(item)
复制代码
更新(Update)
  1. for item in container.read_all_items():
  2.     if item['id'] == '1':
  3.         item['salary'] = 25000.00
  4.         container.upsert_item(item)
  5.         
  6. print("Item updated successfully")
复制代码
删除(Delete)
  1. for item in container.read_all_items():
  2.     if item['id'] == '1':
  3.         container.delete_item(item, partition_key='1')
  4.         
  5. print("Item deleted successfully")
复制代码
如有帮助,请多关注
个人微信公众号:【Python全视角】
TeahLead_KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。

来源:https://www.cnblogs.com/xfuture/p/17528203.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具