翼度科技»论坛 编程开发 python 查看内容

Python中实现定时任务

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
Python中实现定时任务

在项目中,我们可能遇到有定时任务的需求。

  • 其一:每隔一个时间段就执行任务。
    比如:压测中每隔45分钟调整温箱的温度。
  • 其二:定时执行任务。
    例如每天早上 8 点定时推送早报。
今天,我跟大家分享下 Python 定时任务的实现方法。
固定时间间隔执行任务
  1. import time
  2. import logging
  3. logging.basicConfig(
  4.     level=logging.debug,
  5.     format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
  6. )
  7. def task():
  8.     logging.info("Task Start.")
  9.     time.sleep(1)
  10.     logging.info("Task Done.")
复制代码
time.sleep

第一种办法是最简单又最暴力。
那就是在一个死循环中,使用线程睡眠函数 sleep()。
  1. while True:
  2.     task()
  3.     time.sleep(5)
复制代码
上述的方法有几个问题:

  • 阻塞主进程,这个也好解决,可以放到线程中去执行
  • 一次只有一个task,这个也有解决办法,可以多启几个线程
threading.Timer

既然第一种方法暴力,那么有没有比较优雅点的方法?
Python 标准库 threading 中有个 Timer 类。
它会新启动一个线程来执行定时任务,所以它是非阻塞函式。
原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。
超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。
  1. from threading import Timer
  2. t = Timer(task, 5)
  3. t.start()
复制代码
优点:
不阻塞主进程,task在线程中执行
缺点:
一个Timer只能执行一次task就结束了。
如果想循环,需要改造一下task函数。
  1. from threading import Timer
  2. def repeat_task():
  3.     t = Timer(5, repeat_task)
  4.     # 开始任务的位置决定了是任务之间等待固定间隔时间
  5.     # 还是每个任务的开始等待固定间隔时间
  6.     t.start()
  7.     task()
复制代码
这样可以循环执行,但是仍然只能一个线程一个任务。
Q:如何跳出循环
A:加入一个标识符:

  • sleep可以用一个布尔值来控制
  • Timer可以用一个Event来控制
固定时间点执行任务

以上是用内置的方法中比较简单的实现方式。简单的功能可以实现。
前面执行的都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢?
上面的方法是可以做到的。有两种思路,

  • 前面我们指定了间隔时间,那指定时间点,就先计算当前时间到指定时间点的间隔时间
  • 不管间隔时间,而是记录任务时间点,然后实时去检查是否到达指定时间点
思路有了,但是对于固定时间点执行任务的场景以及后面更复杂的场景,自己实现可能就变得复杂。
下面再介绍几个进阶的定时任务的实现方式,可以适应更复杂的业务场景。
sched

第三种方式是使用标准库中sched模块。sched 是事件调度器,
它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。
简单示例如下:
  1. import sched
  2. schedule = sched.scheduler()  # 初始化 sched 模块的 scheduler 类
  3. schedule.enter(10, 1, task)  # 增加调度任务
  4. schedule.run()  # 开始调度任务
复制代码
scheduler 提供了两个添加调度任务的函数:

  • enter(delay, priority, action, argument=(), kwargs={})
    该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。
    priority为优先级,越小优先级越大。两个任务指定相同的延迟时间,优先级大的任务会向被执行。
    action 即需要执行的函数,argument 和 kwargs 分别是函数的位置和关键字参数。
  • scheduler.enterabs(time, priority, action, argument=(), kwargs={})
    添加一项任务,但这个任务会在 time 这时刻执行。因此,time 是绝对时间。其他参数用法与 enter() 中的参数用法是一致。
优点:

  • 执行时间间隔和时间点执行任务
  • 可以添加不同的任务
  • 任务可以设置优先级
缺点:
scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。
  1. import sched
  2. import threading
  3. from functools import wraps
  4. s = sched.scheduler()
  5. STOP_FLAG = threading.Event()
  6. def repeat(interval):
  7.     def wrapper(func):
  8.         @wraps(func)
  9.         def inner():
  10.             if not STOP_FLAG.is_set():
  11.                 s.enter(interval, 0, inner)
  12.             func()
  13.         return inner
  14.     return wrapper
  15. @repeat(5)
  16. def new_task():
  17.     return task()
  18. new_task()
  19. t = threading.Thread(target=s.run)
  20. t.start()
复制代码
实现原理

当然我们仅仅学会怎么用还是不够的,不能知其然而不知其所以然。
介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。
一个完整的定时任务系统,有三个部分:

  • 任务队列(task queue)
    根据执行时间和优先级进行排序
    排序sorted(): 任务一多,整个队列重排,性能堪忧
    heapq.pop():堆排序,只获取最高优先级的任务,不重复排序
  • 调度器(scheduler)
  • 执行器(executor)
    从前到后,实现的内容越来越多,控制的精细度也就越来越高,实现的功能也就越来越丰富。
  1. import sched
  2. import time
  3. import threading
  4. from functools import wraps
  5. from task import task
  6. from typing import Callable
  7. STOP_FLAG = threading.Event()
  8. def timeloop(func: Callable, interval: int):
  9.     while True:
  10.         func()
  11.         time.sleep(interval)
  12. def repeat_task(func: Callable, interval: int):
  13.     # 新建任务的前后,决定了是在任务结束后等待时间,还是固定间隔
  14.     if not STOP_FLAG.is_set():
  15.         t = threading.Timer(interval, repeat_task, args=(func, interval))
  16.         t.start()
  17.     func()
  18. def timer_schedule(func: Callable, interval: int):
  19.     repeat_task(func, interval)
  20. s = sched.scheduler()
  21. def repeat(interval):
  22.     def wrapper(func):
  23.         @wraps(func)
  24.         def inner():
  25.             if not STOP_FLAG.is_set():
  26.                 s.enter(interval, 0, inner)
  27.             func()
  28.         return inner
  29.     return wrapper
  30. @repeat(5)
  31. def new_task():
  32.     return task()
  33. def sched_schedule(func: Callable, interval: int):
  34.     func()
  35.     t = threading.Thread(target=s.run)
  36.     t.start()
  37.     return t
  38. def main(schedule):
  39.     schedule_thread = schedule(new_task, 5)
  40.     while True:
  41.         try:
  42.             time.sleep(1)
  43.         except KeyboardInterrupt:
  44.             STOP_FLAG.set()
  45.             schedule_thread.join()
  46.             break
  47. if __name__ == "__main__":
  48.     # main(timeloop)
  49.     # main(timer_schedule)
  50.     main(sched_schedule)
复制代码
来源:https://www.cnblogs.com/hd092336/p/17549357.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具