高并发多线程基础之threadpoolexecutor源代码分析(代码片段)

踩踩踩从踩 踩踩踩从踩     2023-01-03     225

关键词:

前言

这篇主要讲述ThreadPoolExecutor的源码分析,贯穿整篇文章主要就 线程池的创建、参数分析、提交任务,通过源码及注释充分理解线程池的工作原理。

线程池

为什么需要需要线程池

首先线程并不是越多越好,过多的创建线程消耗大量的资源,反而达到适得其反的效果;如何正确创建线程并且去控制。

线程池的组成部分

线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
工作线程:线程池中的线程、可以循环的执行任务、在没有任务时处于等待状态
任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务收尾工作,任务执行的状态等
任务队列:用于存放没有处理的任务。提供一种缓存机制

下面是我写的另一篇文章包括为什么使用线程池,有哪些优点,以及几种Executors中提供给我们的工厂创建方法等

高并发多线程基础之线程池

ThreadPoolExecutor源码解析

 

源码注释

因为太长了因此我把中文的意思整理了一下,大概介绍的是几个方面

  • 为什么使用线程池

  线程池解决两个不同的问题:它们通常在执行大量任务时提供改进的性能异步任务,由于减少了每个任务的调用开销,它们提供了一种界定和管理资源的手段,包括线程,在执行任务集合时使用。每个@code ThreadPoolExecutor还维护一些基本的统计信息,例如已完成任务的数量。

  • 提供给我们使用的Executors的工厂方法

更方便的@link Executors工厂方法@link执行器#newCachedThreadPool(无限线程池,具有自动线程回收),@link Executors#newFixedThreadPool(固定大小的线程池)和@linkExecutors#newSingleThreadExecutor(单后台线程),即最常见用法的预配置设置情景。

  • 核心和最大池大小

ThreadPoolExecutor自动调整大小,依据我们设置的corePoolSize(核心线程)和maximumPoolSize(最大线程大小);在方法@link#execute(Runnable)中提交新任务时,如果运行的线程少于corePoolSize,则会创建一个新线程创建以处理请求,即使其他工作线程闲散的。如果超过corePoolSize但小于maximumPoolSize线程运行时,将仅创建一个新线程,如果队列已满。通过设置corePoolSize和maximumPoolSize。并且这两个参数可以动态改变的

  • 按需进行线程操作

默认情况下,即使是核心线程也会在最初创建并仅在新任务到达时启动,但这可以被覆盖,动态使用方法prestartCoreThread 或者 prestartAllCoreThreads 。如果需要,您可能需要预启动线程,使用非空队列构造池

  • 创建新线程

使用@link ThreadFactory创建新线程,如果未指定@link Executors#defaultThreadFactory则用于创建所有线程都位于同一ThreadGroup 和具有相同@code NORM_PRIORITY优先级和非守护进程状态。通过提供不同的ThreadFactory,您可以更改线程的名称、线程组、优先级、守护进程状态等。如果@code ThreadFactory在请求时未能创建线程,执行器将继续,但可能无法执行任何任务。

  • 保持活跃时间

如果池当前有多个corePoolSize线程,如果多余的线程空闲了更长的时间,那么它们将被终止;这提供了一种在以下情况下减少资源消耗的方法:池未被积极使用。如果池变得更活跃,稍后,将构造新线程。在关闭之前终止。默认情况下,“保持活动”策略仅当存在多个corePoolSize线程时适用。仅当存在多个corePoolSize线程时适用。但是方法@link#allowCoreThreadTimeOut(布尔值)可用于只要keepAliveTime值不为零

在后面的源代码中就可以看出 默认核心线程是不会超时,只有空闲的线程达到keepAliveTime 才会被杀掉,但是源代码中提供了allowCoreThreadTimeOut 方法只要keepAliveTime 不为0,并且设置allowCoreThreadTimeOut 则可以超时杀死线程

  • 任务队列

任何@link BlockingQueue都可用于传输和保持提交的任务。此队列的使用与池大小交互:
如果运行的线程少于corePoolSize,则执行器始终更喜欢添加新线程而不是排队
如果corePoolSize或多个线程正在运行,则执行器总是更喜欢将请求排队,而不是添加新请求线程
如果请求无法排队,则会创建一个新线程,除非这将超过maximumPoolSize,在这种情况下,任务将拒绝 

排队的一般策略有三种:

直接切换。对于工作来说是一个不错的默认选择队列是一个@link SynchronousQueue,它将任务交给线程而不是拿着它们。这里是对任务进行排队的尝试 如果没有线程可立即运行,则将失败,因此将构造新线程。此政策可避免在以下情况下锁定:处理可能具有内部依赖关系的请求集。直接切换通常需要无限的MaximumPoolSize来实现避免拒绝新提交的任务。这反过来又承认了

无界队列。LinkedBlockingQueue 不超过corePoolSize 线程将永远不会被创建。(以及maximumPoolSize的值因此没有任何影响。)这可能是适当的每个任务都完全独立于其他任务,因此任务不能相互影响执行;例如,在网页服务器中。而这种排队方式有助于消除短暂的请求爆发,它承认命令继续到达时的无限工作队列增长平均速度比处理速度快

有界队列。有界队列,例如ArrayBlockingQueue 有助于防止资源耗尽 与有限的MaximumPoolSize一起使用,但可能更难,调整和控制。队列大小和最大池大小可以交易,互相关闭:使用大型队列和小型池可以最大限度地减少CPU使用率、操作系统资源和上下文切换开销

  • 拒绝策略

方法@link#execute(Runnable)中提交的新任务将被删除,当执行者被关闭时,以及执行器对最大线程和工作队列使用有限边界容量,并且已饱和。在这两种情况下,@code execute方法,则会出现拒绝策略。RejectedExecutionHandler rejectedExecution 提供了四个预定义的处理程序包括:
在默认的@link ThreadPoolExecutor.AbortPolicy中处理程序在拒绝
在@link ThreadPoolExecutor.callerRunPolicy中,线程 调用@code execute本身的运行任务。这提供了一个简单的反馈控制机制,将降低提交新任务
在@link ThreadPoolExecutor.DiscardPolicy中,一个任务无法执行的命令被简单地删除
在@link ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器未关闭,任务位于工作队列的最前面删除,然后重试执行可能再次失败,导致重复此操作。
可以定义和使用其他类型的RejectedExecutionHandler 类特别是当政策设计为仅在特定条件下有效时

  • 钩子方法

此类提供了@code protected可重写的 @link#beforeExecute(线程,可运行)和 @link#afterExecute(Runnable,Throwable)调用的方法,在执行每个任务之前和之后。这些可以用来操纵执行环境;例如,重新初始化线程局部变量、收集统计信息或添加日志项。此外,可以重写方法@link#terminated以执行人完成后需要进行的任何特殊处理完全终止。

  • 队列维护

方法@link#getQueue()允许访问工作队列,用于监视和调试。将此方法用于强烈反对任何其他目的。提供了两种方法,@link#remove(Runnable)和@link#purge可用于当有大量排队的任务时,协助进行存储回收取消

  • 终止线程池

程序中不再引用的池没有剩余的线程将自动@code shutdown。如果您希望确保回收未引用的池,可以调用shutdown

  • 最后包括Doug Lea 写的一个扩展实例 

重写一个或多个受保护的挂钩方法,例如下面是一个子类,它添加了一个简单的暂停/恢复功能:

class PausableThreadPoolExecutor extends ThreadPoolExecutor 
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();
 
    public PausableThreadPoolExecutor(...)  super(...); 
 
    protected void beforeExecute(Thread t, Runnable r) 
      super.beforeExecute(t, r);
      pauseLock.lock();
      try 
        while (isPaused) unpaused.await();
       catch (InterruptedException ie) 
        t.interrupt();
       finally 
        pauseLock.unlock();
      
    
 
    public void pause() 
      pauseLock.lock();
      try 
        isPaused = true;
       finally 
        pauseLock.unlock();
      
    
 
    public void resume() 
      pauseLock.lock();
      try 
        isPaused = false;
        unpaused.signalAll();
       finally 
        pauseLock.unlock();
      
    
  

成员属性

主要是主池控制状态 中各个状态, ctl 是个 Integer 的原子变量用来记录线程池状态 和 线程池中线程个数 ,​​​​​​​

     //默认是RUNNING状态,线程个数为0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //线程个数 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程最大个数(低29位)00011111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

   //运行状态存储在高阶位中
    //(高3位):11100000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

这里描述的是两个概念领域:

  • workerCount,指示有效线程数

workerCount是已注册的任务数,

  • 运行状态提供主要的生命周期控制,具有以下值

正在运行(RUNNING):接受新任务和处理排队的任务
关机(SHUTDOWN):不接受新任务,但处理排队的任务
停止(STOP):不接受新任务,不处理排队的任务,并中断正在进行的任务
整理(TIDYING):所有任务都已终止,workerCount为零,正在转换为状态整理的线程将运行终止的钩子方法
终止(TERMINATED):terminated方法调用完成以后的状态

  • 线程池状态之间进行转换

RUNNING ->shutdown:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。
RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。
SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
STOP -> TIDYING:当线程池为空的时候。
TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。

任务队列属性,用作存放注册的任务

   private final BlockingQueue<Runnable> workQueue;

在添加任务时,可以添加callable,这是在AbstractExecutorService 抽象方法中做了统一的封装,然后统一调用的是execute方法

 public <T> Future<T> submit(Callable<T> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    

然后包括主线程锁,存储工作线程的集合

//这里用来锁定工作线程和相关记录的安全性
private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition();

后面的包括拒绝handler ,以及活跃时间,是否允许核心线程超时杀死、核心线程数、最大线程数这些属性在核心关键方法中充分使用到。

private volatile RejectedExecutionHandler handler;
private volatile ThreadFactory threadFactory;
 private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
 private volatile int corePoolSize;
private volatile int maximumPoolSize;

属性这里设置了默认拒绝策略, 默认的拒绝策略就是执行时抛出异常exception包括下面线程池为我们自定义好了集中拒绝策略可供我们使用的

 private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

 public static class AbortPolicy implements RejectedExecutionHandler 

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        
    


  public static class DiscardPolicy implements RejectedExecutionHandler 
  
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
        
    


public static class DiscardOldestPolicy implements RejectedExecutionHandler 
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
            if (!e.isShutdown()) 
                e.getQueue().poll();
                e.execute(r);
            
        
    

构造方法

默认构造方法,需要传递几个重要的参数

  • 第一个参数为核心线程数,表示默认会创建核心线程去处理任务
  • 第二个参数为最大线程数,表示当任务队列满了过后,会继续创建线程数为
  • 第三和第四个参数一起用的,超时时间,当大于核心线程空置超过5秒则销毁线程
  • 第五个参数,则是创建一个任务队列。
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    

默认的拒绝策略和创建线程的工厂,可以使用的两种线程工厂

DefaultThreadFactory就是创建一个普通的线程,非守护线程,优先级为5。

PrivilegedThreadFactory  创建的线程工厂,增加了两个特性:ClassLoader和AccessControlContext,从而使运行在此类线程中的任务具有与当前线程相同的访问控制和类加载器。

/**
*线程工厂捕获访问控制上下文和类加载器
*/
 static class PrivilegedThreadFactory extends DefaultThreadFactory 
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        PrivilegedThreadFactory() 
            super();
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) 
                // Calls to getContextClassLoader from this class
                // never trigger a security check, but we check
                // whether our callers have this permission anyways.
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

                // Fail fast
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        

        public Thread newThread(final Runnable r) 
            return super.newThread(new Runnable() 
                public void run() 
                    AccessController.doPrivileged(new PrivilegedAction<Void>() 
                        public Void run() 
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        
                    , acc);
                
            );
        
    

execute方法

执行添加任务方法 

用于提交任务到线程池中

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
		/*
		*分三步进行:
		*
		* 1. 如果正在运行的线程少于corePoolSize,请尝试
		*以给定命令作为第一个线程启动新线程
		*任务。对addWorker的调用以原子方式检查运行状态和
		*workerCount,从而防止可能增加
		*在不应该的情况下,通过返回false执行线程。
		*
		* 2. 如果任务可以成功排队,那么我们仍然需要
		*再次检查是否应该添加线程
		*(因为自上次检查以来,已有的已死亡)或
		*自进入此方法后,池已关闭。所以我们
		*重新检查状态,如有必要,在以下情况下回滚排队
		*已停止,如果没有线程,则启动新线程。
		*
		* 3. 如果我们无法将任务排队,那么我们将尝试添加一个新任务
		*线。如果失败了,我们知道我们已经被关闭或饱和了
		*所以拒绝这个任务。
		*/

        //第一步
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        //第二步
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        //第三步
        else if (!addWorker(command, false))
            reject(command);
    

这里分为下面三部分

  • 如果正在运行的线程少于corePoolSize,请尝试以给定命令作为第一个线程启动新线程任务。对addWorker的调用以原子方式检查运行状态和workerCount,从而防止可能增加在不应该的情况下,通过返回false执行线程。 
  • 如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来,已有的已死亡)或自进入此方法后,池已关闭。所以我们重新检查状态,如有必要,在以下情况下回滚排队已停止,如果没有线程,则启动新线程。

addWorker方法

检查是否可以添加与当前工作线程相关的新工作线程池状态和给定的界限(核心或最大值)。如果是,相应地调整工人数量,如有可能,调整创建并启动新的辅助进程,运行firstTask作为其第一项任务。如果池停止或停止,此方法返回false有资格关闭。如果线程工厂在被询问时无法创建线程。如果线程由于线程工厂返回,创建失败null,或由于异常(通常是中的OutOfMemoryError)而导致Thread.start()),我们可以干净地回滚。

 private boolean addWorker(Runnable firstTask, boolean core) 
        //第一部分
        retry:
        for (;;) 
            int c = ctl.get();
            int rs = runStateOf(c);

            // 仅在必要时检查队列是否为空。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) 
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // 其他CAS由于workerCount更改而失败;重试内部循环
            
        

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        //第二部分
        try 
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try 
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                if (workerAdded) 
                    t.start();
                    workerStarted = true;
                
            
         finally 
            if (! workerStarted)
                addWorkerFailed(w);
        
        return workerStarted;
    

主要分两部分

  • 第一部分通过cas操作 增加线程池线程数,利用双重循环防止出现异常的情况
for (;;) 
            int c = ctl.get();
            int rs = runStateOf(c);

            // 仅在必要时检查队列是否为空。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) 
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // 其他CAS由于workerCount更改而失败;重试内部循环
            
        

 会返回false的情况,当前线程池状态为 STOP,TIDYING,TERMINATED;

当前线程池状态为 SHUTDOWN 并且已经有了第一个任务则会直接返回false;

  • 第二部分发安全的把任务添加到 workers 里面,启动任务执行。
w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try 
                    //在保持锁定的同时重新检查。
                    //在ThreadFactory出现故障或
                    //在获得锁之前关闭。
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                if (workerAdded) 
                    t.start();
                    workerStarted = true;
                
            
  • 最后在做了一个当工作线程启动失败时,会将任务删除掉,并尝试看是否终止掉线程池
  private void addWorkerFailed(Worker w) 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
         finally 
            mainLock.unlock();
        
    

工作任务Worker

工作任务主要维护中断控制状态线程运行任务,以及其他次要簿记。此类有机会扩展AbstractQueuedSynchronizer为了简化获取和释放每个锁周围的锁任务执行。这可以防止发生以下情况的中断:用于唤醒等待来自的任务的工作线程而是中断正在运行的任务。我们实现了一个简单的非重入互斥锁而非使用ReentrantLock,因为我们不希望辅助任务能够当调用池控制方法时重新获取锁,如setCorePoolSize。此外,为了抑制中断,直到线程实际上开始运行任务,我们初始化锁将状态设置为负值,并在启动时将其清除(在运行工人)。

run方法

将主运行循环委托给外部runWorker

public void run() 
            runWorker(this);
        

 runworker方法中做的循环触发

 final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
            while (task != null || (task = getTask()) != null) 
                w.lock();
                //如果池正在停止,请确保线程被中断;
                //如果没有,请确保线程没有中断。这
                //在第二种情况下需要重新检查才能处理
                //清除中断时立即关闭循环
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        task.run();
                     catch (RuntimeException x) 
                        thrown = x; throw x;
                     catch (Error x) 
                        thrown = x; throw x;
                     catch (Throwable x) 
                        thrown = x; throw new Error(x);
                     finally 
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                
            
            completedAbruptly = false;
         finally 
            processWorkerExit(w, completedAbruptly);
        
    

主工作运行循环。重复地从队列和执行它们,同时处理一些问题:

  • 我们可以从最初的任务开始,在这种情况下不需要第一个。否则,只要池是运行时,我们从getTask获取任务。如果返回null,则工作进程由于池状态或配置更改而退出参数。其他退出由异常抛出导致外部代码,在这种情况下completedjustly有效通常导致processWorkerExit替换此线程。
  • 在运行任何任务之前,将获取锁以防止其他池在任务执行时中断,然后确保除非池停止,否则此线程没有它的中断设置。
  • 每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,我们会导致线程死亡(使用completedjustly true中断循环)未经处理这项任务。
  • 假设在PreExecute正常完成之前,我们运行任务,正在收集任何抛出的异常以发送到afterExecute。我们分别处理RuntimeException和Error(两者都是规格保证了我们的陷阱)和任意丢弃。因为我们无法在Runnable.run中重新播放丢弃的内容,所以在退出时将它们包装在错误中(到线程的UncaughtExceptionHandler)。也会引发任何异常保守地导致线程死亡。
  • task.run完成后,我们调用afterExecute,它可能还将引发异常,这也将导致线程死。根据JLS第14.20节,此例外情况是即使task.run抛出,也将生效。
  • 异常机制的净效果是afterExecute线程的UncaughtExceptionHandler具有同样的准确性
    我们可以提供的关于客户遇到的任何问题的信息用户代码。

interruptIdleWorkers方法

该方法用于中断可能正在等待任务的线程,并根据传入的参数 onlyone判断是否只中止一个参数 

  private void interruptIdleWorkers(boolean onlyOne) 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            for (Worker w : workers) 
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) 
                    try 
                        t.interrupt();
                     catch (SecurityException ignore) 
                     finally 
                        w.unlock();
                    
                
                if (onlyOne)
                    break;
            
         finally 
            mainLock.unlock();
        
    

shutdown()方法和shutdownNow()方法

showdown方法 这里会将当前所有的线程状态更改为shutdown状态,线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。正在执行的任务会继续执行下去,没有被执行的则中断。

 public void shutdown() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
         finally 
            mainLock.unlock();
        
        tryTerminate();
    

调用的 interruptIdleWorkers方法作用则是终止可能正在等待任务的线程

shutdownNow方法会将线程状态更改为STOP状态,正在执行的任务则被停止,没被执行任务的则返回。

 public List<Runnable> shutdownNow() 
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
         finally 
            mainLock.unlock();
        
        tryTerminate();
        return tasks;
    

调用的interruptWorkers的作用则是 将所有的正在运行的任务终止掉,

 private void interruptWorkers() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            for (Worker w : workers)
                w.interruptIfStarted();
         finally 
            mainLock.unlock();
        
    

并将当前的任务队列删除并返回

tasks = drainQueue();

    线程池日志打印线程名称

这个名称默认是在线程创建工厂中自动给生成的,如果想要他打印的名称换一种,可以自定义个线程工厂,例如下面的,然后在用构造参数放进去,这就能达到自定义的效果了

static class MyThreadFactory implements ThreadFactory 
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() 
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "my-pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        

        public Thread newThread(Runnable r) 
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        
    

总结

最后整个线程池,ThreadPoolExecutor 主要就分为几个模块,包括工作线程任务、任务队列、拒绝策略模块、线程创建工厂模块,整个线程池的生命周期贯穿整个线程池的运行。

高并发多线程之线程基础中生命周期线程封闭cpu缓存(代码片段)

...的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程包括由操作系统分配的内存空间,包含一个或多个线程。一个线程不能独立的存在,它必须是进程的一... 查看详情

高并发多线程基础之threadpoolexecutor源代码分析(代码片段)

前言这篇主要讲述ThreadPoolExecutor的源码分析,贯穿整篇文章主要就线程池的创建、参数分析、提交任务,通过源码及注释充分理解线程池的工作原理。线程池为什么需要需要线程池首先线程并不是越多越好,过多的创... 查看详情

java开发之高并发必备篇——线程基础

​​提到高并发,这几年几乎是火遍编程界的网络名词了。无它,随着现在互联网的高速发展特别是电商平台类的应用快速发展,互联网服务内容也越来越丰富,用户越来越多,淘宝、天猫、京东、“拼夕夕... 查看详情

java并发编程系列之二线程基础

上篇文章对并发的理论基础进行了回顾,主要是为什么使用多线程、多线程会引发什么问题及引发的原因,和怎么使用Java中的多线程去解决这些问题。正所谓,知其然知其所以然,这是学习一个知识遵循的原则。推荐读者先行... 查看详情

多线程与高并发基础三

多线程顺序打印A1B2C3到Z26面试题:写一个固定容量同步容器,拥有put和get方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用实现一个容器,提供两个方法,add,size写两个线程,线程1添加10个元素到容器中,线程2实现... 查看详情

高并发多线程安全之信号量线程组守护线程线程栅栏等的分析(代码片段)

高并发多线程安全之原子性问题、CAS机制及问题解决方案多线程编程之java内存模型(JMM)与可见性问题前言前两篇文章主要分析的是线程安全中java内存模型;以及常见的线程安全三大问题:原子性、可见性、有序性问题这... 查看详情

高并发编程必备基础--转载自并发编程网

文章转载自并发编程网 本文链接地址:高并发编程必备基础一、前言借用Java并发编程实践中的话”编写正确的程序并不容易,而编写正常的并发程序就更难了”,相比于顺序执行的情况,多线程的线程安全问题是微妙而且... 查看详情

java——多线程高并发系列之创建多线程的三种方式(threadrunnablecallable)(代码片段)

...xff09;写在前面历时一个星期,终于整完了Java多线程高并发这个系 查看详情

并发编程之多线程基础-线程五种状态

原文地址:https://www.cnblogs.com/wangyichuan/p/5990821.html 线程从创建、运行到结束总是处于下面五个状态之一:新建状态、就绪状态、运行状态、阻塞状态及死亡状态。    1.新建状态(New):      &n... 查看详情

java——多线程高并发系列之线程池(executor)的理解与使用(代码片段)

文章目录:写在前面Demo1(使用Executors创建线程池)Demo2(使用ThreadPoolExecutor创建线程池)关于ThreadPoolExecutor中的七大参数、四种拒绝策略线程池的执行策略写在前面可以以newThread(()->线程执行的任务).start();... 查看详情

java——多线程高并发系列之threadlocal的使用(代码片段)

文章目录:写在前面Demo1Demo2Demo3写在前面除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决为每个线程绑定自己的值。Demo1packagecom.szh.threadlocal;/***ThreadLocal的基本使用*/publicclassTest01//定义一... 查看详情

高并发多线程安全之原子性问题cas机制及问题解决方案(代码片段)

...、以及有序性问题;这篇文章主要讲解的是多线程高并发的原子性问题,以及解决原子性问题、CAS机制、自旋锁的优缺点、以及ABA问题等解决什么是原子操作定义即一个操作或者多个操作,要么全部执行并且执行的过... 查看详情

java——多线程高并发系列之lockreentrantlock(代码片段)

文章目录:写在前面说说synchronized和Lock的区别?Demo1(先演示一下锁的可重入性)Demo2(ReentrantLock的基本使用)Demo3(使用Lock锁同步不同方法中的代码块)Demo4(ReentrantLock锁的可重入性)Demo 查看详情

并发编程之多线程基础-守护线程与非守护线程

守护线程与主线程之间的联系:Java中有两种线程,一种是用户线程,另一种是守护线程。用户线程是指用户自定义创建的线程,主线程停止,用户线程不会停止用户线程属于非守护线程守护线程当进程不存在或主线程停止,守护... 查看详情

java——多线程高并发系列之readwritelock读写锁(代码片段)

文章目录:写在前面Demo1(读读共享)Demo2(写写互斥)Demo3(读写互斥)写在前面synchronized内部锁与ReentrantLock锁都是独占锁(排它锁),同一时间只允许一个线程执行同步代码块,可以保证线程的... 查看详情

java——多线程高并发系列之synchronized关键字(代码片段)

文章目录:写在前面Demo1(synchronized面对同一个实例对象)Demo2(synchronized面对多个实例对象)Demo3(synchronized面对一个publicstaticfinal常量)Demo4(synchronized同步代码块分别位于实例方法、静态方法中& 查看详情

并发编程之多线程基础篇及面试

线程与进程区别每个正在系统上运行的程序都是一个进程。每个进程包含一到多个线程。线程是一组指令的集合,或者是程序的特殊段,它可以在程序里独立执行。也可以把它理解为代码运行的上下文。所以线程基本上是轻量级... 查看详情

java——多线程高并发系列之arraylisthashsethashmap集合线程不安全的解决方案(代码片段)

...下图这样的异常信息:👇👇👇这是一个并发修改异常,首先ArrayList肯定是线程不安全的,产生这个异常的原因就是可能第一个线程刚进入ArrayList集合中要进行add操作时, 查看详情