翼度科技»论坛 编程开发 python 查看内容

在 SQLAlchemy 中对数据异步处理的时候,获得关联集合的处理方式

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
我们在定义SQLAlchemy对象模型的关系的时候,用到了relationship 来标识关系,其中 lazy 的参数有多种不同的加载策略,本篇随笔介绍它们之间的关系,以及在异步处理中的一些代码案例。
1、在 SQLAlchemy 中定义关系

在 SQLAlchemy 中,relationship() 函数用于定义表之间的关系(如 one-to-many、many-to-one、many-to-many 等)。它支持许多参数来控制如何加载和处理关联的数据。以下是一些常用的 relationship() 参数及其说明:
1. lazy


  • 作用: 控制如何加载关联数据。
  • 可选值:

    • 'select': 延迟加载。访问关系属性时,发送一个独立的查询来获取关联数据(默认值)。
    • 'selectin': 使用 IN 查询批量加载关联对象,避免 n+1 查询问题。
    • 'joined': 使用 JOIN 直接在主查询中加载关联数据。
    • 'subquery': 使用子查询来批量加载关联对象。
    • 'immediate': 在加载主对象后,立即加载关联对象。
    • 'dynamic': 仅适用于 one-to-many,返回一个查询对象,可以进一步过滤或操作关联数据。

  • 详细说明
  在SQLAlchemy中,lazy是一个定义ORM关系如何加载的参数,主要用于控制关联关系(如one-to-many、many-to-one等)在访问时的加载方式。
1)lazy='select' (默认)

  • 说明: 这是最常见的方式,使用"延迟加载"策略。当访问关联属性时,SQLAlchemy会发送一条新的SQL查询来加载相关数据。
  • 优点: 避免了不必要的查询,节省资源。
  • 缺点: 当你访问多个关联对象时,可能会导致"n+1查询问题",即每次访问关联数据时都会发出新的SQL查询。
2) lazy='selectin'

  • 说明: 类似于lazy='select',但通过IN语句批量查询相关对象。SQLAlchemy会在一次查询中批量获取多个对象的关联数据,而不是为每个对象单独查询。
  • 优点: 解决了"n+1查询问题",效率高于select。
  • 缺点: 适用于可以通过IN语句高效查询的场景,但如果结果集非常大,可能会影响性能。
3) lazy='joined'

  • 说明: 在主查询时,使用JOIN语句直接加载关联对象。这意味着关联对象在查询时就会被立即加载,不需要额外的查询。
  • 优点: 避免了多个SQL查询,适合在同一查询中需要大量关联数据的场景。
  • 缺点: 如果JOIN的表数据较多,可能会导致查询结果变得复杂且性能下降。
4)lazy='immediate'

  • 说明: 在加载主对象时,立即加载所有关联对象。与select类似,但是在主对象加载后,马上发送查询请求加载关联对象。
  • 优点: 保证在对象加载后立刻有完整的数据。
  • 缺点: 对每个关联的对象仍然会发送单独的查询,可能造成"n+1查询问题"。
5)lazy='subquery'

  • 说明: 使用子查询来加载关联对象。SQLAlchemy会在查询主对象时生成一个子查询,以批量加载相关对象。
  • 优点: 避免了"n+1查询问题",适合处理大型数据集。
  • 缺点: 子查询可能会导致查询效率降低,特别是在复杂的查询场景中。
6)lazy='dynamic'

  • 说明: 仅适用于one-to-many关系,返回一个查询对象,而不是实际的结果集。你可以通过调用查询对象来进一步过滤或操作关联对象。
  • 优点: 非常灵活,可以根据需要随时查询关联对象。
  • 缺点: 不能使用通常的方式访问关联属性,必须通过查询进一步获取数据。
 
2. backref


  • 作用: 定义反向引用,允许从关联表访问当前表。
  • 用法: 通过 backref,可以在关联的表中自动生成一个反向关系,避免手动定义双向关系。
  • 示例:
  1. class Parent(Base):
  2.     __tablename__ = 'parent'
  3.     id = Column(Integer, primary_key=True)
  4.     children = relationship("Child", backref="parent")
  5. class Child(Base):
  6.     __tablename__ = 'child'
  7.     id = Column(Integer, primary_key=True)
  8.     parent_id = Column(Integer, ForeignKey('parent.id'))
复制代码
3. back_populates


  • 作用: 手动定义双向关系时,使用 back_populates 来明确地表示两个表之间的相互关系。
  • 示例:
  1. class Parent(Base):
  2.     __tablename__ = 'parent'
  3.     id = Column(Integer, primary_key=True)
  4.     children = relationship("Child", back_populates="parent")
  5. class Child(Base):
  6.     __tablename__ = 'child'
  7.     id = Column(Integer, primary_key=True)
  8.     parent_id = Column(Integer, ForeignKey('parent.id'))
  9.     parent = relationship("Parent", back_populates="children")
复制代码
4. cascade


  • 作用: 定义级联操作,决定在父对象上进行操作时,是否自动对关联的子对象进行相应操作。
  • 常见值:

    • 'save-update': 当父对象被保存或更新时,子对象也会被保存或更新。
    • 'delete': 当父对象被删除时,子对象也会被删除。
    • 'delete-orphan': 当子对象失去与父对象的关联时,子对象将被删除。
    • 'all': 包含所有级联操作。

  • 示例:
  1. children = relationship("Child", cascade="all, delete-orphan")
复制代码
5. uselist


  • 作用: 控制关联属性是否返回一个列表。适用于 one-to-one 和 one-to-many 关系。
  • 用法:

    • True: 返回一个列表(适用于 one-to-many,默认值)。
    • False: 返回单个对象(适用于 one-to-one)。

  • 示例:
  1. parent = relationship("Parent", uselist=False)  # one-to-one 关系
复制代码
6. order_by


  • 作用: 定义关联对象的排序方式。
  • 示例:
  1. children = relationship("Child", order_by="Child.name")
复制代码
7. foreign_keys


  • 作用: 显式指定哪些列是用于定义关联关系的外键,适用于存在多个外键的场景。
  • 示例:
  1. parent = relationship("Parent", foreign_keys="[Child.parent_id]")
复制代码
8. primaryjoin


  • 作用: 明确定义关联关系的连接条件,通常在 SQLAlchemy 无法自动推断时使用。
  • 示例:
  1. parent = relationship("Parent", primaryjoin="Parent.id == Child.parent_id")
复制代码
9. secondary


  • 作用: 定义多对多(many-to-many)关系时,指定关联的中间表。
  • 示例:
  1. class Association(Base):
  2.     __tablename__ = 'association'
  3.     parent_id = Column(Integer, ForeignKey('parent.id'))
  4.     child_id = Column(Integer, ForeignKey('child.id'))
  5. children = relationship("Child", secondary="association")
复制代码
10. secondaryjoin


  • 作用: 定义 secondary 表中的关联条件,通常用于复杂的多对多关系。
  • 示例:
  1. children = relationship("Child", secondary="association",
  2.                         secondaryjoin="Child.id == Association.child_id")
复制代码
11. viewonly


  • 作用: 定义只读的关系,不允许通过此关系修改数据。
  • 示例:
  1. children = relationship("Child", viewonly=True)
复制代码
12. passive_deletes


  • 作用: 控制删除时的行为。如果设置为 True,SQLAlchemy 不会主动删除关联对象,而是依赖数据库的级联删除。
  • 示例:
  1. children = relationship("Child", passive_deletes=True)
复制代码
这些参数可以根据具体的业务需求和场景进行调整,以优化查询和数据管理策略。
 
 2、用户角色表的关系分析

在实际业务中,机构和用户是多对多的关系的,我们以机构表定义来进行分析它们的关系信息。
如机构表的模型定义大致如下。
  1. class Ou(Base):
  2.     """机构(部门)信息-表模型"""
  3.     __tablename__ = "t_acl_ou"
  4.     id = Column(Integer, primary_key=True, comment="主键", autoincrement=True)
  5.     pid = Column(Integer, ForeignKey("t_acl_ou.id"), comment="父级机构ID", default="-1")
  6.     handno = Column(String, comment="机构编码")
  7.     name = Column(String, comment="机构名称")
  8.     # 定义 parent 关系
  9.     parent = relationship(
  10.         "Ou", remote_side=[id], back_populates="children", lazy="immediate"
  11.     )
  12.     # 定义 children 关系
  13.     children = relationship("Ou", back_populates="parent", lazy="immediate")
  14.     # 定义 users 关系
  15.     users = relationship(
  16.         "User", secondary="t_acl_ou_user", back_populates="ous", lazy="select"
  17.     )
复制代码
我们可以看到其中加载的多对多关系是采用lazy=select的方式的。
当你使用 await session.get(Ou, ou_id) 来获取一个 Ou 对象后,访问其关系属性(如 ou.users)时,可能会遇到异步相关的问题。原因是,SQLAlchemy 的异步会话需要使用 selectinload 或其他异步加载选项来确保在异步环境中正确地加载关联数据。
在默认的 lazy='select' 关系中,加载关系对象会触发一个同步查询,而这与异步会话不兼容,导致错误。为了解决这个问题,你需要确保关系的加载是通过异步的方式进行的。
解决方法:

1. 使用 selectinload 进行预加载
在查询时,显式地通过 selectinload 来加载关联的 users 关系:
  1. from sqlalchemy.orm import selectinload
  2. ou = await session.get(Ou, ou_id, options=[selectinload(Ou.users)])
  3. # 现在你可以访问 ou.users,关系对象已经被异步加载
  4. print(ou.users)
复制代码
 
2. 使用 lazy='selectin' 或其他异步兼容的加载策略
你还可以在定义模型的关联关系时,将 lazy='selectin' 设置为默认的加载方式,这样当访问关联属性时,SQLAlchemy 会自动使用异步兼容的加载机制:
  1. class Ou(Base):
  2.     __tablename__ = 'ou'
  3.     id = Column(Integer, primary_key=True)
  4.     users = relationship("User", lazy='selectin')  # 使用 selectin 异步加载
  5. ou = await session.get(Ou, ou_id)
  6. print(ou.users)  # 关联对象可以正常异步访问
复制代码
总结:


  • 在异步环境中访问关系对象时,如果使用了同步的 lazy='select',会导致异步不兼容问题。
  • 解决方案是通过查询时使用 selectinload 或将关系的 lazy 属性设置为异步兼容的选项,如 selectin。
因此,如果机构和用户的关系信息,我们可以通过selectload关系实现加载,也可以考虑使用中间表的关系进行获取,如下代码所示:获取指定用户的关联的机构列表.
  1.     async def get_ous_by_user(self, db: AsyncSession, user_id: str) -> list[int]:
  2.         """获取指定用户的关联的机构列表"""
  3.         # 方式一,子查询方式
  4.         stmt = select(User).options(selectinload(User.ous)).where(User.id == user_id)
  5.         result = await db.execute(stmt)
  6.         user = result.scalars().first()
  7.         ous = user.ous if user else []
  8.         # 方式二,关联表方式
  9.         # stmt = (
  10.         #     select(Ou)
  11.         #     .join(user_ou, User.id == user_ou.c.user_id)
  12.         #     .where(user_ou.c.user_id == user_id)
  13.         # )
  14.         # result = await db.execute(stmt)
  15.         # ous = result.scalars().all()
  16.         ouids = [ou.id for ou in ous]
  17.         return ouids
复制代码
上面两种方式是等效的,一个是通过orm关系进行获取关系集合,一个是通过中间表的关系检索主表数据集合。
通过中间表,我们也可以很方便的添加角色的关系,如下面是为角色添加用户,也就是在中间表进行处理即可。
  1.     async def add_user(self, db: AsyncSession, role_id: int, user_id: int) -> bool:
  2.         """添加角色-用户关联"""
  3.         stmt = select(user_role).where(
  4.             and_(
  5.                 user_role.c.role_id == role_id,
  6.                 user_role.c.user_id == user_id,
  7.             )
  8.         )
  9.         if not (await db.execute(stmt)).scalars().first():
  10.             await db.execute(
  11.                 user_role.insert().values(role_id=role_id, user_id=user_id)
  12.             )
  13.             await db.commit()
  14.             return True
  15.         return False
复制代码
当然。如果我们不用这种中间表的处理方式,也是可以使用常规多对多关系进行添加处理,不过需要对数据进行多一些检索,也许性能会差一些。
  1.     async def add_user(self, db: AsyncSession, ou_id: int, user_id: int) -> bool:
  2.         """给机构添加用户"""
  3.         # 可以使用下面方式,也可以使用中间表方式处理
  4.         # 先判断用户是否存在
  5.         user = await db.get(User, user_id)
  6.         if not user:
  7.             return False
  8.         # 再判断机构是否存在
  9.         result = await db.execute(
  10.             select(Ou).options(selectinload(Ou.users)).filter_by(id=ou_id)
  11.         )
  12.         # await db.get(Ou, ou_id) #这种方式不能获得users,因为配置为selectin
  13.         # await db.get(Ou, ou_id, options=[selectinload(Ou.users)])  # 这种方式可以获得users
  14.         ou = result.scalars().first()
  15.         if not ou:
  16.             return False
  17.         # 再判断用户是否已经存在于机构中
  18.         if user in ou.users:
  19.             return False
  20.         # 加入机构
  21.         ou.users.append(user)
  22.         await db.commit()
  23.         return True
复制代码
 

来源:https://www.cnblogs.com/wuhuacong/p/18397908
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具