冠辉机械 发表于 2024-7-26 00:58:44

python 协程 自定义互斥锁

最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。
如果仅仅使用async with asyncio.Lock()的话。会使所有请求都排队处理。
1 import asyncio
2 import datetime
3
4 lock = asyncio.Lock()
5
6
7 async def place_order(user_id, order_id):
8   async with lock:
9         # 模拟下单处理
10         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")
11         await asyncio.sleep(1)# 假设处理需要 1 秒
12         print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")
13
14
15 # 定义一个测试函数
16 async def test():
17   # 创建四个任务,模拟两个 UserID 的并发请求
18   tasks = [
19         asyncio.create_task(place_order(1, 101)),
20         asyncio.create_task(place_order(1, 102)),
21         asyncio.create_task(place_order(2, 201)),
22         asyncio.create_task(place_order(2, 202)),
23   ]
24   # 等待所有任务完成
25   await asyncio.gather(*tasks)
26
27
28 if __name__ == '__main__':
29   # 运行测试函数
30   asyncio.run(test())
这显然不是想要的结果,第二种方案是定义一个字典,key使用user_id,value为asyncio.Lock(),每次执行前从字典里面获取lock,相同的user_id将会使用同一个lock,那就实现了功能。
import asyncio
import datetime

locks = {}


async def place_order(user_id, order_id):
    if user_id not in locks:
      locks = asyncio.Lock()
    async with locks:
      # 模拟下单处理
      print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}")
      await asyncio.sleep(1)# 假设处理需要 1 秒
      print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")


# 定义一个测试函数
async def test():
    # 创建四个任务,模拟两个 UserID 的并发请求
    tasks = [
      asyncio.create_task(place_order(1, 101)),
      asyncio.create_task(place_order(1, 102)),
      asyncio.create_task(place_order(2, 201)),
      asyncio.create_task(place_order(2, 202)),
    ]
    # 等待所有任务完成
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    # 运行测试函数
    asyncio.run(test())
但是这个方案会有缺点是,user_id执行完成之后没有释放资源,当请求的user_id变多之后,势必会造成占用过多的资源。继续改进方案,将locks的value加一个计数器,当获取lock时计数器加1,使用完之后计数器-1,当计数器变为小于等于0时,释放locks对应的key。最后将这个功能封装为一个类方便其他地方调用。
import asynciomutex_locks = {}class MutexObj:    def __init__(self):      self.lock = asyncio.Lock()      self.count = 0class Mutex:    def __init__(self, key: str):      if key not in mutex_locks:            mutex_locks = MutexObj()      self.__mutex_obj = mutex_locks      self.__key = key    def lock(self):      """      获取锁      :return:      """      self.__mutex_obj.count += 1      return self.__mutex_obj.lock    def release(self):      """      释放锁      :return:      """      self.__mutex_obj.count -= 1      if self.__mutex_obj.count
页: [1]
查看完整版本: python 协程 自定义互斥锁