定制小程序开发Flask-定时任务

目录


pip install Flask-APScheduler

 -APScheduler: 

Flask-APScheduler 是基于 python 第三方库 apscheduler 做的集成, 定制小程序开发所以官网上只有一些简定制小程序开发单的使用案例,定制小程序开发详细的配置还是要看 apscheduler的文档。

apscheduler: 

  引用:

一、简单使用

__init__.py:

  1. from flask import Flask
  2. app = Flask(__name__)
  3. app.config.from_object('config.config')
  4. # 初始化db
  5. db = SQLAlchemy()
  6. db.init_app(app)
  7. # 定制小程序开发初始化定时任务
  8. from models.taskSchedule import scheduler
  9. scheduler.init_app(app)
  10. scheduler.start()
  11. # 定制小程序开发修改调度器执行组件冗定制小程序开发余日志级别
  12. logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)

taskSchedule.py:

  1. from flask_apscheduler import APScheduler
  2. scheduler = APScheduler()
  3. # interval example, 间隔执行, 每30秒执行一次
  4. @scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
  5. def job1():
  6. print('Job 1 executed')
  7. # cron examples, 每分钟执行一次
  8. @scheduler.task('cron', id='do_job_2', minute='*')
  9. def job2():
  10. print('Job 2 executed')
  11. # 每周执行一次
  12. @scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
  13. def job3():
  14. print('Job 3 executed')

也可以在程勋运行后添加定时任务:

  1. scheduler.start()
  2. scheduler.add_job(**args)

二、apscheduler 

apscheduler 四个组件:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

triggers

支持三种任务触发方式

  • date:

固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

| 参数 | 说明 |
| :——————————– | :——————- |
| run_date (datetime 或 str) | 作业的运行日期或时间 |
| timezone (datetime.tzinfo 或 str) | 指定时区 |

例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
.add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])

 

  • interval:

时间间隔触发器,每个一定时间间隔执行一次。

| 参数 | 说明 |
| —————————- | ———- |
| weeks (int) | 间隔几周 |
| days (int) | 间隔几天 |
| hours (int) | 间隔几小时 |
| minutes (int) | 间隔几分钟 |
| seconds (int) | 间隔多少秒 |
| start_date (datetime 或 str) | 开始日期 |
| end_date (datetime 或 str) | 结束日期 |

# 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')

  • cron:

cron风格的任务触发

参数说明
year (int 或 str)表示四位数的年份 (2019)
month(int\str)月 (范围1-12)
day(int\str)日 (范围1-31)
week(int\str)周 (范围1-53)
day_of_week (int\str)表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\str)表示取值范围为0-23时
minute (int\str)表示取值范围为0-59分
second (int\str)表示取值范围为0-59秒
start_date (datetime\str)表示开始时间
end_date (datetime\str)表示结束时间
timezone (datetime.tzinfo\str)表示时区取值

(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

sched.add_job(my_job, 'cron',second = '*/5')

job stores

支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers

调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用

Flask-APScheduler 中默认使用的就是 BackgroundScheduler:

scheduler.py:

  1. #.....
  2. class APScheduler(object):
  3. """Provides a scheduler integrated to Flask."""
  4. def __init__(self, scheduler=None, app=None):
  5. self._scheduler = scheduler or BackgroundScheduler()
  6. self._host_name = socket.gethostname().lower()
  7. self._authentication_callback = None
  8. # .....

使用上下文

如果正在使用 Flask-SQLAlchemy 并在定时任务中执行数据库操作,需要提供 Flask 应用程序上下文:

  1. from flask_apscheduler import APScheduler
  2. scheduler = APScheduler()
  3. @scheduler.task(
  4. "interval",
  5. id="update_news_graph_job",
  6. minutes = 16
  7. )
  8. def update_news_graph():
  9. # 提供flask上下文对象
  10. with scheduler.app.app_context():
  11. result = db.session.execute(query_sql)
当然, scheduler 需要在flask中注册:
  1. scheduler.init_app(app)
  2. scheduler.start()

官网关于上下文的解释:

日志设置

如果定时任务执行间隔几秒钟, 调度程序的日志会很多,可以设置调度程序日志级别或完全禁用:

  1. #设置调度程序的日志级别, 原本级别为info
  2. scheduler.start()
  3. scheduler.add_job(every_minute, trigger='cron', second=0, id='every_minute')
  4. logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
  5. #或者禁用调度程序日志
  6. logging.getLogger('apscheduler.executors.default').propagate = False

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发