你真的懂threadpoolexecutor线程池技术吗?看了源码你会有全新的认识(代码片段)

author author     2023-01-21     752

关键词:

Java是一门多线程的语言,基本上生产环境的Java项目都离不开多线程。而线程则是其中最重要的系统资源之一,如果这个资源利用得不好,很容易导致程序低效率,甚至是出问题。

有以下场景,有个电话拨打系统,有一堆需要拨打的任务要执行,首先肯定是考虑多线程异步去执行。假如我每执行一个拨打任务都new一个Thread去执行,当同时有1万个任务需要执行的时候,那么就会新建1万个线程,加上线程各种初始销毁等操作,这个消耗是巨大的。而其实往往实现这些功能的时候,并不是完全需要实时马上完成,只是希望在可控范围内尽量提高执行的并发性能。

因此线程池技术应用而生,Java中最常用的线程池技术就是ThreadPoolExecutor。接下来就整体看看ThreadPoolExecutor的实现。<!-- more -->
这个类的注解非常多,很多也是重点,所以就不从注解开始看起。先从使用说起,有个概念先。

基本使用

        // 核心线程
        int corePoolSize = 5;
        // 最大线程
        int maximumPoolSize = 10;
        // 线程空闲回收时间
        int keepAliveTime = 30;
        // 线程空闲回调时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        // 队列大小
        int queueSize = 20;
        // 队列
        BlockingQueue workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        executor.execute(() -> 
            // do something 1
        );
        executor.execute(() -> 
            // do something 2
        );

定义好一些必要的参数,构建一个ThreadPoolExecutor对象。然后调用对象的execute()方法即可。
参数说明:

  • corePoolSize,线程池保留的最小线程数。如果线程池中的线程少于此数目,则在执行execut()时创建。
  • maximumPoolSize,线程池中允许拥有的最大线程数。
  • keepAliveTime、unit,当线程闲置时,保持线程存活的时间。
  • workQueue,工作队列,存放提交的等待任务,其中有队列大小的限制。

线程管理机制

非常多人误解了corePoolSize、maximumPoolSize、workQueue的相互关系。不少人认为无论队列选择什么,corePoolSize和maximumPoolSize一定是有用,定义一定是生效的,其实并不然啊!
看下线程基本规则注解说明
技术分享图片

  1. 默认情况下,线程池在初始的时候,线程数为0。当接收到一个任务时,如果线程池中存活的线程数小于corePoolSize核心线程,则新建一个线程。
  2. 如果所有运行的核心线程都都在忙,超出核心线程处理的任务,执行器更多地选择把任务放进队列,而不是新建一个线程。
  3. 如果一个任务提交不了到队列,在不超出最大线程数量情况下,会新建线程。超出了就会报错。

另外,如果想在线程初始化时候就有核心线程,可以调用prestartCoreThread()或prestartAllCoreThread(),前者是初始一个,后者是初始全部。

再看看排队策略
技术分享图片

  • 直接提交,用SynchronousQueue。特点是不保存,直接提交给线程,如果没没线程,则新建一个。
  • 无限提交,用类似LinkedBlockingQueue×××队列。特点是保存所以核心线程处理不了的任务,队列无上限,最大线程也没用。
  • 有限提交,用类似ArrayBlockingQueue有界队列。特点是可以保存超过核心线程的任务,并且队列也是有上限的。超过上限,新建线程(满了抛错)。更好地保护资源,防止崩溃,也是最常用的排队策略。

从以上规则可以看出来,核心线程数和最大线程数,还有队列结构是相互影响的,如何排队,队列多大,最大线程是多少都是不一定的。

再看看保持存活机制
技术分享图片
当超过核心线程数的线程,线程池会让该线程保持存活keepAliveTime时间,超过该时间则会销毁该线程。
另外默认对非核心线程有效,若想核心线程也适用于这个机制,可以调用allowCoreThreadTimeOut()方法。这样的话就没有核心线程这一说了。

综合以上,线程池在多次执行任务后,会一直维持部分线程存活,即使它是闲置的。这样的目的是为了减少线程销毁创建的开销,下次有个任务需要执行,直接从池子里拿线程就能用了。但核心线程不能维护太多,因为也需要一定开销。最大的线程数保护了整个系统的稳定性,避免并发量大的时候,把线程挤满。工作队列则是保证了任务顺序和暂存,系统的可靠性。线程存活规则的目的和维护核心线程的目的类似,但降低了它的存活的时间。

另外还有拒绝机制,它提供了一些异常情况下的解决方案。
技术分享图片

ctl线程状态控制

这个ctl变量是整个线程池的核心控制状态。
技术分享图片
这个ctl代表了两个变量

  • workerCount,生效的线程数。基本可以理解为存活的线程,但某个时候有暂时性的差异。
  • runState,线程池的运行状态。
    其中,ctl(int32位)的低29位代表workerCount,所以最大线程数为(2^29)-1。另外3位表示runState。

runState有以下几种状态:
技术分享图片

  • RUNNING:接收新任务,处理队列任务。
  • SHUTDOWN:不接收新任务,但处理队列任务。
  • STOP:不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
  • TIDYING:所有任务都被终结,有效线程为0。会触发terminated()方法。
  • TERMINATED:当terminated()方法执行结束。

当调用了shutdown(),状态会从RUNNING变成SHUTDOWN,不再接收新任务,此时会处理完队列里面的任务。
如果调用的是shutdownNow(),状态会直接变成STOP。
当线程或者队列都是空的时候,状态就会变成TIDYING。
当terminated()执行完的时候,就会变成TERMINATED。

execute()

带着对上面的规则与机制的认识,现在从就这这个入口开始看看源码,到底整个流程是怎么实现的。
技术分享图片

  1. 如果少于核心线程在跑,用这个任务尝试创建一个新线程。
  2. 如果一个任务成功入队,再次检查下线程池状态看是否需要入队,因为可能在入队过程中,状态发送了变化。如果确认入队且没有存活线程,则新建一个空线程。
  3. 如果进不了队,则尝试新建一个线程,如果都失败了。拒绝这个task
    对于第二点最后为什么新建一个线程?很容易猜想到,会有一个轮询的机制让下个task出队,直接利用这个空闲线程。

注释基本解释了所有代码,代码也没什么特别的。其中最主要的还是addWoker()这个方法,下面来看看。

addWoker()

先了解下这个方法的整体思路
技术分享图片
从描述可知,addwoker失败,会在线程池状态不对、线程满了或者线程工厂创建线程池失败时候发生。
这个方法比较长,分两段看。先看第一段。
技术分享图片
retry:这种写法,如果比较少看源码的,应该是前所未见的了。这是个循环的位置标记,是java的语法之一。看回代码,这里面for循环还嵌套里一个for循环,而retry:是标记第一个for循环的,后面breakcontinue语句都指向到了retry。说明breakcontinue是都是操作外层的for循环。retry可以是任何变量命名合法的字符。

然后看看外出for循环的if语句
技术分享图片
这个if判断想要执行到return false;,队列为空是一个必要条件。因为addWork()不单只接收新任务会调用到,处理队列中的任务也会调用到。而前面提到SHUTDOWN状态下还会处理队列中的任务的,所以队列不为空是会让它继续执行下去的。

对于内层的for循环
技术分享图片
会先判断worker的数据是否符合corePoolSize和maximumPoolSize的定义,不满足则返回失败。
然后尝试CAS让workerCount自增,如果CAS失败还是继续自旋去自增,直到成功。除非线程池状态发生了变化,发退回到外层for循环重新执行,判断线程池的状态。

第一段的代码,就是让workerCount在符合条件下自增

第二段代码
技术分享图片
这段比较好理解,先创建一个Worker对象,这个Worker里面包含一个由线程工厂创建的线程,和一个需要执行的任务(可以为空)。如果线程创建成功了,那么就加一个重入锁去把这个新建的Worker对象放到workers成员变量中,在加入之前需要重新判断下线程池的状态和新建线程的状态。如果worker添加到workers成员变量中,就启动这个新建的线程。最后如果添加失败,则执行addWorkFailed(w)
技术分享图片
如果失败了,加锁操作回滚下wokers、workerCount,然后判断下状态看看是否需要终结线程池。

addWorker()大概的流程就这样。

总结

对于其他方法,没有什么特别的,在此不再过多的叙述,有兴趣的可以翻翻源码阅读下。
回顾总结下上面的核心要点

  1. 当核心线程满且忙碌时,线程池倾向于把提交的任务放进队列,而不是新建线程。
  2. 根据选择队列的不同,maximumPoolSize不一定有用的。具体有三种不同的策略。
  3. ctl是线程池的核心控制状态,包含的runState线程池运行状态和workCount有效线程数。
  4. retry:是一种标记循环的语法,retry可以是任何变量命名合法字符。

更多技术文章、精彩干货,请关注
博客:zackku.com
微信公众号:Zack说码
技术分享图片

阿里推荐的线程使用方法threadpoolexecutor(代码片段)

...统资源的开销,然后之所以不用Executors自定义线程池,用ThreadPoolExecutor是为了规范线程池的使用,还有让其他人更好懂线程池的运行规则。先说一下关于线程的概念任务:线程需要执行的代码,也就是Runnable任务队列:线程满了... 查看详情

你真的懂timeout吗?(代码片段)

服务为什么需要timeout呢?提前释放资源记得在上家公司时,一个python服务与公网交互,request库发出去的请求没有设置timeout...而且还是个定时任务,占用了超多fd同时微服务场景下某下游的服务阻塞卡顿,这样... 查看详情

java线程池threadpoolexecutor源码简析(代码片段)

...厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor对象。因 查看详情

java线程池threadpoolexecutor源码简析(代码片段)

...厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor对象。因 查看详情

并发编程之线程池threadpoolexecutor(代码片段)

...,进而影响执行效率。所以,线程池就应运而生。线程池ThreadPoolExecutor可以通过idea先看下线程池的类图,了解一下它的继承关系和大概结构。它继承自AbstractExecutorService类,这是一个抽象类,不过里边的方法都是已经实现好的。... 查看详情

线程池之threadpoolexecutor(代码片段)

线程池之ThreadPoolExecutor+面试题线程池介绍线程池(ThreadPool):把一个或多个线程通过统一的方式进行调度和重复使用的技术,避免了因为线程过多而带来使用上的开销。为什么要使用线程池?可重复使用已有线程,避免对象创... 查看详情

java线程池之线程池原理

1/**2*线程池原理3*/4publicclassThreadPoolExecutor{56//大部分线程池都试调用的ThreadPoolExecutor这个类7//如果你想自定义线程池,创建一个ThreadPoolExecutor对象,传入参数即可89}  查看详情

你真的懂线程同步么?(代码片段)

  前言:学进程时,学习的重点应该进程间通信,而学习线程时,重点就应该是线程同步了。想过为什么?fork创建子进程之后,子进程有自己的独立地址空间和PCB,想和父进程或其它进程通信,就需要各种通信方式,例如无... 查看详情

threadpoolexecutor线程池

1packageunit;23importjava.util.concurrent.ArrayBlockingQueue;4importjava.util.concurrent.ThreadPoolExecutor;5importjava.util.concurrent.TimeUnit;67publicclassThreadPoolExecutor线程池{8publicstaticvoidmai 查看详情

线程池threadpoolexecutor(代码片段)

1-ThreadPool线程池1.1为什么要用线程池1.2线程池的架构--ThreadPoolExecutor1.3线程池的构造方法packagethread20200415;importjava.util.concurrent.Executor;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Execut 查看详情

threadpoolexecutor线程池设计思路(代码片段)

ThreadPoolExecutor线程池设计思路引言ThreadPoolExecutor线程池相关属性线程池状态记录execute执行任务工作线程抽象为Workerexecute中添加工作线程Worker是如何run的Worker从工作队列获取任务Worker收尾工作尝试终结当前线程池手动调用关闭线... 查看详情

threadpoolexecutor线程池参数设置技巧

ThreadPoolExecutor线程池参数设置技巧一、ThreadPoolExecutor的重要参数?corePoolSize:核心线程数核心线程会一直存活,及时没有任务需要执行当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理设置allowCoreThrea... 查看详情

threadpoolexecutor线程池

...3、对线程进行一些简单的管理。在java中,线程池的类为ThreadPoolExecutor,该类提供了四种构造方法://五个参数的构造函数publicThreadPoolExecutor(intcorePoolSize,intmaximumPoo 查看详情

线程池之threadpoolexecutor

所属包:java.util.concurrent.ThreadPoolExecutor 类关系:publicclassThreadPoolExecutorextendsAbstractExecutorService1.继承关系ThreadPoolExecutor继承了一个抽象类:AbstractExecutorServicepublicabstractclassAbstractExecu 查看详情

threadpoolexecutor线程池

TestThreadPoolExecutorMainpackagecore.test.threadpool;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;/***ThreadPoolExec 查看详情

threadpoolexecutor线程池参数设置技巧

 一、ThreadPoolExecutor的重要参数 corePoolSize:核心线程数核心线程会一直存活,及时没有任务需要执行当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理设置allowCoreThreadTimeout=true(默认false)时,... 查看详情

threadpoolexecutor线程池参数设置技巧

一、ThreadPoolExecutor的重要参数 corePoolSize:核心线程数核心线程会一直存活,及时没有任务需要执行当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理设置allowCoreThreadTimeout=true(默认false)时,核... 查看详情

线程池源码分析-threadpoolexecutor(代码片段)

...录线程池接口分析以及FutureTask设计实现线程池源码分析-ThreadPoolExecutor该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPoolExecutor,希望能粗略的描述出线程池的各个实现细节。针对JDK1.7中的线程池#2ThreadPoolExecutor... 查看详情