定制软件开发定时任务四种实现方法总结
背景: 在使用Django框架开发web项目时,定制软件开发很多时候需要设置定时定制软件开发任务或让用户手动在页定制软件开发面上设置定时任务。
一、使用django-定制软件开发插件来实现定时任务
1.0、 基本介绍
要使用django-crontab插件,定制软件开发只需要下载一个django-crontab定制软件开发包就可以使用cron表达式在Django定制软件开发框架中设置定时任务。定制软件开发这种方法不支持windows系统,定制软件开发功能也相对简单。
1.1、安装插件
pip install django-crontab
- 1
1.2、注册app
(1)在settings.py中INSTALLED_APPS引入app,定制软件开发完成子应用注册。
INSTALLED_APPS = [ ... 'django_crontab']
- 1
- 2
- 3
- 4
(2)在settings.py定制软件开发中配置定时任务,在settings.py定制软件开发最后增加一下代码:
# 定时任务'''* * * * * :分别表示 分(0-59)、时(0-23)、天(1 - 31)、月(1 - 12) 、周(定制软件开发星期中星期几 (0 - 7) (0 7 均为周天))crontab范例:定制软件开发每五分钟执行 */5 * * * *定制软件开发每小时执行 0 * * * *每天执行 0 0 * * *定制软件开发每周一执行 0 0 * * 1每月执行 0 0 1 * *每天23点执行 0 23 * * *'''CRONJOBS = [ ('*/1 * * * *', 'base.crontabs.confdict_handle', ' >> /tmp/logs/confdict_handle.log'), # 注意:/tmp/base_api 定制软件开发目录要手动创建]或者:CRONJOBS = [ ('*/5 * * * *', 'appname.cron.test','>>/home/test.log')]'''‘/5 * * *’ 遵循的是crontab 语法。‘appname.cron.test’,这个appname定制软件开发就是你开发时加入到settings中的那个。因为你的cron.py定制软件开发文件就在这个下面,定制软件开发否则找不到路径。cron 定制软件开发就是你自己起的任务文件的名字。test定制软件开发就是执行的函数中的内容。‘>>/home/test.log’,定制软件开发通常会输出信息到一个文件中,定制软件开发就使用这个方法,注意的是‘>>’定制软件开发表示追加写入,’>’定制软件开发表示覆盖写入。'''
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
一、附件部分(Linux 定制软件开发中的定时任务crontab定制软件开发的语法如下)
* * * * * command分钟(0-59) 小时(0-23) 定制软件开发每个月的哪一天(1-31) 月份(1-12) 周几(0-6) shell定制软件开发脚本或者命令
- 1
- 2
核心语法:
CRONJOBS = [('*/5 * * * *', '任务路径.定制软件开发任务函数名','>>/home/book.log')]
参数说明:
‘*/5 * * *’ 定制软件开发表示五分钟一次,而django-crontab是调用Linux的crontab.
- 定制软件开发第一个参数:代表
定制软件开发执行时间或者周期
【定制软件开发时间的顺序为分->时->天->月->周
】 - 定制软件开发第二个参数:代表
定制软件开发需要定时执行的函数(路径+函数名)
- 定制软件开发第三个参数:
输出log定制软件开发信息的路径+log文件
定制软件开发常见的参数:
"*"定制软件开发表示可选的所有取值范定制软件开发围内的数字
;"/"表示'每',定制软件开发比如若第一个参数为/5,定制软件开发就是五分钟一次
,例如:*/5就是每5个单位- 定制软件开发代表从某个数字到某个数字
, 定制软件开发分开几个离散的数字
\quad应用示例:
- 定制软件开发每两个小时
0 */2 * * *
- 晚上11点到早上8定制软件开发点之间每两个小时,早上8点
0 23-7,8 * * *
- 每个月的4定制软件开发号和每个礼拜的礼拜一定制软件开发到礼拜三的早上11点
0 11 4 * 1-3
- 1月1日早上4点
0 4 1 1 *
\quad定制软件开发有兴趣的小伙伴可以深入研究下 Linux 的crontab定时任务。参考链接:
定制软件开发在执行脚本中:0 6 * * * commands >> /tmp/test.log # 每天早上6点执行, 定制软件开发并将信息追加到test.log中0 */2 * * * commands # 每隔2定制软件开发小时执行一次
- 1
- 2
- 3
1.3、定制软件开发编写定时任务方法
在本例中是在apps/base/crontabs.py
(一般,执行add new file under appname, named cron.py)中增加的定时任务
from .models import ConfDict # base内的一个model,定时任务多数用来操作数据库,因此给一个示例import datetime# 定时任务 def confdict_handle(): try: objs = CondDict.objects.all() print(obj) loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('本地时间:'+str(loca_time)) except Exception as e: print('发生错误,错误信息为:', e)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
1.4、如何使用&运行
(1)开启定时器
python manage.py crontab add
- 1
(2)将任务添加并生效(查看开启的定时器)
- 显示当前的定时任务
python manage.py crontab show
- 1
- 删除所有定时任务
python manage.py crontab remove
- 1
(3)重启django服务,执行
corntab -e
- 1
(4)查看定时任务
crontab -l
- 1
此时,应该是可以看到系统中创建了该定时任务。
1.5、django-crontab插件优缺点:
- 优点:
简单、方便、易于管理
和django服务是分离的,不会影响到django对外提供的web服务。 - 缺点:
无法在Windows平台上运行(django_crontab必须在Linux的crontab开启的情况下才可以使用,不然会出现不执行的情况);
就算在Linux系统上,也可能出现运行了没有效果的消息,至今未知原因。
二、使用django-apscheduler插件实现定时任务
2.0、基本介绍
django-apscheduler支持三种调度任务:固定时间间隔,固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windows和linux,适合中小型项目。django-apscheduler中相关的概念和python的定时任务框架apscheduler中的概念是一样的
APScheduler的使用场景?
redis持久化存储时,使用APScheduler,使数据同步。
用户下单后使用,规定30min内必须支付,否则取消订单。
APScheduler 与 crontab 同为定时任务工具,有什么区别?
(1)crontab:
- crontab是Linux系统提供的一个命令,用来完成定时任务;
- 使用django-crontab 扩展 封装了Linux提供的crontab 命令;
- 可以独立于程序之外,不会占用程序资源,耦合性低;
- 但是它不灵活,
比如上面那个订单支付问题,crontab不知道要什么时候执行,所以它做不到
。
(2)APScheduler:
- 可以独立运行,也可以放在程序(如Django、Flask)中。
- 灵活,可以在程序开始前开启定时任务,也可以执行到某个任务时,立即可开启定时任务。
- 如果依赖程序的话,会占用程序资源;
2.1、安装插件
pip install django-apscheduler或者pip install apscheduler
- 1
- 2
- 3
- 4
- 5
2.2、使用插件
修改settings.py增加以下代码:
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'django_apscheduler', # 新加入的定时任务插件django-apscheduler 'UserManger.apps.UsermangerConfig',]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
2.3、迁移数据库
因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据
python manage.py migrate
- 1
去数据库中看一看,生成了两个表格,大部分都顾名思义。
- django_apscheduler_djangojob——用于存储任务的表格
- django_apscheduler_djangojobexecution——用于存储任务执行状态的表格
参数说明:
- status: 执行状态
- duration: 执行了多长时间
- exception: 是否出现了什么异常
\quadNote:
这两个表用来管理你所需要的定时任务,然后就开始在任一view.py下写你需要实现的任务:
2.4、完整示例 在views.py中增加你的定时任务代码
注意:如果在其他文件中添加代码是没有效果的
from apscheduler.schedulers.background import BackgroundScheduler # 使用它可以使你的定时任务在后台运行from django_apscheduler.jobstores import DjangoJobStore, register_events, register_jobimport time'''date:在您希望在某个特定时间仅运行一次作业时使用interval:当您要以固定的时间间隔运行作业时使用cron:以crontab的方式运行定时任务minutes:设置以分钟为单位的定时器seconds:设置以秒为单位的定时器'''try: scheduler = BackgroundScheduler() # 创建定时任务的调度器对象——实例化调度器 # 调度器使用DjangoJobStore() scheduler.add_jobstore(DjangoJobStore(), "default") # 'cron'方式循环,周一到周五,每天9:30:10执行,id为工作ID作为标记 # ('scheduler',"interval", seconds=1) #用interval方式循环,每一秒执行一次 @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time') #@register_job(scheduler, "interval", seconds=5) def my_job(param1, param2): # 定义定时任务 # 定时每5秒执行一次 #t_now = time.localtime() print(time.strftime('%Y-%m-%d %H:%M:%S')) # 监控任务 register_events(scheduler) # 向调度器中添加定时任务 scheduler.add_job(my_job, 'date', args=[100, 'python']) # 启动定时任务调度器工作——调度器开始 scheduler.start()except Exception as e: print('定时任务异常:%s' % str(e))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
2.6、如何使用&运行
apscheduler定时任务会跟随django项目一起运行因此直接启动django即可。
python manage.py runserver
- 1
2.7、django-apscheduler插件优缺点:
- 优点:
简单、定时方式丰富
轻量 - 缺点:
定时任务会跟随django项目一起运行,会影响django对外提供的web服务
使用uwsgi线上运行时,很难启动apscheduler定时任务
按照文档上的设置 --enable-threads 依然没有正常启动,具体原因未知,也查了很多方法,都没有解决。并且也不建议将定时任务和uwsgi放在一起运行,这样定时任务在运行过程中可能会影响uwsgi的服务。
二、附件部分(django-apscheduler功能详解)
创建任务:
\quad
有两种创建任务的方法:装饰器和add_job函数
。
1. 装饰器
在任意view.py中实现代码
(我习惯新开一个app专门实现定时任务
):
典型范例:
import timefrom apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_job, register_eventsprint('django-apscheduler')def job2(name): # 具体要执行的代码 print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))# 实例化调度器scheduler = BackgroundScheduler()# 调度器使用DjangoJobStore()scheduler.add_jobstore(DjangoJobStore(), "default")# 添加任务1# 每隔5s执行这个任务@register_job(scheduler,"interval", seconds=5,args=['王路'],id='job1')# 每天8点半执行这个任务#@register_job(scheduler, 'cron', id='test', hour=8, minute=30,args=['test'])def job1(name): # 具体要执行的代码 print('{} 任务运行成功!{}'.format(name,time.strftime("%Y-%m-%d %H:%M:%S")))scheduler.add_job(job2,"interval",seconds=10,args=['王飞'],id="job2")# 监控任务——注册定时任务register_events(scheduler)# 调度器开始运行scheduler.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
启动服务 python manage.py runserver
这个任务就会被存储到django_apscheduler_djangojob表
中,并按照设置定时的执行程序。
装饰器@register_job()参数说明:
- scheduler: 指定调度器
- trigger: 任务执行的方式,共有三种: ‘date’、‘interval’、‘cron’。
- ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
例子: 2019-07-07 22:49:00 执行任务
@register_job(scheduler, ‘date’, id=‘test’, run_date=‘2019-07-07 22:49:00’)
注:在亲测时,执行完任务会报错,原因时执行完任务后会去mysql中删除djangojob表中的任务。但是djangojobexecution表记录着执行结果,有外键关联着djangojob表,所以删除时显示有外键约束错误。但是任务会正常执行,执行之后也会正常删除。 - ‘interval’ + ‘hours’ + ‘minutes’ + … 的参数组合,能实现间隔性任务。
例子:每隔3个半小时执行任务
@register_job(scheduler, ‘interval’, id=‘test’, hours=3, minutes=30)
还有seconds,days参数可以选择
注:如果任务需要执行10秒,而间隔设置为1秒,它是不会给你开10个线程同时去执行10个任务的。它会错过其他任务直到当前任务完成。 - ‘cron’ + ‘hour’ + ‘minute’+…的参数组合,能实现cron类的任务。
例子:每天的8点半执行任务
@register_job(scheduler, ‘cron’, id=‘test’, hour=8, minute=30)
还有day,second,month等参数可以选择。
- ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
- id: 任务的名字,不传的话会自动生成。不过为了之后对任务进行暂停、开启、删除等操作,建议给一个名字。并且是唯一的,如果多个任务取一个名字,之前的任务就会被覆盖。
- args: list类型。执行代码所需要的参数。
- next_run_time:datetime类型。开始执行时间。如果你现在创建一个定时任务,想3天后凌晨三点半自动给你女朋友发微信,那就需要这个参数了。
还有些其他的参数感兴趣的同学可以查看源代码来了解。
2. add_job函数
装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面输入参数,并提交来手动创建定时任务,就需要使用add_job函数
。
下面这个小例子,前端传递json数据给后端,触发test_add_task函数
,来添加任务:
import jsonfrom django.http import JsonResponsefrom apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStore, register_events, register_jobscheduler = BackgroundScheduler()scheduler.add_jobstore(DjangoJobStore(), 'default')# 与前端的接口def test_add_task(request): if request.method == 'POST': content = json.loads(request.body.decode()) # 接收参数 try: start_time = content['start_time'] # 用户输入的任务开始时间, '10:00:00' start_time = start_time.split(':') hour = int(start_time)[0] minute = int(start_time)[1] second = int(start_time)[2] s = content['s'] # 接收执行任务的各种参数 # 创建任务 scheduler.add_job(test, 'cron', hour=hour, minute=minute, second=second, args=[s]) code = '200' message = 'success' except Exception as e: code = '400' message = e back = { 'code': code, 'message': message } return JsonResponse(json.dumps(data, ensure_ascii=False), safe=False) # 具体要执行的代码def test(s): pass register_events(scheduler)scheduler.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
这样就可以由前端用户来手动设置定时任务了。
add_job函数参数说明:
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
但是我习惯将具体的业务代码写到另外一个文件中,view.py中只写前后端交互的接口函数,这种情况下传递的参数为一个字符串,格式为: ‘package.module:some.object’,即 包名.模块:函数名
参考链接:
基础组件:
\quad
APScheduler 有四种组件
,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)
。
- schedulers(调度器)
它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。- triggers(触发器)
描述调度任务被触发的条件。不过触发器完全是无状态的。- job stores(作业存储器)
任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中。- executors(执行器)
负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:
- BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
- BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。(Django框架使用)
- AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
- GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 - GeventExecutor 配合使用。
- TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
- TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。scrapy爬虫框架
- QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
triggers(触发器):
它提供 3种内建的 trigger:
- date 触发器 作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数 | 说明 |
---|---|
run_date (datetime 或 str) | 作业的运行日期或时间 |
timezone (datetime.tzinfo 或 str) | 指定时区 |
典型范例1:
# 在 2017-12-13 时刻运行一次 job_func 方法scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")
- 1
- 2
- 3
- 4
- 5
- 6
典型范例2:
# 每天0点执行函数的代码,0点的话,hour可以不用写app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数]) #每天凌晨3点执行代码app.scheduler.add_job(函数名, "cron", hour=3, args=[app])#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用app.scheduler.add_job(函数名, "date", args=[app])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- interval 触发器 固定时间间隔触发。interval 间隔调度,参数如下:
参数 | 说明 |
---|---|
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
timezone (datetime.tzinfo 或str) | 时区 |
典型范例:
# 每隔两分钟执行一次 job_func 方法scheduler .add_job(job_func, 'interval', minutes=2)# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
- 1
- 2
- 3
- 4
- cron 触发器 在特定时间周期性地触发,和Linux crontab格式兼容。
参数 | 说明 |
---|---|
year (int 或 str) | 年,4位数字 |
month (int 或 str) | 月 (范围1-12) |
day (int 或 str) | 日 (范围1-31 |
week (int 或 str) | 周 (范围1-53) |
day_of_week (int 或 str) | 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 时 (范围0-23) |
minute (int 或 str) | 分 (范围0-59) |
second (int 或 str) | 秒 (范围0-59) |
start_date (datetime 或 str) | 最早开始日期(包含) |
end_date (datetime 或 str) | 最晚结束时间(包含) |
timezone (datetime.tzinfo 或str) | 指定时区 |
这些参数是支持算数表达式,取值格式有如下:
典型范例:
# 在每天的2点35分36分37分 执行 job_func 任务scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")
- 1
- 2
作业存储(job store):
\quad
有两种添加方法,其中一种是add_job()
, 另一种则是@register_job()修饰器
来修饰函数。
\quad
这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。
典型范例:
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')· def test_job(): t_now = time.localtime() print(t_now)
- 1
- 2
- 3
- 4
其他功能:
django-apscheduler框架还提供了很多操作定时任务的函数。比如:
- 删除任务:
scheduler.remove_job(job_name)
- 暂停任务:
scheduler.pause_job(job_name)
- 开启任务:
scheduler.resume_job(job_name)
- 获取所有任务:
scheduler.get_jobs()
- 修改任务:
scheduler.modify_job(job_name)
注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。
可以在页面上做一个这样的表格,再加上简单的前后端交互就可以让用户自行管理定时任务:
其他的还有一些辅助功能(包括显示所有任务,显示任务的执行时间等),同学们可以自行查看。
- 重设任务:
scheduler.reschedule_job(job_name)
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)
- 1
执行器(executor):
\quad
执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。
执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)
- 线程池:
from apscheduler.executors.pool import ThreadPoolExecutorThreadPoolExecutor(max_workers) ThreadPoolExecutor(20) # 最多20个线程同时执行
- 1
- 2
- 3
使用方法:
executors = { 'default': ThreadPoolExecutor(20) } scheduler = BackgroundScheduler(executors=executors)
- 1
- 2
- 3
- 4
- 进程池:
from apscheduler.executors.pool import ProcessPoolExecutorProcessPoolExecutor(max_workers)ProcessPoolExecutor(5) # 最多5个进程同时执行
- 1
- 2
- 3
使用方法:
executors = { 'default': ProcessPoolExecutor(3) } scheduler = BackgroundScheduler(executors=executors)
- 1
- 2
- 3
- 4
参考链接:
总结:
django-apscheduler使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,并且作业会存储在数据库中,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。
对job的操作:
- add_job():会返回一个apscheduler.job.Job的实例,可以用来改变或者移除job。
- scheduler_job():只适应于应用运行期间不会改变的job
移除job:
- remove_job():使用jobID移除
- job.remove():使用add_job()返回实例
参考链接:
【以下内容未经验证,选择性参考】:摘自
-
监控任务:使用
django_apscheduler.jobstores
提供的register_events
监控任务:register_events() -
程序运行(开启调度器):
scheduler.start()
- 1
- 停止APScheduler运行(如果报错,调度器就立即停止执行):
scheduler.shutdown()
- 1
参考范例【未验证】01:
在任意一个app内的views.py中写好定时任务
from apscheduler.scheduler import Schedulerfrom time import sleepdef task_Fun(): ''' 这里写定时任务 ''' sleep(1)sched = Scheduler()@sched.interval_schedule(seconds=6)def my_task1(): print('定时任务1开始') task_Fun() print('定时任务1结束')@sched.interval_schedule(hours=4)def my_task2(): print('定时任务2开始') sleep(1) print('定时任务2结束')sched.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
ok。启动django 项目,定时任务就会在你设定的时间执行了
参考链接:
参考范例【未验证】02:
from django.utils import timezonefrom apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.schedulers.background import BackgroundScheduler#from apscheduler.jobstores.mongodb import MongoDBJobStorefrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}job_defaults = { 'coalesce': False, 'max_instances': 3}#异步式的scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) #阻塞的,适用于scheduler是独立服务的场景。#scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone())#scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) def myjob(): pass try: scheduler.start() # 5s后执行myjob # 传入时间去除毫秒 deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5) scheduler.add_job(myjob, 'date', run_date=deadline)except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
参考链接:
三、使用Celery插件实现定时任务
3.0、基本介绍
Celery为分布式任务队列
。侧重实时操作,可用于生产系统处理数以百万计的任务,都用于大型项目,配置和使用较为复杂。Django的分布式主要由Celery框架实现,这是python开发的分布式任务队列。由于它本身不支持消息存储服务,所以需要第三方消息服务来传递任务,一般使用Redis。
3.1、安装依赖
本例建立在认为你已经知道如何使用Celery实现异步任务的基础上,需要学习的请移步
本例使用redis作为Borker和backend
pip install -U "celery[redis]"
- 1
3.2、使用插件
修改settings.py 增加以下代码
....# Celery配置from kombu import Exchange, Queue# 设置任务接受的类型,默认是{'json'}CELERY_ACCEPT_CONTENT = ['application/json']# 设置task任务序列列化为jsonCELERY_TASK_SERIALIZER = 'json'# 请任务接受后存储时的类型CELERY_RESULT_SERIALIZER = 'json'# 时间格式化为中国时间CELERY_TIMEZONE = 'Asia/Shanghai'# 是否使用UTC时间CELERY_ENABLE_UTC = False# 指定borker为redis 如果指定rabbitmq CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'# 指定存储结果的地方,支持使用rpc、数据库、redis等等,具体可参考文档 # CELERY_RESULT_BACKEND = 'db+mysql://scott:tiger@localhost/foo' # mysql 作为后端数据库CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'# 设置任务过期时间 默认是一天,为None或0 表示永不过期CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24# 设置worker并发数,默认是cpu核心数# CELERYD_CONCURRENCY = 12# 设置每个worker最大任务数CELERYD_MAX_TASKS_PER_CHILD = 100# 指定任务的位置CELERY_IMPORTS = ( 'base.tasks',)# 使用beat启动Celery定时任务# schedule时间的具体设定参考:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.htmlCELERYBEAT_SCHEDULE = { 'add-every-10-seconds': { 'task': 'base.tasks.cheduler_task', 'schedule': 10, 'args': ('hello', ) },}...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
3.3、编写定时任务代码
在本例中是在apps/base/tasks.py中增加的定时任务
# Create your tasks herefrom __future__ import absolute_import, unicode_literalsfrom celery import shared_taskimport time@shared_taskdef cheduler_task(name): print(name) print(time.strftime('%X'))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
3.4、如何使用&运行
启动定时任务beat
celery -A dase_django_api beat -l info
- 1
启动Celery worker 用来执行定时任务
celery -A dase_django_api worker -l -l info
- 1
注意: 这里的 dase_django_api 要换成你的Celery APP的名称
3.5、Celery插件的优缺点:
- 优点:
稳定可靠、管理方便
高性能、支持分布式
Celery侧重于实时操作,可用于生产系统每天处理数以百万计的任务,可用于大型项目。
可在分布的机器、进程、线程上执行任务调度。 - 缺点:
实现复杂
部署复杂
非轻量级
配置和使用较为复杂,需要Redis数据库和多个python第三方库。
四、自建代码实现定时任务
4.1、创建定时任务
在apps/base下创建一个文件名为schedules.py;键入一下内容
import os, sys, time, datetimeimport threadingimport djangobase_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))# print(base_apth)# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了sys.path.append(base_apth)os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块django.setup()from base.models import ConfDictdef confdict_handle(): while True: try: loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('本地时间:'+str(loca_time)) time.sleep(10) except Exception as e: print('发生错误,错误信息为:', e) continuedef main(): ''' 主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥 ''' try: # 启动定时任务,多个任务时,使用多线程 task1 = threading.Thread(target=confdict_handle) task1.start() except Exception as e: print('发生异常:%s' % str(e))if __name__ == '__main__': main()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
4.2、如何使用&运行
直接运行脚本即可
python apps/base/schedules.py
- 1
4.3、自建代码的优缺点:
优点:
自定义
高度自由
缺点:
过于简单
当任务过多时,占用资源也会增加
参考链接:
定时任务简介
定时任务, linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文两种方式实现定时任务。可以直接参考目录三Django中使用django-apscheduler,目录二内容测试已通过,相关数据库配置暂时无空尝试。
APScheduler简介
APscheduler全称Advanced Python Scheduler :作用为在指定的时间规则执行指定的作业。
apscheduler的四大组件,分别是Triggers,Job stores,Executors,Schedulers
- triggers 触发器 可以按照日期、时间间隔或者contab表达式三种方式触发
- job stores 作业存储器 指定作业存放的位置,默认保存在内存,也可以保存在各种数据库中
- executors 执行器 将指定的作业提交到线程池或者进程池中运行
- schedulers 作业调度器 常用的有BackgroundScheduler(后台运行)和BlockingScheduler(阻塞式)
Scheduler添加job流程:
Scheduler调度流程:
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
作业(任务)存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
注意:
一个任务储存器不要共享给多个调度器,否则会导致状态混乱
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。
【调度器工作流程
】:
调度器组件详解:
根据开发需求选择相应的组件,下面是不同的调度器组件:
-
BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
-
BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
-
AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
-
GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
-
TornadoScheduler Tornado调度器,适用于构建Tornado应用。
-
TwistedScheduler Twisted调度器,适用于构建Twisted应用。
-
QtScheduler Qt调度器,适用于构建Qt应用。
(1)任务储存器的选择:
要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。
(2)执行器的选择:
同样要看你的实际需求。默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。
配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。
(3)APScheduler有三种内置的触发器:
-
date 日期:触发任务运行的具体日期
-
interval 间隔:触发任务运行的时间间隔
-
cron 周期:触发任务运行的周期
-
calendarinterval:当您想要在一天中的特定时间以日历为基础的间隔运行任务时使用
一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。
触发器代码示例:
date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
【参数说明】
- run_date(datetime or str) 任务运行的日期或者时间
- timezone(datetime.tzinfo or str) 指定时区
from datetime import datefrom apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()def my_job(text): print(text)# 在2019年4月15日执行scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])scheduler.start()###########################################################################################import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()def my_job(text): print(text) # datetime类型(用于精确时间)scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])scheduler.start()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
注意:run_date参数可以是date类型、datetime类型或文本类型。
scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])
- 1
【更多示例代码,参考链接】:
【针对上述4个模块的详细介绍,参考博文】
有价值的实战项目参考
本文开发内容
作为测试平台而言,定时任务算是必备要素了,只有跑起来的自动化,才能算是真正的自动化。本文将给测试计划添加定时任务功能,具体如下:
- 前端添加测试计划的定时任务开关
- 采用crontab表达式设置计划时间
- 后端集成django-apschedule,在数据库中记录任务明细和执行详情。
- 定时清理执行记录。
前端效果图:
页面静态化介绍
1. 页面静态化介绍、
1.为什么要做页面静态化
- 减少数据库查询次数。
- 提升页面响应效率。
2.什么是页面静态化
- 将动态渲染生成的页面结果保存成html文件,放到静态文件服务器中。
- 用户直接去静态服务器,访问处理好的静态html文件。
2. 首页页面静态化实现
1.首页页面静态化实现步骤
- 查询首页相关数据
- 获取首页模板文件
- 渲染首页html字符串
- 将首页html字符串写入到指定目录,命名’index.html’
2.首页页面静态化实现
import osimport timefrom django.conf import settingsfrom django.template import loaderfrom apps.contents.models import ContentCategoryfrom apps.contents.utils import get_categoriesdef generate_static_index_html(): """ 生成静态的主页html文件 """ print('%s: generate_static_index_html' % time.ctime()) # 获取商品频道和分类 categories = get_categories() # 广告内容 contents = {} content_categories = ContentCategory.objects.all() for cat in content_categories: contents[cat.key] = cat.content_set.filter(status=True).order_by('sequence') # 渲染模板 context = { 'categories': categories, 'contents': contents } # 获取首页模板文件 template = loader.get_template('index.html') # 渲染首页html字符串 html_text = template.render(context) # 将首页html字符串写入到指定目录,命名'index.html' file_path = os.path.join(settings.STATICFILES_DIRS[0], 'index.html') with open(file_path, 'w', encoding='utf-8') as f: f.write(html_text)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
3. 定时任务crontab静态化首页
对于首页的静态化,考虑到页面的数据可能由多名运营人员维护,并且经常变动,所以将其做成定时任务,即定时执行静态化。
在Django执行定时任务,可以通过 django-crontab扩展来实现。
参考链接:
APScheduler常见错误
报错No module named 'apscheduler.scheduler’
BlockingScheduler算是会实行block阻塞程序运行(会阻塞主线程的运行)
APScheduler中两种调度器的区别及使用过程中要注意的问题
参考链接: