目录
XxlJob定制软件开发是目前最流行的定时任务中间件,对比quartz,定制软件开发代码的侵入明显少了很多,定制软件开发不需要每次在代码里配置job, 而XxlJobd的admin server定制软件开发组件提供了可视化ui, 对job定制软件开发和执行器能够从前端页定制软件开发面配置管理,简单易用,定制软件开发目前已经接入几百家互定制软件开发联网公司使用,XxlJob定制软件开发的强大任务调度能力为定制软件开发广大开发者和企业所认可,那XxlJob定制软件开发是怎么工作的?
Tip: 总字数22922字,定制软件开发阅读全文大概会花您20定制软件开发分钟喝茶时间~
XxlJob定制软件开发最新依赖版本: 2.3.0 定制软件开发和源码地址:
- <dependency>
- <groupId>com.xuxueli</groupId>
- <artifactId>xxl-job-core</artifactId>
- <version>2.3.0</version>
- </dependency>
XxlJob主要包含2定制软件开发个核心模块: xxl-job-admin 和xxl-job-core。
- xxl-job-admin 定制软件开发提供可视化的ui定制软件开发页面管理执行器、Job定制软件开发以及查看日志等功能, 默认登录地址为: localhost:8080/xxl-job-admin, 用户名为: admin, 密码为: 123456。
- xxl-job-executor 中基于netty实现一个embedServer, 与admin server是一个独立的server ,处理任务调度请求,包含了Job的核心调度实现。
最新版本使用@XxlJob注解标记Job, 同时支持生命周期Job任务。
XxlJob的Executor组件是Job调度的核心实现,配合admin Server 完成周期调度。
一、XxlJob 的Executor
XxlJob提供了2个任务执行器,简称Executor, XxlJob通过Executor来管理所有Job的生命周期,包括Job的初始化、启动和销毁等工作,目前的2个主要子类为XxlJobSimpleExecutor和XxlSpringExecutor。
- XxlJobSimpleExecutor 提供不依赖Spring框架的实现方式。也就是说我不用Spring框架,使用纯Java代码也能使用XxlJob。
- XxlSpringExecutor 提供基于Spring框架的实现方式。
XxlJobSimpleExecutor和XxlSpringExecutor都继承了XxlJobExecutor, XxlJobExecutor提供注册Job、初始化Server等功能、核心方法 registJobHandler、initEmbedServer。
注入Job的方式有2种: 基于Spring的Bean 和 纯Java(不使用Spring框架)两种。
1. 使用Spring框架注入
覆盖XxlJobSpringExecutor, 使用@Value注解读取application.properties里的配置。
- package com.xxl.job.executor.core.config;
-
- import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * xxl-job config
- *
- * @author xuxueli 2017-04-28
- */
- @Configuration
- public class XxlJobConfig {
- private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
-
- @Value("${xxl.job.admin.addresses}")
- private String adminAddresses;
-
- @Value("${xxl.job.accessToken}")
- private String accessToken;
-
- @Value("${xxl.job.executor.appname}")
- private String appname;
-
- @Value("${xxl.job.executor.address}")
- private String address;
-
- @Value("${xxl.job.executor.ip}")
- private String ip;
-
- @Value("${xxl.job.executor.port}")
- private int port;
-
- @Value("${xxl.job.executor.logpath}")
- private String logPath;
-
- @Value("${xxl.job.executor.logretentiondays}")
- private int logRetentionDays;
-
-
- @Bean
- public XxlJobSpringExecutor xxlJobExecutor() {
- logger.info(">>>>>>>>>>> xxl-job config init.");
- XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
- xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
- xxlJobSpringExecutor.setAppname(appname);
- xxlJobSpringExecutor.setAddress(address);
- xxlJobSpringExecutor.setIp(ip);
- xxlJobSpringExecutor.setPort(port);
- xxlJobSpringExecutor.setAccessToken(accessToken);
- xxlJobSpringExecutor.setLogPath(logPath);
- xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
-
- return xxlJobSpringExecutor;
- }
-
- /**
- * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
- *
- * 1、引入依赖:
- * <dependency>
- * <groupId>org.springframework.cloud</groupId>
- * <artifactId>spring-cloud-commons</artifactId>
- * <version>${version}</version>
- * </dependency>
- *
- * 2、配置文件,或者容器启动变量
- * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
- *
- * 3、获取IP
- * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
- */
-
-
- }
配置application.properties文件:
- ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
- xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
-
- ### xxl-job, access token
- xxl.job.accessToken=
-
- ### xxl-job executor appname
- xxl.job.executor.appname=xxl-job-executor-sample
- ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
- xxl.job.executor.address=
- ### xxl-job executor server-info
- xxl.job.executor.ip=
- xxl.job.executor.port=9998
- ### xxl-job executor log-path
- xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
- ### xxl-job executor log-retention-days
- xxl.job.executor.logretentiondays=30
创建一个Bean类SampleXxlJob, 每一个被@XxlJob标记方法都是一个Job,使用@XxlJob注解标记方法即可。
- package com.xxl.job.executor.service.jobhandler;
-
- import com.xxl.job.core.context.XxlJobHelper;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import java.io.BufferedInputStream;
- import java.io.BufferedReader;
- import java.io.DataOutputStream;
- import java.io.InputStreamReader;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.util.Arrays;
- import java.util.concurrent.TimeUnit;
-
- /**
- * XxlJob开发示例(Bean模式)
- *
- * 开发步骤:
- * 1、任务开发:在Spring Bean实例中,开发Job方法;
- * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
- * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
- * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
- *
- * @author xuxueli 2019-12-11 21:52:51
- */
- @Component
- public class SampleXxlJob {
- private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
-
-
- /**
- * 1、简单任务示例(Bean模式)
- */
- @XxlJob("demoJobHandler")
- public void demoJobHandler() throws Exception {
- XxlJobHelper.log("XXL-JOB, Hello World.");
-
- for (int i = 0; i < 5; i++) {
- XxlJobHelper.log("beat at:" + i);
- TimeUnit.SECONDS.sleep(2);
- }
- // default success
- }
-
-
- /**
- * 2、分片广播任务
- */
- @XxlJob("shardingJobHandler")
- public void shardingJobHandler() throws Exception {
-
- // 分片参数
- int shardIndex = XxlJobHelper.getShardIndex();
- int shardTotal = XxlJobHelper.getShardTotal();
-
- XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
-
- // 业务逻辑
- for (int i = 0; i < shardTotal; i++) {
- if (i == shardIndex) {
- XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
- } else {
- XxlJobHelper.log("第 {} 片, 忽略", i);
- }
- }
-
- }
-
-
- /**
- * 3、命令行任务
- */
- @XxlJob("commandJobHandler")
- public void commandJobHandler() throws Exception {
- String command = XxlJobHelper.getJobParam();
- int exitValue = -1;
-
- BufferedReader bufferedReader = null;
- try {
- // command process
- ProcessBuilder processBuilder = new ProcessBuilder();
- processBuilder.command(command);
- processBuilder.redirectErrorStream(true);
-
- Process process = processBuilder.start();
- //Process process = Runtime.getRuntime().exec(command);
-
- BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
- bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
-
- // command log
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- XxlJobHelper.log(line);
- }
-
- // command exit
- process.waitFor();
- exitValue = process.exitValue();
- } catch (Exception e) {
- XxlJobHelper.log(e);
- } finally {
- if (bufferedReader != null) {
- bufferedReader.close();
- }
- }
-
- if (exitValue == 0) {
- // default success
- } else {
- XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
- }
-
- }
-
-
- /**
- * 4、跨平台Http任务
- * 参数示例:
- * "url: http://www.baidu.com\" +
- * "method: get\" +
- * "data: content\";
- */
- @XxlJob("httpJobHandler")
- public void httpJobHandler() throws Exception {
-
- // param parse
- String param = XxlJobHelper.getJobParam();
- if (param==null || param.trim().length()==0) {
- XxlJobHelper.log("param["+ param +"] invalid.");
-
- XxlJobHelper.handleFail();
- return;
- }
-
- String[] httpParams = param.split("\");
- String url = null;
- String method = null;
- String data = null;
- for (String httpParam: httpParams) {
- if (httpParam.startsWith("url:")) {
- url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
- }
- if (httpParam.startsWith("method:")) {
- method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
- }
- if (httpParam.startsWith("data:")) {
- data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
- }
- }
-
- // param valid
- if (url==null || url.trim().length()==0) {
- XxlJobHelper.log("url["+ url +"] invalid.");
-
- XxlJobHelper.handleFail();
- return;
- }
- if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
- XxlJobHelper.log("method["+ method +"] invalid.");
-
- XxlJobHelper.handleFail();
- return;
- }
- boolean isPostMethod = method.equals("POST");
-
- // request
- HttpURLConnection connection = null;
- BufferedReader bufferedReader = null;
- try {
- // connection
- URL realUrl = new URL(url);
- connection = (HttpURLConnection) realUrl.openConnection();
-
- // connection setting
- connection.setRequestMethod(method);
- connection.setDoOutput(isPostMethod);
- connection.setDoInput(true);
- connection.setUseCaches(false);
- connection.setReadTimeout(5 * 1000);
- connection.setConnectTimeout(3 * 1000);
- connection.setRequestProperty("connection", "Keep-Alive");
- connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
- connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
-
- // do connection
- connection.connect();
-
- // data
- if (isPostMethod && data!=null && data.trim().length()>0) {
- DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
- dataOutputStream.write(data.getBytes("UTF-8"));
- dataOutputStream.flush();
- dataOutputStream.close();
- }
-
- // valid StatusCode
- int statusCode = connection.getResponseCode();
- if (statusCode != 200) {
- throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
- }
-
- // result
- bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
- StringBuilder result = new StringBuilder();
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- result.append(line);
- }
- String responseMsg = result.toString();
-
- XxlJobHelper.log(responseMsg);
-
- return;
- } catch (Exception e) {
- XxlJobHelper.log(e);
-
- XxlJobHelper.handleFail();
- return;
- } finally {
- try {
- if (bufferedReader != null) {
- bufferedReader.close();
- }
- if (connection != null) {
- connection.disconnect();
- }
- } catch (Exception e2) {
- XxlJobHelper.log(e2);
- }
- }
-
- }
-
- /**
- * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
- */
- @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
- public void demoJobHandler2() throws Exception {
- XxlJobHelper.log("XXL-JOB, Hello World.");
- }
- public void init(){
- logger.info("init");
- }
- public void destroy(){
- logger.info("destroy");
- }
-
-
- }
2. 不使用框架注入
不使用Spring框架也能实现Bean的注入,使用类加载器调用getResourceAsStream方法读取到Properties对象实例里,然后初始化XxlJobExecutor的子类, 在init的时候将所有的声明@XxlJob的类作为bean设置在List<Object> beans里。
- package com.bing.sh.job.config;
-
- import com.bing.sh.job.executor.SimpleExecutor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Configuration;
-
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.List;
- import java.util.Properties;
-
-
- @Configuration
- public class FrameLessXxlJobConfig {
-
- private Logger logger = LoggerFactory.getLogger(FrameLessXxlJobConfig.class);
-
-
- // singleTon
- private static final FrameLessXxlJobConfig instance = new FrameLessXxlJobConfig();
-
- public static FrameLessXxlJobConfig getInstance() {
- return instance;
- }
-
-
- public SimpleExecutor initXxlJobExecutor(String appName, List<Object> beanLists) {
- Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
- // init executor
- SimpleExecutor xxlJobExecutor = new SimpleExecutor();
- xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
- xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
- xxlJobExecutor.setAppname(appName);
- xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
- xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
- xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
- xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
- xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
-
- xxlJobExecutor.setXxlJobBeanLists(beanLists);
- try {
- xxlJobExecutor.start();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- return xxlJobExecutor;
- }
-
- public Properties loadProperties(String fileName) {
- InputStreamReader isr = null;
- try {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- isr = new InputStreamReader(classLoader.getResourceAsStream(fileName), "utf-8");
- if (isr != null) {
- Properties prop = new Properties();
- prop.load(isr);
- return prop;
- }
- } catch (IOException e) {
- logger.error("load propeties {} error");
- }
- return null;
-
- }
-
-
- }
推荐采用第二种方式注入,分布式环境下我们可以使用第二种方式注入,将executor打成jar包,然后在微服务里扫描所有包含@XxlJob的bean, 每个依赖的服务只需要配置自己服务的appName即可。
xxl.job.executor.appname=xxl-job-user-service
当然也可以采用Springboot的形式注入,只是在配置时,我们需要在每个服务里注入xxlJob的admin url和executor的所有相关信息。
3. 使用jar包的形式集成executor
新创建一个base-service project , 将executor的公共的配置放入到base-service里, 执行器的端口设置为:9998。
- ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
- xxl.job.admin.addresses=http://127.0.0.1:8000/xxl-job-admin
-
- ### xxl-job, access token
- xxl.job.accessToken=
-
- ### xxl-job executor appname
- #xxl.job.executor.appname=xxl-job-executor-sample
- ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
- xxl.job.executor.address=
- ### xxl-job executor server-info
- xxl.job.executor.ip=
- xxl.job.executor.port=9998
- ### xxl-job executor log-path
- xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
- ### xxl-job executor log-retention-days
- xxl.job.executor.logretentiondays=30
打包发布到本地仓库和私服,添加pom.xml配置:
-
- <!--将本地jar发布到私服-->
- <distributionManagement>
- <repository>
- <id>maven-releases</id>
- <url>http://192.168.31.129:30081/repository/maven-releases/</url>
- </repository>
-
- <snapshotRepository>
- <id>maven-snapshots</id>
- <name>Internal Snapshots</name>
- <url>http://192.168.31.129:30081/repository/snapshots/</url>
- </snapshotRepository>
-
- </distributionManagement>
执行命令:
mvn clean install package deploy
在自己的服务里添加base-service依赖:
- <dependency>
- <groupId>com.bing.sh</groupId>
- <artifactId>base-service</artifactId>
- <version>0.0.1-release</version>
- </dependency>
在user-service里的application.properties文件里配置appName:
- # xxlJob
- xxl.job.executor.appname=xxl-job-user-service
注入appName和所有的bean。
- package com.bingbing.sh.config;
-
- import com.bing.sh.job.config.FrameLessXxlJobConfig;
- import com.bingbing.sh.job.UserJobHandler;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.Arrays;
-
- @Configuration
- public class XxlJobConfig {
-
-
- @Value("${xxl.job.executor.appname}")
- private String appName;
-
-
- @Bean
- public void initJobExecutor() {
- FrameLessXxlJobConfig frameLessXxlJobConfig = new FrameLessXxlJobConfig();
- frameLessXxlJobConfig.initXxlJobExecutor(appName, Arrays.asList(new UserJobHandler()));
- }
-
- }
二、XxlJob 核心工作原理
1. 注册JobHandler
Job处理器是XxlJob中调度的单位,也是最终调用目标的任务的载体,所有的Job处理器注册在了一个ConcurrentHashMap里, 在XxlJobExecutor类里,其中map的key 为@XxlJob(value=''')的value值, map的value 一个IJobHandler接口的实例实现。
- private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
- = new ConcurrentHashMap<String, IJobHandler>();
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
IJobHandler有3个实现,分别为GlueJobHandler、MethodJobHandler和ScriptJobHandler。
handler名称 | 描述 |
GlueJobHandler | 提供GLUE任务的处理器。 |
MethodJobHandler | 提供常规的Bean模式方法Job处理器。 |
ScriptJobHandler | 提供脚本处理器。 |
其中MethodJobHandler能基本满足我们日常的开发需求。
最新版本支持生命周期模式,提供init和destroy的存放方法,MethodHandler包含3个Method属性: executeMethod 、initMethod和destroyMethod,用法:
实例化一个MethodJobHandler,然后根据XxlJob注解里的定义的init、destory和value值找到对应的method对象,封装到MethodJobHandler里。
2. 注册JobThread
JobThread是运行job的一个线程,可以看做执行Job线程载体,存放在XxlJobExecutor类里 的JobThreadRepository,它也是一个concurrentHashMap。
- private static ConcurrentMap<Integer, JobThread> jobThreadRepository
- = new ConcurrentHashMap();
注册JobThread方法, 每次注册时会将jobId和Jobhandler作为参数实例化一个JobThread。
- public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
- JobThread newJobThread = new JobThread(jobId, handler);
- // 启动线程
- newJobThread.start();
- logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
-
- JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
- if (oldJobThread != null) {
- oldJobThread.toStop(removeOldReason);
- oldJobThread.interrupt();
- }
-
- return newJobThread;
- }
直接调用newJobThread.start()启动JobThread线程,如果该job已经存在于jobThreadRepository里,那么停掉旧线程,这样能始终保证只有一个线程为Job服务,避免有些情况下会出现任务重复执行,发生定时错乱问题。
可以通过postman调用一个http请求去kill掉该Job,查看XxlJob会在任务执行的时候,重新创建一个新的线程去替代旧线程。
localhost:9998/kill 是executor提供的一个http请求,参数为{"jobId":2}。
调用结果:
观察executor的控制台:
21:23:23.916 logback [Thread-14] INFO com.xxl.job.core.thread.JobThread - >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[Thread-14,10,main]
21:23:24.014 logback [xxl-rpc, EmbedServer bizThreadPool-1270369654] INFO c.x.job.core.executor.XxlJobExecutor - >>>>>>>>>>> xxl-job regist JobThread success, jobId:2, :com.xxl.job.core.handler.impl.MethodJobHandler@2d99d5a5[class com.bingbing.sh.job.UserJobHandler#initUserHandler]
也可以跟踪代码发现创建了一个新的线程去替代旧线程。
3. JobThread---- 真正执行Job的地方
JobThread是一个自定义的线程,也是正在调用@XxlJob标记方法的地方,执行的机制是通过反射,调用的形式是通过启动JobThread线程, 在run()方法里通过handler来执行execute()方法,达到最终调用目标方法的目的。
看下面一个Job例子,在JobThread是如何执行的呢?
- @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
- public void demoJobHandler2() throws Exception {
- XxlJobHelper.log("XXL-JOB, Hello World.");
- }
- public void init(){
- logger.info("init");
- }
- public void destroy(){
- logger.info("destroy");
- }
在run方法里会首先从triggerQueue里poll一个triggerParam, triggerParam 是启动job的一组参数集,在admin 页面 启动任务时将初始化triggerParam, 下一节会提到triggerParam。
根据调试,默认的getExecutorTimeout() 的值为0,因此直接执行handler.execute() 方法, MethodJobHandler的execute方法如下:
- public void execute() throws Exception {
- Class<?>[] paramTypes = this.method.getParameterTypes();
- if (paramTypes.length > 0) {
- this.method.invoke(this.target);
- } else {
- this.method.invoke(this.target);
- }
-
- }
我们在这里看到了最终执行Job的地方是JobThread类里的handler.execute()、handler.init()和handler.destory()方法。
让我们接着看XxlJob是如何触发执行任务的,简单讲是怎么触发JobThread的启动,是怎么实现在admin页面通过手动的控制任务的启动与终止Job的?
3. 执行一次任务
在控制台上执行一次任务 ,点击执行:
核心思想: 执行一次时直接触发任务,发送Http请求 /run 给executor,netty server 接收到请求后,执行run()方法----executorBiz.run(triggerParam), 最终进入JobThread,执行任务。
接着进入到JobTriggerPoolHelper的addTrigger()方法,这里使用了线程池去执行trigger动作。
- public void addTrigger(final int jobId,
- final TriggerTypeEnum triggerType,
- final int failRetryCount,
- final String executorShardingParam,
- final String executorParam,
- final String addressList) {
-
- // choose thread pool
- ThreadPoolExecutor triggerPool_ = fastTriggerPool;
- AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
- if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
- triggerPool_ = slowTriggerPool;
- }
-
- // trigger
- triggerPool_.execute(new Runnable() {
- @Override
- public void run() {
-
- long start = System.currentTimeMillis();
-
- try {
- // do trigger
- XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- } finally {
-
- // check timeout-count-map
- long minTim_now = System.currentTimeMillis()/60000;
- if (minTim != minTim_now) {
- minTim = minTim_now;
- jobTimeoutCountMap.clear();
- }
-
- // incr timeout-count-map
- long cost = System.currentTimeMillis()-start;
- if (cost > 500) { // ob-timeout threshold 500ms
- AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
- if (timeoutCount != null) {
- timeoutCount.incrementAndGet();
- }
- }
- }
- }
- });
- }
接着进入到XxlJobTrigger类里的processTrigger方法,看processTrigger主要做了哪几件事?
1) init trigger-param, 创建一个TriggerParam实例。
2) 获取executor的address, 是从xxl_job_group表里读取出来的一个address,该address可自动注册也可在admin后台手动录入。
3) 将TriggerParam 和 address 组合,执行 runExecutor(triggerParam,address)方法。
- ReturnT<String> triggerResult = null;
- if (address != null) {
- triggerResult = runExecutor(triggerParam, address);
- } else {
- triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
- }
4) 调用 ExecutorBiz 接口的run方法, 实现类为ExecutorBizImpl
- try {
- ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
- runResult = executorBiz.run(triggerParam);
- } catch (Exception e) {
- logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
- runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
- }
5) 进入到 run() 方法, 执行jobThread 的实例化, 如果有JobId对应了旧的Thread,那么需要用新线程去替换。
- // replace thread (new or exists invalid)
- if (jobThread == null) {
- jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
- }
-
- // push data to queue
- ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
进入到registJobThread, 启动JobThread。
JobThread 启动成功,意味着JobId对应的目标方法会被调度到。
4. 启动任务
启动任务与执行一次的触发方式不同,执行一次直接会调用触发器,到executor的run()方法里执行JobThread, 而启动的任务则需要借助JobScheduleHelper来调度执行。
同时将xxl_job_info表里的任务status 字段置为1,为后续定时任务判断job的状态为启动:
XxlJobScheduler
XxlJobScheduler是admin server 初始化的一个bean, 在spring 生命周期中的InitializingBean的afterPropertiesSet() 方法里初始化, 在Spring 容器启动的时会执行afterPropertiesSet() 方法。
- public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
-
- private static XxlJobAdminConfig adminConfig = null;
- public static XxlJobAdminConfig getAdminConfig() {
- return adminConfig;
- }
-
-
- // ---------------------- XxlJobScheduler ----------------------
-
- private XxlJobScheduler xxlJobScheduler;
-
- @Override
- public void afterPropertiesSet() throws Exception {
- adminConfig = this;
-
- xxlJobScheduler = new XxlJobScheduler();
- xxlJobScheduler.init();
- }
-
- @Override
- public void destroy() throws Exception {
-
- xxlJobScheduler.destroy();
- }
-
- ...
-
- }
其中XxlJobScheduler的init()方法初始化了一个JobScheduleHelper 帮助定时触发在admin页面配置的Job。
- public void init() throws Exception {
- // init i18n
- initI18n();
-
- // admin trigger pool start
- JobTriggerPoolHelper.toStart();
-
- // admin registry monitor run
- JobRegistryHelper.getInstance().start();
-
- // admin fail-monitor run
- JobFailMonitorHelper.getInstance().start();
-
- // admin lose-monitor run ( depend on JobTriggerPoolHelper )
- JobCompleteHelper.getInstance().start();
-
- // admin log report start
- JobLogReportHelper.getInstance().start();
-
- // start-schedule ( depend on JobTriggerPoolHelper )
- JobScheduleHelper.getInstance().start();
-
- logger.info(">>>>>>>>> init xxl-job admin success.");
- }
进入到JobScheduleHelper的start() 方法, start()方法初始化了2个线程:
1) scheduleThread, 读取xxl_job_info的status为1的所有任务并通过pushTimeRing(int ringSecond, int jobId)方法将 JobId和下次执行时间放入到时间轮里,同时根据cron表达式刷新下次执行时间。
注: ringData是通过时间戳的取余计算出来的,以一分钟为刻度,每一秒可以作为一个key, 如果有相同的key,那么计算出来的值会放在Map的value,即List<Integer>里。
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
2) ringThread, 轮询时间轮,取出JobId和下次执行时间,触发Trigger。
进入scheduleThread的run方法里,执行查询xxl_job_info表 status为1的记录:
进入到pushTimeRing, TimeRing 是一个时间轮。
TimeRing 用来存放触发时间和JobId的组合。
TimeRing
JobScheduleHelper的start()方法里scheduleThread 将任务放到时间轮里,ringThread的daemon线程处理时间轮里的任务,时间轮需要一个线程去轮询执行,类似于kafka的时间轮机制,也就是遍历ringItemData , 然后挨个去触发Trigger。
存放任务
ringData是一个map, key 为任务的时间戳,JobId为任务id, 如果相同时间内有多个任务,那么用List<Integer>存放任务Id列表。
- private void pushTimeRing(int ringSecond, int jobId){
- // push async ring
- List<Integer> ringItemData = ringData.get(ringSecond);
- if (ringItemData == null) {
- ringItemData = new ArrayList<Integer>();
- ringData.put(ringSecond, ringItemData);
- }
- ringItemData.add(jobId);
-
- logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
- }
取出任务
根据当前时间取出ringData里的任务id列表,然后轮询任务id列表,轮询执行trigger。
- // ring thread
- ringThread = new Thread(new Runnable() {
- @Override
- public void run() {
-
- while (!ringThreadToStop) {
-
- // align second
- try {
- TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
- } catch (InterruptedException e) {
- if (!ringThreadToStop) {
- logger.error(e.getMessage(), e);
- }
- }
-
- try {
- // second data
- List<Integer> ringItemData = new ArrayList<>();
- int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
- for (int i = 0; i < 2; i++) {
- List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
- if (tmpData != null) {
- ringItemData.addAll(tmpData);
- }
- }
-
- // ring trigger
- logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
- if (ringItemData.size() > 0) {
- // do trigger
- for (int jobId: ringItemData) {
- // do trigger
- JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
- }
- // clear
- ringItemData.clear();
- }
- } catch (Exception e) {
- if (!ringThreadToStop) {
- logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
- }
- }
- }
- logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
- }
- });
- ringThread.setDaemon(true);
- ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
- ringThread.start();
最终进入到ExecutorBizImpl的run()方法
和上述执行一次的逻辑一样会进入到XxlJobExecutor.registJobThread(int jobId, IJobHandler handler, String removeOldReason)方法,JobThread启动,调用目标方法,核心流程结束。