executor框架

huanStephen huanStephen     2022-10-10     522

关键词:

  JDK5的java.util.concurrent包中的执行器将为你管理Thread对象,从而简化了并发编程。Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象执行任务。Executor允许你管理异步任务的执行,而无须显示的管理线程的周期。

  Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

  在HotSoop VM的线程模型中,java线程被一对一映射为本地操作系统线程。java线程启动时会创建一个本地操作系统线程,当该java线程终止时,这个操作系统线程也会被回收,操作系统会调度所有线程并将它们分配给可用的CPU。

  在上层,java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上。

 

组成部分

任务

包括被执行任务需要实现的接口:Runnable接口或Callable接口。

任务的执行

包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。

Executor框架有两个关键类实现了ExecutorService接口ThreadPoolExecutor和ScheduledThreadPoolExecutor。

异步计算的结果

包括接口Future和实现Future接口的FutureTask类。

 

成员构成图

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口。

AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法。

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

ScheduledExecutorService接口继承了ExecutorService接口,提供了带“周期执行”功能得ExecutorService。

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

Future接口和实现Future接口的FutureTask类,代表异步计算结果。

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

 

使用流程

主线程首先要创建实现Runnable或者Callable接口的任务对象。

工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))。然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnablecommand));或者也可以吧Runnable对象或Callable对象交给ExecutorService执行(ExecutorService.submit(Runnable task))或ExecutorService.submit(Callable<T> task)。

如果执行ExecutorService.submit(...),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。

最后,主线程可以执行Future.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

 

分类

ThreadPoolExecutor通常使用工厂类Executors来创建,通常分为以下3类:

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>(),
                        threadFactory);
}

 

固定线程数适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器。

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里吧keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程带来如下影响:

  1. 当线程池中的数量达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数量不会超过corePoolSize。
  2. 由于1,使用无界队列时maximumPoolSize将是一个无效参数。
  3. 由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
  4. 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor

创建使用单个线程的SingleThreadExecutor的API,适用于需要保证顺序地执行各个人物,并且在任意时间点,不会有多个线程是活动的应用场景。

这对于你希望在另一个线程中连续运行的人物事物(长期存活的任务)来说是很有用的。例如监听进入的套接字连接任务,它对于希望在线程中运行的短任务也同样方便,例如更新本地或远程日志的小任务或者是事件分发线程。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

有无参数的方法,corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。

CachedThreadPool

创建一个会根据需要创建新线程的CachedThreadPool的API。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
}

有无参数方法,corePoolSize被设置为0,即corePool为空,maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的,这里吧keepAliveTime设置为60L,意味着CacheThreadPool中的空闲线程等待现任务的最长时间为60秒,空闲60秒后将会被终止。CachedThreadPool使用SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的,这意味着,如果主线程提交任务的速度高于maximumPool中的线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

WorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

有无参数方法,利用所有运行的处理器数据来创建一个工作窃取的线程池,使用forkjoin实现。

 

ScheduledThreadPoolExecutor

包含多个线程,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

步骤

  1. 线程从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.task())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
  2. 线程执行这个ScheduledFutureTask。
  3. 线程修改ScheduledFutureTask的time变量为下次将要被执行的时间。
  4. 线程把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

提交定时任务的方法

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

向定时任务线程池提交一个延时Runnable任务(仅执行一次)

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

向定时任务线程池提交一个延时的Callable任务(仅执行一次)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

向定时任务线程池提交一个固定时间间隔的任务

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

向定时任务线程池提交一个固定延时间隔执行的任务

scheduleAtFixedRate和scheduleWithFixedDelay区别

固定时间间隔的任务不论每次任务话费多少时间,下次任务开始执行时间是确定的,当然执行任务的时间不能超过执行周期

固定延时间隔的任务执行完任务以后都会延时一个固定时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间不确定,也就导致了每次任务的执行周期存在一定的波动

ScheduleThreadPoolExecutor相较于Timer的优势

  • Timer是基于绝对时间的延时执行或周期执行,当系统时间改变,则任务的执行会受到影响。而ScheduleThreadPoolExecutor中,任务基于相对进行周期或延时操作。
  • Timer也可以提交多个TimeTask任务,但是有一个线程来执行所有TimeTask,这样并发性受到影响,而ScheduleThreadPoolExecutor可以设定线程池中线程的数量。
  • Timer不会捕捉TimerTask的异常,只是简单的停止,这样势必会影响其他TimeTask的执行,而ScheduleThreadPoolExecutor中,如果一个线程因某些原因停止,线程池可以自动创建新的线程来维护线程池中线程的数量。

scheduleAtFixedRate超时问题

若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,马上开始执行

 

Callable、Future和FutureTask

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。

当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们返回一个FutureTask对象。

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。

Executors提供的,把一个Runnable包装成一个Callable的API。

public static Callable<Object> callable(Runnable task)  // 假设返回对象Callable1

Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。

public static <T> Callable<T> callable(Runnable task, T result)  // 假设返回对象Callable2

当任务成功完成后FutureTask.get()将返回该任务的结果。例如,如果提交的是对象Callable1,FutureTask.get()方法将返回null;如果提交的是对象Callable2,FutureTask.get()方法将返回result对象。

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。

 任务完成与否判断

可以使用isDone()方法来判断任务Future是否已经完成,也可以通过get()方法来获取结果来判断,两者不同点在于前者在任务未完成之时并不会阻塞,而后者在没有完成的时候回阻塞。

class TaskWithResult implements Callable<String> {
    private int id;
    public TaskWithResult(int id) {
        this.id = id;
    }
    public String call() throws InterruptedException {
        Thread.sleep(3000);    // 休眠3秒
        return "result of TaskWithResult " + id;
    }
}
public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<Future<String>> results = new ArrayList<Future<String>>();
        for(int i = 0; i < 5; i++)
            results.add(exec.submit(new TaskWithResult(i)));
        int i = 0;
        for(Future<String> fs : results) {
            try {
                System.out.println(fs.isDone());    // 判断是否完成
                System.out.println(i);
                System.out.println(fs.get());
            } catch(InterruptedException e) {
                System.out.println(e);
                return;
            } catch(ExecutionException e) {
                System.out.println(e);
            } finally {
                exec.shutdown();
            }
            i ++;
        }
    }
}

输出:

false
0
result of TaskWithResult 0
true
1
result of TaskWithResult 1
true
2
result of TaskWithResult 2
true
3
result of TaskWithResult 3
true
4
result of TaskWithResult 4

执行开始后输出false和0,并在这里阻塞,待3秒完成后输出后续结果。这里由于get()的线程未完成,则发生了阻塞,而isDone()方法则不会。

 

CompletionService

CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

总结:

使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

 

executor框架executors工厂类(代码片段)

Executors简介Executors是一个工厂类,其提供的是Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable类的实例的工厂方法;提供常用配置的ExecutorService、ScheduledExecutorService的实现方法;创建并返回ThreadFactory的方法,它可将... 查看详情

java并发编程-executor框架executor,

...一些列的小任务,即Runnable,然后将这些任务提交给一个Executor执行, Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。Executor的子接口有:ExecutorService,ScheduledExecutorService,已知实现类:AbstractExec 查看详情

jdk多任务执行框架(executor框架)

Executor的常用方法为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通... 查看详情

什么是executors框架?

查看详情

什么是executors框架?

Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。无限制的创建线程会引起应用程序内存溢出。所以创建一个线程池是个更好的的解决方案,因为可以限制线程的数量并且可以回收再利用这些线程... 查看详情

executor框架

...离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。Executor框架两级调度模型HotSpotVM的线程模型中底层,Java线程(jav 查看详情

什么是executors框架?

Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框 架。无限制的创建线程会引起应用程序内存溢出。所以创建一个线程池是个更好的的 解决方案,因为可以限制线程的数量并且可以回收再利用... 查看详情

java并发之executor框架

前言Executor框架概览ExecutorExecutorServiceScheduledExecutorServiceThreadPoolExecutorScheduledThreadPoolExecutorExecutors结语前言在学习JUC的过程中我发现,JUC这个包下面的文档写的十分的好,清楚又易于理解,这篇博客便是参考JUC中和Executor框架相关... 查看详情

juc系列executor框架之completionservice(代码片段)

【JUC系列】Executor框架之CompletionService文章目录【JUC系列】Executor框架之CompletionServiceCompletionServiceExecutorCompletionService成员变量内部类QueueingFuture构造函数任务执行使用案例场景一:场景二:需要优先阅读【JUC系列】Executor... 查看详情

java并发executor框架

  Executor框架简介Java的线程既是工作单元,也是执行机制。从JDK5开始,把工作单元和执行机制分离开来。Executor框架由3大部分组成任务。被执行任务需要实现的接口:Runnable接口或Callable接口异步计算的结果。Future接口和F... 查看详情

java并发executor框架机制与线程池配置使用

【Java并发】Executor框架机制与线程池配置使用一,Executor框架Executor框架便是Java5中引入的,其内部使用了线程池机制,在java.util.cocurrent包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java5... 查看详情

java并发之executor框架

...分的好,清楚又易于理解,这篇博客便是参考JUC中和 Executor 框架相关的一些类文档汇总出来的。当然了,Executor框架涉及到的类还是不少的,全部汇总的话时间成本太高,有点亏,所以这里主要就集中在了 Executor ... 查看详情

2,executor线程池(代码片段)

一,Executor框架简介在Java5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java5中引入的,其内部使用了线程池机制,它在java.util.cocurrent包下,通过该框架来控制线程的启动、执行和关闭,可以简化并... 查看详情

executor框架完整解读(代码片段)

...分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供.在HotSpotVM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程... 查看详情

jdk线程池框架executor源码阅读

Executor框架ExecutorExecutorServiceAbstractExecutorServiceThreadPoolExecutorThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现主要成员1.ctlprivatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0 查看详情

java线程池executor框架概述

...a线程池还有一个很重要的意义:Java线程池就是JDK5推出的Executor框架,在此之前Java线程既是工作任务又是执行机制,而Executor框架把工作任务与执行机制分离开来:工作任务包括Runnable接口和Callable接口,而执行机制由Executor接口... 查看详情

)(代码片段)

文章目录《Java并发编程的艺术》读后笔记-Executor框架简介(第十章)1.Executor框架的两级调度模型2.Executor框架的结构与成员2.1Executor框架的结构2.2Executor框架的成员一、ThreadPoolExecutor二、ScheduledThreadPoolExecutor三、Future接口... 查看详情

基础线程机制--executor线程池框架(代码片段)

基础线程机制Executor线程池框架1.引入Executor的原因(1)newThread()的缺点???每次newThread()耗费性能???调用newThread()创建的线程缺乏管理,被称为野线程,而且可以无限制的创建,之间相互竞争,导致过多的系统资源被占用导致系统... 查看详情