翼度科技»论坛 编程开发 python 查看内容

在 SQLAlchemy 中实现数据处理的时候,实现表自引用、多对多、联合查询,有

6

主题

6

帖子

18

积分

新手上路

Rank: 1

积分
18
有时候,我们在使用SQLAlchemy操作某些表的时候,需要使用外键关系来实现一对多或者多对多的关系引用,以及对多表的联合查询,有序列的uuid值或者自增id值,字符串的分拆等常见处理操作。
1、在 SQLAlchemy 中定义具有嵌套 children 关系的表

要在 SQLAlchemy 中定义具有嵌套 children 关系的表,如表中包含 id 和 pid 字段,可以使用 relationship 和 ForeignKey 来建立父子关系。
首先,你需要定义一个模型类,其中包含 id 和 pid 字段。id 是主键,pid 是指向父记录的外键。然后,你使用 relationship 来建立父子关系。
  1. from sqlalchemy import Column, Integer, String, ForeignKey
  2. from sqlalchemy.orm import relationship, declarative_base
  3. from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
  4. from sqlalchemy.orm import sessionmaker
  5. Base = declarative_base()
  6. class DictTypeInfo(Base):
  7.     __tablename__ = 'dict_type_info'
  8.    
  9.     id = Column(Integer, primary_key=True, index=True)
  10.     name = Column(String, index=True)
  11.     code = Column(String)
  12.     remark = Column(String)
  13.     seq = Column(Integer)
  14.     pid = Column(Integer, <strong>ForeignKey('dict_type_info.id')</strong>)  # 外键指向父节点的 id
  15.     # 定义 parent 关系
  16.     <strong>parent</strong> = <strong>relationship</strong>("DictTypeInfo", remote_side=[id], back_populates="children")
  17.     # 定义 children 关系
  18.     <strong>children</strong> = <strong>relationship</strong>("DictTypeInfo", back_populates="parent")
复制代码
例子使用代码如下所示。
  1. # 创建异步引擎和会话
  2. DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
  3. engine = create_async_engine(DATABASE_URL, echo=True)
  4. AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
  5. async def init_db():
  6.     async with engine.begin() as conn:
  7.         await conn.run_sync(Base.metadata.create_all)
  8. # 示例:如何插入数据并进行查询
  9. async def example_usage():
  10.     async with AsyncSessionLocal() as session:
  11.         async with session.begin():
  12.             # 插入数据
  13.             parent_node = DictTypeInfo(name="Parent", code="P001", remark="Parent Node", seq=1)
  14.             child_node1 = DictTypeInfo(name="Child1", code="C001", remark="First Child", seq=1, <strong>parent=</strong><strong>parent_node</strong>)
  15.             child_node2 = DictTypeInfo(name="Child2", code="C002", remark="Second Child", seq=2, <strong>parent=</strong><strong>parent_node</strong>)
  16.             session.add(parent_node)
  17.             session.add(child_node1)
  18.             session.add(child_node2)
  19.             
  20.         # 查询数据
  21.         async with session.begin():
  22.             result = await session.execute(
  23.                 "SELECT * FROM dict_type_info WHERE pid IS NULL"
  24.             )
  25.             parent_nodes = result.scalars().all()
  26.             for node in parent_nodes:
  27.                 print(f"Parent Node: {node.name}, <strong>Children</strong>: {<strong>[child.name for child in node.children]</strong>}")
复制代码
代码说明


  • 定义模型类 (DictTypeInfo):

    • id: 主键。
    • pid: 外键,指向同一个表的 id,表示父节点。
    • parent: 父关系,通过 remote_side 设定本模型的外键指向自身的主键。
    • children: 子关系,back_populates 用于双向关系的映射。

  • 创建异步引擎和会话:

    • 使用 create_async_engine 和 AsyncSession 创建数据库引擎和会话,以支持异步操作。

  • 插入和查询数据:

    • 插入数据示例展示了如何创建父节点和子节点,并将子节点关联到父节点。
    • 查询数据示例展示了如何查询所有父节点以及它们的子节点。

注意事项


  • remote_side: 在 relationship 中,remote_side 是指定哪些字段是远程的一方(即子节点关系的目标)。
  • 确保在模型中定义了正确的外键约束。在你提供的模型中,pid 列需要指向同一表中的 id 列。确保 ForeignKey 设置正确。
  • 异步操作: 使用 AsyncSession 和 asyncio 进行异步数据库操作。
  • 创建表: 在初始化数据库时,确保表结构是正确的。
要使用 selectinload 加载某个 pid 下的对象及其子列表,可以通过 SQLAlchemy 的 selectinload 来优化加载子关系。selectinload 可以减少 SQL 查询的数量,特别是在加载具有层次结构的数据时。
  1. async def get_tree(pid: int):
  2.     async with AsyncSessionLocal() as session:
  3.         # 通过 selectinload 加载所有子节点
  4.         stmt = select(DictTypeInfo).filter(DictTypeInfo.pid == pid).options(<strong>selectinload(DictTypeInfo.children)</strong>)
  5.         result = await session.execute(stmt)
  6.         nodes = result.scalars().all()
  7.         
  8.         return nodes
复制代码
这样,调用 get_tree 函数获取指定 pid 的节点及其子节点,代码如下。
  1. async def example_usage():
  2.     nodes = <strong>a<strong>w</strong>ait get_tree(pid=1)
  3.     </strong>for node in nodes:
  4.         print(f"Node: {node.name}, Children: {[child.name for child in node.children]}")
复制代码
selectinload: selectinload 可以减少 N+1 查询问题,它通过一条额外的查询来加载相关对象。这适合用于层次结构数据的加载。通过这种方式,你可以使用 SQLAlchemy 的 selectinload 来高效地加载具有父子关系的对象,并优化数据库查询性能。
 
同样,我们在 SQLAlchemy 中实现多对多关系也是类似的处理方式。
 在 SQLAlchemy 中,实现多对多关系通常需要创建一个关联表(association table),该表将存储两个相关联表的外键,从而实现多对多关系。以下是一个实现多对多关系的详细步骤。
1) 定义多对多关系的关联表

首先,需要定义一个关联表,该表包含两个外键,分别指向两端的主表。这通常使用 Table 对象来实现。
  1. from sqlalchemy import Table, Column, Integer, ForeignKey
  2. from sqlalchemy.ext.declarative import declarative_base
  3. Base = declarative_base()
  4. <strong>association_table</strong> = Table('association', Base.metadata,
  5.     Column('left_id', Integer, ForeignKey('left_table.id')),
  6.     Column('right_id', Integer, ForeignKey('right_table.id'))
  7. )
复制代码
在这个例子中,association_table 是一个包含两个外键的中间表:left_id 和 right_id 分别指向 left_table 和 right_table 的主键。
2)定义两端的模型并添加关系

在两端的模型中,使用 relationship 来定义多对多关系,并指定 secondary 参数为关联表。
  1. from sqlalchemy.orm import relationship
  2. class<strong> LeftModel</strong>(Base):
  3.     __tablename__ = 'left_table'
  4.     id = Column(Integer, primary_key=True)
  5.     name = Column(String(50))
  6.     rights = <strong>relationship</strong>("RightModel", secondary=association_table, back_populates="lefts")
  7. class<strong> RightModel</strong>(Base):
  8.     __tablename__ = 'right_table'
  9.     id = Column(Integer, primary_key=True)
  10.     name = Column(String(50))
  11.     lefts = <strong>relationship</strong>("LeftModel", secondary=association_table, back_populates="rights")
复制代码

  • rights 是 LeftModel 中定义的关系属性,它将连接到 RightModel。
  • lefts 是 RightModel 中定义的关系属性,它将连接到 LeftModel。
  • secondary=association_table 告诉 SQLAlchemy 使用 association_table 作为连接表。
  • back_populates 用于双向关系的对称引用。
3)创建数据库并插入数据

下面的代码展示了如何创建数据库、插入数据并查询多对多关系。
  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import sessionmaker
  3. # 创建数据库引擎
  4. engine = create_engine('sqlite:///example.db')
  5. Base.metadata.create_all(engine)
  6. Session = sessionmaker(bind=engine)
  7. session = Session()
  8. # 创建模型实例
  9. left1 = LeftModel(name="Left 1")
  10. right1 = RightModel(name="Right 1")
  11. right2 = RightModel(name="Right 2")
  12. # 设置多对多关系
  13. left1.rights = [right1, right2]
  14. # 添加到会话并提交
  15. session.add(left1)
  16. session.commit()
  17. # 查询并打印关系
  18. for right in left1.rights:
  19.     print(right.name)  # 输出: Right 1, Right 2
  20. for left in right1.lefts:
  21.     print(left.name)  # 输出: Left 1
复制代码
你可以像操作普通列表一样来处理这些关系,例如添加、删除关联等:
  1. # 添加关系
  2. left1.rights.append(RightModel(name="Right 3"))
  3. session.commit()
  4. # 删除关系
  5. left1.rights.remove(right2)
  6. session.commit()
复制代码
 通过这些步骤,你可以在 SQLAlchemy 中实现和操作多对多关系。
 
2、在 SQLAlchemy 中联合多个表进行记录关联查询

例如,在我的框架中,字典大类和字典项目是不同的表进管理的,因此如果需要根据大类名称进行字典项目的查询,那么就需要联合两个表进行处理。
具体操作如下:创建一个查询,将 DictDataInfo 表与 DictTypeInfo 表联接(通过 DictType_ID 和 Id 列)
  1. from sqlalchemy.future import select
  2. from sqlalchemy.orm import aliased
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy.ext.asyncio import create_async_engine
  5. from sqlalchemy.orm import sessionmaker
  6. # 假设你的数据库模型是 DictDataInfo 和 DictTypeInfo
  7. # 需要提前定义好这两个模型类
  8. DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"
  9. engine = create_async_engine(DATABASE_URL, echo=True)
  10. AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
  11. async def<strong> get_dict_data(dict_type_name: str):</strong>
  12.     async with AsyncSessionLocal() as session:
  13.         # 创建别名
  14.         DictData = aliased(DictDataInfo)
  15.         DictType = aliased(DictTypeInfo)
  16.         # 联合查询并根据条件过滤
  17.         stmt = (
  18.             select(DictData)
  19.             .join(DictType, DictData.DictType_ID == DictType.id)
  20.             .filter(DictType.name == dict_type_name)
  21.         )
  22.         result = await session.execute(stmt)
  23.         dict_data = result.scalars().all()
  24.         return dict_data
  25. # 示例用法
  26. import asyncio
  27. async def example_usage():
  28.     dict_type_name = "some_type_name"
  29.     dict_data = await <strong>get_dict_data</strong>(dict_type_name)
  30.     for data in dict_data:
  31.         print(data)
复制代码
代码说明


  • aliased: 使用 aliased 创建表的别名,这样可以方便地在查询中引用这些表。
  • join: 使用 join 进行表连接。这里 DictDataInfo 表的 DictType_ID 列与 DictTypeInfo 表的 id 列连接。
  • filter: 使用 filter 来添加条件筛选,筛选出 DictTypeInfo 表中 name 列等于 dict_type_name 的记录。
  • select: 使用 select 语句来选择 DictDataInfo 表中的记录,这对应于 Select(d => d)。
  • 异步操作: 由于使用的是 SQLAlchemy 的异步模式,所有数据库操作都在 async with 和 await 语句中进行,以确保异步执行。
如果我们需要将获得的数据进行对象转换,我们可以使用下面的处理代码实现。
  1. # 定义 CListItem 类
  2. class<strong> CListItem</strong>:
  3.     def __init__(self, name, value):
  4.         self.name = name
  5.         self.value = value
  6. # 定义示例列表和转换操作
  7. def convert_list_items(list_items):
  8.     dict_list = []
  9.     if list_items:  # 确保 list_items 不是 None
  10.         for info in list_items.Items:
  11.             <strong>dict_list.append</strong>(CListItem(info.Name, info.Value))
  12.     return dict_list
复制代码
 
3、使用sqlalchemy插入数据的时候,如何判断为非自增类型的时候,id赋值一个有序列的uuid值

有时候,我们的数据表主键是用字符串的,这种适用于很广的用途,比较容易在插入的时候就确定好id键的值,从而可以处理相关的内容。
但是,有时候我们可以让后端进行确定一个有序的ID值,那么使用SQLAlchemy 我们应该如何实现?
首先,确保你已经导入了 uuid 库,这是用于生成 UUID 的 Python 标准库。
有序 UUID 通常是基于时间的 UUID。你可以使用 uuid.uuid1() 来生成基于时间的 UUID。
  1. def generate_sequential_uuid():
  2.     return uuid.uuid1()  # 基于时间生成有序UUID
复制代码
在定义 SQLAlchemy 模型时,可以将 id 字段设置为使用该函数生成的 UUID。通常在模型中通过 default 参数设置默认值。
  1. from sqlalchemy import Column, String
  2. from sqlalchemy.ext.declarative import declarative_base
  3. Base = declarative_base()
  4. class MyModel(Base):
  5.     __tablename__ = 'my_table'
  6.     id = Column(String(36), primary_key=True, default=generate_sequential_uuid, nullable=False)
  7.     # 其他字段...
复制代码
在插入新数据时,如果 id 字段为空,它将自动使用 generate_sequential_uuid 函数生成一个基于时间的 UUID。
这样就可以确保在插入数据时,非自增类型的 id 字段会被赋值为一个有序列的 UUID 值。
对于自增的整型 id,SQLAlchemy 提供了自动处理机制。你只需要在模型中将 id 字段定义为 Integer 类型,并设置 primary_key=True,SQLAlchemy 就会自动为该字段设置自增属性。
  1. from sqlalchemy import Column, Integer, String
  2. from sqlalchemy.ext.declarative import declarative_base
  3. Base = declarative_base()
  4. class MyModel(Base):
  5.     __tablename__ = 'my_table'
  6.     id = Column(Integer, primary_key=True, autoincrement=True)
  7.     name = Column(String(50))
  8.     # 其他字段..
复制代码
默认情况下,SQLAlchemy 会使用数据库的原生自增机制(如 MySQL 的 AUTO_INCREMENT 或 PostgreSQL 的 SERIAL)。如果你需要使用自定义的自增策略,可以通过设置 Sequence 来实现(适用于支持 Sequence 的数据库,如 PostgreSQL)。
  1. from sqlalchemy import Sequence
  2. class MyModel(Base):
  3.     __tablename__ = 'my_table'
  4.     id = Column(Integer, Sequence('my_sequence'), primary_key=True)
  5.     name = Column(String(50))
复制代码
在上述代码中,Sequence('my_sequence') 定义了一个序列,SQLAlchemy 将使用该序列生成自增的 id 值。
通过这些步骤,你可以轻松处理整型自增 id 字段,SQLAlchemy 会自动为每个新记录分配唯一的自增 id。
 
4、在插入记录的时候,对字符串的数据处理

在批量插入数据字典的时候,我希望根据用户输入内容(多行数据)进行转化,把每行的数据分拆进行判断,如果符合条件的进行处理插入。
在 Python 中,可以使用字符串的 splitlines() 方法来实现相同的功能。
  1. # 假设 Data 和 input.Seq 是从输入中获取的
  2. Data = "example\nline1\nline2\n"  # 示例数据
  3. input_seq = "123"  # 示例序列字符串
  4. # 将 Data 按行拆分,并移除空行
  5. array_items = [line for line in Data.splitlines() if line]
  6. # 初始化变量
  7. int_seq = -1
  8. seq_length = 3
  9. str_seq = input_seq
  10. # 尝试将 str_seq 转换为整数
  11. if str_seq.isdigit():
  12.     int_seq = int(str_seq)
  13.     seq_length = len(str_seq)
  14. # 打印结果
  15. print(f"Array Items: {array_items}")
  16. print(f"int_seq: {int_seq}")
  17. print(f"seq_length: {seq_length}")
复制代码

  • Python 的 splitlines() 方法将字符串按行分割,同时自动处理各种换行符(包括 \n 和 \r\n)。
  • 列表推导式 [line for line in Data.splitlines() if line] 移除了空行,类似于 C# 中的 StringSplitOptions.RemoveEmptyEntries。
  • 使用 str_seq.isdigit() 检查 str_seq 是否全部由数字组成,这类似于 C# 的 int.TryParse。
在 Python 中,可以使用 re.split() 函数来按照正则表达式分割字符串。以下是对应的 Python 代码:
  1. import re
  2. # 假设 info 是一个包含 Name 和 Value 属性的对象
  3. class Info:
  4.     def __init__(self):
  5.         self.Name = ""
  6.         self.Value = ""
  7. info = Info()
  8. # dictData 是输入的字符串
  9. dict_data = "example_name example_value"
  10. # 使用正则表达式按照空白字符分割字符串
  11. array = re.split(r'\s+', dict_data)
  12. # 赋值给 info 对象的属性
  13. info.Name = array[0]
  14. info.Value = array[1] if len(array) > 1 else array[0]
  15. # 打印结果
  16. print(f"Name: {info.Name}")
  17. print(f"Value: {info.Value}")
复制代码
使用 re.split() 函数根据空白字符(包括空格、制表符等)分割字符串 dict_data。r'\s+' 是一个正则表达式,表示一个或多个空白字符。
如果你需要根据多个分隔符来分割字符串,同样可以使用正则表达式(re 模块)的 re.split() 方法。
  1. str_item = " 1,2,3;4;5/6/7、8、9;10 "
  2. import re
  3. result = re.split(r"[;,|/,;、]+", str_item.strip())
  4. print(result)
复制代码
结果输出:['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
解释:


  • re.split(r'[;,|/,;、]', text) 中的 r'[;,|/,;、]' 是一个正则表达式模式:
    [] 表示字符类,表示匹配字符类中的任意一个字符。
    ;,|/,;、 分别表示分号、逗号,竖线,中文逗号,中文分号,和空格,这些字符都将作为分隔符。
使用正则表达式可以灵活处理多个分隔符,适用于更复杂的分割需求。
 

来源:https://www.cnblogs.com/wuhuacong/p/18380616
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具