juc并发编程之completablefuture基础用法(代码片段)

知道什么是码怪吗? 知道什么是码怪吗?     2022-12-03     161

关键词:

目录

实现多线程的四种方式

方式一:继承Thread类

方式二:实现Runnable接口

方式三:实现Callable接口

方式四:线程池

创建异步对象

回调方法

handle方法 

线程串行化 

任务组合

组合任务单任务完成及执行


实现多线程的四种方式

方式一:继承Thread类

public static class Thread01 extends Thread 
    @Override
    public void run() 
    

执行任务方式: 

Thread thread = new Thread01();
thread.start();

方式二:实现Runnable接口

public static class Runable01 implements Runnable 
    @Override
    public void run() 
    

执行任务方式:  

Runable01 runable01 = new Runable01();
new Thread(runable01).start();

方式三:实现Callable接口

    public static class Callable01 implements Callable<Object> 
        @Override
        public Integer call() throws Exception 
            return 0;
        
    

 执行任务方式:

FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();// 执行任务
System.out.println(futureTask.get());// 得到返回值

方式四:线程池

前三种方式执行的过程相当于每次新任务都会新增一个线程,这是十分消耗内存的,为了统一管理线程,引入了线程池。执行任务的线程从线程池中拿。

查看线程池构造方法的源码,其七个构造参数分别表示的含义如下:

参数名 参数类型参数含义
corePoolSizeint核心线程数,即线程池中一直保持的线程数量。
maximumPoolSizeint允许的最大线程数
keepAliveTimelong线程数大于核心线程数时,线程在该时间下没有收到任务就会自动释放。
unitTimeUnit时间单位
workQueueBlockingQueue<Runnable>阻塞队列,存储等待执行的任务
threadFactory       ThreadFactory线程工厂,创造线程
handlerRejectedExecutionHandler拒绝策略

ThreadPoolExecutor executor = new ThreadPoolExecutor(
                20,// 核心线程数
                200,// 最大线程数
                10,// 线程关闭时间
                TimeUnit.SECONDS,// 时间:秒
                new LinkedBlockingDeque<>(100000),// 存储的异步任务最大数
                Executors.defaultThreadFactory(),// 线程工厂
                new ThreadPoolExecutor.AbortPolicy());//拒绝策略

 接下来为了方便测试,我们开启一个固定10个线程的线程池。

ExecutorService executor = Executors.newFixedThreadPool(10);// 开启一个固定10个线程的线程池

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。分别为:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

public static CompletableFuture<Void> runAsync(Runnable runnable)

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

这四种方法的区别在于:

方法名区别
supplyAsync(Supplier<U> supplier)可以返回结果
runAsync(Runnable runnable)    不能返回结果
supplyAsync(Supplier<U> supplier, Executor executor)可以返回结果,可指定线程池
runAsync(Runnable runnable, Executor executor)不能返回结果,可指定线程池
CompletableFuture.runAsync(() -> 
    System.out.println("runAsync方法,当前线程:" + Thread.currentThread().getId());
, executor);

CompletableFuture.supplyAsync(() -> 
    System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
    return "这是结果";
, executor);

回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

方法以Async结尾,表示此任务可能被其他线程执行,并且也可以指定线程池。如果不以Async结尾,则是继续用当前的线程执行任务。

方法名区别
whenComplete继续以当前线程执行本次任务
whenCompleteAsync将当前任务提交给线程池处理

whenCompleteAsync方法

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 
    System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;// 制造数学异常
    return i;
, executor).whenCompleteAsync((res, excption) -> // 只能感知异常,无法处理异常
    System.out.println("res:   " + res + "excption:   " + excption);
, executor).exceptionally(throwable -> // 处理异常
    return 10;// 返回默认值
);
System.out.println("结果为:" + future.get());// 获取返回值

whenComplete方法 

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 
    System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;// 制造数学异常
    return i;
, executor).whenComplete((res, excption) -> // 只能感知异常,无法处理异常
    System.out.println("res:   " + res + "excption:   " + excption);
).exceptionally(throwable -> // 处理异常
    return 10;// 返回默认值
);
System.out.println("结果为:" + future.get());

执行结果为: 

handle方法 

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

方法以Async结尾,表示此任务可能被其他线程执行,并且也可以指定线程池。如果不以Async结尾,则是继续用当前的线程执行任务。

该方法可对结果做最后的处理,可以处理异常并且有返回值。

下面这段代码,如果无异常,返回500,有异常返回0。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 
    System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2 ;
    return i;
, executor).handle((res, thr) -> // 感知异常并处理异常
    System.out.println("res:   " + res + "    thr:   " + thr);
    if (res != null) return res * 100;// 如果结果不为空
    if (thr != null) return 0;// 如果异常不为空
    return 100;
);
System.out.println(future.get());

正常执行结果:

异常执行结果:

线程串行化 

主要就是以下9个方法,以Async结尾的表示可以被其他线程执行,而参数中有Executor的表示可以指定线程池。

 三种方式的区别:

方法名区别
thenApply获取上一个任务的结果,并且返回当前任务的结果
thenAccept获取上一个任务的结果,无返回值
thenRun无法获取上一个任务的结果,无返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 
    System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    return i;
, executor).thenApplyAsync((res) -> // 前一个任务处理完成之后,后续处理
    System.out.println("任务二启动");
    return res * 10;
, executor);
System.out.println(future.get());

执行结果:

任务组合

其意思就是将两个任务组合在一起,当两个任务都执行完成之后在执行给定的任务。

其方法名和参数值至此以及无需赘述了。 

方法名区别
thenCombine获取两个任务的返回结果,并且返回当前任务的返回值
thenAcceptBoth获取两个任务的返回结果,并且无返回值
runAfterBoth不获取两个任务的结果,并且无返回值

下面编写了两个任务

CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务一开始:" + Thread.currentThread().getId());
    System.out.println("任务一结束");
    return 10;
, executor);

CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务二开始:" + Thread.currentThread().getId());
    System.out.println("任务二结束");
    return 123;
, executor);

 runAfterAsync方法

future01.runAfterBothAsync(future02, () -> // 无法获取任务的返回值,也不返回结果
    System.out.println("开始执行,无返回值也无参数");
, executor);

执行结果 

thenAcceptBothAsync方法

future01.thenAcceptBothAsync(future02, (f1, f2) -> // 获取两个任务的返回值,不返回结果
    System.out.println("两个任务结果之和为:" + f1 * f2);
, executor);

执行结果  

  

 thenCombineAsync方法

CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> // 获取两个任务的返回值,并返回结果
    System.out.println("两个任务结果之和为:" + f1 * f2);
    return "hello world";
, executor);
System.out.println("thenCombineAsync返回值为: "+future.get());

执行结果  

 

组合任务单任务完成及执行

其意思就是将两个任务组合在一起,只要有一个任务完成了就执行给定的任务。

方法名区别
applyToEither获取先完成任务的结果,有返回值
acceptEither获取先完成任务的结果,无返回值
runAfterEither不获取任务的结果,无返回值

下面编写了两个任务,其中任务一睡眠500ms

CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务一开始:" + Thread.currentThread().getId());
    try 
        Thread.sleep(500);// 线程睡眠500ms
     catch (InterruptedException e) 
        e.printStackTrace();
    
    System.out.println("任务一结束");
    return 10;
, executor);

CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务二开始:" + Thread.currentThread().getId());
    System.out.println("任务二结束");
    return 123;
, executor);

runAfterEither方法

future01.runAfterEitherAsync(future02, () -> 
    System.out.println("runAfterEitherAsync开始执行");
, executor);

执行结果

acceptEitherAsync方法

future01.acceptEitherAsync(future02, (res) -> 
    System.out.println("acceptEitherAsync方法开始执行    " + res);
, executor);

执行结果 

 

applyToEitherAsync方法

CompletableFuture<String> result = future01.applyToEitherAsync(future02, (res) -> 
    System.out.println("applyToEitherAsync方法开始执行    " + res);
    return "res:   " + res.toString();
, executor);
System.out.println(result.get());

执行结果 

多任务组合

将多个任务进行组合。 

allOf:等待所有任务完成。

anyOf:只要有一个任务完成。

下面代码编写了3个任务

CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务一开始:" + Thread.currentThread().getId());
    try 
        Thread.sleep(500);// 线程睡眠500ms
     catch (InterruptedException e) 
        e.printStackTrace();
    
    System.out.println("任务一结束");
    return 10;
, executor);

CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务二开始:" + Thread.currentThread().getId());
    System.out.println("任务二结束");
    return 20;
, executor);

CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(() -> 
    System.out.println("任务三开始:" + Thread.currentThread().getId());
    System.out.println("任务三结束");
    return 30;
, executor);

allOf & anyOf 

CompletableFuture<Void> allOf = future01.allOf(future02, future03);
CompletableFuture<Object> anyOf = future01.anyOf(future02, future03);

juc并发编程之completablefuture基础用法(代码片段)

目录实现多线程的四种方式方式一:继承Thread类方式二:实现Runnable接口方式三:实现Callable接口方式四:线程池创建异步对象回调方法handle方法 线程串行化 任务组合组合任务单任务完成及执行实现多线程的四... 查看详情

java并发编程系列之三juc概述

上篇文章为解决多线程中出现的同步问题引入了锁的概念,上篇文章介绍的是Synchronized关键字锁,本篇文章介绍更加轻量级的锁Lock接口及引出JUC的相关知识。本文不力争阐释清楚JUC框架的所有内容,而是站在一定的高度下,了... 查看详情

java并发编程系列之三juc概述

上篇文章为解决多线程中出现的同步问题引入了锁的概念,上篇文章介绍的是Synchronized关键字锁,本篇文章介绍更加轻量级的锁Lock接口及引出JUC的相关知识。本文不力争阐释清楚JUC框架的所有内容,而是站在一定的高度下,了... 查看详情

juc并发编程--经典实例之卖票&转账(代码片段)

1.卖票代码:importlombok.extern.slf4j.Slf4j;importjava.util.ArrayList;importjava.util.List;importjava.util.Random;importjava.util.Vector;@Slf4j(topic="c.ExerciseSell")publicclassExerciseSel 查看详情

juc并发编程共享模式之工具juc读写锁stampedlock--介绍&使用(代码片段)

1.StampedLock1.1介绍该类自JDK8加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用加解读锁:longstamp=lock.readLock();lock.unlockRead(stamp);加解写锁:longstamp=lock.writeLock();lock.unlockWrite(stamp);乐... 查看详情

juc并发编程共享模式之工具juc线程安全的集合类--线程安全的集合类概述

...,线程安全的方法上面都是使用synchronized修饰,并发的性能比较低。时至今日不推荐使用。示例:1.2使用Collections装饰的线程安全集合在原来线程不安全的集合里面的方法多加了一个synchronized修饰,使得原来线程不安全... 查看详情

juc并发编程--应用之统筹(烧水泡茶)(代码片段)

1.应用之统筹(烧水泡茶)阅读华罗庚《统筹方法》,给出烧水泡茶的多线程解决方案,提示:参考图二,用两个线程(两个人协作)模拟烧水泡茶过程文中办法乙、丙都相当于任务串行而图一相当于启... 查看详情

juc并发编程共享模式之工具threadpoolexecutor--正确处理线程池异常(代码片段)

1.正确处理线程池异常1.1如果不处理异常示例代码:packagecom.tian;importlombok.extern.slf4j.Slf4j;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecuto 查看详情

juc并发编程多线程设计模式--同步模式之balking(代码片段)

1.同步模式之Balking1.1定义Balking(犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回1.2实现例如:publicclassMonitorService//用来表示是否已经有线... 查看详情

juc并发编程共享模式之工具threadpoolexecutor--线程池应用之定时任务(在每周周四执行定时任务)(代码片段)

1.线程池应用之定时任务示例:在每周周四执行定时任务packagecom.tian;importjava.time.DayOfWeek;importjava.time.Duration;importjava.time.LocalDateTime;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExec 查看详情

juc-多线程之forkjoin;异步调用completablefuture(代码片段)

一、ForkJoinForkJoin是在Java7提供的一个用于并行执行任务的框架,ForkJoin从字面上看Fork是分岔的意思,Join是结合的意思,核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果࿰... 查看详情

juc并发编程共享模式之工具jucreentrantlock--reentrantlock原理(代码片段)

1.ReentrantLock原理1.1非公平锁实现原理加锁解锁流程先从构造器开始看,默认为非公平锁实现:publicReentrantLock()sync=newNonfairSync();NonfairSync继承自AQS没有竞争时第一个竞争出现时:Thread-1执行了CAS尝试将state由0改为1,结果失... 查看详情

juc并发编程--jit即时编译器之锁清除(代码片段)

1.1锁消除导入相关依赖:因为JMH是JDK9自带的,如果是JDK9之前的版本需要加入如下依赖<dependency><groupId>org.openjdk.jmh</groupId><artifactId>jmh-core</artifactId><version>1.23</version>&l 查看详情

juc并发编程共享模式之工具juc读写锁reentrantreadwritelock--reentrantreadwritelock(不可重入锁)使用&注意事项(代码片段)

...作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。类似于数据库中的select...from...lockinsharemode提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法1.1示例代码:读-... 查看详情

juc并发编程共享模式之工具jucsemaphore(信号量)--semaphore原理

1.Semaphore原理Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位减一。刚开始permits(state)为3,这时5个线程来获取资源假设其中Thread-1&... 查看详情

juc并发编程共享模式之工具jucsemaphore(信号量)--介绍&使用(代码片段)

1.Semaphore1.1介绍信号量,用来限制能同时访问共享资源的线程上限。作用类似于停车场里面的信号指示牌:显示还有多少空车位。1.2使用1.2.1不使用Semaphorepackagecom.tian;importlombok.extern.slf4j.Slf4j;@Slf4j(topic="c.TestSemaphore")... 查看详情

juc并发编程详解java关键字之volatile(代码片段)

📢📢📢📣📣📣哈喽!大家好,我是【Bug终结者】,【CSDNJava领域优质创作者】🏆,阿里云专家博主🏆,51CTO人气博主🏆,InfoQ写作专家🏆一位上进心 查看详情

juc并发编程共享模式之工具jucconcurrenthashmap--concurrenthashmap的错误使用和正确使用(示例:统计单词个数)(代码片段)

1.练习:单词计数1.1错误使用1.1.1生成测试数据packagecom.tian;importjava.io.FileOutputStream;importjava.io.IOException;importjava.io.OutputStreamWriter;importjava.io.PrintWriter;importjava.util.ArrayList;importjava.util 查看详情