首页 » 互联网 » python准时责任框架APScheduler详解_触发器_功课

python准时责任框架APScheduler详解_触发器_功课

神尊大人 2025-01-05 20:27:03 0

扫一扫用手机浏览

文章目录 [+]

简介

优点

python准时责任框架APScheduler详解_触发器_功课 python准时责任框架APScheduler详解_触发器_功课 互联网

缺陷

python准时责任框架APScheduler详解_触发器_功课 python准时责任框架APScheduler详解_触发器_功课 互联网
(图片来自网络侵删)

Apscheduler

基于Quartz的一个Python定时任务框架,供应了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化作业

支持定时、定期、一次性任务,支持任务持久化及动态添加

配置可选项较多,配置起来较为繁芜,有一定的学习本钱。

Celery

是一个大略,灵巧,可靠的分布式系统,用于处理大量,同时为操作供应掩护此类系统所需的工具, 也可用于任务调度

支持配置定期任务、支持 crontab 模式配置

不支持一次性定时任务,单独为定时任务功能而搭建celery显得过于重量级。

schedule

轻量级,无需配置的作业调度库

轻量级、无需配置、语法大略

壅塞式调用、无法动态添加或删除任务,无任务状态存储

python-crontab

针对系统 Cron 操作 crontab 文件的作业调度库

支持定时、定期任务,能够动态添加任务

不能实现一次性任务需求,没有状态存储,无法跨平台实行

APScheduler观点与组件触发器(trigger)

触发器包含调度逻辑,描述一个任务何时被触发,按日期或按韶光间隔或按 cronjob 表达式三种办法触发。
每个作业都有它自己的触发器,除了初始配置之外,触发器是完备无状态的。

任务存储器(job stores)

任务存储器指定了作业被存放的位置,默认情形下作业保存在内存,也可将作业保存在各种数据库中,当任务被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。
作业存储器充当保存、加载、更新和查找作业的中间商。
在调度器之间不能共享作业存储。

实行器(executors)

实行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,实行器关照调度器触发相应的事宜。

调度器(schedulers)

任务调度器,属于掌握角色,通过它配置作业存储器、实行器和触发器,添加、修正和删除任务。
调度器折衷触发器、作业存储器、实行器的运行,常日只有一个调度程序运行在运用程序中,开拓职员常日不须要直接处理作业存储器、实行器或触发器,配置作业存储器和实行器是通过调度器来完成的。

事情流程

APScheduler快速上手安装

安装非常大略,通过pip install apscheduler即可。

快速体验

# main.py# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import logger# 定时任务实行函数def my_task(): logger.info("开始实行任务")if __name__ == '__main__': # 实例化调度器工具 scheduler = BlockingScheduler() # 添加定时任务,指界说务函数和触发器 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) # 开始实行定时任务 scheduler.start()实行结果

/Users/cuiliang/PycharmProjects/test/venv/bin/python /Users/cuiliang/PycharmProjects/test/crontab.py2022-08-21 22:09:10.063 | INFO | __main__:my_task:7 - 开始实行任务2022-08-21 22:09:13.068 | INFO | __main__:my_task:7 - 开始实行任务2022-08-21 22:09:16.067 | INFO | __main__:my_task:7 - 开始实行任务APScheduler配置详解触发器(triggers)

APScheduler支持的触发器紧张有:

DateTrigger:日期触发器。
日期触发器紧张是在某一日期韶光点上运行任务时调用,是 APScheduler 里面最大略的一种触发器。
以是常日也适用于一次性的任务或作业调度。

# 指界说务在2022年8月23日实行scheduler.add_job(my_task, trigger=DateTrigger(run_date=date(2022, 8, 23), timezone="Asia/Shanghai"))# 指界说务在2022年8月23日8时5分30秒实行scheduler.add_job(my_task, trigger=DateTrigger(run_date=datetime(2022, 8, 23, 8, 8, 30), timezone="Asia/Shanghai"))# 指界说务在2022年8月23日8时5分30秒实行scheduler.add_job(my_task, trigger=DateTrigger(run_date="2022-08-23 08:08:30", timezone="Asia/Shanghai"))IntervalTrigger:间隔触发器。
间隔触发器是在日期触发器根本上扩展了对韶光部分,比如时、分、秒、天、周这几个部分的设定。
是我们用以对重复性任务进行设定或调度的一个常用调度器。
设定了韶光部分之后,从起始日期开始(默认是当前)会按照设定的韶光去实行任务。

interval触发器支持设置如下参数:

参数

含义

类型

weeks

整形

days

一个月中的第几天

整形

hours

小时

整形

minutes

分钟

整形

seconds

整形

start_date

间隔触发的起始韶光

韶光格式字符串

end_date

间隔触发的结束韶光

韶光格式字符串

jitter

触发的韶光偏差

整形

# 指界说务每10分钟实行一次scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=10, timezone="Asia/Shanghai"))# 指界说务在2022年8月22日9时到10时区间内,每10分钟实行一次scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=10, start_date="2022-08-22 09:00:00", end_date="2022-08-22 10:00:00", timezone="Asia/Shanghai"))CronTrigger:cron 表达式触发器。
cron 表达式触发器就等价于我们 Linux 上的 crontab,它紧张用于更繁芜的日期韶光进行设定。

cron触发器支持设置如下参数:

参数

含义

year

4位数字的年份

month

1-12月份

day

1-31日

week

1-53周

day_of_week

一个星期中的第几天( 0-6或者 mon、 tue、 wed、 thu、 fri、 sat、 sun)

hour

0-23小时

minute

0-59分钟

second

0-59秒

start_date

datetime类型或者字符串类型,起始韶光

end_date

datetime类型或者字符串类型,结束韶光

timezone

时区

jitter

任务触发的偏差韶光

也可以用表达式类型,可以用以下办法:

表达式

字段

描述

任何

在每个值都触发

/a

任何

每隔a触发一次

a-b

任何

在a-b区间内任何一个韶光触发

a-b/c

任何

在a-b区间内每隔c触发一次

xth y

day

在x个星期y触发

last x

day

在末了一个星期x触发

last

day

在一个月中的末了一天触发

x,y,z

任何

将上面的表达式进行组合

# 指界说务在1-3月和6-9月,每个月第三个星期5那天的0-4点每2个小时实行一次 scheduler.add_job(my_task, trigger=CronTrigger(month="1-3,6-9", day="3th 5", hour="0-4/2", timezone="Asia/Shanghai")) # 利用crontab表达式,指界说务在每天1-15日每天0点0分实行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("0 0 1-15 ", timezone="Asia/Shanghai"))调度器(schedulers)

APScheduler 供应了以下几种调度器:

BlockingScheduler:壅塞调度器,当程序中没有任何存在主进程之中运行东西时,就则利用该调度器。

from datetime import datetimeimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef tick(): print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): passBackgroundScheduler:后台调度器,利用单独的线程实行,在不该用后面任何的调度器且希望在运用程序内部运行时的后台启动时才进行利用,如当前你已经开启了一个 Django 或 Flask 做事。

from datetime import datetimeimport timeimport osfrom apscheduler.schedulers.background import BackgroundSchedulerdef tick(): print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval', seconds=3) scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: # 仿照主线程持续运行。
while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
AsyncIOScheduler:AsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,利用该调度器。
GeventScheduler:Gevent 调度器,如果代码是通过 gevent 模块进行协程操作,利用该调度器TornadoScheduler:Tornado 调度器,在 Tornado 框架中利用TwistedScheduler:Twisted 调度器,在基于 Twisted 的框架或运用程序中利用QtScheduler:Qt 调度器,在构建 Qt 运用中进行利用。

常日情形下如果不是和 Web 项目或运用集成共存,那么每每都首选 BlockingScheduler 调度器来进行操作,它会在当提高程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或运用共存,那么须要选择 BackgroundScheduler 调度器,由于它不会滋扰当前运用的线程或进程状况。

实行器(executors)

APScheduler 供应了以下几种实行器:

ThreadPoolExecutor:默认的线程池实行器。
大部分情形下是可以知足我们需求ProcessPoolExecutor:进程池实行器。
涉及到一些 CPU密集打算的操作,利用此实行器AsyncIOExecutor:asyncio程序实行器,如果代码是通过 asyncio 模块进行异步操作,利用该实行器。
TornadoExecutor:Tornado程序实行器,在 Tornado 框架中利用TwistedExecutor:Twisted程序实行器,在基于 Twisted 的框架或运用程序中利用GeventExecutor:Gevent程序实行器,在Gevent框架中利用任务存储器(job stores)

APScheduler支持的数据库紧张有:

sqlalchemy :关系型的数据库。
这里就紧张是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。

# main.py# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入任务存储器,此处利用SQLAlchemyJobStorefrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore# 导入日志记录器from loguru import logger# 定时任务实行函数def my_task(): logger.info("开始实行任务")if __name__ == '__main__': # 实例化调度器工具 scheduler = BlockingScheduler() # 指定利用MySQL存储任务 url = 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url)) # 指界说务每10分钟实行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始实行定时任务 scheduler.start()

查看数据库表内容

mongodb :非构造化Mongodb数据库。
该类型数据库常常用于对非构造化或版构造化数据的存储或操作,如 JSON。

# main.py# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入任务存储器,此处利用MongoDBJobStorefrom apscheduler.jobstores.mongodb import MongoDBJobStore# 导入日志记录器from loguru import logger# 导入MongoDB客户端from pymongo import MongoClient# 定时任务实行函数def my_task(): logger.info("开始实行任务")if __name__ == '__main__': # 实例化调度器工具 scheduler = BlockingScheduler() # 指定利用MongoDB存储任务 url = 'mongodb://127.0.0.1:27017/' scheduler.add_jobstore(jobstore=MongoDBJobStore(client=MongoClient(host=url))) # 指界说务每10分钟实行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始实行定时任务 scheduler.start()

查看数据库表内容

redis :内存数据库。
常日用作数据缓存来利用,当然通过一些主从复制等办法也能实现当中数据的持久化或保存。

# main.py# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入任务存储器,此处利用RedisJobStorefrom apscheduler.jobstores.redis import RedisJobStore# 导入日志记录器from loguru import logger# 定时任务实行函数def my_task(): logger.info("开始实行任务")if __name__ == '__main__': # 实例化调度器工具 scheduler = BlockingScheduler() # 指定利用redis存储任务 REDIS = { 'host': '127.0.0.1', 'port': '6379', 'db': 0, 'password': '123.com' } scheduler.add_jobstore(jobstore=RedisJobStore(REDIS)) # 指界说务每10分钟实行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始实行定时任务 scheduler.start()

查询redis数据库内容

常见job属性coalesce=True

比如由于某个缘故原由导致某个任务积攒了很多次没有实行(比如有一个任务是1分钟跑一次,但是系统缘故原由断了5分钟),如果 coalesce=True,那么下次规复运行的时候,会只实行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部实行

max_instances=3

比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被实行,由于已经有3个实例在运行。

name="my_job"

job的名称

misfire_grace_time=3

这个参数的条件是利用可持续化的jobstore,如果利用默认内存的jobstore,这个参数是没故意义的。
一样平常须要利用misfire_grace_time的场景,便是但是那个持久化jobstore的做事挂掉了,任务须要被调度的时候没有被调度成功,后期持久化的jobstore启动了,这个任务重新被调度了(从jobstore中获取job),misfire_grace_time决定这个任务在错过实行韶光之后还需不须要实行

定时任务完全流程演示选择得当的scheduler

在利用之前我们须要先实例化一个 scheduler 工具,所有的 scheduler 工具都被放在了 apscheduler.schedulers 模块下,根据需求选择BlockingScheduler或者BackgroundScheduler调度器引入即可,此处以最根本的壅塞调度器BlockingScheduler为例:

from apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()配置schedulers

对付 scheduler 属性的配置,支持以下的办法灵巧配置:

在创建实例化工具时配置参数在创建实例化之后再配置参数

对付scheduler参数的配置,支持以下的办法灵巧配置:

利用配置字典参数配置将关键字参数通报配置

假设现在有这样一个需求:

组件

模块

需求

调度器(schedulers)

壅塞调度器(BlockingScheduler)

为新任务关闭合并模式同一个任务同一韶光最多只能有3个实例在运行

触发器(triggers)

cron表达式触发器(CronTrigger)

利用cron表达式,每分钟实行一次

实行器(executors)

线程池实行器(ThreadPoolExecutor)

最大10个线程

任务存储器(job stores)

关系型的数据库(sqlalchemy )

将结果保存到MySQL数据库

接下来分别利用属性+参数的四种组合演示如何配置schedulers

在创建实例化工具时配置参数+将关键字参数通报配置

# main.py# 导入线程池实行器from apscheduler.executors.pool import ThreadPoolExecutor# 导入sqlalchemy,利用MySQL数据库存储from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore# 导入壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入cron表达式触发器from apscheduler.triggers.cron import CronTrigger# 导入日志模块from loguru import logger# 导入天生随机数模块import random# 创建定时任务实行函数def my_task(number): logger.info("开始实行任务,传入的随机数为%s" % number)# 作业存储器配置 利用MySQL数据库存储job_stores = { 'default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' }}# 实行器配置 利用线程池实行器,最大10个线程executors = { 'default': ThreadPoolExecutor(10),}# Job干系配置,更多选项拜会官方文档job_defaults = { 'coalesce': False, # 设置这个目的是,比如由于某个缘故原由导致某个任务积攒了很多次没有实行(比如有一个任务是1分钟跑一次,但是系统缘故原由断了5分钟), # 如果 coalesce=True,那么下次规复运行的时候,会只实行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部实行 'max_instances': 3 # 同一个任务同一韶光最多只能有3个实例在运行。
# 比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被实行,由于已经有3个实例在运行。
}# 实例化调度器scheduler = BlockingScheduler( jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai' # 指定时区)if __name__ == '__main__': number = random.randint(0, 9) scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(" "), args=[number]) try: scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
在创建实例化工具时配置参数+利用配置字典参数配置

# main.py# 导入线程池实行器from apscheduler.executors.pool import ThreadPoolExecutor# 导入sqlalchemy,利用MySQL数据库存储from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore# 导入壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入cron表达式触发器from apscheduler.triggers.cron import CronTrigger# 导入日志模块from loguru import logger# 导入天生随机数模块import random# 创建定时任务实行函数def my_task(number): logger.info("开始实行任务,传入的随机数为%s" % number)config = { # 作业存储器配置 利用MySQL数据库存储 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' }, # 实行器配置 利用线程池实行器,最大10个线程 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '10' }, # Job配置,为新任务关闭合并模式 'apscheduler.job_defaults.coalesce': 'false', # Job配置,同一个任务同一韶光最多只能有3个实例在运行 'apscheduler.job_defaults.max_instances': '3', # Job配置,指定时区 'apscheduler.timezone': 'Asia/Shanghai',}# 实例化调度器scheduler = BlockingScheduler(config)if __name__ == '__main__': # 天生随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,实行频率为每分钟实行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(" "), args=[number]) try: # 开始实行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")在创建实例化之后再配置参数+将关键字参数通报配置

# main.py# 导入线程池实行器from apscheduler.executors.pool import ThreadPoolExecutor# 导入sqlalchemy,利用MySQL数据库存储from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore# 导入壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入cron表达式触发器from apscheduler.triggers.cron import CronTrigger# 导入日志模块from loguru import logger# 导入天生随机数模块import random# 创建定时任务实行函数def my_task(number): logger.info("开始实行任务,传入的随机数为%s" % number)# 实例化调度器scheduler = BlockingScheduler()# 作业存储器配置 利用MySQL数据库存储url = 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8'# executors = { 'default': ThreadPoolExecutor(10),}# Job干系配置,更多选项拜会官方文档job_defaults = { 'coalesce': False, # 设置这个目的是,比如由于某个缘故原由导致某个任务积攒了很多次没有实行(比如有一个任务是1分钟跑一次,但是系统缘故原由断了5分钟), # 如果 coalesce=True,那么下次规复运行的时候,会只实行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部实行 'max_instances': 3 # 同一个任务同一韶光最多只能有3个实例在运行。
# 比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被实行,由于已经有3个实例在运行。
}# 调度器工具配置参数scheduler.configure( job_defaults=job_defaults, timezone='Asia/Shanghai')# 添加任务存储器参数scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url))# 添加实行器参数,利用线程池实行器,最大10个线程scheduler.add_executor(executor=ThreadPoolExecutor(max_workers=10))if __name__ == '__main__': # 天生随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,实行频率为每分钟实行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(" "), args=[number]) try: # 开始实行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
在创建实例化之后再配置参数+利用配置字典参数配置

# main.py# 导入线程池实行器from apscheduler.executors.pool import ThreadPoolExecutor# 导入sqlalchemy,利用MySQL数据库存储from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore# 导入壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入cron表达式触发器from apscheduler.triggers.cron import CronTrigger# 导入日志模块from loguru import logger# 导入天生随机数模块import random# 创建定时任务实行函数def my_task(number): logger.info("开始实行任务,传入的随机数为%s" % number)# 实例化调度器scheduler = BlockingScheduler()config = { # 作业存储器配置 利用MySQL数据库存储 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' }, # 实行器配置 利用线程池实行器,最大10个线程 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '10' }, # Job配置,为新任务关闭合并模式 'apscheduler.job_defaults.coalesce': 'false', # Job配置,同一个任务同一韶光最多只能有3个实例在运行 'apscheduler.job_defaults.max_instances': '3', # Job配置,指定时区 'apscheduler.timezone': 'Asia/Shanghai',}# 调度器工具配置参数scheduler.configure(config)if __name__ == '__main__': # 天生随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,实行频率为每分钟实行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(" "), args=[number]) try: # 开始实行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")添加调度任务

创建 scheduler 工具之后,我们须要调用add_job()方法或是装饰器scheduled_job方法来将我们须要实行的函数进行注册。

以上面的demo代码为例,利用add_job()方法,指定实行任务函数,触发器既可添加调度任务。

运行调度任务

调用 start() 方法之后调度器就会开始实行,并在掌握台上看到对应的结果

2022-08-22 07:53:00.019 | INFO | __main__:my_task:11 - 开始实行任务,天生随机数为7查看数据库存储记录

任务常用操作添加任务

add_job()因此传参的形式指定对应的函数名这种方法是最常用的,推举利用此方法。
此方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修正或删除任务。

scheduled_job() 因此装饰器的形式直接对我们要实行的函数进行润色,这种方法最方便,但缺陷便是运行时,不能修公理务。

通过add_job()添加任务在上面的配置schedulers时已经演示,此处不再赘述。
通过装饰器scheduled_job添加任务

# main.py# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.blocking import BlockingScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import logger# 实例化调度器工具scheduler = BlockingScheduler()# 定时任务实行函数@scheduler.scheduled_job(trigger="interval", args=(1,), seconds=3)def my_task(number): logger.info("开始实行任务,传入的参数是%s" % number)if __name__ == '__main__': try: # 开始实行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")

主要提醒:

如果添加 job 时,scheduler 尚未运行,job 会被临时地进行排列,直到 scheduler 启动之后,它的首次运行韶光才会被确切地皮算出来。
如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使replace_existing=True,否则每次重动身序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。
内置任务储存器中,只有MemoryJobStore不会序列化任务;内置实行器中,只有ProcessPoolExecutor会序列化任务。
如果想要急速运行任务,可以在添加任务时省略trigger参数获取任务列表

可以利用get_jobs方法来得到机器上可处理的作业调度列表。
方法会返回一个Job实例的列表,如果你仅仅对特定的 job store 中的 job 感兴趣,可以将 job store 的别名作为第二个参数。

也可以利用print_jobs()来格式化输出作业列表以及它们的触发器和下一次的运行韶光。

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("实行任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始实行定时任务调度器 scheduler.start() scheduler.print_jobs() print(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")# 运行结果Jobstore default: my_task (trigger: interval[0:00:01], next run at: 2022-08-28 21:36:24 CST)[<Job (id=2e1b64f0422e4f2a9163ad2b7e634bd5 name=my_task)>]删除任务

当从 scheduler 中移除一个 job 时,它会从关联的 job store 中被移除,不再被实行。
如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。
有两种办法:

调用remove_job(),参数为:任务ID,任务储存器名称。
通过润色器添加的任务,利用此方法删除任务。

# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 实例化调度器工具scheduler = BackgroundScheduler()# 定时任务实行函数@scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task")def my_task(): logger.info("开始实行任务")if __name__ == '__main__': try: # 开始实行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("删除定时任务") scheduler.remove_job(job_id="my_task") except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")在通过add_job()创建的任务实例上调用remove()方法

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("开始实行任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始实行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("删除定时任务") my_job.remove() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果(任务开始实行后,过了3秒钟删除定时任务,期间定时任务每秒实行一次,共实行3次)

2022-08-28 21:25:26.458 | ERROR | __main__:<module>:24 - 开始定时任务2022-08-28 21:25:27.458 | INFO | __main__:my_task:17 - 开始实行任务2022-08-28 21:25:28.461 | INFO | __main__:my_task:17 - 开始实行任务2022-08-28 21:25:29.461 | INFO | __main__:my_task:17 - 开始实行任务2022-08-28 21:25:29.463 | ERROR | __main__:<module>:27 - 删除定时任务

把稳点:如果利用BlockingScheduler调度器的话,在其start之后的任何操作都不会去实行。
因此想要修正删除任务,必须利用BackgroundScheduler。

停息/规复任务

通过Job实例或者 scheduler 本身你可以轻易地停息和规复 job 。
当一个 job 被停息,它的下一次运行韶光将会被清空,同时不再打算之后的运行韶光,直到这个 job 被规复。

对付利用add_job添加的任务,可以利用pause()方法停息任务,利用resume()方法规复任务

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("实行任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始实行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("停息定时任务") my_job.pause() time.sleep(3) logger.error("规复定时任务") my_job.resume() time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")通过润色器添加的任务,可以利用pause_job()方法停息任务,利用resume_job()方法规复任务

# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 实例化调度器工具scheduler = BackgroundScheduler()# 定时任务实行函数@scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task")def my_task(): logger.info("实行任务")if __name__ == '__main__': try: logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) logger.error("停息定时任务") scheduler.pause_job(job_id="my_task") time.sleep(3) logger.error("规复定时任务") scheduler.resume_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果(开始实行定时任务后,每秒实行一次,实行了3次,然后停息定时任务,过了3秒后规复定时任务,实行3秒后进程结束)

2022-08-28 21:27:59.955 | ERROR | __main__:<module>:24 - 开始定时任务2022-08-28 21:28:00.959 | INFO | __main__:my_task:17 - 实行任务2022-08-28 21:28:01.959 | INFO | __main__:my_task:17 - 实行任务2022-08-28 21:28:02.958 | INFO | __main__:my_task:17 - 实行任务2022-08-28 21:28:02.960 | ERROR | __main__:<module>:27 - 停息定时任务2022-08-28 21:28:05.965 | ERROR | __main__:<module>:30 - 规复定时任务2022-08-28 21:28:06.959 | INFO | __main__:my_task:17 - 实行任务2022-08-28 21:28:07.957 | INFO | __main__:my_task:17 - 实行任务2022-08-28 21:28:08.959 | INFO | __main__:my_task:17 - 实行任务修公理务属性

apscheduler支持修正job 的属性,例如max_instances,coalesce等属性信息。

对付利用add_job添加的任务,可以利用modify()方法修公理务属性

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("实行task任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修正定时任务属性") my_job.modify(max_instances=3, name='new task') time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")通过润色器添加的任务,可以利用modify_job()方法修公理务属性

# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 实例化调度器工具scheduler = BackgroundScheduler()# 定时任务实行函数@scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task")def my_task(): logger.info("实行任务")if __name__ == '__main__': try: logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修正定时任务属性") scheduler.modify_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果

2022-08-28 21:48:07.075 | ERROR | __main__:<module>:22 - 开始定时任务2022-08-28 21:48:08.078 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 21:48:09.078 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 21:48:10.076 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 21:48:10.080 | ERROR | __main__:<module>:26 - 修正定时任务属性2022-08-28 21:48:11.078 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 21:48:12.078 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 21:48:13.077 | INFO | __main__:my_task:13 - 实行task任务修公理务触发器

如果你想重新调度一个 job (这意味着要修正其 trigger),你可以利用apscheduler.job.Job.reschedule()或reschedule_job()方法。
这些方法都会为 job 构建新的 trigger ,然后根据新的 trigger 重新打算其下一次的运行韶光:

对付利用add_job添加的任务,可以利用reschedule()方法修公理务触发器

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("实行task任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修正定时任务触发器") my_job.reschedule(trigger=IntervalTrigger(seconds=2)) time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")通过润色器添加的任务,可以利用reschedule_job()方法修公理务触发器

# 导入调度器,此处利用BlockingScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 实例化调度器工具scheduler = BackgroundScheduler()# 定时任务实行函数@scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task")def my_task(): logger.info("实行任务")if __name__ == '__main__': try: logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修正定时任务属性") scheduler.reschedule_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果(开始定时任务后,3秒钟韶光,每1秒实行一次任务,共实行3次。
然后修正触发器为每2秒实行一次,在3秒韶光内实行了1次任务)

2022-08-28 22:14:03.958 | ERROR | __main__:<module>:22 - 开始定时任务2022-08-28 22:14:04.960 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 22:14:05.960 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 22:14:06.959 | INFO | __main__:my_task:13 - 实行task任务2022-08-28 22:14:06.959 | ERROR | __main__:<module>:26 - 修正定时任务触发器2022-08-28 22:14:08.963 | INFO | __main__:my_task:13 - 实行task任务调度器常用操作终止调度器

默认情形,会终止任务存储器以及实行器,然后等待所有目前实行的job完成后(自动终止)

如果利用wait=False,不会等待任何运行中的任务完成,直接终止

利用默认情形,等待任务完成后终止

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("开始实行task任务") time.sleep(2) logger.info("task任务实行完成")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(6) logger.error("终止调度器") scheduler.shutdown() logger.error(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果

2022-08-28 22:41:46.859 | ERROR | __main__:<module>:24 - 开始定时任务2022-08-28 22:41:49.864 | INFO | __main__:my_task:13 - 开始实行task任务2022-08-28 22:41:51.865 | INFO | __main__:my_task:15 - task任务实行完成2022-08-28 22:41:52.860 | INFO | __main__:my_task:13 - 开始实行task任务2022-08-28 22:41:52.861 | ERROR | __main__:<module>:28 - 终止调度器2022-08-28 22:41:54.863 | INFO | __main__:my_task:15 - task任务实行完成2022-08-28 22:41:54.864 | ERROR | __main__:<module>:30 - []利用wait=False参数直接终止

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("开始实行task任务") time.sleep(2) logger.info("task任务实行完成")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(6) logger.error("终止调度器") scheduler.shutdown(wait=False) logger.error(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果

2022-08-28 22:43:40.258 | ERROR | __main__:<module>:24 - 开始定时任务2022-08-28 22:43:43.259 | INFO | __main__:my_task:13 - 开始实行task任务2022-08-28 22:43:45.261 | INFO | __main__:my_task:15 - task任务实行完成2022-08-28 22:43:46.260 | INFO | __main__:my_task:13 - 开始实行task任务2022-08-28 22:43:46.260 | ERROR | __main__:<module>:28 - 终止调度器2022-08-28 22:43:46.260 | ERROR | __main__:<module>:30 - []2022-08-28 22:43:48.263 | INFO | __main__:my_task:15 - task任务实行完成调度器事宜监听

可以为 scheduler 绑定事宜监听器(event listen)。
Scheduler 事宜在某些情形下会被触发,而且它可能携带有关特定事宜的细节信息。
为add_listener()函数供应适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事宜。
可调用的listener可以通过event object作为参数而被调用。

事宜

对应列举值

描述

归属类

EVENT_SCHEDULER_STARTED

1

调度程序启动

SchedulerEvent

EVENT_SCHEDULER_SHUTDOWN

2

调度程序关闭

SchedulerEvent

EVENT_SCHEDULER_PAUSED

4

调度程序中任务处理停息

SchedulerEvent

EVENT_SCHEDULER_RESUMED

8

调度程序中任务处理规复

SchedulerEvent

EVENT_EXECUTOR_ADDED

16

将实行器添加到调度程序中

SchedulerEvent

EVENT_EXECUTOR_REMOVED

32

实行器从调度程序中删除

SchedulerEvent

EVENT_JOBSTORE_ADDED

64

将任务存储添加到调度程序中

SchedulerEvent

EVENT_JOBSTORE_REMOVED

128

任务存储从调度程序中删除

SchedulerEvent

EVENT_ALL_JOBS_REMOVED

256

所有任务从所有任务存储中删除或从一个特定的任务存储中删除

SchedulerEvent

EVENT_JOB_ADDED

512

任务添加到任务存储中

JobEvent

EVENT_JOB_REMOVED

1024

从任务存储中删除了任务

JobEvent

EVENT_JOB_MODIFIED

2048

从调度程序外部修正了任务

JobEvent

EVENT_JOB_EXECUTED

4096

任务被成功实行

JobExecutionEvent

EVENT_JOB_ERROR

8192

任务在实行期间引发非常

JobExecutionEvent

EVENT_JOB_MISSED

16384

错过了任务实行

JobExecutionEvent

EVENT_JOB_SUBMITTED

32768

任务已经提交到实行器中实行

JobSubmissionEvent

EVENT_JOB_MAX_INSTANCES

65536

任务由于达到最大并发实行时,触发的事宜

JobSubmissionEvent

EVENT_ALL

包含以上的所有事宜

示例代码

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入事宜类from apscheduler.events import EVENT_ALL# 导入日志记录器from loguru import loggerimport time# 定时任务实行函数def my_task(): logger.info("实行task任务")# 事宜监听函数def my_listener(event): match event.code: case 4096: logger.info("任务被成功实行") case 32768: logger.info("任务已经提交到实行器中实行") case _: logger.info(event.code)if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() scheduler.add_listener(my_listener, mask=EVENT_ALL) time.sleep(4) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行实行结果

2022-08-28 22:59:58.734 | ERROR | __main__:<module>:34 - 开始定时任务2022-08-28 23:00:00.739 | INFO | __main__:my_task:14 - 实行task任务2022-08-28 23:00:00.739 | INFO | __main__:my_listener:21 - 任务被成功实行2022-08-28 23:00:00.740 | INFO | __main__:my_listener:23 - 任务已经提交到实行器中实行2022-08-28 23:00:02.739 | INFO | __main__:my_listener:23 - 任务已经提交到实行器中实行2022-08-28 23:00:02.740 | INFO | __main__:my_task:14 - 实行task任务2022-08-28 23:00:02.740 | INFO | __main__:my_listener:21 - 任务被成功实行故障排查

如果 scheduler 没有如预期般正常运行,可以考试测验将apscheduler的 logger 的日志级别提升到DEBUG等级。

示例代码

# main.py# 导入调度器,此处利用BackgroundScheduler壅塞调度器from apscheduler.schedulers.background import BackgroundScheduler# 导入触发器,此处利用IntervalTrigger特定时间间隔触发from apscheduler.triggers.interval import IntervalTrigger# 导入事宜类from apscheduler.events import EVENT_ALL# 导入日志记录器from loguru import loggerimport timeimport logginglogging.basicConfig()logging.getLogger('apscheduler').setLevel(logging.DEBUG)# 定时任务实行函数def my_task(): logger.info("实行task任务")if __name__ == '__main__': try: # 实例化调度器工具 scheduler = BackgroundScheduler() # 添加定时任务,指界说务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2)) logger.error("开始定时任务") # 开始实行定时任务调度器 scheduler.start() time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")实行结果

INFO:apscheduler.scheduler:Adding job tentatively -- it will be properly scheduled when the scheduler starts2022-08-28 23:11:43.561 | ERROR | __main__:<module>:28 - 开始定时任务INFO:apscheduler.scheduler:Added job "my_task" to job store "default"INFO:apscheduler.scheduler:Scheduler startedDEBUG:apscheduler.scheduler:Looking for jobs to runDEBUG:apscheduler.scheduler:Next wakeup is due at 2022-08-28 23:11:45.561102+08:00 (in 1.998446 seconds)DEBUG:apscheduler.scheduler:Looking for jobs to runINFO:apscheduler.executors.default:Running job "my_task (trigger: interval[0:00:02], next run at: 2022-08-28 23:11:45 CST)" (scheduled at 2022-08-28 23:11:45.561102+08:00)2022-08-28 23:11:45.564 | INFO | __main__:my_task:19 - 实行task任务INFO:apscheduler.executors.default:Job "my_task (trigger: interval[0:00:02], next run at: 2022-08-28 23:11:45 CST)" executed successfullyDEBUG:apscheduler.scheduler:Next wakeup is due at 2022-08-28 23:11:47.561102+08:00 (in 1.996868 seconds)APScheduler方法总结掌握调度程序

方法

描述

非常

configure

对给定的调度程序重新设置配置

SchedulerAlreadyRunningError

–如果调度程序已经在运行

start

启动已配置的实行程序和任务存储,并开始处理操持的任务。

SchedulerAlreadyRunningError

–如果调度程序已经在运行 RuntimeError

–如果在禁用线程的uWSGI下运行

shutdown

关闭调度程序及其实行程序和任务存储。

SchedulerNotRunningError

–如果尚未启动调度程序

pause

停息调度程序中的任务处理。

resume

在调度程序中规复任务处理。

wakeup

关照调度程序可能有任务须要实行。

掌握实行器

方法

描述

非常

add_executor

将实行程序添加到此调度程序。

ValueError

–如果已有给定别名的实行程序

remove_executor

从此调度程序中删除具有给定别名的实行程序。

掌握任务存储

方法

描述

非常

add_jobstore

将任务存储添加到此调度程序。

ValueError

–如果已经存在给定别名的任务存储

remove_jobstore

从此调度程序中通过给定别名删除任务存储。

掌握事宜侦听器

方法

描述

add_listener

添加调度程序事宜的侦听器。

remove_listener

删除以前添加的事宜侦听器。

掌握任务

Job 的配置参数和前面 Job 里先容的差不多, BaseScheduler 只是将 Job 的接口重新封装了但是也实现了很多任务是如何添加到任务存储,任务是如何分配给实行器等等实现,以是在 Job 那一部分提到,官方是不肯望由用户自己实例化 Job

方法

描述

非常

add_job

将给定的任务添加到任务列表中,如果调度程序已经在运行,则将其唤醒。

scheduled_job

利用 scheduled_job

装饰器来动态装饰 Job

的实际函数

modify_job

修正单个任务的属性。

reschedule_job

为任务布局一个新触发器,并更新其下一个运行韶光。

pause_job

使给界说务在明确规复之前不实行。

resume_job

规复给界说务的操持,如果操持已完成,则将其删除。

get_jobs

从特定的任务存储或所有任务中返回挂起的任务

get_job

返回与给定 job_id

匹配的 Job

remove_job

删除任务,使其无法再运行。

JobLookupError

– 如果没有找到任务

remove_all_jobs

从指定的任务存储中删除所有任务,或者如果没有给出任何任务,则删除所有任务存储。

print_jobs

打印出当前操持在所有任务存储库或仅特界说务存储库上的所有任务的文本列表。

django-apscheduler

如果你正在开拓的Web项目须要实现定时任务的功能,得益于 APScheduler支持多样的调度器,我们可以很随意马虎的将APScheduler和我们的DRF项目结合到一起。

功能简介

在这里强烈推举利用django-apscheduler库,比拟APScheduler,他添加了以下功能:

任务查看功能:将任务数据可以持久化保存的Django数据库,可以直接通过admin页面查看当前的任务列表和任务均匀实行韶光、下次任务实行韶光等信息。

实行历史记录功能:查看当前所有定时任务的实行历史和状态码等信息。

任务管理功能:可以选择删除定时任务和通过管理页面手动触发任务

安装模块

pip install django-apscheduler

配置settings配置

# 注册appINSTALLED_APPS = ( # ... "django_apscheduler",)# apscheduler全局配置APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # Django admin中显示带秒的韶光APSCHEDULER_RUN_NOW_TIMEOUT = 25 # admin手动触发的作业最大运行韶光迁移数据库

python manage.py migrate

查看数据库表构造

django_apscheduler_djangojob:用于存放任务列表

django_apscheduler_djangojobexecution:用于存放任务实行历史

利用

由于在生产环境常日会利用uwsgi启动多个进程运行做事,会导致每个事情进程都有自己独立的定时任务,终极会使得定时任务多次重复实行,因此,在Django中利用apscheduler时,推举利用自定义命令,在一个单独的专用进程中实行单个定时任务。

添加自定义命令到项目中

创建一个名为public的APP并注册,然后在public目录里面创建commands的python文件夹,末了在commands文件夹下创建crontab.py文件,文件目录构造如下所示:

我们利用BlockingScheduler后台调度器,并利用Django ORM作为任务存储器,并添加了一个my_job的定时任务和一个清理过期记录的定时任务,自定义命令crontab内容如下:

from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.triggers.interval import IntervalTriggerfrom django.core.management.base import BaseCommandfrom django_apscheduler import utilfrom apscheduler.triggers.cron import CronTriggerfrom django_apscheduler.jobstores import DjangoJobStorefrom django_apscheduler.models import DjangoJobExecutionfrom django.conf import settingsfrom loguru import loggerdef my_job(): logger.info('定时任务开始实行') logger.info('定时任务实行完毕')@util.close_old_connectionsdef delete_old_job_executions(max_age=604_800): DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand): help = "Runs APScheduler." def handle(self, args, options): scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) scheduler.add_jobstore(DjangoJobStore(), "default") scheduler.add_job( my_job, trigger=IntervalTrigger(seconds=1, timezone=settings.TIME_ZONE), # Every 10 seconds id="my_job", # The `id` assigned to each job MUST be unique max_instances=5, replace_existing=True, misfire_grace_time=60 ) logger.info("添加my_job任务成功") scheduler.add_job( delete_old_job_executions, trigger=CronTrigger( day_of_week="mon", hour="00", minute="00" ), # Midnight on Monday, before start of the next work week. id="delete_old_job_executions", max_instances=1, replace_existing=True ) logger.info("添加delete_old_job_executions任务成功") try: logger.info("scheduler开始实行...") scheduler.start() except KeyboardInterrupt: logger.info("scheduler停滞实行...") scheduler.shutdown() logger.info("Scheduler成功停滞!")

运行自定义命令并查当作果

(venv) ➜ drf_apscheduler git:(master) ✗ python manage.py crontab 2022-08-29 08:56:47.506 | INFO | public.management.commands.crontab:handle:37 - 添加my_job任务成功2022-08-29 08:56:47.508 | INFO | public.management.commands.crontab:handle:48 - 添加delete_old_job_executions任务成功2022-08-29 08:56:47.508 | INFO | public.management.commands.crontab:handle:51 - scheduler开始实行...2022-08-29 08:56:48.512 | INFO | public.management.commands.crontab:my_job:13 - 定时任务开始实行2022-08-29 08:56:48.513 | INFO | public.management.commands.crontab:my_job:14 - 定时任务实行完毕

查看admin管理页任务详情

django-apscheduler功能扩展

虽然利用django-apscheduler的admin实现了任务历史记录查看、手动实行、删除等操作,但是在实际前后端分离开拓过程中,须要供应干系的API接口供前端调用,因此还须要在django-apscheduler的根本上做二次开拓,扩展干系功能。

源码剖析

模型剖析(venv/lib/python3.10/site-packages/django_apscheduler/models.py):

可以看到一共有两个模型,一个是DjangoJob,用于存放定时任务id和下次运行韶光。
另一个是DjangoJobExecutionManager,用户存放定时任务实行历史记录。

admin剖析(venv/lib/python3.10/site-packages/django_apscheduler/admin.py):

紧张关注run_selected_jobsh函数,他实现了手动实行选定的任务的功能。

关键函数剖析(venv/lib/python3.10/site-packages/django_apscheduler/jobstores.py):

紧张关注DjangoJobStore这个类。
DjangoJobStore实现了任务存储器利用Django数据库的功能,同时还封装了一些任务的修正、删除、查询等方法

列出所有作业get_all_jobs()查找作业lookup_job(job_id)删除作业remove_job(job_id)删除所有作业remove_all_jobs()更新作业update_job(job)新增作业add_job(job_state)项目开拓

对django-apscheduler源码剖析后可创造,瞄准时任务系统的需求都可以利用django-apscheduler来实现,梳理一下各个功能模块的开拓思路:

获取定时任务实行历史记录:定义一个模型序列化器和只读模型视图集,关联DjangoJobExecutionManager模型即可。
获取定时任务列表:定义一个模型序列化器和模型视图集,关联DjangoJob模型即可。
删除任务:重写模型视图集的perform_destroy方法,利用DjangoJobStore类的remove_job方法即可。
停息任务:先利用DjangoJobStore类的lookup_job方法找到对应的任务实例,然后将next_run_time属性设置为None,末了利用update_job方法更新实例即可。
规复任务:利用DjangoJobStore类的lookup_job方法找到对应的任务实例,并利用__getstate__将任务属性转为字典,然后利用job_add方法,重新将任务信息重新添加到Django数据库,添加时将replace_existing设置为True。
修公理务触发器:利用DjangoJobStore类的lookup_job方法找到对应的任务实例,然后将trigger属性设置为新的触发器,末了利用update_job方法更新实例即可。
手动实行任务:与规复任务类似,差异在于任务存储器利用内存存储即可。

路由配置(public/urls.py)

from rest_framework import routersfrom public import viewsfrom django.urls import pathapp_name = "public"urlpatterns = [ # 定时作业停息/规复 path('job_pause/<str:job_id>/', views.JobPauseAPIView.as_view(), name='job_pause'), # 更改定时作业触发器 path('job_triggers/<str:job_id>/', views.JobTriggersAPIView.as_view(), name='job_triggers'), # 立即实行一次定时作业 path('job_run/<str:job_id>/', views.JobRunAPIView.as_view(), name='job_triggers')]router.register('user', views.UserDemoModelViewSet, 'user')# 获取定时任务实行历史记录router.register('job_history', views.JobHistoryReadOnlyModelViewSet, 'job_history')# 获取定时任务列表router.register('job', views.JobModelViewSet, 'job')urlpatterns += router.urls

视图配置(public/views.py)

from django.shortcuts import renderfrom django_apscheduler.models import DjangoJobExecution, DjangoJobfrom rest_framework import viewsets, statusfrom rest_framework.response import Responsefrom rest_framework.views import APIViewfrom public.serializers import DjangoJobExecutionSerializer, \ DjangoJobSerializerfrom public.utils import MyPageNumberfrom apscheduler.triggers.cron import CronTriggerfrom apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.job import Job as AppSchedulerJobfrom django_apscheduler.jobstores import DjangoJobStore, DjangoMemoryJobStoredjango_job_store = DjangoJobStore()class JobHistoryReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet): """ 定时作业实行历史 """ queryset = DjangoJobExecution.objects.all() serializer_class = DjangoJobExecutionSerializer pagination_class = MyPageNumberclass JobModelViewSet(viewsets.ModelViewSet): """ 定时作业列表 """ queryset = DjangoJob.objects.all() serializer_class = DjangoJobSerializer # 重写删除方法 def perform_destroy(self, instance): django_job_store.remove_job(instance.id)class JobPauseAPIView(APIView): """ 定时作业停息/规复 """ @staticmethod def post(request, job_id): action = request.data.get('action') job: AppSchedulerJob = django_job_store.lookup_job(job_id) if action == 'pause': job.next_run_time = None django_job_store.update_job(job) result = {'id': job_id, 'message': '任务停息成功!'} else: job_state = job.__getstate__() del job_state['next_run_time'] scheduler = BackgroundScheduler() scheduler.add_jobstore(django_job_store) scheduler.add_job(replace_existing=True, job_state) scheduler.start() result = {'id': job_id, 'message': '任务规复成功!'} return Response(result, status=status.HTTP_200_OK)class JobTriggersAPIView(APIView): """ 更改定时作业触发器 """ @staticmethod def post(request, job_id): crontab_exp = request.data.get('crontab_exp') job: AppSchedulerJob = django_job_store.lookup_job(job_id) job.trigger = CronTrigger.from_crontab(crontab_exp) django_job_store.update_job(job) result = {'id': job_id, 'message': '修正定时任务触发器成功!'} return Response(result, status=status.HTTP_200_OK)class JobRunAPIView(APIView): """ 立即手动实行一次定时作业 """ @staticmethod def post(request, job_id): scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoMemoryJobStore()) scheduler.start() job: AppSchedulerJob = django_job_store.lookup_job(job_id) job_state = job.__getstate__() del job_state['next_run_time'] del job_state['version'] del job_state['executor'] job_state['trigger'] = None scheduler.add_job(replace_existing=True, job_state) result = {'id': job_id, 'message': '定时任务手动实行成功!'} return Response(result, status=status.HTTP_200_OK)

序列化器配置(public/serializers.py)

import timefrom datetime import datetimefrom django_apscheduler.models import DjangoJobExecution, DjangoJobfrom rest_framework import serializersclass DjangoJobExecutionSerializer(serializers.ModelSerializer): """ 定时作业实行历史列化器 """ class Meta: model = DjangoJobExecution fields = "__all__"class DjangoJobSerializer(serializers.ModelSerializer): """ 定时作业列表列化器 """ class Meta: model = DjangoJob exclude = ['job_state']项目地址

github:https://github.com/cuiliang0302/drf_template

gitee:https://gitee.com/cuiliang0302/drf_template

API接口文档:https://www.apifox.cn/apidoc/shared-34bb4a27-bf7b-432d-9d51-0a767a259e6e 访问密码 : 4UoQc75S

从0开始写API接口开拓思路

如果想从头开始写API接口实现apscheduler定时任务的CRUD,也是很随意马虎的,在此供应一个开拓思路给大家。

和Django-apscheduler一样,我们首先创建一个模型,用于存放定时任务id,状态,实行韶光,关联的函数等信息。

然后定义一个scheduler类,在初始化时选择添加默认的调度器、任务存储器、实行器等参数。

末了通过引入我们创建好的 scheduler 工具之后就可以直接用来做增编削查的操作:

增:利用 add_job() 方法,其紧张的参数是要运行的函数(或方法)、触发器以及触发器参数等删:利用 delete_job() 方法,我们须要传入一个对应任务的 id 参数,用以能够查找到对应的任务改:利用 reschedule_job() 方法,这里也须要一个对应任务的 id 参数,以及须要重新修正的触发器及其参数查:利用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 工具的列表,而后者是通过 id 参数来查找对应的任务工具;这里我通过底层源码利用 getstate() 来获取到任务的干系信息,这些信息我们通过事先设定好的 Job 工具来对其进行序列化,末了将信息从接口中返回。
常见问题为什么scheduler不实行job ?

导致这种情形的缘故原由很多,最常见的两种情形是:

scheduler 在 uWSGI 的事情进程中运行,但是(uWSGI)并没有启用多线程运行了BackgroundScheduler但是已经实行到了脚本的末端进程已经退出运行。

示例代码:

from apscheduler.schedulers.background import BackgroundScheduler def myjob(): print('hello') scheduler = BackgroundScheduler() scheduler.start() scheduler.add_job(myjob, 'cron', hour=0)

可见,以上脚本在运行完add_job()之后就直接退出了,因此 scheduler 根本没有机会去运行其调度好的 job 。

如何在 uWSGI 中利用 APScheduler

方案一:uWSGI 利用了一些技巧来禁用掉 GIL 锁,但多线程的利用对付 APScheduler 的操作来说至关主要。
为了修复这个问题,你须要利用--enalbe-threads选项来重新启用 GIL 。

方案二:在一个单独的专用进程中实行单个定时任务。

如何在一个或多个事情进程中共享独立的 job store

目前版本是不支持的,但是未来apscheduler4操持会支持这个功能,详情参考文档:https://github.com/agronholm/apscheduler/issues/465

在两个或更多的进程中共享一个持久化的 job store 会导致 scheduler 的行为不正常:如重复实行或作业丢失,等等。
这是由于 APScheduler 目前没有任何进程间同步和旗子暗记量机制,因此当一个 job 被添加、修正或从 scheduler 中移除时 scheduler 无法得到关照。

变通方案:在专用的进程中来运行 scheduler,然后通过一些远程访问的路子 —— 如 RPyC、gRPC 或一个 HTTP 做事器 —— 来将其连接起来。
在源码仓库中包含了一个利用 RPyC 的示例。

如何在 web 运用中利用 APScheduler

如果你想在 Django 中运行,可以考虑django_apscheduler,推举利用自定义命令,在一个单独的专用进程中实行单个定时任务。

如果你想在 Flask 中利用 APScheduler ,这里也有一个非官方的插件Flask-APScheduler。

参考文档

date触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html#module-apscheduler.triggers.date

interval触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html#module-apscheduler.triggers.interval

crontab触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html#module-apscheduler.triggers.cron

job配置项:https://apscheduler.readthedocs.io/en/stable/modules/job.html#module-apscheduler.job

apscheduler方法示例:https://apscheduler.readthedocs.io/en/stable/py-modindex.html

django-apscheduler地址:https://github.com/jcass77/django-apscheduler

查看更多

崔亮的博客-专注devops自动化运维,传播精良it运维技能文章。
更多原创运维开拓干系文章,欢迎访问www.cuiliangblog.cn

标签:

相关文章

芯片行业的商业模式_芯片_家当

之前讲芯片的发展历史以及海内半导体家当链的情形。本日详细讲一下,芯片家当的两种商业模式,这样会对家当链有更加详细深入的理解;顺便也...

互联网 2025-01-16 阅读0 评论0

光源新品---LED灯丝灯_灯丝_基板

LED灯丝是将无背镀的蓝光LED灯珠固晶在蓝宝石、透明陶瓷、荧光晶体、玻璃、金属等做成的基条上,再通过金线串联,以及涂覆荧光粉而制...

互联网 2025-01-16 阅读0 评论0