jdk多任务执行框架(executor框架)

Mr.years Mr.years     2022-10-14     626

关键词:

Executor的常用方法

为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通过Executors可以创建特定功能的线程池。

  1. newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,如线程池中有空闲,则立即执行,如没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。
  2. newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
  3. newCachedThreadPool()方法,返回一个可以根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动收回。
  4.  newScheduleThreadPool()方法,返回一个ScheduledExecutorService对象,但该线程池可以指定线程的数量。

查看它们的源码:

技术分享图片
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        //这里super是ThreadPoolExecutor类
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
View Code

从源码看以看出它们都是由ThreadPoolExecutor构造出来的。

ThreadPoolExecutor构造方法概述

ThreadPoolExecutor(

  int corePoolSize,//核心线程数,线程池刚初始化的时候实例化线程个数

  int maximumPoolSize,//最大线程数

  long keepLongTime,//空闲时间,过时回收

  TimeUnit unit,//时间单位

  BlockingQueue<Runable> worker,//线程暂缓处

  ThreadFactory threadFactory,

  RejectExecuteHandle handle//拒绝执行的方法

)

所以上面几个方法主要是根据不同的参数来执行不同的行为。其中newScheduleThreadPool方法稍复杂一点,它的方法值是ScheduledExecutorService。

ScheduledExecutorService主要有两个方法scheduleWithFixedDelay,scheduleAtFixedRate

1、scheduleAtFixedRate 方法,以固定的频率来执行某项计划(任务),即固定的频率来执行某项计划,它不受计划执行时间的影响。到时间就执行。 它不受计划执行时间的影响。
2、scheduleWithFixedDealy,相对固定的延迟后,执行某项计划(任务), 即无论某个任务执行多长时间,等执行完了,我再延迟指定的时间去执行。它受计划执行时间的影响。

技术分享图片
class Temp extends Thread {
    public void run() {
        try {
            System.out.println( new Date().toString() + "..run start");
            Thread.sleep(3000);
            System.out.println( new Date().toString() + "...run end");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public class ScheduledJob {
    
    public static void main(String args[]) throws Exception {
    
        Temp command = new Temp();
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        System.out.println(new Date().toString() + "...start");
        //scheduler.scheduleWithFixedDelay(command, 5, 2, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(command, 5, 2, TimeUnit.SECONDS);
    }
}
View Code

scheduleAtFixedRate 的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,所以它在第一个run方法执行完后,立即执行了第二个run方法

Tue Feb 06 15:48:01 CST 2018...start
Tue Feb 06 15:48:06 CST 2018..run start
Tue Feb 06 15:48:09 CST 2018...run end
Tue Feb 06 15:48:09 CST 2018..run start
Tue Feb 06 15:48:12 CST 2018...run end
Tue Feb 06 15:48:12 CST 2018..run start

scheduleWithFixedDealy的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,但是它还是等了2秒后,再执行了第二个run方法

Tue Feb 06 15:53:04 CST 2018...start
Tue Feb 06 15:53:10 CST 2018..run start
Tue Feb 06 15:53:13 CST 2018...run end
Tue Feb 06 15:53:15 CST 2018..run start
Tue Feb 06 15:53:18 CST 2018...run end
Tue Feb 06 15:53:20 CST 2018..run start
Tue Feb 06 15:53:23 CST 2018...run end

自定义线程池

如果Executors工厂类无法满足我们的需求,可以自己去创建线程池。在自己创建线程池时,这个构造方法对于队列是什么类型比较关键:

使用有届队列(ArrayBlockingQueue):若有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列中等待执行,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建行的线程,诺线程数大于maximumPoolSize,则执行拒绝策略。
使用无界队列(LinkedBlockingQueue):除非系统资源耗尽,否则无界的任务队列不存在入队失败的情况。当有新任务来时,系统的线程数小于corePoolSize时,则新建线程执行任务,当达到corePoolSize后,就不会继续增加,若后续还有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大时,无界队列会保持快速增长,直到耗尽系统内存。注意:maximumPoolSize在无界队列时没有作用
JDK拒绝策略:
AbortPolicy:直接抛出异常组织系统正常工作 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务
DiscardPolicy:丢弃无法处理的任务,不做任何处理。
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口

有届队列的例子

技术分享图片
public class UseThreadPoolExecutor1 {
    public static void main(String[] args) {
        /**
         * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
         * 若大于corePoolSize,则会将任务加入队列,
         * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
         * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
         * 
         */    
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,                 //coreSize
                2,                 //MaxSize
                60,             //60
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(3)            //指定一种队列 (有界队列)
                //new LinkedBlockingQueue<Runnable>()
//                , new MyRejected()
                //, new DiscardOldestPolicy()
                );
        
        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        MyTask mt6 = new MyTask(6, "任务6");
        
        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
        pool.execute(mt6);
        
        pool.shutdown();
        
    }
}
View Code

只执行前四个任务(把任务5,6先不执行)

run taskId =1
run taskId =2
run taskId =3
run taskId =4

它是一个一个执行的,原因是第一个线程来的时候,立即执行了(coreSize=1),当后面三个线程来的时候会被加入到queue中(new ArrayBlockingQueue<Runnable>(3))

只执行前五个任务

run taskId =1
run taskId =5
run taskId =2
run taskId =3
run taskId =4

它是两个两个执行的,原因是第五个任务来的时候已经超出了queue的size(3),但是总线程数有小于MaxSize(2),所以任务5和任务1一起执行了,之后会从queue中拿两个再执行

执行留个任务

run taskId =1
Exception in thread "main" run taskId =5
java.util.concurrent.RejectedExecutionException: Task 6 rejected from [email protected][Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.bjsxt.height.concurrent018.UseThreadPoolExecutor1.main(UseThreadPoolExecutor1.java:46)
run taskId =2
run taskId =3
run taskId =4

任务6来的时候,queue已经满了,总线程数也满了,这是会执行拒绝策略,JDK默认的是AbortPolicy,直接抛出异常,我们也可以改,就想代码里注释掉的一样

无界队列的例子

技术分享图片
public class UseThreadPoolExecutor2 implements Runnable{

    private static AtomicInteger count = new AtomicInteger(0);
    
    @Override
    public void run() {
        try {
            int temp = count.incrementAndGet();
            System.out.println("任务" + temp);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) throws Exception{
        //System.out.println(Runtime.getRuntime().availableProcessors());
        BlockingQueue<Runnable> queue = 
                new LinkedBlockingQueue<Runnable>();
        ExecutorService executor  = new ThreadPoolExecutor(
                    5,         //core
                    10,     //max
                    120L,     //2fenzhong
                    TimeUnit.SECONDS,
                    queue);
        
        for(int i = 0 ; i < 20; i++){
            executor.execute(new UseThreadPoolExecutor2());
        }
        Thread.sleep(1000);
        System.out.println("queue size:" + queue.size());        //10
        Thread.sleep(2000);
    }
}
View Code

执行结果:

任务1
任务2
任务3
任务4
任务5
queue size:15
任务6
任务7
任务9
任务8
任务10
任务11
任务12
任务13
任务14
任务15
任务16
任务17
任务19
任务20
任务18

任务是5个5个执行的,而且把剩余的任务全部加到queue中。第二个参数没有作用,不会创建10个线程,如果是有届队列,就会起作用。

Task的代码:

技术分享图片
public class MyTask implements Runnable {

    private int taskId;
    private String taskName;
    
    public MyTask(int taskId, String taskName){
        this.taskId = taskId;
        this.taskName = taskName;
    }
    
    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        try {
            System.out.println("run taskId =" + this.taskId);
            Thread.sleep(5*1000);
            //System.out.println("end taskId =" + this.taskId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        
    }
    
    public String toString(){
        return Integer.toString(this.taskId);
    }

}
View Code

 MyRejected的代码:

技术分享图片
public class MyRejected implements RejectedExecutionHandler{

    
    public MyRejected(){
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义处理..");
        System.out.println("当前被拒绝任务为:" + r.toString());
    }

}
View Code

 








executor框架完整解读(代码片段)

...分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供.在HotSpotVM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程... 查看详情

java并发executor框架

  Executor框架简介Java的线程既是工作单元,也是执行机制。从JDK5开始,把工作单元和执行机制分离开来。Executor框架由3大部分组成任务。被执行任务需要实现的接口:Runnable接口或Callable接口异步计算的结果。Future接口和F... 查看详情

java线程池executor框架概述

...a线程池还有一个很重要的意义:Java线程池就是JDK5推出的Executor框架,在此之前Java线程既是工作任务又是执行机制,而Executor框架把工作任务与执行机制分离开来:工作任务包括Runnable接口和Callable接口,而执行机制由Executor接口... 查看详情

executor框架(代码片段)

...离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。Executor框架两级调度模型HotSpotVM的线程模型中底层,Java线程(java.lang.Thread)被一一映射为操作系统线程。Java线程启动时,创建一个本地操... 查看详情

executor框架(代码片段)

...离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。Executor框架两级调度模型HotSpotVM的线程模型中底层,Java线程(java.lang.Thread)被一一映射为操作系统线程。Java线程启动时,创建一个本地操... 查看详情

executor框架简介

Executor框架是在Java5中引入的,可以通过该框架来控制线程的启动,执行,关闭,简化并发编程。Executor框架把任务提交和执行解耦,要执行任务的人只需要把任务描述清楚提交即可,任务的执行提交人不需要去关心。通过Executor... 查看详情

executor框架

  Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相... 查看详情

线程池框架executor

Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于... 查看详情

戏(细)说executor框架线程池任务执行全过程(上)

原文链接:http://ifeve.com/executor-framework-thread-pool-task-execution-part-01/内容综述基于Executor接口中将任务提交和任务执行解耦的设计,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装... 查看详情

executor框架completionservice接口

??CompletionService接口是一个独立的接口,并没有扩展ExecutorService。其默认实现类是ExecutorCompletionService;??接口CompletionService的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。从而可以将任务的执... 查看详情

深入浅出java线程池使用原理2

一、Executor框架介绍Executor框架将Java多线程程序分解成若干个任务,将这些任务分配给若干个线程来处理,并得到任务的结果1.1、Executor框架组成任务:被执行任务需要实现的接口:Runnable接口或Callable接口任务的执行:任务执行... 查看详情

concurrent包分析之executor框架(代码片段)

...tomcat这种servlet容器的线程池都设置了最大线程数量的。Executor框架组成Eexecutor接口:包含Eexecutor、ExecutorService、ScheduledExecutorServiceThreadPool线程池:包含ThreadPoolExecutor、ScheduledThreadPoolExecutorFork/Join框架:JDK1.7新增类之间的关系如... 查看详情

java并发编程-executor框架executor,

...一些列的小任务,即Runnable,然后将这些任务提交给一个Executor执行, Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。Executor的子接口有:ExecutorService,ScheduledExecutorService,已知实现类:AbstractExec 查看详情

java并发编程学习10-任务执行与executor框架(代码片段)

任务执行何为任务?任务通常是一些抽象且离散的工作单元。大多数并发应用程序都是围绕着“任务执行”来构造的。而围绕着“任务执行”来设计应用程序结构时,首先要做的就是要找出清晰的任务边界。大多数服务器应用程... 查看详情

juc系列executor框架之futuretask(代码片段)

JUC系列Executor框架之FutureTaskJDK版本1.8文章目录JUC系列Executor框架之FutureTask使用示例示例一示例二源码分析类图Callable接口Future接口RunnableFuture接口内部类WaitNode成员变量任务状态状态关系图构造函数核心方法执行任务获取任务执行... 查看详情

executor框架

为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制。他们都在Java.utilconcurrent包中,是JDK并发包的核心。其中有一个比较重要的类:Executors,他扮演着线程工厂的角色,我们通过Executors可以... 查看详情

java并发编程学习10-任务执行与executor框架(代码片段)

任务执行何为任务?任务通常是一些抽象且离散的工作单元。大多数并发应用程序都是围绕着“任务执行”来构造的。而围绕着“任务执行”来设计应用程序结构时,首先要做的就是要找出清晰的任务边界。大多数服务... 查看详情

java并发编程学习10-任务执行与executor框架(代码片段)

任务执行何为任务?任务通常是一些抽象且离散的工作单元。大多数并发应用程序都是围绕着“任务执行”来构造的。而围绕着“任务执行”来设计应用程序结构时,首先要做的就是要找出清晰的任务边界。大多数服务... 查看详情