executor框架周期/延时任务schedulethreadpoolexecutor(代码片段)

潜龙在渊 潜龙在渊     2022-10-21     803

关键词:

ScheduledThreadPoolExecutor 介绍

??ScheduledThreadPoolExecutor 是一个可以实现定时任务的 ThreadPoolExecutor(线程池)。比 timer 更加灵活,效率更高!

//继承自 ThreadPoolExecutor 类,并实现了 ScheduledExecutorService 接口
public class ScheduledThreadPoolExecutorextends extends  ThreadPoolExecutor implements ScheduledExecutorService

ScheduledThreadPoolExecutor的四个构造方法如下:

    public ScheduledThreadPoolExecutor(int corePoolSize) 
        super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue());
    

    public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 
        super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory);
    

    public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 
        super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler);
    

    public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 
        super(arg0, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
    

??ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor ,从其构造方法可以看出,此线程池的线程也不会空闲超时(keepAliveTime = 0),同时使用队列是无边界的DelayedWorkQueue;要注意是,虽然此类继承自 ThreadPoolExecutor,但是有几个继承的调整方法对此类并无作用,特别是在此类中设置 maximumPoolSize 是没有意义的,因为ScheduleThreadPoolExecutor 使用了无边界的任务队列,所以根本不需要创建多于 corePoolsize 数量的线程。

扩展此类的注意事项:

??此类重写 AbstractExecutorService 的 submit 方法,以生成内部对象控制每个任务的延迟和调度。若要保留功能性,子类中任何进一步重写的这些方法都必须调用超类版本,超类版本有效地禁用附加任务的定制。但是,此类提供 protected 访问类型的扩展方法 decorateTask(为 Runnable 和 Callable 各提供一种版本),可定制用于通过 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 进入的执行命令的具体任务类型。

//修改或替换用于执行 callable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) 

//修改或替换用于执行 runnable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)

下面是 schedule()方法的源码,可以看出提交的 Runnable 任务通过 decorateTask() 方法封装成 RunnableScheduledFuture 对象,然后才处理这个对象。其他的submit()、execute() 等也是如此。

public ScheduledFuture<?> schedule(Runnable arg0, long arg1, TimeUnit arg3) 
        if (arg0 != null && arg3 != null) 
            //封装Runnable对象
            RunnableScheduledFuture arg4 = this.decorateTask((Runnable) arg0,
                    new ScheduledFutureTask(this, arg0, (Object) null, this.triggerTime(arg1, arg3)));
            this.delayedExecute(arg4);
            return arg4;
         else 
            throw new NullPointerException();
        
    

??默认情况下,ScheduledThreadPoolExecutor 使用一个扩展 FutureTask 的任务类型。但是,可以使用下列形式的子类修改或替换该类型。

 public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor 

   static class CustomTask<V> implements RunnableScheduledFuture<V>  ... 

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Runnable r, RunnableScheduledFuture<V> task) 
       return new CustomTask<V>(r, task);
   

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Callable<V> c, RunnableScheduledFuture<V> task) 
       return new CustomTask<V>(c, task);
   
   // ... add constructors, etc.
 


ScheduleThreadPoolExecutor 主要的方法介绍

1. 零延时的 execute()、submit() 方法

?? execute()、submit() 方法都被重写了,本质上调用的还是 schedule() 方法;从下面的源码可以看出,这两个方法提交的任务都是延时为0的 “实时任务”;

public void execute(Runnable arg0) 
        this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
    

    public Future<?> submit(Runnable arg0) 
        return this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
    

2. 提交一个延时任务的 schedule() 方法

方法描述:

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):
创建并执行在给定延迟后启用的 ScheduledFuture。
ScheduledFuture<?> schedule(Runnable command, long delay,imeUnit unit):
创建并执行在给定延迟后启用的一次性操作。

3、 提交周期性的任务 scheduleAtFixedRate()scheduleWithFixedDelay()

方法描述:

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):
initialDelay 是此周期任务的开始执行时的延时时间(即只在第一次开始执行时延时,此后周期性地执行这个任务)。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):
指定了首次执行前的初始延时时间,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

两者的区别:

scheduleAtFixedRate: 固定的周期时间。此方法的 period 参数所指的间隔时间是 从上一周期的任务开始时间到当前周期的任务开始时间的间隔。当上一周期任务执行结束了,如果任务的执行时间大于 指定的周期时间period ,那么便可以开始此周期任务的下一周期的执行。否则,便是间隔时间还没有达到一周期的时间,还需要继续等待,直到周期时间到来;总的来说,可以分为以下两种情况:

  • 任务的执行时间 > period参数:那么周期运行的时间便是 任务的执行时间。
  • 任务的执行时间 < period参数:那么周期运行的时间便是 period参数。

scheduleWithFixedDelay: 固定的间隔时间。此方法的 delay 参数所指的间隔时间是 从上一周期的任务的执行结束时间到当前周期的任务开始时间的间隔,是指定任务的固定的运行间隔,与任务的执行时间无关。

@ Example1 scheduleAtFixedRate 测试

简单起见,下面创建了只有一个线程 ScheduledThreadPoolExecutor 对象,也只提交一个周期任务。 下面的例子中,任务的执行时间大于 period 参数。

public class Test 
    public static void main(String[] args) 
        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作为一个周期任务提交,period 为1000ms,任务执行时间为2000ms
        schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
    
class MyRunnable implements Runnable 

    int perio = 1;

    @Override
    public void run() 
       //为周期任务捕获异常,避免异常影响下一周期的任务执行
        try 
            System.out.println("---------------第 " + perio + " 周期-------------");
            System.out.println("begin = " + System.currentTimeMillis() / 1000);//秒
            //任务执行时间
            Thread.sleep(2000);
            System.out.println("end =   " + System.currentTimeMillis() / 1000);
            perio++;
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (Exception e)  
            e.printStackTrace();
        

    

运行结果:

---------------第 1 周期-------------
begin = 1513938114
end = 1513938116
---------------第 2 周期-------------
begin = 1513938116
end = 1513938118
---------------第 3 周期-------------
begin = 1513938118
end = 1513938120
---------------第 4 周期-------------
begin = 1513938120
end = 1513938122
---------------第 5 周期-------------
begin = 1513938122
end = 1513938124

从结果可以看出,任务的周期执行是连着的,没有间隔时间。这是因为任务的运行时间大于周期执行时间,即当任务还没结束时,周期时间已经到了,所以任务刚结束,就可以进行下一周期的执行。


@ Example2 scheduleWithFixedDelay 测试

??同样也是上面的例子,将周期方法换成 scheduleWithFixedDelay( )

public class Test 
    public static void main(String[] args) 
        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作为一个周期任务提交,delay 为1000ms
        schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
    

运行结果:

---------------第 1 周期-------------
begin = 1513938832
end = 1513938834
---------------第 2 周期-------------
begin = 1513938835
end = 1513938837
---------------第 3 周期-------------
begin = 1513938838
end = 1513938840
---------------第 4 周期-------------
begin = 1513938841
end = 1513938843

上面的scheduleWithFixedDelay例子的任务是间隔一个固定的时间执行的,无论任务的执行时间是否大于周期时间。

4. 线程池关闭

两个关闭线程池的方法,一旦线程池被关闭,就会拒绝以后提交的所有任务

void shutdown()
在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。线程池中的周期任务、延时任务,根据下面的两个策略来判断是否继续正常运行,还是停止运行。

List<Runnable> shutdownNow()
尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。对于正在运行,尝试通过中断该线程来结束线程。对于尚未运行的任务,则都不再执行。

线程池关闭(shutdown())下的两个策略的描述

  • void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value):
    在调用线程池调用了 shutdown()方法后,是否继续执行现有延时任务(就是通过 schedule()方法提交的延时任务 )的策略;默认值为false;在以下两种种的情况下,延时任务将会被终止:

  • void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
    在调用线程池调用了 shutdown()方法后,是否继续执行现有周期任务(通过 scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任务)的策略;默认值为false;在以下两种的情况下,周期任务将会被终止:

获取这个两个策略的设置值:

boolean getContinueExistingPeriodicTasksAfterShutdownPolicy():
取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy():
获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略

@ Example3 shoutdown下的周期任务测试

??还是基于上面的例子进行改造,main线程休眠10秒后,shutdown线程池。在默认的情况下(策略为false),因为间隔为1s,任务执行时间为2s,所以 shutdown 后,最多能执行4个周期;但是下面的例子,将策略的值设置为true,shutdown后,周期任务也可以正常运行下去。

public class Test 
    public static void main(String[] args) throws InterruptedException 
        //池中只有一个线程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //shutdown时,周期任务的策略
        schedulePool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
         //作为周期任务提交 
        ScheduledFuture future = schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
        
        Thread.sleep(10*1000);
        
        schedulePool.shutdown();
    

运行结果:

---------------第 1 周期-------------
begin = 1513945378
end = 1513945380
---------------第 2 周期-------------
begin = 1513945381
end = 1513945383
---------------第 3 周期-------------
begin = 1513945384
end = 1513945386
---------------第 4 周期-------------
begin = 1513945387
end = 1513945389
---------------第 5 周期-------------
begin = 1513945390
end = 1513945392
---------------第 6 周期-------------
begin = 1513945393
end = 1513945395
.......

5. 移除任务、取消任务

BlockingQueue
返回此执行程序使用的任务队列。此队列中的每个元素都是一个 ScheduledFuture,包括用 execute 所提交的那些任务
boolean remove(Runnable task):
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
void setRemoveOnCancelPolicy(boolean value):
此方法是在1.7引入的,是用于对调用cancel()的任务的处理策略:是否马上移除出队列;默认为false;
周期任务也可以通过 ScheduledFuture的 cancel()取消运行;

Executors 提供了两个常用的ScheduledThreadPoolExecutor

??这两个常用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(单线程的线程池)、ScheduledThreadPool(线程数量固定的线程池),下面是 Executors 对应的源代码。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() 
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory arg) 
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, arg));
    

    public static ScheduledExecutorService newScheduledThreadPool(int arg) 
        return new ScheduledThreadPoolExecutor(arg);
    

    public static ScheduledExecutorService newScheduledThreadPool(int arg, ThreadFactory arg0) 
        return new ScheduledThreadPoolExecutor(arg, arg0);
    

java-延时执行-参数-任务

定义任务ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(50);scheduledExecutorService.schedule(newDoorGuardDelUserThreadPool(vd,companyDao,preRegistrationDataDao,doorGuard 查看详情

executors之scheduledthreadexecutor

参考技术AScheduledThreadPoolExecutor继承自ThreadPoolExecutor实现了ScheduledExecutorService接口。主要完成定时或者周期的执行线程任务。执行过程:(1)scheduledExecutorService.schedule(myThread,5,TimeUnit.SECONDS)结果:可以看出主线程启动子线程:scheduled... 查看详情

executor框架简介

Executor框架是在Java5中引入的,可以通过该框架来控制线程的启动,执行,关闭,简化并发编程。Executor框架把任务提交和执行解耦,要执行任务的人只需要把任务描述清楚提交即可,任务的执行提交人不需要去关心。通过Executor... 查看详情

executor框架

  Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相... 查看详情

线程池框架executor

Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于... 查看详情

apscheduler定时任务使用(代码片段)

...结束时间主要模块触发器[triggers]作业店[jobstores]执行者[executors]调度器[schedulers]使用安装pipinstallapscheduler创建调度器fromdatetimeimportdatetimeimportjsonfromapscheduler.schedulers.backgroundimportBackgroundSchedulerfromapscheduler.jobstores.mongodbimportMongoDBJobS... 查看详情

celery---一个懂得异步任务,延时任务,周期任务的芹菜(代码片段)

...celey是芹菜celery是基于Python实现的模块,用于执行异步延时周期任务的其结构组成是由  1.用户任务app  2.管道任务broker用于存储任务官方推荐redisrabbitMQ/backend用于存储任务执行结果的  3.员工workerCelery的简单示例fromceleryimpor... 查看详情

celery时区设置问题源码探究

参考技术A项目中有使用到Celery框架,主要使用Celery来在使用Django搭建的项目中创建延时任务及周期任务。在使用过程中出现过延时任务及周期任务到预定时间未能执行的情况。Google、百度了一些网友的分析及解决方案,大多认... 查看详情

java并发专题之十juc-locks之线程池框架概述

环境   jdkversion:jdk1.8.0_171一、Executor接口执行器接口,也是最顶层的抽象核心接口,分离了任务和任务的执行。二、ExecutorService接口在Executor的基础上提供了执行器生命周期管理,任务异步执行等功能。在Executor的基础上增强了... 查看详情

jdk多任务执行框架(executor框架)

Executor的常用方法为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通... 查看详情

延时任务实现方案总结(代码片段)

...引入一个依赖如下所示<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId& 查看详情

celery异步,延时任务,周期任务(代码片段)

  celery中文译为芹菜,是一个分布式任务队列. 是异步的,所以能处理大量消息  最新的celery不支持windows下使用了,所以在使用pycharm安装celery模块之后,需要再安装eventlet模块才能测试运行.一.异步任务启动客户端:s1,s2要在项... 查看详情

java多线程-定时器-并发与并行-线程生命周期(代码片段)

...件发送。定时器实现方式:方式一:Timer方式二:ScheduledExecutorServiceTimer定时器构造器和方法如下:构造器说明publicTimer()创建Timer定时器对象publicstaticvoidmain(String[]args)//创建一个定时器任务Timertimer=newTimer();方法说明schedule(... 查看详情

ucosii如何分配任务之间的延时节拍时间

...分配任务之间的延时节拍时间参考技术A我说的是UCOSII的周期性执行任务,不是时间分片。而周期性任务的结构如下:voidMyTask(void*pdata)//周期性执行的任务函数进行准备工作的代码;for(;;)//无限循环,也可用while(1)任务实体代码;OSTi... 查看详情

schedule:一个简单实用的python周期任务调度工具!(代码片段)

欢迎关注,专注Python、数据分析、数据挖掘、好玩工具!如果你想周期性地执行某个Python脚本,最出名的选择应该是Crontab脚本,但是Crontab具有以下缺点:1.不方便执行秒级任务。2.当需要执行的定时任务有上... 查看详情

kotlin延时执行任务操作(代码片段)

Timer().schedule(3000)//执行的任务 查看详情

springboot集成schedule(深度理解)(代码片段)

...务框架做一个比较:从以上表格可以看出,SpringSchedule框架功能完善,简单易用。对于中小型项目需求,SpringSchedule是完全可以胜任的。  1、springboot集成schedule1.1添加 查看详情

java并发编程-executor框架executor,

...一些列的小任务,即Runnable,然后将这些任务提交给一个Executor执行, Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。Executor的子接口有:ExecutorService,ScheduledExecutorService,已知实现类:AbstractExec 查看详情