翼度科技»论坛 编程开发 python 查看内容

python 协程 自定义互斥锁

6

主题

6

帖子

18

积分

新手上路

Rank: 1

积分
18
最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。
如果仅仅使用async with asyncio.Lock()的话。会使所有请求都排队处理。
  1. 1 import asyncio
  2. 2 import datetime
  3. 3
  4. 4 lock = asyncio.Lock()
  5. 5
  6. 6
  7. 7 async def place_order(user_id, order_id):
  8. 8     async with lock:
  9. 9         # 模拟下单处理
  10. 10         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")
  11. 11         await asyncio.sleep(1)  # 假设处理需要 1 秒
  12. 12         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")
  13. 13
  14. 14
  15. 15 # 定义一个测试函数
  16. 16 async def test():
  17. 17     # 创建四个任务,模拟两个 UserID 的并发请求
  18. 18     tasks = [
  19. 19         asyncio.create_task(place_order(1, 101)),
  20. 20         asyncio.create_task(place_order(1, 102)),
  21. 21         asyncio.create_task(place_order(2, 201)),
  22. 22         asyncio.create_task(place_order(2, 202)),
  23. 23     ]
  24. 24     # 等待所有任务完成
  25. 25     await asyncio.gather(*tasks)
  26. 26
  27. 27
  28. 28 if __name__ == '__main__':
  29. 29     # 运行测试函数
  30. 30     asyncio.run(test())
复制代码

这显然不是想要的结果,第二种方案是定义一个字典,key使用user_id,value为asyncio.Lock(),每次执行前从字典里面获取lock,相同的user_id将会使用同一个lock,那就实现了功能。
  1. import asyncio
  2. import datetime
  3. locks = {}
  4. async def place_order(user_id, order_id):
  5.     if user_id not in locks:
  6.         locks[user_id] = asyncio.Lock()
  7.     async with locks[user_id]:
  8.         # 模拟下单处理
  9.         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")
  10.         await asyncio.sleep(1)  # 假设处理需要 1 秒
  11.         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")
  12. # 定义一个测试函数
  13. async def test():
  14.     # 创建四个任务,模拟两个 UserID 的并发请求
  15.     tasks = [
  16.         asyncio.create_task(place_order(1, 101)),
  17.         asyncio.create_task(place_order(1, 102)),
  18.         asyncio.create_task(place_order(2, 201)),
  19.         asyncio.create_task(place_order(2, 202)),
  20.     ]
  21.     # 等待所有任务完成
  22.     await asyncio.gather(*tasks)
  23. if __name__ == '__main__':
  24.     # 运行测试函数
  25.     asyncio.run(test())
复制代码

但是这个方案会有缺点是,user_id执行完成之后没有释放资源,当请求的user_id变多之后,势必会造成占用过多的资源。继续改进方案,将locks的value加一个计数器,当获取lock时计数器加1,使用完之后计数器-1,当计数器变为小于等于0时,释放locks对应的key。最后将这个功能封装为一个类方便其他地方调用。
[code]import asynciomutex_locks = {}class MutexObj:    def __init__(self):        self.lock = asyncio.Lock()        self.count = 0class Mutex:    def __init__(self, key: str):        if key not in mutex_locks:            mutex_locks[key] = MutexObj()        self.__mutex_obj = mutex_locks[key]        self.__key = key    def lock(self):        """        获取锁        :return:        """        self.__mutex_obj.count += 1        return self.__mutex_obj.lock    def release(self):        """        释放锁        :return:        """        self.__mutex_obj.count -= 1        if self.__mutex_obj.count

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具