|
在现代Web开发中,定时任务是不可或缺的一部分,无论是定期数据分析、定时发送邮件、还是系统维护脚本,都需要精准的定时调度。Django作为Python世界中强大的Web框架,其对定时任务的支持自然也是开发者关注的重点。本文将深入探讨Django定时任务解决方案,特别是聚焦于django_apscheduler这一强大扩展库,带您领略其背后的运行原理与实战应用,助您在Django项目中高效驾驭定时任务。
一、Django定时任务组件概览
在Django世界中,实现定时任务主要有以下几种方式:
- 使用while True循环:研发直接简单,不依赖任何其他Python库,但包括调度时间、调度频率、任务管理等在内均无法实现。代码样例如下:
- import time
- # 定时任务1
- def task1():
- print("hello,world")
- # 定时任务2
- def task2():
- print("hello,world")
- if __name__ == '__main__':
- while True:
- task1()
- task1()
- time.sleep(7) # 调度频率:每7秒调度一次
复制代码
- 使用cron jobs:传统方式,通过操作系统级别的cron服务安排定时任务,但与Django应用解耦,配置和管理相对独立。
- 基于Django的自定义管理命令:借助Django自定义管理命令能力,通过编写命令行脚本,再借助cron或任务调度系统执行,灵活性高但集成度较低。
- 集成第三方库(推荐):使用Django三方库,如django_cron、django_schedule,以及我们今天的主角django_apscheduler,这些库提供更紧密的Django集成和更丰富的功能。
二、为何选择django_apscheduler库
相较于其他几种方案,django_apscheduler凭借其与Django的集成度、强大的功能、灵活的配置等特点脱颖而出:
- 无缝集成:作为APScheduler的Django适配器,它直接在Django应用中提供高级定时任务功能,无需额外配置cron作业。
- 高级调度:支持复杂调度逻辑,如周期性任务、一次性任务、cron风格表达式等,满足多样化需求。
- 数据库支持:默认使用Django的数据库作为作业存储,易于管理任务状态和历史记录,尤其适合分布式部署。同时,定时任务的定义、执行状态等都会被持久化到数据库中,应用重启后的状态**可恢复。
- 灵活的执行器:可选ThreadPoolExecutor或ProcessPoolExecutor,控制并发执行,适应不同任务类型。
三、django_apscheduler运行原理探秘
django_apscheduler的核心在于将APScheduler与Django框架紧密结合。其背后的工作机制主要包括:
- 初始化与调度:应用启动后初始化BackgroundScheduler实例,加载并调度所有配置好的定时任务。
- 作业存储:利用Django的ORM框架,将任务信息存储在Django应用数据库中,实现任务的持久化和分布式跨进程共享。
- 执行器与任务执行:当到达任务触发时间,执行器(线程池或进程池)会根据配置执行任务。任务执行结果和状态会被记录,便于追踪和审计。
四、实战演练:用django_apscheduler构建定时任务
接下来,让我们通过一个简单示例,感受django_apscheduler的实战魅力:
- 安装库:首先,通过pip安装django-apscheduler:
- pip install django-apscheduler
复制代码
- 配置定时任务:其次,在settings.py中注册django_apscheduler应用:
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- # ....其他APP....
- 'django_apscheduler',
- # ....其他APP....
- ]
- # 时间格式化
- APSCHEDULER_DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
- # 任务超时时间(单位:秒)
- APSCHEDULER_RUN_NOW_TIMEOUT = 600
复制代码
- 创建定时任务数据表:执行Django数据迁移管理命令,创建数据表:
数据表的定义如下:- -- SELECT sql FROM sqlite_master WHERE type='table' AND name='django_apscheduler_djangojob';
- CREATE TABLE "django_apscheduler_djangojob"
- (
- "id" varchar(255) NOT NULL PRIMARY KEY,
- "next_run_time" datetime NULL,
- "job_state" BLOB NOT NULL
- );
- -- SELECT sql FROM sqlite_master WHERE type='table' AND name='django_apscheduler_djangojobexecution';
- CREATE TABLE "django_apscheduler_djangojobexecution"
- (
- "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
- "status" varchar(50) NOT NULL,
- "run_time" datetime NOT NULL,
- "duration" decimal NULL,
- "finished" decimal NULL,
- "exception" varchar(1000) NULL,
- "traceback" text NULL,
- "job_id" varchar(255) NOT NULL REFERENCES "django_apscheduler_djangojob" ("id") DEFERRABLE INITIALLY DEFERRED,
- CONSTRAINT "unique_job_executions" UNIQUE ("job_id", "run_time")
- );
复制代码
- 编写任务逻辑:在任一一个Django应用中(比如:该App为task),创建任务模块task_list.py:
- # ./task/task_list.py
- from datetime import datetime
- def print_task():
- print('Scheduler测试任务执行:{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
复制代码
- 注册任务:通过Django自定义管理命令,设置调度规则,启动django_apscheduler调度器:
在taskApp中,创建如下结果目录,配置Django自定义管理命令:- $ tree management/
- management/
- ├── __init__.py
- └── commands
- ├── __init__.py
- └── start_tasks.py
复制代码
- 目录中的2个__init__.py文件是标记文件,内容为空,仅代表目标是一个模块
- 文件./task/management/commands/start_tasks.py代表自定义命令,Django默认会扫描每个App的/management/commands目录,该目录下的所有文件名均可作为命令,如start_tasks.py的管理命令为:python manage.py start_tasks
- # ./task/management/commands/start_tasks.py
- from datetime import datetime
- from apscheduler.executors.pool import ThreadPoolExecutor
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from django.conf import settings
- from django.core.management.base import BaseCommand
- from django_apscheduler import util
- from django_apscheduler.jobstores import DjangoJobStore
- from django_apscheduler.models import DjangoJobExecution
- from ...task_list import print_task
- #
- # Django manage.py命令:存储定时任务信息
- #
- class Command(BaseCommand):
- help = '启动定时任务.'
- def handle(self, *args, **options):
- # 调度器
- scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) # 研发阶段使用
- # scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE) # 生产阶段使用
- # 任务存储
- scheduler.add_jobstore(DjangoJobStore(), 'SQLiteJobStore')
- # 配置线程池执行器,限制最大并发数为1,防止并发
- executor = ThreadPoolExecutor(max_workers=1)
- scheduler.executor = executor
- # 注册定义任务
- id_print_task = 'print_task__job'
- print('开始-增加任务({})'.format(id_print_task))
- scheduler.add_job(
- print_task,
- id=id_print_task,
- name=id_print_task,
- max_instances=1,
- replace_existing=True,
- trigger=IntervalTrigger(seconds=15, start_date=datetime.now(), ), # 从当前时间开始,每15秒钟调度一次
- )
- print('完成-增加任务({})'.format(id_print_task))
- # 启动定时任务
- try:
- scheduler.start()
- except KeyboardInterrupt:
- scheduler.shutdown()
复制代码 至此,任务配置完成:每15秒钟调度一次我们自定义任务。
- python manage.py start_tasks
复制代码 由于我们用的调度器是BlockingScheduler,启动之后命令行不退出,在DEV研发阶段,建议采用这个调度器;生成环境,建议采用BackgroundScheduler调度器,通过后台守护进程执行定时任务。
五、总结
django_apscheduler以其高度集成、灵活配置和强大的功能,成为Django项目中定时任务解决方案的优选。它不仅简化了定时任务的实现,还提升了任务管理的便捷性和系统的稳定性。无论你是初次接触定时任务的新手,还是寻求高效解决方案的老手,django_apscheduler都是值得深入了解和掌握的工具。希望本文能为你在Django定时任务的探索之路上点亮一盏明灯。
关注本公众号,我们共同学习进步
来源:https://www.cnblogs.com/obullxl/p/18226576/NTopic2024060101
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|