线程池拒绝策略详解(代码片段)

IT-老牛 IT-老牛     2022-12-07     727

关键词:

文章目录

1.前言

当线程池已经关闭或达到饱和(最大线程和队列都已满)状态时,新提交的任务将会被拒绝。 ThreadPoolExecutor 定义了四种拒绝策略:

1、AbortPolicy
默认策略,在需要拒绝任务时抛出RejectedExecutionException;
2、CallerRunsPolicy
直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
3、DiscardPolicy
直接丢弃任务;
4、DiscardOldestPolicy
丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃。

我们也可以自定义拒绝策略,只需要实现 RejectedExecutionHandler; 需要注意的是,拒绝策略的运行需要指定线程池和队列的容量。

2.ThreadPoolExecutor创建线程方式

通过下面的demo来了解ThreadPoolExecutor创建线程的过程。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //循环提交任务
            for (int i = 0; i < 8; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println("大家好,我是线程:" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


大家好,我是线程:1
大家好,我是线程:2
大家好,我是线程:3
大家好,我是线程:6
大家好,我是线程:7
大家好,我是线程:8
大家好,我是线程:4
大家好,我是线程:5

执行流程:

  1. 首先通过 ThreadPoolExecutor 构造函数创建线程池;
  2. 执行 for 循环,提交 8 个任务(恰好等于maximumPoolSize[最大线程数] + capacity[队列大小]);
    通过 threadPoolExecutor.submit 提交 Runnable 接口实现的执行任务;
  3. 提交第1个任务时,由于当前线程池中正在执行的任务为 0 ,小于 3(corePoolSize 指定),所以会创建一个线程用来执行提交的任务1;
  4. 提交第 2, 3 个任务的时候,由于当前线程池中正在执行的任务数量小于等于 3 (corePoolSize 指定),所以会为每一个提交的任务创建一个线程来执行任务;
  5. 当提交第4个任务的时候,由于当前正在执行的任务数量为 3 (因为每个线程任务执行时间为10s,所以提交第4个任务的时候,前面3个线程都还在执行中),此时会将第4个任务存放到 workQueue 队列中等待执行;
    由于 workQueue 队列的大小为 2 ,所以该队列中也就只能保存 2 个等待执行的任务,所以第5个任务也会保存到任务队列中;
  6. 当提交第6个任务的时候,因为当前线程池正在执行的任务数量为3,workQueue 队列中存储的任务数量也满了,这时会判断当前线程池中正在执行的任务的数量是否小于6(maximumPoolSize指定);
    如果小于 6 ,那么就会新创建一个线程来执行提交的任务 6;
  7. 执行第7,8个任务的时候,也要判断当前线程池中正在执行的任务数是否小于6(maximumPoolSize指定),如果小于6,那么也会立即新建线程来执行这些提交的任务;
  8. 此时,6个任务都已经提交完毕,那 workQueue 队列中的等待 任务4 和 任务5 什么时候执行呢?
    当任务1执行完毕后(10s后),执行任务1的线程并没有被销毁掉,而是获取 workQueue 中的任务4来执行;
  9. 当任务2执行完毕后,执行任务2的线程也没有被销毁,而是获取 workQueue 中的任务5来执行;

通过上面流程的分析,也就知道了之前案例的输出结果的原因。其实,线程池中会线程执行完毕后,并不会被立刻销毁,线程池中会保留 corePoolSize 数量的线程,当 workQueue 队列中存在任务或者有新提交任务时,那么会通过线程池中已有的线程来执行任务,避免了频繁的线程创建与销毁,而大于 corePoolSize 小于等于 maximumPoolSize 创建的线程,则会在空闲指定时间(keepAliveTime)后进行回收。

3.ThreadPoolExecutor拒绝策略测试

在上面的测试中,我设置的执行线程总数恰好等于maximumPoolSize[最大线程数] + capacity[队列大小],因此没有出现需要执行拒绝策略的情况,因此在这里,我再增加一个线程,提交9个任务,来演示不同的拒绝策略。

3.1.AbortPolicy

功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println("大家好,我是线程:" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


大家好,我是线程:1
大家好,我是线程:2
大家好,我是线程:3
大家好,我是线程:6
大家好,我是线程:7
大家好,我是线程:8
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 6, active threads = 6, queued tasks = 2, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.bruce.demo7.ThreadPoolSerialTest.main(ThreadPoolSerialTest.java:37)
大家好,我是线程:4
大家好,我是线程:5

ThreadPoolExecutor中默认的策略就是AbortPolicyExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。

3.2.CallerRunsPolicy

将被拒绝的任务添加到线程池正在运行线程中去执行

功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。

使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.CallerRunsPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread().getName()+"-->" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


pool-1-thread-1-->1
pool-1-thread-2-->2
pool-1-thread-3-->3
pool-1-thread-4-->6
pool-1-thread-5-->7
pool-1-thread-6-->8
main-->9
pool-1-thread-1-->4
pool-1-thread-2-->5

3.3.DiscardPolicy

丢弃任务,不抛出异常

功能:直接静悄悄的丢弃这个任务,不触发任何动作

使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.DiscardPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread().getName()+"-->" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


pool-1-thread-1-->1
pool-1-thread-2-->2
pool-1-thread-3-->3
pool-1-thread-4-->6
pool-1-thread-5-->7
pool-1-thread-6-->8
pool-1-thread-1-->4
pool-1-thread-2-->5

3.4.DiscardOldestPolicy

丢弃队列最前面的任务,重新尝试执行任务。

功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行

使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.DiscardOldestPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread线程池(详解):三大方法七大参数四种拒绝策略(代码片段)

线程池(重点)一:线程池:三大方法,七大参数,四种拒绝策略  池化技术:  01:程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术  02:线程池、连接池、内存池、对象池///......创建、销毁。十分... 查看详情

线程池拒绝策略详解(代码片段)

文章目录1.前言2.ThreadPoolExecutor创建线程方式3.ThreadPoolExecutor拒绝策略测试3.1.AbortPolicy3.2.CallerRunsPolicy3.3.DiscardPolicy3.4.DiscardOldestPolicy3.5.自定义拒绝策略4.第三方实现的拒绝策略4.1.dubbo中的线程拒绝策略4.2.Netty中的线程池拒绝策略4.3... 查看详情

java-jdk线程池详解(代码片段)

线程池参数详解publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler) 参数说明corePoolSize表示常驻核心线程数量。maximumPoolSize表示线程池... 查看详情

常见线程池详解(一篇足以)(代码片段)

线程的缺点线程的创建需要开辟内存资源:本地方法栈、虚拟机栈、程序计数器等线程私有变量的内存,频繁的创建和消耗会带来一定的性能开销;使用线程不能很好的管理任务和有好的拒绝任务;线程池线程池... 查看详情

关于java线程池详解(代码片段)

Java-五种线程池,四种拒绝策略,三种阻塞队列三种阻塞队列:   BlockingQueue<Runnable>workQueue=null;   workQueue=newArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界   workQueue=newLinkedBlockingQueue... 查看详情

面试官:有多少种线程池拒绝策略阻塞队列?(代码片段)

线程池的工作原理ThreadPoolExecutor(intcorePoolSize,//核心线程数intmaximumPoolSize,//最大线程数longkeepAliveTime,//空闲线程存活时间TimeUnitunit,//存活时间单位BlockingQueue<Runnable>workQueue,//阻塞队列RejectedExecutionHandlerhandle 查看详情

java并发编程线程池机制(线程池阻塞队列|线程池拒绝策略|使用threadpoolexecutor自定义线程池参数)(代码片段)

文章目录一、线程池阻塞队列二、拒绝策略三、使用ThreadPoolExecutor自定义线程池参数一、线程池阻塞队列线程池阻塞队列是线程池创建的第555个参数:BlockingQueue<Runnable>workQueue;publicThreadPoolExecutor(intcorePoolSize, //核心线程数,... 查看详情

java线程池拒绝策略(代码片段)

0前置依赖本例中会依赖assertj-core完成单测,maven依赖如下:<!--https://mvnrepository.com/artifact/org.assertj/assertj-core--><dependency><groupId>org.assertj</groupId><artifactId&g 查看详情

java线程池拒绝策略(代码片段)

0前置依赖本例中会依赖assertj-core完成单测,maven依赖如下:<!--https://mvnrepository.com/artifact/org.assertj/assertj-core--><dependency><groupId>org.assertj</groupId><artifactId&g 查看详情

java线程池工作原理及拒绝策略详解

在多线程编程中,我们经常使用线程池来管理线程,以减缓线程频繁的创建和销毁带来的资源的浪费,在创建线程池的时候,经常使用一个工厂类来创建线程池Executors,实际上Executors的内部使用的是类ThreadPoolExecutor。它有一个最... 查看详情

java八股系列——线程池拒绝策略(代码片段)

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务AbortPolicy(默认策... 查看详情

java多线程:线程池详解(代码片段)

文章目录线程池注意点:线程池都有哪些状态?谈谈线程池的拒绝策略?线程池的队列大小通常怎么设置?1.CPU密集型任务2.IO密集型任务3.混合型任务线程池有哪些参数,各个参数的作用是什么?线程池注意点:... 查看详情

jdk线程池的拒绝策略(代码片段)

关于新疆服务请求未带入来话原因的问题      经核查,该问题是由于立单接口内部没有成功调用接续的“更新来电原因接口”导致的,接续测更新来电原因接口编码:NGCCT_UPDATESRFLAG_PUT,立单接口调用代码... 查看详情

jdk线程池的四种拒绝策略(代码片段)

...dPoolExecutor中有四种处理策略。1.CallerRunsPolicy:当最大线程数量上限&&缓冲队列满了,后续再有任务提交过来就让当前主线程去处理这个任务。2.AbortPolicy:当最大线程数量上限&&缓冲队列满了,后续再有... 查看详情

池化技术——自定义线程池(代码片段)

目录池化技术——自定义线程池1、为什么要使用线程池?1.1、池化技术的特点:1.2、线程池的好处:1.3、如何自定义一个线程池2、三大方法2.1、单个线程的线程池方法2.2、固定的线程池的大小的方法2.3、可伸缩的线程池的方法2... 查看详情

simplethreadpool给线程池增加拒绝策略和停止方法(代码片段)

给线程池增加拒绝策略和停止方法packagecom.dwz.concurrency.chapter13;importjava.util.ArrayList;importjava.util.LinkedList;importjava.util.List;publicclassSimpleThreadPool3privatefinalintsize;privatefinalintqueueSize;privatefinalstaticintDEFAULT_SIZE=10;privatefinalstaticintDEFAULT_TASK_... 查看详情

线程池的手写和拒绝策略(代码片段)

手写线程池:AbortPolicy:直接抛出RejectedExecutionException异常阻止系统正常运行。publicclassMyThreadPoolDemopublicstaticvoidmain(String[]args)ExecutorServicethreadPool=newThreadPoolExecutor(2,5,1L,TimeUnit.SECONDS,newLinkedBlockingQueue<>(3),Executors.defaultThreadFactory()... 查看详情

springboot中如何配置线程池拒绝策略,妥善处理好溢出的任务(代码片段)

...们分别学会了用@Async创建异步任务、为异步任务配置线程池、使用多个线程池隔离不同的异步任务。今天这篇,我们继续对上面的知识进行完善和优化!如果你已经看过上面几篇内容并已经掌握之后,一起来思考... 查看详情