线程池续:你必须要知道的线程池submit()实现原理之futuretask!(代码片段)

一枝花算不算浪漫 一枝花算不算浪漫     2022-12-25     445

关键词:

FutureTask思维导图.png

前言

上一篇内容写了Java中线程池的实现原理及源码分析,说好的是实实在在的大满足,想通过一篇文章让大家对线程池有个透彻的了解,但是文章写完总觉得还缺点什么?

上篇文章只提到线程提交的execute()方法,并没有讲解线程提交的submit()方法,submit()有一个返回值,可以获取线程执行的结果Future<T>,这一讲就那深入学习下submit()FutureTask实现原理。

使用场景&示例

使用场景

我能想到的使用场景就是在并行计算的时候,例如一个方法中调用methodA()、methodB(),我们可以通过线程池异步去提交方法A、B,然后在主线程中获取组装方法A、B计算后的结果,能够大大提升方法的吞吐量。

FutureTask使用场景.png

使用示例

/**
 * @author wangmeng
 * @date 2020/5/28 15:30
 */
public class FutureTaskTest 
    public static void main(String[] args) throws InterruptedException, ExecutionException 
        ExecutorService threadPool = Executors.newCachedThreadPool();

        System.out.println("====执行FutureTask线程任务====");
        Future<String> futureTask = threadPool.submit(new Callable<String>() 
            @Override
            public String call() throws Exception 
                System.out.println("FutureTask执行业务逻辑");
                Thread.sleep(2000);
                System.out.println("FutureTask业务逻辑执行完毕!");
                return "欢迎关注: 一枝花算不算浪漫!";
            
        );

        System.out.println("====执行主线程任务====");
        Thread.sleep(1000);
        boolean flag = true;
        while(flag)
            if(futureTask.isDone() && !futureTask.isCancelled())
                System.out.println("FutureTask异步任务执行结果:" + futureTask.get());
                flag = false;
            
        

        threadPool.shutdown();
    

上面的使用很简单,submit()内部传递的实际上是个Callable接口,我们自己实现其中的call()方法,我们通过futureTask既可以获取到具体的返回值。

submit()实现原理

submit() 是也是提交任务到线程池,只是它可以获取任务返回结果,返回结果是通过FutureTask来实现的,先看下ThreadPoolExecutor中代码实现:

public class ThreadPoolExecutor extends AbstractExecutorService 
    public <T> Future<T> submit(Callable<T> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    


public abstract class AbstractExecutorService implements ExecutorService 
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) 
        return new FutureTask<T>(callable);
    

提交任务还是执行execute()方法,只是task被包装成了FutureTask ,也就是在excute()中启动线程后会执行FutureTask.run()方法。

再来具体看下它执行的完整链路图:

submit()全链路图.png

上图可以看到,执行任务并返回执行结果的核心逻辑实在FutureTask中,我们以FutureTask.run/get 两个方法为突破口,一点点剖析FutureTask的实现原理。

FutureTask源码初探

先看下FutureTask中部分属性:

FutureTask属性.png

public class FutureTask<V> implements RunnableFuture<V> 
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;

  1. state

当前task状态,共有7中类型。
NEW: 当前任务尚未执行
COMPLETING: 当前任务正在结束,尚未完全结束,一种临界状态
NORMAL:当前任务正常结束
EXCEPTIONAL: 当前任务执行过程中发生了异常。
CANCELLED: 当前任务被取消
INTERRUPTING: 当前任务中断中..
INTERRUPTED: 当前任务已中断

  1. callble

用户提交任务传递的Callable,自定义call方法,实现业务逻辑

  1. outcome

任务结束时,outcome保存执行结果或者异常信息。

  1. runner

当前任务被线程执行期间,保存当前任务的线程对象引用

  1. waiters

因为会有很多线程去get当前任务的结果,所以这里使用了一种stack数据结构来保存

FutureTask.run()实现原理

我们已经知道在线程池runWorker()中最终会调用到FutureTask.run()方法中,我们就来看下它的执行原理吧:

run()执行逻辑.png

具体代码如下:

public class FutureTask<V> implements RunnableFuture<V> 
    public void run() 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try 
            Callable<V> c = callable;
            if (c != null && state == NEW) 
                V result;
                boolean ran;
                try 
                    result = c.call();
                    ran = true;
                 catch (Throwable ex) 
                    result = null;
                    ran = false;
                    setException(ex);
                
                if (ran)
                    set(result);
            
         finally 
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        
    

首先是判断FutureTaskstate状态,必须是NEW才可以继续执行。

然后通过CAS修改runner引用为当前线程。

接着执行用户自定义的call()方法,将返回结果设置到result中,result可能为正常返回也可能为异常信息。这里主要是调用set()/setException()

FutureTask.set()实现原理

set()方法的实现很简单,直接看下代码:

public class FutureTask<V> implements RunnableFuture<V> 
    protected void set(V v) 
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        
    

call()返回的数据赋值给全局变量outcome上,然后修改state状态为NORMAL,最后调用finishCompletion()来做挂起线程的唤醒操作,这个方法等到get()后面再来讲解。

FutureTask.get()实现原理

get().png

接着看下代码:

public class FutureTask<V> implements RunnableFuture<V> 
    public V get() throws InterruptedException, ExecutionException 
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    

如果FutureTaskstateNORMAL或者COMPLETING,说明当前任务并没有执行完成,调用get()方法会被阻塞,具体的阻塞逻辑在awaitDone()方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException 

        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) 
            if (Thread.interrupted()) 
                removeWaiter(q);
                throw new InterruptedException();
            

            int s = state;
            if (s > COMPLETING) 
                if (q != null)
                    q.thread = null;
                return s;
            
            else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) 
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) 
                    removeWaiter(q);
                    return state;
                
                LockSupport.parkNanos(this, nanos);
            
            else
                LockSupport.park(this);
        
    

这个方法可以说是FutureTask中最核心的方法了,一步步来分析:

如果timed不为空,这说明指定nanos时间还未返回结果,线程就会退出。

q是一个WaitNode对象,是将当前引用线程封装在一个stack数据结构中,WaitNode对象属性如下:

 static final class WaitNode 
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode()  thread = Thread.currentThread(); 

接着判断当前线程是否中断,如果中断则抛出中断异常。

下面就进入一轮轮的if... else if...判断逻辑,我们还是采用分支的方式去分析。

分支一:if (s > COMPLETING)

此时get()方法已经有结果了,无论是正常返回的结果,还是异常、中断、取消等,此时直接返回state状态,然后执行report()方法。

分支二:else if (s == COMPLETING)

条件成立,说明当前任务接近完成状态,这里让当前线程再释放cpu,进行下一轮抢占cpu

分支三:else if (q == null)

第一次自旋执行,WaitNode还没有初始化,初始化q=new WaitNode();

分支四:else if (!queued)

queued代表当前线程是否入栈,如果没有入栈则进行入栈操作,顺便将全局变量waiters指向栈顶元素。

分支五/六:LockSupport.park

如果设置了超时时间,则使用parkNanos来挂起当前线程,否则使用park()

经过这么一轮自旋循环后,如果执行call()还没有返回结果,那么调用get()方法的线程都会被挂起。

被挂起的线程会等待run()返回结果后依次唤醒,具体的执行逻辑在finishCompletion()中。

最终stack结构中数据如下:

WaitNode.png

FutureTask.finishCompletion()实现原理

finishCompletion().png

具体实现代码如下:

private void finishCompletion() 
    for (WaitNode q; (q = waiters) != null;) 
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
            for (;;) 
                Thread t = q.thread;
                if (t != null) 
                    q.thread = null;
                    LockSupport.unpark(t);
                
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            
            break;
        
    

    done();

    callable = null;

代码实现很简单,看过get()方法后,我们知道所有调用get()方法的线程,在run()还没有返回结果前,都会保存到一个有WaitNode构成的statck数据结构中,而且每个线程都会被挂起。

这里是遍历waiters栈顶元素,然后依次查询起next节点进行唤醒,唤醒后的节点接着会往后调用report()方法。

FutureTask.report()实现原理

report().png

具体代码如下:

private V report(int s) throws ExecutionException 
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);

这个方法很简单,因为执行到了这里,说明当前state状态肯定大于COMPLETING,判断如果是正常返回,那么返回outcome数据。

如果state是取消状态,抛出CancellationException异常。

如果状态都不满足,则说明执行中出现了差错,直接抛出ExecutionException

FutureTask.cancel()实现原理

cancel().png

public boolean cancel(boolean mayInterruptIfRunning) 
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try 
        if (mayInterruptIfRunning) 
            try 
                Thread t = runner;
                if (t != null)
                    t.interrupt();
             finally 
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            
        
     finally 
        finishCompletion();
    
    return true;

cancel()方法的逻辑很简单,就是修改state状态为CANCELLED,然后调用finishCompletion()来唤醒等待的线程。

这里如果mayInterruptIfRunning,就会先中断当前线程,然后再去唤醒等待的线程。

总结

FutureTask的实现原理其实很简单,每个方法基本上都画了一个简单的流程图来方便立即。

后面还打算分享一个BlockingQueue相关的源码解读,这样线程池也可以算是完结了。

在这之前可能会先分享一个SpringCloud常见配置代码分析、最佳实践等手册,方便工作中使用,也是对之前看过的源码一种总结。敬请期待!
欢迎关注:
原创干货分享.png

java线程池submit阻塞获取结果实现原理

前言Java线程池中提交任务运行,通常使用​​execute()​​方法就足够了。那如果想要实现在主线程中阻塞获取线程池任务运行的结果,该怎么办呢?答案是用​​submit()​​方法提交任务。这也是面试中经常被问到的一个知识点... 查看详情

13.threadpoolexecutor线程池之submit方法

jdk1.7.0_79   在上一篇《ThreadPoolExecutor线程池原理及其execute方法》中提到了线程池ThreadPoolExecutor的原理以及它的execute方法。本文解析ThreadPoolExecutor#submit。  对于一个任务的执行有时我们不需要它返回结果,但是有我们需... 查看详情

线程池

Executors(线程池)(jdk1.5特性)   jdk1.5之前,我们必须要手动实现自己的线程池,jdk1.5之后,Java内置支持线程池?线程池的优点   线程池里的每一个线程代码结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下... 查看详情

线程池中execute和submit的区别

...统组的人去修了,在leader的指导下,发现了app中的问题:线程池启动线程的方法为submit,而没有对submit的返回值做处理,在运行过程中也没有catchNPE,任务一直在线程池中错误循环,所以导致了问题,最终将所有线程池的submit方... 查看详情

线程基础:线程池——基本使用(中)

(接上文:《线程基础:线程池(5)——基本使用(上)》)3-4、JAVA主要线程池的继承结构我们先来总结一下上文中讨论过的内容,首先就是JAVA中ThreadPoolExecutor类的继承结构。例如以下图所看到的:ThreadPoolExecutor:这个线程池... 查看详情

线程基础:线程池——基本使用(中)

(接上文:《线程基础:线程池(5)——基本使用(上)》)3-4、JAVA主要线程池的继承结构我们先来总结一下上文中讨论过的内容,首先就是JAVA中ThreadPoolExecutor类的继承结构。如下图所示:ThreadPoolExecutor:这个线程池就是我们... 查看详情

线程基础:线程池——基本使用(中)

(接上文:《线程基础:线程池(5)——基本使用(上)》)3-4、JAVA主要线程池的继承结构我们先来总结一下上文中讨论过的内容,首先就是JAVA中ThreadPoolExecutor类的继承结构。如下图所示:ThreadPoolExecutor:这个线程池就是我们... 查看详情

python线程池threadpoolexecutor.submit的数据丢失问题

参考技术AThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。关于concurrent.futures模块下的ThreadPoolExecutor类在使用submit的时候,如果参数传进去的是生成器对象,在某些情况下,生成器对象会被消耗掉一部分或者是全部... 查看详情

mdc实现线程池tranceid全链路传递(代码片段)

1、首先定义一个自定义线程池类继承ThreadPoolTaskExecutorpublicclassThreadPoolExecutorMdcWrapperextendsThreadPoolTaskExecutor@Overridepublicvoidexecute(Runnabletask)super.execute(ThreadMdcUtil.wrap(task,MDC.getCopyOfContextMap()));@Overridepublic<T>Future<T>submit(C... 查看详情

使用线程池

/***JDK提供了ExecutorService实现了线程池功能:*线程池内部维护一组线程,可以高效执行大量小任务;*Executors提供了静态方法创建不同类型的ExecutorService;*必须调用shutdown()关闭ExecutorService;*ScheduledThreadPool可以定期调度多个任务... 查看详情

京东二面:线程池中的线程抛出了异常,该如何处理?大部分人都会答错!(代码片段)

在实际开发中,我们常常会用到线程池,但任务一旦提交到线程池之后,如果发生异常之后,怎么处理?怎么获取到异常信息?在了解这个问题之前,可以先看一下线程池的源码解析,从源码中我们知道了线程池的提交方式:sub... 查看详情

线程池的submit和execute方法区别

线程池中的execute方法大家都不陌生,即开启线程执行池中的任务。还有一个方法submit也可以做到,它的功能是提交指定的任务去执行并且返回Future对象,即执行的结果。下面简要介绍一下两者的三个区别:1、接收的参数不一样2... 查看详情

多线程使用线程池实现一个简单线程池(代码片段)

@TOC为什么要引入线程池我们知道我们每次创建启动销毁一个线程的消耗是较大的所以引入线程池的最大的好处就是减少每次启动销毁线程的损耗那么他是如何实现减少的?简单举个例子:为什么线程池子比系统申请释放快?为了... 查看详情

java线程池(代码片段)

相信在实际工作中,大家对于线程池的使用并不陌生,例如以下几个应用场景:支付成功之后,异步发送短信通知用户;公司的OA系统中,提交某些申请之后,异步发送给各个部门负责人进行审批;请求某个接口时,需要做些日... 查看详情

django异步任务线程池(代码片段)

...非常慢,一些不需要即时响应的任务可以放到后台的异步线程中完成,发起异步任务的请求就可以立即响应选择用线程池的原因是:线程比进程更为可控。不像子进程,子线程会在所属进程结束时立即结束。线程可共享内存。请... 查看详情

2-10进程池与线程池(代码片段)

一进程池与线程池在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服... 查看详情

你知道线程池是如何退出程序的吗?

摘要:本文,我们就来从源码角度深度解析线程池是如何优雅的退出程序的。本文分享自华为云社区《【高并发】从源码角度深度解析线程池是如何实现优雅退出的》,作者:冰河。本文,我们就来从源码角度深度解析线程池是... 查看详情

线程池详解(代码片段)

管理一组线程集合,方便线程的复用,免了频繁创建和销毁线程所带来的开销,相关类的继承关系如下:Executor仅声明了一个方法execute,代表要执行某个任务。ExecutorService接口在其父类接口基础上,声明了包含但不限于shutdown、s... 查看详情