xxljob分布式定时任务xxljob用法及核心调度源码详解(代码片段)

Dream_it_possible! Dream_it_possible!     2023-01-06     303

关键词:

  

目录

一、XxlJob 的Executor

1. 使用Spring框架注入

2. 不使用框架注入

3. 使用jar包的形式集成executor

二、XxlJob的核心工作原理

1. 注册JobHandler

2.  注册JobThread

3. JobThread---- 真正执行Job的地方

3. 执行一次任务

4. 启动任务

XxlJobScheduler

TimeRing


      XxlJob是目前最流行的分布式定时任务中间件,对比quartz,代码的侵入明显少了很多,不需要每次在代码里配置job, 而XxlJobd的admin server组件提供了可视化ui, 对job和执行器能够从前端页面配置管理,简单易用,目前已经接入几百家互联网公司使用,XxlJob的强大任务调度能力为广大开发者和企业所认可,那XxlJob是怎么工作的?  

        Tip: 总字数22922字,阅读全文大概会花您20分钟喝茶时间~

        XxlJob最新依赖版本: 2.3.0 和源码地址:

 <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.0</version>
</dependency>

GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)

        XxlJob主要包含2个核心模块:  xxl-job-admin 和xxl-job-core。

         

  •  xxl-job-admin 提供可视化的ui页面管理执行器、Job以及查看日志等功能, 默认登录地址为: localhost:8080/xxl-job-admin, 用户名为: admin, 密码为: 123456。
  •  xxl-job-executor 中基于netty实现一个embedServer, 与admin server是一个独立的server ,处理任务调度请求,包含了Job的核心调度实现。

        最新版本使用@XxlJob注解标记Job,  同时支持生命周期Job任务。

        XxlJob的Executor组件是Job调度的核心实现,配合admin  Server 完成周期调度。

一、XxlJob 的Executor

        XxlJob提供了2个任务执行器,简称Executor, XxlJob通过Executor来管理所有Job的生命周期,包括Job的初始化、启动和销毁等工作,目前的2个主要子类为XxlJobSimpleExecutor和XxlSpringExecutor。

  • XxlJobSimpleExecutor 提供不依赖Spring框架的实现方式。也就是说我不用Spring框架,使用纯Java代码也能使用XxlJob。
  • XxlSpringExecutor 提供基于Spring框架的实现方式。

        XxlJobSimpleExecutor和XxlSpringExecutor都继承了XxlJobExecutor, XxlJobExecutor提供注册Job、初始化Server等功能、核心方法 registJobHandler、initEmbedServer。

               

        注入Job的方式有2种: 基于Spring的Bean 和 纯Java(不使用Spring框架)两种。

1. 使用Spring框架注入

        覆盖XxlJobSpringExecutor, 使用@Value注解读取application.properties里的配置。

package com.xxl.job.executor.core.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig 
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("$xxl.job.admin.addresses")
    private String adminAddresses;

    @Value("$xxl.job.accessToken")
    private String accessToken;

    @Value("$xxl.job.executor.appname")
    private String appname;

    @Value("$xxl.job.executor.address")
    private String address;

    @Value("$xxl.job.executor.ip")
    private String ip;

    @Value("$xxl.job.executor.port")
    private int port;

    @Value("$xxl.job.executor.logpath")
    private String logPath;

    @Value("$xxl.job.executor.logretentiondays")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() 
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>$version</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


配置application.properties文件: 

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

创建一个Bean类SampleXxlJob, 每一个被@XxlJob标记方法都是一个Job,使用@XxlJob注解标记方法即可。

package com.xxl.job.executor.service.jobhandler;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例(Bean模式)
 *
 * 开发步骤:
 *      1、任务开发:在Spring Bean实例中,开发Job方法;
 *      2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
 *      3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
 *      4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
public class SampleXxlJob 
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) 
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        
        // default success
    


    /**
     * 2、分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception 

        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();

        XxlJobHelper.log("分片参数:当前分片序号 = , 总分片数 = ", shardIndex, shardTotal);

        // 业务逻辑
        for (int i = 0; i < shardTotal; i++) 
            if (i == shardIndex) 
                XxlJobHelper.log("第  片, 命中分片开始处理", i);
             else 
                XxlJobHelper.log("第  片, 忽略", i);
            
        

    


    /**
     * 3、命令行任务
     */
    @XxlJob("commandJobHandler")
    public void commandJobHandler() throws Exception 
        String command = XxlJobHelper.getJobParam();
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try 
            // command process
            ProcessBuilder processBuilder = new ProcessBuilder();
            processBuilder.command(command);
            processBuilder.redirectErrorStream(true);

            Process process = processBuilder.start();
            //Process process = Runtime.getRuntime().exec(command);

            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) 
                XxlJobHelper.log(line);
            

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
         catch (Exception e) 
            XxlJobHelper.log(e);
         finally 
            if (bufferedReader != null) 
                bufferedReader.close();
            
        

        if (exitValue == 0) 
            // default success
         else 
            XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
        

    


    /**
     * 4、跨平台Http任务
     *  参数示例:
     *      "url: http://www.baidu.com\\n" +
     *      "method: get\\n" +
     *      "data: content\\n";
     */
    @XxlJob("httpJobHandler")
    public void httpJobHandler() throws Exception 

        // param parse
        String param = XxlJobHelper.getJobParam();
        if (param==null || param.trim().length()==0) 
            XxlJobHelper.log("param["+ param +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        

        String[] httpParams = param.split("\\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) 
            if (httpParam.startsWith("url:")) 
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            
            if (httpParam.startsWith("method:")) 
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
            
            if (httpParam.startsWith("data:")) 
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            
        

        // param valid
        if (url==null || url.trim().length()==0) 
            XxlJobHelper.log("url["+ url +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) 
            XxlJobHelper.log("method["+ method +"] invalid.");

            XxlJobHelper.handleFail();
            return;
        
        boolean isPostMethod = method.equals("POST");

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try 
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(isPostMethod);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (isPostMethod && data!=null && data.trim().length()>0) 
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) 
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) 
                result.append(line);
            
            String responseMsg = result.toString();

            XxlJobHelper.log(responseMsg);

            return;
         catch (Exception e) 
            XxlJobHelper.log(e);

            XxlJobHelper.handleFail();
            return;
         finally 
            try 
                if (bufferedReader != null) 
                    bufferedReader.close();
                
                if (connection != null) 
                    connection.disconnect();
                
             catch (Exception e2) 
                XxlJobHelper.log(e2);
            
        

    

    /**
     * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");
    
    public void init()
        logger.info("init");
    
    public void destroy()
        logger.info("destroy");
    



2. 不使用框架注入

        不使用Spring框架也能实现Bean的注入,使用类加载器调用getResourceAsStream方法读取到Properties对象实例里,然后初始化XxlJobExecutor的子类, 在init的时候将所有的声明@XxlJob的类作为bean设置在List<Object> beans里。

package com.bing.sh.job.config;

import com.bing.sh.job.executor.SimpleExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Properties;


@Configuration
public class FrameLessXxlJobConfig 

    private Logger logger = LoggerFactory.getLogger(FrameLessXxlJobConfig.class);


    // singleTon
    private static final FrameLessXxlJobConfig instance = new FrameLessXxlJobConfig();

    public static FrameLessXxlJobConfig getInstance() 
        return instance;
    


    public SimpleExecutor initXxlJobExecutor(String appName, List<Object> beanLists) 
        Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
        // init executor
        SimpleExecutor xxlJobExecutor = new SimpleExecutor();
        xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
        xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
        xxlJobExecutor.setAppname(appName);
        xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
        xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
        xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
        xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
        xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

        xxlJobExecutor.setXxlJobBeanLists(beanLists);
        try 
            xxlJobExecutor.start();
         catch (Exception e) 
            logger.error(e.getMessage(), e);
        
        return xxlJobExecutor;
    

    public Properties loadProperties(String fileName) 
        InputStreamReader isr = null;
        try 
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            isr = new InputStreamReader(classLoader.getResourceAsStream(fileName), "utf-8");
            if (isr != null) 
                Properties prop = new Properties();
                prop.load(isr);
                return prop;
            
         catch (IOException e) 
            logger.error("load  propeties  error");
        
        return null;

    



        推荐采用第二种方式注入,分布式环境下我们可以使用第二种方式注入,将executor打成jar包,然后在微服务里扫描所有包含@XxlJob的bean, 每个依赖的服务只需要配置自己服务的appName即可。

xxl.job.executor.appname=xxl-job-user-service

        当然也可以采用Springboot的形式注入,只是在配置时,我们需要在每个服务里注入xxlJob的admin url和executor的所有相关信息。

3. 使用jar包的形式集成executor

        新创建一个base-service project ,  将executor的公共的配置放入到base-service里, 执行器的端口设置为:9998。

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8000/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
#xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

打包发布到本地仓库和私服,添加pom.xml配置:


    <!--将本地jar发布到私服-->
    <distributionManagement>
        <repository>
            <id>maven-releases</id>
            <url>http://192.168.31.129:30081/repository/maven-releases/</url>
        </repository>

        <snapshotRepository>
            <id>maven-snapshots</id>
            <name>Internal Snapshots</name>
            <url>http://192.168.31.129:30081/repository/snapshots/</url>
        </snapshotRepository>

    </distributionManagement>

执行命令:  

mvn clean install package deploy

在自己的服务里添加base-service依赖:

<dependency>
                <groupId>com.bing.sh</groupId>
                <artifactId>base-service</artifactId>
                <version>0.0.1-release</version>
</dependency>

在user-service里的application.properties文件里配置appName:

# xxlJob
xxl.job.executor.appname=xxl-job-user-service

注入appName和所有的bean。

package com.bingbing.sh.config;

import com.bing.sh.job.config.FrameLessXxlJobConfig;
import com.bingbing.sh.job.UserJobHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

@Configuration
public class XxlJobConfig 


    @Value("$xxl.job.executor.appname")
    private String appName;


    @Bean
    public void initJobExecutor() 
        FrameLessXxlJobConfig frameLessXxlJobConfig = new FrameLessXxlJobConfig();
        frameLessXxlJobConfig.initXxlJobExecutor(appName, Arrays.asList(new UserJobHandler()));
    


二、XxlJob 核心工作原理

1. 注册JobHandler

       Job处理器是XxlJob中调度的单位,也是最终调用目标的任务的载体,所有的Job处理器注册在了一个ConcurrentHashMap里,  在XxlJobExecutor类里,其中map的key 为@XxlJob(value=''')的value值, map的value 一个IJobHandler接口的实例实现。

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository 
    = new ConcurrentHashMap<String, IJobHandler>();
    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

         IJobHandler有3个实现,分别为GlueJobHandler、MethodJobHandler和ScriptJobHandler。

handler名称描述
GlueJobHandler提供GLUE任务的处理器。
MethodJobHandler提供常规的Bean模式方法Job处理器。
ScriptJobHandler提供脚本处理器。

        其中MethodJobHandler能基本满足我们日常的开发需求。

         最新版本支持生命周期模式,提供init和destroy的存放方法,MethodHandler包含3个Method属性: executeMethod 、initMethod和destroyMethod,用法:

        实例化一个MethodJobHandler,然后根据XxlJob注解里的定义的init、destory和value值找到对应的method对象,封装到MethodJobHandler里。

2.  注册JobThread

        JobThread是运行job的一个线程,可以看做执行Job线程载体,存放在XxlJobExecutor类里 的JobThreadRepository,它也是一个concurrentHashMap。

private static ConcurrentMap<Integer, JobThread> jobThreadRepository 
        = new ConcurrentHashMap();

        注册JobThread方法, 每次注册时会将jobId和Jobhandler作为参数实例化一个JobThread。

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason)
        JobThread newJobThread = new JobThread(jobId, handler);
        // 启动线程
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:, handler:", new Object[]jobId, handler);

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) 
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        

        return newJobThread;
    

        直接调用newJobThread.start()启动JobThread线程,如果该job已经存在于jobThreadRepository里,那么停掉旧线程,这样能始终保证只有一个线程为Job服务,避免有些情况下会出现任务重复执行,发生定时错乱问题。

        可以通过postman调用一个http请求去kill掉该Job,查看XxlJob会在任务执行的时候,重新创建一个新的线程去替代旧线程。

        localhost:9998/kill 是executor提供的一个http请求,参数为"jobId":2。

        

         调用结果:

观察executor的控制台: 

21:23:23.916 logback [Thread-14] INFO  com.xxl.job.core.thread.JobThread - >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[Thread-14,10,main]
21:23:24.014 logback [xxl-rpc, EmbedServer bizThreadPool-1270369654] INFO  c.x.job.core.executor.XxlJobExecutor - >>>>>>>>>>> xxl-job regist JobThread success, jobId:2, handler:com.xxl.job.core.handler.impl.MethodJobHandler@2d99d5a5[class com.bingbing.sh.job.UserJobHandler#initUserHandler]

         也可以跟踪代码发现创建了一个新的线程去替代旧线程。

3. JobThread---- 真正执行Job的地方

         JobThread是一个自定义的线程,也是正在调用@XxlJob标记方法的地方,执行的机制是通过反射,调用的形式是通过启动JobThread线程,  在run()方法里通过handler来执行execute()方法,达到最终调用目标方法的目的。

        看下面一个Job例子,在JobThread是如何执行的呢?

    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception 
        XxlJobHelper.log("XXL-JOB, Hello World.");
    
    public void init()
        logger.info("init");
    
    public void destroy()
        logger.info("destroy");
    

         在run方法里会首先从triggerQueue里poll一个triggerParam, triggerParam 是启动job的一组参数集,在admin 页面 启动任务时将初始化triggerParam, 下一节会提到triggerParam。

        根据调试,默认的getExecutorTimeout() 的值为0,因此直接执行handler.execute() 方法, MethodJobHandler的execute方法如下:

    public void execute() throws Exception 
        Class<?>[] paramTypes = this.method.getParameterTypes();
        if (paramTypes.length > 0) 
            this.method.invoke(this.target);
         else 
            this.method.invoke(this.target);
        

    

        我们在这里看到了最终执行Job的地方是JobThread类里的handler.execute()、handler.init()和handler.destory()方法。

        让我们接着看XxlJob是如何触发执行任务的,简单讲是怎么触发JobThread的启动,是怎么实现在admin页面通过手动的控制任务的启动与终止Job的?

3. 执行一次任务

        在控制台上执行一次任务 ,点击执行:

        核心思想: 执行一次时直接触发任务,发送Http请求 /run 给executor,netty server 接收到请求后,执行run()方法----executorBiz.run(triggerParam), 最终进入JobThread,执行任务。

        接着进入到JobTriggerPoolHelper的addTrigger()方法,这里使用了线程池去执行trigger动作。

   public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) 

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10)       // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        

        // trigger
        triggerPool_.execute(new Runnable() 
            @Override
            public void run() 

                long start = System.currentTimeMillis();

                try 
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                 catch (Exception e) 
                    logger.error(e.getMessage(), e);
                 finally 

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) 
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500)        // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) 
                            timeoutCount.incrementAndGet();
                        
                    
                
            
        );
    

        接着进入到XxlJobTrigger类里的processTrigger方法,看processTrigger主要做了哪几件事?

1) init trigger-param,  创建一个TriggerParam实例。

2) 获取executor的address, 是从xxl_job_group表里读取出来的一个address,该address可自动注册也可在admin后台手动录入。

3) 将TriggerParam 和 address 组合,执行 runExecutor(triggerParam,address)方法。

   ReturnT<String> triggerResult = null;
        if (address != null) 
            triggerResult = runExecutor(triggerParam, address);
         else 
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        

4)  调用 ExecutorBiz 接口的run方法, 实现类为ExecutorBizImpl

        try 
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
         catch (Exception e) 
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        

 5)  进入到 run() 方法, 执行jobThread 的实例化, 如果有JobId对应了旧的Thread,那么需要用新线程去替换。

        // replace thread (new or exists invalid)
        if (jobThread == null) 
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

 进入到registJobThread, 启动JobThread。

 JobThread 启动成功,意味着JobId对应的目标方法会被调度到。

4. 启动任务

        启动任务与执行一次的触发方式不同,执行一次直接会调用触发器,到executor的run()方法里执行JobThread, 而启动的任务则需要借助JobScheduleHelper来调度执行。

        同时将xxl_job_info表里的任务status 字段置为1,为后续定时任务判断job的状态为启动: 

XxlJobScheduler

        XxlJobScheduler是admin server 初始化的一个bean, 在spring 生命周期中的InitializingBean的afterPropertiesSet() 方法里初始化, 在Spring 容器启动的时会执行afterPropertiesSet() 方法。

public class XxlJobAdminConfig implements InitializingBean, DisposableBean 

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() 
        return adminConfig;
    


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception 
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    

    @Override
    public void destroy() throws Exception 

        xxlJobScheduler.destroy();
    

...

其中XxlJobScheduler的init()方法初始化了一个JobScheduleHelper 帮助定时触发在admin页面配置的Job。

    public void init() throws Exception 
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    

进入到JobScheduleHelper的start() 方法, start()方法初始化了2个线程:
1)  scheduleThread,  读取xxl_job_info的status为1的所有任务并通过pushTimeRing(int ringSecond, int jobId)方法将 JobId和下次执行时间放入到时间轮里,同时根据cron表达式刷新下次执行时间。

        注:  ringData是通过时间戳的取余计算出来的,以一分钟为刻度,每一秒可以作为一个key, 如果有相同的key,那么计算出来的值会放在Map的value,即List<Integer>里。

int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

2)  ringThread, 轮询时间轮,取出JobId和下次执行时间,触发Trigger。

进入scheduleThread的run方法里,执行查询xxl_job_info表 status为1的记录:

进入到pushTimeRing, TimeRing 是一个时间轮。

 TimeRing 用来存放触发时间和JobId的组合。

TimeRing

        JobScheduleHelper的start()方法里scheduleThread 将任务放到时间轮里,ringThread的daemon线程处理时间轮里的任务,时间轮需要一个线程去轮询执行,类似于kafka的时间轮机制,也就是遍历ringItemData , 然后挨个去触发Trigger。

存放任务

        ringData是一个map, key 为任务的时间戳,JobId为任务id, 如果相同时间内有多个任务,那么用List<Integer>存放任务Id列表。

    private void pushTimeRing(int ringSecond, int jobId)
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) 
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    

取出任务 

        根据当前时间取出ringData里的任务id列表,然后轮询任务id列表,轮询执行trigger。

   // ring thread
        ringThread = new Thread(new Runnable() 
            @Override
            public void run() 

                while (!ringThreadToStop) 

                    // align second
                    try 
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                     catch (InterruptedException e) 
                        if (!ringThreadToStop) 
                            logger.error(e.getMessage(), e);
                        
                    

                    try 
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) 
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) 
                                ringItemData.addAll(tmpData);
                            
                        

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) 
                            // do trigger
                            for (int jobId: ringItemData) 
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            
                            // clear
                            ringItemData.clear();
                        
                     catch (Exception e) 
                        if (!ringThreadToStop) 
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:", e);
                        
                    
                
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            
        );
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();

最终进入到ExecutorBizImpl的run()方法

和上述执行一次的逻辑一样会进入到XxlJobExecutor.registJobThread(int jobId, IJobHandler handler, String removeOldReason)方法,JobThread启动,调用目标方法,核心流程结束。

xxljob负载均衡用法及实现原理详解(代码片段)

...um3. ExecutorRouteRound计算server地址 4.小结    上一篇讲到XxlJob是核心调度实现,接着看XxJob是怎么做负载均衡的& 查看详情

springcloud技术栈系列4:分布式定时任务

问题 回答xxljob支持自定义http接口来添加调度任务吗?.支持的,我们基于低代码的实际场景,封装成了openfeign的接口,这实际上也是一种http接口,我们封装了两个接口,一个用来增加和更新调度任务,一个用来修改任务的状态... 查看详情

发现mariadb数据库时间晚了12个小时,xxljob定时任务调度异常

...现时区是美国时区(EDT),非中国时区:xxljob定时任务提前12个小时执行了,示例:应该是在晚上23点执行的ÿ 查看详情

xxljob间隔几天运行时间不准确

参考技术A关于xxl-job两台机器时间或者时区不同的问题解决。把xxl-job的admin跟执⾏任务的程序通过分布式部署的话,两台机器时间差必须在三分钟之内。 查看详情

让定时任务xxljob里面的任务灵活起来(代码片段)

...之前定时任务执行中漏了一条数据。这个时候,如何xxljob的任务参数传递过来了业务号。那么就只执 查看详情

xxl-job后继任务导致前一个任务执行一半,源码分析xxljob

step1查看xxljob架构原理: 查看xxljob-admin, 存在路由策略配置、运行模式、阻塞处理策略。分析xxljob源码core阻塞处理策略枚举:路由策略枚举:发现存在failover(失败转移),和busyover(忙碌转移) 路由策略实现方... 查看详情

springboot项目集成xxljob全纪录(图文详解)(代码片段)

目录xxljob介绍优点特性如下:代码配置过程1.引入xxl-job的依赖2.编写配置文件3.编写配置类4.新建Job文件夹,将自己写的类放到此文件夹下5.编写业务代码登录xxl-Job并配置1.执行器管理--新增执行器 2.任务管理--新增任务 ... 查看详情

springboot整合定时任务,可以动态编辑的定时任务(代码片段)

...0c;一个简单的管理页面不过我们当时自己写的这个不支持分布式环境,想要支持倒也不是啥难事,弄一个zookeeper或者redis作为公共的信息中心,里边记录了定时任务的各种运行情况,有了这个就能支持分布式环境... 查看详情

xxljob部署及基本使用(代码片段)

...artifactId><version>2.3.0</version></dependency>配置XxlJobSpringExe 查看详情

项目实战典型案例24.xxljob控制台不打印日志排查

目录一:背景介绍问题一:xxljob不打印日志问题二:启动权限二:问题排查过程问题一:问题二三:总结一:背景介绍问题一:xxljob不打印日志问题二:启动权限admin权限启动jar包后,xxljob... 查看详情

那些年我们追过的源码

...,etcd)sentinel(熔断限流框架)seata(分布式事务框架)canal(MySQL增量订阅中间件)xxljob(定时任务调度)tinyid(滴滴分布式id) 查看详情

分布式定时任务调度框架-quartz学习及实战记录笔记

...任务并将定时调度任务持久化到MySQL以及Quartz集群配置7.分布式定时任务调度框架学习与实战记录完整篇 查看详情

分布式定时任务调度框架-quartz学习及实战记录笔记

...任务并将定时调度任务持久化到MySQL以及Quartz集群配置7.分布式定时任务调度框架学习与实战记录完整篇 查看详情

基于springboot的定时任务实现(非分布式)

1.核心注解在springboot项目中我们可以很方便地使用spring自己的注解@Scheduled和@EnableScheduling配合来实现便捷开发定时任务。@EnableScheduling注解的作用是发现注解@Scheduled的任务并后台执行,此注解可以加到启动类上也可以加到执行调... 查看详情

点我达分布式任务调度系统-dajob

...时任务数量日益增加,常规的垂直应用架构已无法应对,分布式服务架构势在必行。同时,也迫切需要一个分布式任务调度系统来管理分布式服务中的定时任务。单一应用架构当网站流量很小时,只需一个应用,将所有功能都部... 查看详情

xxjob使用

...xl-job-executor-sample-springboot项目中com.xxl.job.executor.core.config.XxlJobConfig到自己项目2).复制对应的配置文件到自己项目,xxljob依赖3).编写定时任务demo补充:1.报警邮件配置(发送者),需要在对应邮箱设置中开启SMTP获取授权码: 查看详情

【springboot实战】分布式定时任务锁shedlock

...由于定时任务的特殊性,以及一些方法的幂等性要求,在分布式多节点部署的情况下,某个定时任务只需要执行一次。1.背景介绍ShedLock(https://github.com/lukas-krecan/ShedLock)是一个轻量级的分布式定时任务锁组件,使用其可以满足我... 查看详情

springboot定时任务原理及如何动态创建定时任务

...一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据... 查看详情