|
本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。
MySQL
连接数据库
Python可以使用mysql-connector-python库连接MySQL数据库:- import mysql.connector
- conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
- print("Opened MySQL database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在MySQL中如何进行基本的CRUD操作。
创建(Create)
- conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
- cursor = conn.cursor()
- cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 SQL Server
连接数据库
Python可以使用pyodbc库连接SQL Server数据库:- import pyodbc
- conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
- print("Opened SQL Server database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。
创建(Create)
- conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
- cursor = conn.cursor()
- cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
- conn.commit()
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 Oracle
连接数据库
Python可以使用cx_Oracle库连接Oracle数据库:- import cx_Oracle
- dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
- conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
- print("Opened Oracle database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在Oracle中如何进行基本的CRUD操作。
创建(Create)
- dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
- conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
- cursor = conn.cursor()
- cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")
- conn.commit()
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
- conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
- conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
- conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 PostgreSQL
连接数据库
Python可以使用psycopg2库连接PostgreSQL数据库:- import psycopg2
- conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
- print("Opened PostgreSQL database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。
创建(Create)
- conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
- cursor = conn.cursor()
- cursor.execute('''CREATE TABLE Employees
- (ID INT PRIMARY KEY NOT NULL,
- NAME TEXT NOT NULL,
- AGE INT NOT NULL,
- ADDRESS CHAR(50),
- SALARY REAL);''')
- conn.commit()
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 MongoDB
连接数据库
Python可以使用pymongo库连接MongoDB数据库:- from pymongo import MongoClient
- client = MongoClient("mongodb://localhost:27017/")
- db = client["my_database"]
- print("Opened MongoDB database successfully")
- client.close()
复制代码 CRUD操作
接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。
创建(Create)
在MongoDB中,文档的创建操作通常包含在插入操作中:- client = MongoClient("mongodb://localhost:27017/")
- db = client["my_database"]
- employees = db["Employees"]
- employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}
- employees.insert_one(employee)
- print("Document inserted successfully")
- client.close()
复制代码 读取(Retrieve)
- client = MongoClient("mongodb://localhost:27017/")
- db = client["my_database"]
- employees = db["Employees"]
- cursor = employees.find()
- for document in cursor:
- print(document)
- client.close()
复制代码 更新(Update)
- client = MongoClient("mongodb://localhost:27017/")
- db = client["my_database"]
- employees = db["Employees"]
- query = { "id": "1" }
- new_values = { "$set": { "salary": "25000.00" } }
- employees.update_one(query, new_values)
- print("Document updated successfully")
- client.close()
复制代码 删除(Delete)
- client = MongoClient("mongodb://localhost:27017/")
- db = client["my_database"]
- employees = db["Employees"]
- query = { "id": "1" }
- employees.delete_one(query)
- print("Document deleted successfully")
- client.close()
复制代码 SQLite
连接数据库
Python使用sqlite3库连接SQLite数据库:- import sqlite3
- conn = sqlite3.connect('my_database.db')
- print("Opened SQLite database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在SQLite中如何进行基本的CRUD操作。
创建(Create)
- conn = sqlite3.connect('my_database.db')
- cursor = conn.cursor()
- cursor.execute('''CREATE TABLE Employees
- (ID INT PRIMARY KEY NOT NULL,
- NAME TEXT NOT NULL,
- AGE INT NOT NULL,
- ADDRESS CHAR(50),
- SALARY REAL);''')
- conn.commit()
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- conn = sqlite3.connect('my_database.db')
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- conn = sqlite3.connect('my_database.db')
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- conn = sqlite3.connect('my_database.db')
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 DB2
连接数据库
Python可以使用ibm_db库连接DB2数据库:- import ibm_db
- dsn = (
- "DRIVER={{IBM DB2 ODBC DRIVER}};"
- "DATABASE=my_database;"
- "HOSTNAME=127.0.0.1;"
- "PORT=50000;"
- "PROTOCOL=TCPIP;"
- "UID=username;"
- "PWD=password;"
- )
- conn = ibm_db.connect(dsn, "", "")
- print("Opened DB2 database successfully")
- ibm_db.close(conn)
复制代码 CRUD操作
接下来,我们将展示在DB2中如何进行基本的CRUD操作。
创建(Create)
- conn = ibm_db.connect(dsn, "", "")
- sql = '''CREATE TABLE Employees
- (ID INT PRIMARY KEY NOT NULL,
- NAME VARCHAR(20) NOT NULL,
- AGE INT NOT NULL,
- ADDRESS CHAR(50),
- SALARY DECIMAL(9, 2));'''
- stmt = ibm_db.exec_immediate(conn, sql)
- print("Table created successfully")
- ibm_db.close(conn)
复制代码 读取(Retrieve)
- conn = ibm_db.connect(dsn, "", "")
- sql = "SELECT id, name, address, salary from Employees"
- stmt = ibm_db.exec_immediate(conn, sql)
- while ibm_db.fetch_row(stmt):
- print("ID = ", ibm_db.result(stmt, "ID"))
- print("NAME = ", ibm_db.result(stmt, "NAME"))
- print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS"))
- print("SALARY = ", ibm_db.result(stmt, "SALARY"))
- ibm_db.close(conn)
复制代码 更新(Update)
- conn = ibm_db.connect(dsn, "", "")
- sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"
- stmt = ibm_db.exec_immediate(conn, sql)
- ibm_db.commit(conn)
- print("Total number of rows updated :", ibm_db.num_rows(stmt))
- ibm_db.close(conn)
复制代码 删除(Delete)
- conn = ibm_db.connect(dsn, "", "")
- sql = "DELETE from Employees where ID = 1"
- stmt = ibm_db.exec_immediate(conn, sql)
- ibm_db.commit(conn)
- print("Total number of rows deleted :", ibm_db.num_rows(stmt))
- ibm_db.close(conn)
复制代码 Microsoft Access
连接数据库
Python可以使用pyodbc库连接Microsoft Access数据库:- import pyodbc
- conn_str = (
- r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
- r'DBQ=path_to_your_access_file.accdb;'
- )
- conn = pyodbc.connect(conn_str)
- print("Opened Access database successfully")
- conn.close()
复制代码 CRUD操作
接下来,我们将展示在Access中如何进行基本的CRUD操作。
创建(Create)
- conn = pyodbc.connect(conn_str)
- cursor = conn.cursor()
- cursor.execute('''CREATE TABLE Employees
- (ID INT PRIMARY KEY NOT NULL,
- NAME TEXT NOT NULL,
- AGE INT NOT NULL,
- ADDRESS CHAR(50),
- SALARY DECIMAL(9, 2));''')
- conn.commit()
- print("Table created successfully")
- conn.close()
复制代码 读取(Retrieve)
- conn = pyodbc.connect(conn_str)
- cursor = conn.cursor()
- cursor.execute("SELECT id, name, address, salary from Employees")
- rows = cursor.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("ADDRESS = ", row[2])
- print("SALARY = ", row[3])
- conn.close()
复制代码 更新(Update)
- conn = pyodbc.connect(conn_str)
- cursor = conn.cursor()
- cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
- conn.commit()
- print("Total number of rows updated :", cursor.rowcount)
- conn.close()
复制代码 删除(Delete)
- conn = pyodbc.connect(conn_str)
- cursor = conn.cursor()
- cursor.execute("DELETE from Employees where ID = 1")
- conn.commit()
- print("Total number of rows deleted :", cursor.rowcount)
- conn.close()
复制代码 Cassandra
连接数据库
Python可以使用cassandra-driver库连接Cassandra数据库:- from cassandra.cluster import Cluster
- cluster = Cluster(['127.0.0.1'])
- session = cluster.connect('my_keyspace')
- print("Opened Cassandra database successfully")
- cluster.shutdown()
复制代码 CRUD操作
接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。
创建(Create)
- cluster = Cluster(['127.0.0.1'])
- session = cluster.connect('my_keyspace')
- session.execute("""
- CREATE TABLE Employees (
- id int PRIMARY KEY,
- name text,
- age int,
- address text,
- salary decimal
- )
- """)
- print("Table created successfully")
- cluster.shutdown()
复制代码 读取(Retrieve)
- cluster = Cluster(['127.0.0.1'])
- session = cluster.connect('my_keyspace')
- rows = session.execute('SELECT id, name, address, salary FROM Employees')
- for row in rows:
- print("ID = ", row.id)
- print("NAME = ", row.name)
- print("ADDRESS = ", row.address)
- print("SALARY = ", row.salary)
- cluster.shutdown()
复制代码 更新(Update)
- cluster = Cluster(['127.0.0.1'])
- session = cluster.connect('my_keyspace')
- session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
- print("Row updated successfully")
- cluster.shutdown()
复制代码 删除(Delete)
- cluster = Cluster(['127.0.0.1'])
- session = cluster.connect('my_keyspace')
- session.execute("DELETE FROM Employees WHERE id = 1")
- print("Row deleted successfully")
- cluster.shutdown()
复制代码 Redis
连接数据库
Python可以使用redis-py库连接Redis数据库:- import redis
- r = redis.Redis(host='localhost', port=6379, db=0)
- print("Opened Redis database successfully")
复制代码 CRUD操作
接下来,我们将展示在Redis中如何进行基本的CRUD操作。
创建(Create)
- r = redis.Redis(host='localhost', port=6379, db=0)
- r.set('employee:1:name', 'John')
- r.set('employee:1:age', '30')
- r.set('employee:1:address', 'New York')
- r.set('employee:1:salary', '1000.00')
- print("Keys created successfully")
复制代码 读取(Retrieve)
- r = redis.Redis(host='localhost', port=6379, db=0)
- print("NAME = ", r.get('employee:1:name').decode('utf-8'))
- print("AGE = ", r.get('employee:1:age').decode('utf-8'))
- print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
- print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))
复制代码 更新(Update)
- r = redis.Redis(host='localhost', port=6379, db=0)
- r.set('employee:1:salary', '25000.00')
- print("Key updated successfully")
复制代码 删除(Delete)
- r = redis.Redis(host='localhost', port=6379, db=0)
- r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
- print("Keys deleted successfully")
复制代码 ElasticSearch
连接数据库
Python可以使用elasticsearch库连接ElasticSearch数据库:- from elasticsearch import Elasticsearch
- es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
- print("Opened ElasticSearch database successfully")
复制代码 CRUD操作
接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。
创建(Create)
- es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
- employee = {
- 'name': 'John',
- 'age': 30,
- 'address': 'New York',
- 'salary': 1000.00
- }
- res = es.index(index='employees', doc_type='employee', id=1, body=employee)
- print("Document created successfully")
复制代码 读取(Retrieve)
- es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
- res = es.get(index='employees', doc_type='employee', id=1)
- print("Document details:")
- for field, details in res['_source'].items():
- print(f"{field.upper()} = ", details)
复制代码 更新(Update)
- es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
- res = es.update(index='employees', doc_type='employee', id=1, body={
- 'doc': {
- 'salary': 25000.00
- }
- })
- print("Document updated successfully")
复制代码 删除(Delete)
- es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
- res = es.delete(index='employees', doc_type='employee', id=1)
- print("Document deleted successfully")
复制代码 Neo4j
连接数据库
Python可以使用neo4j库连接Neo4j数据库:- from neo4j import GraphDatabase
- driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
- print("Opened Neo4j database successfully")
- driver.close()
复制代码 CRUD操作
接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。
创建(Create)
- driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
- with driver.session() as session:
- session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
- print("Node created successfully")
- driver.close()
复制代码 读取(Retrieve)
- driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
- with driver.session() as session:
- result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
- for record in result:
- print("ID = ", record["n"]["id"])
- print("NAME = ", record["n"]["name"])
- print("ADDRESS = ", record["n"]["address"])
- print("SALARY = ", record["n"]["salary"])
- driver.close()
复制代码 更新(Update)
- driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
- with driver.session() as session:
- session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
- print("Node updated successfully")
- driver.close()
复制代码 删除(Delete)
- driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
- with driver.session() as session:
- session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
- print("Node deleted successfully")
- driver.close()
复制代码 InfluxDB
连接数据库
Python可以使用InfluxDB-Python库连接InfluxDB数据库:- from influxdb import InfluxDBClient
- client = InfluxDBClient(host='localhost', port=8086)
- print("Opened InfluxDB database successfully")
- client.close()
复制代码 CRUD操作
接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。
创建(Create)
- client = InfluxDBClient(host='localhost', port=8086)
- json_body = [
- {
- "measurement": "employees",
- "tags": {
- "id": "1"
- },
- "fields": {
- "name": "John",
- "age": 30,
- "address": "New York",
- "salary": 1000.00
- }
- }
- ]
- client.write_points(json_body)
- print("Point created successfully")
- client.close()
复制代码 读取(Retrieve)
- client = InfluxDBClient(host='localhost', port=8086)
- result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
- for point in result.get_points():
- print("ID = ", point['id'])
- print("NAME = ", point['name'])
- print("AGE = ", point['age'])
- print("ADDRESS = ", point['address'])
- print("SALARY = ", point['salary'])
- client.close()
复制代码 更新(Update)
InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。
删除(Delete)
同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。- client = InfluxDBClient(host='localhost', port=8086)
- # 删除整个系列
- client.query('DROP SERIES FROM "employees"')
- # 删除某个时间段的数据
- # client.query('DELETE FROM "employees" WHERE time < now() - 1d')
- print("Series deleted successfully")
- client.close()
复制代码 Snowflake
连接数据库
Python可以使用snowflake-connector-python库连接Snowflake数据库:- from snowflake.connector import connect
- con = connect(
- user='username',
- password='password',
- account='account_url',
- warehouse='warehouse',
- database='database',
- schema='schema'
- )
- print("Opened Snowflake database successfully")
- con.close()
复制代码 CRUD操作
接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。
创建(Create)
- con = connect(
- user='username',
- password='password',
- account='account_url',
- warehouse='warehouse',
- database='database',
- schema='schema'
- )
- cur = con.cursor()
- cur.execute("""
- CREATE TABLE EMPLOYEES (
- ID INT,
- NAME STRING,
- AGE INT,
- ADDRESS STRING,
- SALARY FLOAT
- )
- """)
- cur.execute("""
- INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
- (1, 'John', 30, 'New York', 1000.00)
- """)
- print("Table created and row inserted successfully")
- con.close()
复制代码 读取(Retrieve)
- con = connect(
- user='username',
- password='password',
- account='account_url',
- warehouse='warehouse',
- database='database',
- schema='schema'
- )
- cur = con.cursor()
- cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
- rows = cur.fetchall()
- for row in rows:
- print("ID = ", row[0])
- print("NAME = ", row[1])
- print("AGE = ", row[2])
- print("ADDRESS = ", row[3])
- print("SALARY = ", row[4])
- con.close()
复制代码 更新(Update)
- con = connect(
- user='username',
- password='password',
- account='account_url',
- warehouse='warehouse',
- database='database',
- schema='schema'
- )
- cur = con.cursor()
- cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
- print("Row updated successfully")
- con.close()
复制代码 删除(Delete)
- con = connect(
- user='username',
- password='password',
- account='account_url',
- warehouse='warehouse',
- database='database',
- schema='schema'
- )
- cur = con.cursor()
- cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
- print("Row deleted successfully")
- con.close()
复制代码 Amazon DynamoDB
连接数据库
Python可以使用boto3库连接Amazon DynamoDB:- import boto3
- dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
- aws_access_key_id='Your AWS Access Key',
- aws_secret_access_key='Your AWS Secret Key')
- print("Opened DynamoDB successfully")
复制代码 CRUD操作
接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。
创建(Create)
- table = dynamodb.create_table(
- TableName='Employees',
- KeySchema=[
- {
- 'AttributeName': 'id',
- 'KeyType': 'HASH'
- },
- ],
- AttributeDefinitions=[
- {
- 'AttributeName': 'id',
- 'AttributeType': 'N'
- },
- ],
- ProvisionedThroughput={
- 'ReadCapacityUnits': 5,
- 'WriteCapacityUnits': 5
- }
- )
- table.put_item(
- Item={
- 'id': 1,
- 'name': 'John',
- 'age': 30,
- 'address': 'New York',
- 'salary': 1000.00
- }
- )
- print("Table created and item inserted successfully")
复制代码 读取(Retrieve)
- table = dynamodb.Table('Employees')
- response = table.get_item(
- Key={
- 'id': 1,
- }
- )
- item = response['Item']
- print(item)
复制代码 更新(Update)
- table = dynamodb.Table('Employees')
- table.update_item(
- Key={
- 'id': 1,
- },
- UpdateExpression='SET salary = :val1',
- ExpressionAttributeValues={
- ':val1': 25000.00
- }
- )
- print("Item updated successfully")
复制代码 删除(Delete)
- table = dynamodb.Table('Employees')
- table.delete_item(
- Key={
- 'id': 1,
- }
- )
- print("Item deleted successfully")
复制代码 Microsoft Azure CosMos DB
连接数据库
Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:- from azure.cosmos import CosmosClient, PartitionKey, exceptions
- url = 'Cosmos DB Account URL'
- key = 'Cosmos DB Account Key'
- client = CosmosClient(url, credential=key)
- database_name = 'testDB'
- database = client.get_database_client(database_name)
- container_name = 'Employees'
- container = database.get_container_client(container_name)
- print("Opened CosMos DB successfully")
复制代码 CRUD操作
接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。
创建(Create)
- database = client.create_database_if_not_exists(id=database_name)
- container = database.create_container_if_not_exists(
- id=container_name,
- partition_key=PartitionKey(path="/id"),
- offer_throughput=400
- )
- container.upsert_item({
- 'id': '1',
- 'name': 'John',
- 'age': 30,
- 'address': 'New York',
- 'salary': 1000.00
- })
- print("Container created and item upserted successfully")
复制代码 读取(Retrieve)
- for item in container.read_all_items():
- print(item)
复制代码 更新(Update)
- for item in container.read_all_items():
- if item['id'] == '1':
- item['salary'] = 25000.00
- container.upsert_item(item)
-
- print("Item updated successfully")
复制代码 删除(Delete)
- for item in container.read_all_items():
- if item['id'] == '1':
- container.delete_item(item, partition_key='1')
-
- print("Item deleted successfully")
复制代码如有帮助,请多关注
个人微信公众号:【Python全视角】
TeahLead_KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。
来源:https://www.cnblogs.com/xfuture/p/17528203.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|