定制软件【分布式任务调度】(四)XXL-JOB的任务调度及执行流程

文章目录

1.概述

定制软件从上一篇了解到了XXL-JOB定制软件的注册原理,定制软件在这个基础上就可以聊一聊XXL-JOB定制软件的调度和执行流程了。
XXL-JOB定制软件要完成一次任务调度,定制软件需要发起两次Http通信请求,分别是:

  • 定制软件调度中心调用执行器,定制软件执行定时任务。
  • 定制软件执行器回调调度中心,定制软件上报定时任务执行结果。

定制软件在调度和执行的整个流程中,XXL-JOB定制软件使用了大量的异步操作,定制软件减少调度中心的资源压力,定制软件以此在集中式调度配置定制软件与性能之间找到平衡点。

2.定制软件对调度流程的思考

在研究XXL-JOB定制软件的执行流程之前,定制软件不妨先思考一下,定制软件如果我们自己来实现一定制软件个定时任务的调度,定制软件要从哪些方面去入手?


定制软件有了前面基本博文的基础,定制软件我们现在已经有了一个定制软件调度中心集群,一个执行器,以及一个定制软件定时任务配置。定制软件在不考虑异步执行的情况下,定制软件要实现任务调度就非常简单了。

定制软件我们只需要在调度中心定制软件启动一个线程,不断的去定制软件扫描任务配置表,定制软件判断该任务是否到了触发时间
定制软件如果到了触发时间,定制软件调度中心就使用这个任务对应的定制软件执行器配置,获取到执行器的ip、端口,直接发起Http请求,定制软件等待执行器执行完毕后,响应一个执行结果。这样,定制软件一个最简单定时任务就完成了。

当然,定制软件这样的定时任务只能存在与Demo中,定制软件想要在生产环境中运行,定制软件还需要解决很多问题,定制软件这里例举一些:

  • 定制软件线程中循环扫描任务配定制软件置表是否过于频繁
  • 调度中心集群同时执行一个任务,造成重复调度如何处理
  • 任务是否到了触发时间该如何判断
  • 当前线程既要做扫描,又要做调度,同步请求的阻塞过程影响到了其他任务的调度该如何处理
  • 同步调用执行器,执行器执行任务耗时很长阻塞了调度中心的线程怎么办
  • ……

带着这样的问题,我们一起看看XXL-JOB是如何解决的。

不记得定时任务的配置过程的同学,可以回顾一下

3.调度中心流程

调度中心做的第一件事,就是启动线程不断的扫描定时任务的配置表,我们可以从初始化方法中找到这个线程的初始化过程,在JobScheduleHelper.getInstance().start()中,会启动两个线程,分别是:

  • scheduleThread:这个线程就是用来扫描任务配置表,并判断当前任务是否应该触发。
  • ringThread:大部分的任务触发都在这个线程中,这个线程会从时间轮中去获取数据。

3.1.任务配置扫描

scheduleThread主要是在循环扫描定时任务配置表xxl_job_info,这里会引出第一个问题调度中心集群是如何避免重复调度问题的
其实就是使用了一个锁,当然分布式锁的实现有很多种,XXL-JOB选择的是使用MySQL的排他锁来实现的,其实就是一句简单的SQL:

select * from xxl_job_lock where lock_name = 'schedule_lock' for update;
  • 1

每个调度中心的scheduleThread在扫描任务表之前,都会先执行这个语句,如果前面还有其他的线程在执行扫描的过程,当前线程就会被阻塞,等待锁释放。

第二个问题,如何避免频繁的查询数据库呢?
我们先看一下实现的代码:

这段代码的意思是,如果本次扫描和触发判断的逻辑,消耗时间小于1s,就需要让线程sleep一段时间,这个时间值的计算包含了两个字段值:

  • preReadSuc:是一个boolean值,当查询出的需要做触发判断的 定时任务列表 为空时,值为false,反之,值为true
  • PRE_READ_MS:是一个常量值,表示 5秒。

也就是说,如果查询的定时任务列表有值,则表示程序在正常的处理定时任务,此时就让线程Sleep 0到1秒,如果定时任务列表没有值,则有可能下一秒查询还是没有值,则Sleep 4-5秒,以此来减少查询次数。
至于为什么这里的阈值为5秒,我们可以接着往下看。

定时任务扫描条件
查询定时任务配置的时候,判断是否触发有一个很重要的时间点:当前系统时间
但是我们使用当前系统时间进行精确匹配查询的话,数据是有可能不准确的。比如当前获取到的时间是2022年10月26日18点整,但是由于程序耗时,io耗时等影响,实际进行查询的时候可能是18点过1秒了,那这个18点整的任务就错过了。

一般我们会通过将时间点扩大为一个时间段的方式来处理,例如XXL-JOB的处理方式就是,给当前系统时间加上5秒,然后查询出 触发时间 <=(当前时间+5s) 的数据,得到一个最迟触发时间5秒后 的任务列表。

接下来,就会依次处理这个列表中每一个任务的实际触发时机。

3.2.任务触发时机

XXL-JOB将查询出的任务列表数据分为了三个部分:

  • 已超时5秒以上
  • 已超时但不足5秒
  • 还未到触发时间或恰好到触发时间(通过时间戳来做得判断,恰好到的情况很少)

由于当前时间nowTime已经固定,而每个任务的触发时间可能会不一样,以触发时间来做一个时间轴,就可以用图示直观的表示这三个部分的数据,如下图:

这里的触发时间,指的是xxl_job_info表中trigger_next_time字段值,这个值在创建、更新任务或者在任务触发时更新。具体是通过当前时间CRON表达式来计算出一个具体的时间。


3.2.1.已超时5秒以上

根据调度过期策略的配置,有两种不同的执行流程,默认情况下的调度过期策略是忽略,已经超时5秒以上的任务会被丢弃掉。另外一种策略是立即执行一次,就是字面意思,立即触发一次任务调度。


3.2.2.还未到触发时间

这部分任务会被放入到时间轮中,时间轮中的数据会在ringThread线程中不断的被取出,然后调用trigger方法进行任务触发。

时间轮是一种用于实现定时器、延时调度等功能的算法,广泛的运用与各种中间件中,例如:Netty、Kafka、Dubbo等。如果想详细的了解时间轮算法,可以自行百度一下,而在XXL-JOB中,使用的是HashMap来实现的,具体的做法是:

先获取到triggerNextTime的值,这是一个时间戳,通过下面的算法可以获取到这个时间戳对应的秒数。

// [0,59]int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
  • 1
  • 2

然后 以 ringSecond 为key,jobId(任务Id)为value,put到HashMap中。

private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); private void pushTimeRing(int ringSecond, int jobId){    List<Integer> ringItemData = ringData.get(ringSecond);    if (ringItemData == null) {        ringItemData = new ArrayList<Integer>();        ringData.put(ringSecond, ringItemData);    }    ringItemData.add(jobId);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

剥开时间轮神秘的面纱,其实实现起来非常的简单,当然XXL-JOB中的时间轮算法只是一种最简单的运用。

3.2.3.超时未超过5秒

处于这个时间段内的任务,会立即触发一次,并更新下次触发时间。
需要注意的是,如果发现下次触发时间满足在未来5秒内会触发,还会将这个任务直接放入到时间轮中,下次触发就不再从数据库获取数据开始执行了,当然,为了避免重复添加这个任务到时间轮中,会再次更新下次触发时间。

3.3.任务触发

多数的任务都是通过ringThread使用时间轮来进行触发的。

先获取到当前时间的秒数,然后从时间轮中取出当前秒前一秒的所有任务,最后循环ringItemData,依次触发其中的每一个任务。

任务的执行是一个相对耗时的操作,对于这种没有紧密的事务关联又相对耗时的操作,一般都会选择使用异步处理,所以trigger方法会把请求扔到triggerPool中处理。
XXL-JOB中的triggerPool有快慢两种线程池fastTriggerPoolslowTriggerPool,主要是做一个线程池的隔离,将执行偏慢的任务放到slowTriggerPool中,避免执行较慢的任务占用过多的字段导致正常的任务也不能快速的调度。
一个任务如果远程调度的时间超过500ms(不是任务执行时间)就可以标记一次慢任务,在10分钟内同一个任务表标记慢10次就会进入到slowTriggerPool中运行了。

线程中的任务会经过保存日志、请求参数封装、路由策略等操作,最终会获取到一个执行器的地址,通过Http调用run方法,执行任务。


至此,调度中心的调度流程就已经结束了,接下来就等待执行器回调,获取任务执行结果。

4.执行器流程

我们在执行器中配置一个定时任务的时候,会在需要定时执行的方法上使用@XxlJob来标记这个方法,官方示例中是这么去标记一个方法的。

注解中的demoJobHandler就是任务处理器的名称。

在服务启动,并且单例的Bean注册完成时,会回调afterSingletonsInstantiated方法,进而调用initJobHandlerMethodRepository方法。

以官方的任务示例为例,这里处理的是任务处理器demoJobHandler与其所在的类(这里使用的是Spring管理,所以是bean)及其标记的方法,将类信息、方法信息生成了一个jobHandler对象来作为value,用demoJobHandler作为key,保存它们之间的映射关系。

4.1.执行器任务调度

任务处理器选择
执行器对象在获取到调度器传入的参数之后,会根据任务处理器名称获取到对应的jobHandler


阻塞策略判断
XXL-JOB提供了三种阻塞策略,分别是:

  • 单机串行:前一个任务还没有执行完毕,就等前一个任务执行完再执行当次的任务
  • 丢弃后续调度:前一个任务没有执行完毕,就终止当次任务。
  • 覆盖之前调度:前一个任务没有执行完毕,就把它停掉,并执行当次任务。

这里需要说明的是,XXL-JOB中为每一个被@XxlJob标记的方法都创建了一个专用的线程jobThread来执行定时任务。这是一个懒加载的线程,即在方法被触发时才会创建。
jobThread中,有两个重要的成员变量:

  • handler:这个就是上面说的jobHandler
  • triggerQueue:触发队列,单机串行的策略会使用到这个队列。

    注:如果当前方法是第一次执行,显然就不会有前一个没有执行完毕的任务,所以第一次被触发的方法不会进入阻塞策略的选择中。

任务执行
通过了阻塞策略判断之后,调度参数会被push到triggerQueue中,jobThread会从这个中获取数据。

然后通过 handler.execute(),通过反射来调用实际的定时任务方法。

注:如果配置了超时时间的话,这里会通过callable来执行 handler.execute(),并在get方法中超时时间,如下:

// 超时则会抛出 TimeoutExceptionfutureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
  • 1
  • 2

4.2.任务回调

最终任务在执行完毕之后,被被push到回调队列callBackQueue中,回调线程会从队列中获取到回调信息,通过callback方法回调调度中心。

调度中心在获取到回到的信息之后,就会更新数据库的任务信息。

5.流程图

至此,一次任务调度和执行的流程就结束了。

6.总结

异步实践
XXL-JOB的调度流程中使用了大量的异步用法,总结起来就是两种:

  • 通过线程池来执行异步操作
  • 通过自旋线程 + 阻塞队列的方式来执行异步操作

源码中对多线程的使用方式是一种非常好的示例,我们完全可以参照这里的源码,在自己的项目里面实现异步调度。

调度流程
XXL-JOB调度流程的思想是比较容易理解的,整个流程看起来很舒服。

  • 获取任务:调度线程不断的扫描任务表,查询出将要执行的任务。
  • 前置处理:对每一个任务都做一次触发时间的计算,能够立即触发的就立即触发,不能立即触发的就放在时间轮中触发,不能触发的就抛弃掉。
  • 触发任务:调度线程不断的从时间轮中获取任务并触发。
  • 异步调度:调度中心将调度与触发做了异步处理,使用触发线程池来做Http调用。
  • 任务执行:执行器为每个任务都分配了一个线程,自己处理自己的任务,任务之间不会互相影响。
  • 任务回调:将执行结果回传到调度中心中,更新任务运行状态。
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发