翼度科技»论坛 编程开发 python 查看内容

celery

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
1、Celery的概念

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等
定时任务: 每天定时执行爬虫爬取指定内容
还可以使用celery实现简单的分布式爬虫系统等等
Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,
2、Celery的特点


  • 简单,易于使用和维护,有丰富的文档。
  • 高效,支持多线程、多进程、协程模式运行,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。
3、Celery的作用

应用解耦,异步处理,流量削锋,消息通讯。
  1. celery通过消息(任务)进行通信,
  2. celery通常使用一个叫Broker(中间人/消息中间件/消息队列/任务队列)来协助clients(任务的发出者/客户端)和worker(任务的处理者/工作进程)进行通信的.
  3. clients发出消息到任务队列中,broker将任务队列中的信息派发给worker来处理。
  4. client ---> 消息 --> Broker(消息队列) -----> 消息 ---> worker(celery运行起来的工作进程)
  5. 消息队列(Message Queue),也叫消息队列中间件,简称消息中间件,它是一个独立运行的程序,表示在消息的传输过程中临时保存消息的容器。
  6. 所谓的消息,是指代在两台计算机或2个应用程序之间传送的数据。消息可以非常简单,例如文本字符串或者数字,也可以是更复杂的json数据或hash数据等。
  7. 所谓的队列,是一种先进先出、后进呼后出的数据结构,python中的list数据类型就可以很方便地用来实现队列结构。
  8. 目前开发中,使用较多的消息队列有RabbitMQ,Kafka,RocketMQ,MetaMQ,ZeroMQ,ActiveMQ等,当然,像redis、mysql、MongoDB,也可以充当消息中间件,但是相对而言,没有上面那么专业和性能稳定。
  9. 并发任务10k以下的,直接使用redis
  10. 并发任务10k以上,1000k以下的,直接使用RabbitMQ
  11. 并发任务1000k以上的,直接使用RocketMQ
复制代码
4、Celery的运行架构

Celery的运行架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
  1. 一个celery系统可以包含很多的worker和broker
  2. Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括Redis,RabbitMQ,RocketMQ等
复制代码
5、Celery执行流程图


6、Celery安装
  1. pip install -U celery -i  https://pypi.tuna.tsinghua.edu.cn/simple
复制代码
注意:
Celery不建议在windows系统下使用,Celery在4.0版本以后不再支持windows系统,所以如果要在windows下使用只能安装4.0以前的版本,而且即便是4.0之前的版本,在windows系统下也是不能单独使用的,需要安装gevent、geventlet或eventlet协程模块
7、基本使用

使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例对象,我们一般叫做celery应用对象,或者更简单直接叫做一个app。app应用对象是我们使用celery所有功能的入口,比如启动celery、创建任务,管理任务,执行任务等.
celery框架有2种使用方式

  • 一种是单独一个项目目录,
  • 另一种就是Celery集成到web项目框架中。
8、Celery 使用示例

8.1 Celery作为一个单独项目运行

例如,mycelery代码目录直接放在项目根目录下即可,路径如下:
  1. 服务端项目根目录/
  2. └── mycelery/
  3.     ├── settings.py   # 配置文件
  4.     ├── __init__.py   
  5.     ├── main.py       # 入口程序
  6.     └── sms/          # 异步任务目录,这里拿发送短信来举例,一个类型的任务就一个目录
  7.          └── tasks.py # 任务的文件,文件名必须是tasks.py!!!每一个任务就是一个被装饰的函数,写在任务文件中
复制代码
main.py,代码:
  1. from celery import Celery
  2. # 实例化celery应用,参数一般为项目应用名
  3. app = Celery("blog")
  4. # 通过app实例对象加载配置文件
  5. app.config_from_object("mycelery.settings")
  6. # 注册任务, 自动搜索并加载任务
  7. # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
  8. # app.autodiscover_tasks(["任务1","任务2",....])
  9. app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
  10. # 启动Celery的终端命令
  11. # 强烈建议切换目录到项目的根目录下启动celery!!
  12. # celery -A mycelery.main worker --loglevel=info
复制代码
配置文件settings.py,代码:
  1. # 任务队列的链接地址
  2. broker_url = 'redis://127.0.0.1:6379/14'
  3. # 结果队列的链接地址
  4. result_backend = 'redis://127.0.0.1:6379/15'
复制代码
关于配置信息的官方文档:https://docs.celeryproject.org/en/master/userguide/configuration.html
创建任务文件sms/tasks.py,任务文件名必须固定为"tasks.py",并创建任务,代码:
  1. from ..main import app
  2. from utils.tencent_sms import send_message
  3. """
  4. # 如果不添加name参数,启动celery后的任务名称就会很长
  5. [tasks]
  6.   . mycelery.sms.tasks.send_sms1
  7.   . mycelery.sms.tasks.send_sms3
  8.   . send_sms2
  9.   . send_sms4
  10. """
  11. @app.task(name="send_sms1")
  12. def send_sms1():
  13.     """没有任务参数,没有返回结果的异步任务"""
  14.     print('任务:send_sms1执行了。。。')
  15. # s2 = send.sms2.delay(15150404640,4526)
  16. @app.task(name="send_sms2")
  17. def send_sms2(mobile, code):
  18.     """有任务参数,没有返回结果的异步任务"""
  19.     print(f'任务:send_sms2执行了。。。{mobile},code={code}')
  20. # s3 = send.sms3.delay()
  21. # s3.result = 100
  22. @app.task(name="send_sms3")
  23. def send_sms3():
  24.     """没有任务参数,有返回结果的异步任务"""
  25.     print('任务:send_sms3执行了。。。')
  26.     return 100
  27. # s4 = send.sms4.delay(10,15)
  28. # s4.result = 25
  29. @app.task(name="send_sms4")
  30. def send_sms4(x, y):
  31.     """有参数,有结果的异步任务"""
  32.     print('任务:send_sms4执行了...')
  33.     return x + y
  34. @app.task(name='send_sms')
  35. def send_sms(mobile, random_code):
  36.     """发送短信"""
  37.     return send_message(mobile, random_code)
复制代码
接下来,我们运行celery。
  1. cd ~/Desktop/blog/blogapi
  2. # 普通的运行方式[默认多进程,卡终端,按CPU核数+1创建进程数]
  3. # ps aux|grep celery
  4. celery -A mycelery.main worker --loglevel=info
  5. # 启动多工作进程,以守护进程的模式运行[一个工作进程就是4个子进程]
  6. # 注意:pidfile和logfile必须以绝对路径来声明
  7. celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker1
  8. celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker2
  9. # 关闭运行的工作进程
  10. celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log"
  11. celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log"
复制代码
效果如下:

调用上面的异步任务,拿django的shell进行举例:
  1. # 因为celery模块安装在了虚拟环境中,所以要确保进入虚拟环境
  2. conda activate base
  3. cd /home/xuejian/Desktop/blog/blog
  4. python manage.py shell
  5. # 调用celery执行异步任务
  6. from mycelery.sms.tasks import send_sms1,send_sms2,send_sms3,send_sms4
  7. mobile = "13312345656"
  8. code = "666666"
  9. # delay 表示马上按顺序来执行异步任务,在celrey的worker工作进程有空闲的就立刻执行
  10. # 可以通过delay异步调用任务,可以没有参数
  11. ret1 = send_sms1.delay()
  12. # 可以通过delay传递异步任务的参数,可以按位置传递参数,也可以使用命名参数
  13. # ret2 = send_sms.delay(mobile=mobile,code=code)
  14. ret2 = send_sms2.delay(mobile,code)
  15. # apply_async 让任务在后面指定时间后执行,时间单位:秒/s
  16. # 任务名.apply_async(args=(参数1,参数2), countdown=定时时间)
  17. ret4 = send_sms4.apply_async(kwargs={"x":10,"y":20},countdown=30)
  18. # 根据返回结果,不管delay,还是apply_async的返回结果都一样的。
  19. ret4.id      # 返回一个UUID格式的任务唯一标志符,78fb827e-66f0-40fb-a81e-5faa4dbb3505
  20. ret4.status  # 查看当前任务的状态 SUCCESS表示成功! PENDING任务等待
  21. ret4.get()   # 获取任务执行的结果[如果任务函数中没有return,则没有结果,如果结果没有出现则会导致阻塞]
  22. if ret4.status == "SUCCESS":
  23.     print(ret4.get())
复制代码
接下来,我们让celery可以调度第三方框架的代码,这里拿django当成一个第三模块调用进行举例。
在main.py主程序中对django进行导包引入,并设置django的配置文件进行django的初始化。
  1. import os,django
  2. from celery import Celery
  3. # 初始化django
  4. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blog.settings')
  5. django.setup()
  6. # 初始化celery对象
  7. app = Celery("blog")
  8. # 加载配置
  9. app.config_from_object("mycelery.config")
  10. # 自动注册任务
  11. app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
  12. # 运行celery
  13. # 终端下: celery -A mycelery.main worker -l info
复制代码
在需要使用django配置的任务中,直接加载配置,所以我们把注册的短信发送功能,整合成一个任务函数,mycelery.sms.tasks,代码:
  1. from ..main import app
  2. from ronglianyunapi import send_sms as send_sms_to_user
  3. @app.task(name="send_sms1")
  4. def send_sms1():
  5.     """没有任何参数,没有返回结果的异步任务"""
  6.     print('任务:send_sms1执行了...')
  7. @app.task(name="send_sms2")
  8. def send_sms2(mobile, code):
  9.     """有参数,没有返回结果的异步任务"""
  10.     print(f'任务:send_sms2执行了...mobile={mobile}, code={code}')
  11. @app.task(name="send_sms3")
  12. def send_sms3():
  13.     """没有任何参数,有返回结果的异步任务"""
  14.     print('任务:send_sms3执行了...')
  15.     return 100
  16. @app.task(name="send_sms4")
  17. def send_sms4(x,y):
  18.     """有结果的异步任务"""
  19.     print('任务:send_sms4执行了...')
  20.     return x+y
  21. @app.task(name="send_sms")
  22. def send_sms(tid, mobile, datas):
  23.     """发送短信"""
  24.     print("发送短信")
  25.     return send_sms_to_user(tid, mobile, datas)
复制代码
最终在django的视图里面,我们调用Celery来异步执行任务。
只需要完成2个步骤,分别是导入异步任务调用异步任务。users/views.py,代码:
  1. import random
  2. from django_redis import get_redis_connection
  3. from django.conf import settings
  4. # from ronglianyunapi import send_sms
  5. from mycelery.sms.tasks import send_sms
  6. """
  7. /users/sms/(?P<mobile>1[3-9]\d{9})
  8. """
  9. class SMSAPIView(APIView):
  10.     """
  11.     SMS短信接口视图
  12.     """
  13.     def get(self, request, mobile):
  14.         """发送短信验证码"""
  15.         redis = get_redis_connection("sms_code")
  16.         # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
  17.         interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
  18.         if interval != -2:
  19.             return Response({"errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)
  20.         # 基于随机数生成短信验证码
  21.         # code = "%06d" % random.randint(0, 999999)
  22.         code = f"{random.randint(0, 999999):06d}"
  23.         # 获取短信有效期的时间
  24.         time = settings.RONGLIANYUN.get("sms_expire")
  25.         # 短信发送间隔时间
  26.         sms_interval = settings.RONGLIANYUN["sms_interval"]
  27.         # 调用第三方sdk发送短信
  28.         # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
  29.         # 异步发送短信
  30.         send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
  31.         # 记录code到redis中,并以time作为有效期
  32.         # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
  33.         pipe = redis.pipeline()
  34.         pipe.multi()  # 开启事务
  35.         pipe.setex(f"sms_{mobile}", time, code)
  36.         pipe.setex(f"interval_{mobile}", sms_interval, "_")
  37.         pipe.execute()  # 提交事务,同时把暂存在pipeline的数据一次性提交给redis
  38.         return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
复制代码
上面就是使用celery并执行异步任务的第一种方式,适合在一些无法直接集成celery到项目中的场景。
8.2 Celery作为第三方模块集成到项目中

这里还是拿django来举例,目录结构调整如下:
  1. blogapi/           # 服务端项目根目录
  2. └── blogapi/       # 主应用目录
  3.     ├── apps/           # 子应用存储目录  
  4.     ├   └── users/            # django的子应用
  5.     ├       └── tasks.py      # [新增]分散在各个子应用下的异步任务模块
  6.     ├── settings/     # [修改]django的配置文件存储目录[celery的配置信息填写在django配置中即可]
  7.     ├── __init__.py   # [修改]设置当前包目录下允许外界调用celery应用实例对象
  8.     └── celery.py     # [新增]celery入口程序,相当于上一种用法的main.py
复制代码
blog/celery.py,主应用目录下创建cerley入口程序,创建celery对象并加载配置和异步任务,代码:
  1. import os
  2. from celery import Celery
  3. # 必须在实例化celery应用对象之前执行
  4. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blog.settings')
  5. # 实例化celery应用对象
  6. app = Celery('blog')
  7. # 指定任务的队列名称
  8. app.conf.task_default_queue = 'Celery'
  9. # 也可以把配置写在django的项目配置中
  10. app.config_from_object('django.conf:settings', namespace='CELERY') # 设置django中配置信息以 "CELERY_"开头为celery的配置信息
  11. # 自动根据配置查找django的所有子应用下的tasks任务文件
  12. app.autodiscover_tasks()
复制代码
settings/dev.py,django配置中新增celery相关配置信息,代码:
  1. # Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
  2. # 任务队列
  3. CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/14'
  4. # 结果队列
  5. CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/15'
  6. # 时区,与django的时区同步
  7. CELERY_TIMEZONE = TIME_ZONE
  8. # 防止死锁
  9. CELERY_FORCE_EXECV = True
  10. # 设置并发的worker数量
  11. CELERYD_CONCURRENCY = 200
  12. # 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败记录]
  13. CELERY_ACKS_LATE = True
  14. # 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
  15. CELERYD_MAX_TASKS_PER_CHILD = 500
  16. # 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
  17. CELERYD_TIME_LIMIT = 10 * 60
  18. # 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
  19. CELERY_DISABLE_RATE_LIMITS = True
  20. # celery的任务结果内容格式
  21. CELERY_ACCEPT_CONTENT = ['json', 'pickle']
  22. # 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
  23. # 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
  24. # https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
  25. from celery.schedules import crontab
  26. CELERY_BEAT_SCHEDULE = {
  27.     "user-add": {  # 定时任务的注册标记符[必须唯一的]
  28.         "task": "add",   # 定时任务的任务名称
  29.         "schedule": 10,  # 定时任务的调用时间,10表示每隔10秒调用一次add任务
  30.         # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
  31.     }
  32. }
复制代码
blog/__init__.py,主应用下初始化,代码:
  1. import pymysql
  2. from .celery import app as celery_app
  3. pymysql.install_as_MySQLdb()
  4. __all__ = ['celery_app']
复制代码
users/tasks.py,代码:
  1. from celery import shared_task
  2. from ronglianyunapi import send_sms as sms
  3. # 记录日志:
  4. import logging
  5. logger = logging.getLogger("django")
  6. @shared_task(name="send_sms")
  7. def send_sms(tid, mobile, datas):
  8.     """异步发送短信"""
  9.     try:
  10.         return sms(tid, mobile, datas)
  11.     except Exception as e:
  12.         logger.error(f"手机号:{mobile},发送短信失败错误: {e}")
  13. @shared_task(name="send_sms1")
  14. def send_sms1():
  15.     print("send_sms1执行了!!!")
复制代码
django中的用户发送短信,就可以改成异步发送短信了。
users/views,视图中调用异步发送短信的任务,代码:
  1. from .tasks import send_sms
  2. send_sms.delay(settings.RONGLIANYUN.get("reg_tid"),mobile, datas=(code, time // 60))
复制代码
users/views.py,异步发送信息的完整视图,代码:
  1. import random
  2. from django_redis import get_redis_connection
  3. from django.conf import settings
  4. # from ronglianyunapi import send_sms
  5. # from mycelery.sms.tasks import send_sms
  6. from .tasks import send_sms
  7. """
  8. /users/sms/(?P<mobile>1[3-9]\d{9})
  9. """
  10. class SMSAPIView(APIView):
  11.     """
  12.     SMS短信接口视图
  13.     """
  14.     def get(self, request, mobile):
  15.         """发送短信验证码"""
  16.         redis = get_redis_connection("sms_code")
  17.         # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
  18.         interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
  19.         if interval != -2:
  20.             return Response({"errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)
  21.         # 基于随机数生成短信验证码
  22.         # code = "%06d" % random.randint(0, 999999)
  23.         code = f"{random.randint(0, 999999):06d}"
  24.         # 获取短信有效期的时间
  25.         time = settings.RONGLIANYUN.get("sms_expire")
  26.         # 短信发送间隔时间
  27.         sms_interval = settings.RONGLIANYUN["sms_interval"]
  28.         # 调用第三方sdk发送短信
  29.         # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
  30.         # 异步发送短信
  31.         send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
  32.         # 记录code到redis中,并以time作为有效期
  33.         # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
  34.         pipe = redis.pipeline()
  35.         pipe.multi()  # 开启事务
  36.         pipe.setex(f"sms_{mobile}", time, code)
  37.         pipe.setex(f"interval_{mobile}", sms_interval, "_")
  38.         pipe.execute()  # 提交事务,同时把暂存在pipeline的数据一次性提交给redis
  39.         return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
复制代码
终端下先启动celery,在django项目根目录下启动。
  1. cd ~/Desktop/blog/blogapi
  2. # 1. 普通运行模式,关闭终端以后,celery就会停止运行
  3. celery -A blogapi worker  -l INFO
  4. # 2. 启动多worker进程模式,以守护进程的方式运行,不需要在意终端。但是这种运行模型,一旦停止,需要手动启动。
  5. celery multi start worker -A blogapi -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker1
  6. # 3. 启动多worker进程模式
  7. celery multi stop worker -A blogapi --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid"
复制代码
还是可以在django终端下调用celery的
  1. $ python manage.py shell
  2. >>> from users.tasks import send_sms1
  3. >>> res = send_sms1.delay()
  4. >>> res = send_sms1.apply_async(countdown=15)
  5. >>> res.id
  6. '893c31ab-e32f-44ee-a321-8b07e9483063'
  7. >>> res.state
  8. 'SUCCESS'
  9. >>> res.result
复制代码
关于celery中异步任务发布的2个方法的参数如下:
  1. 异步任务名.delay(*arg, **kwargs)
  2. 异步任务名.apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)
复制代码
定时任务的调用器启动,可以在运行了worker以后,使用以下命令:
  1. cd ~/Desktop/blog/blogapi
  2. celery -A blogapi beat
复制代码
beat调度器关闭了,则定时任务无法执行,如果worker工作进程关闭了,则celery关闭,保存在消息队列中的任务就会囤积在那里。

celery还有一些高阶用法, 我们后面用到再提。
celery后面还可以使用supervisor进行后台托管运行。还可以针对任务执行的情况和结果,使用flower来进行监控。celery失败任务的重新尝试执行。
supervisor会在celery以外关闭了以后,自动重启celery。

来源:https://www.cnblogs.com/xuejian123/p/17199559.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具