高并发多线程基础之线程池(代码片段)

踩踩踩从踩 踩踩踩从踩     2022-12-31     510

关键词:

高并发多线程之线程基础中生命周期、线程封闭、cpu缓存

前言

本篇文章描述我们jdk给我们提供的线程池;了解为什么使用线程池,有哪些优点,以及几种Executors中提供给我们的工厂创建方法等

线程池的原理

为什么要使用线程池

首先线程并不是越多越好,过多的创建线程消耗大量的资源,反而达到适得其反的效果;如何正确创建线程并且去控制。

  • 线程不仅仅java中的一个对象,每个线程都有自己的工作内存空间。

        线程创建、销毁需要时间,消耗性能

        线程过多,会占用很多的内存

  • 操作系统需要频繁切换线程上下文 (都处于ruannable的状态),影响性能。
  • 如果创建时间+销毁时间>执行任务时间,就很不合算。

让一个线程执行很多任务,线程池得推出就是为了方便控制线程的数量

概念

帮我们创建和管理线程的一个中心 

线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;

工作线程:线程池中的线程、可以循环的执行任务、在没有任务时处于等待状态

任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务收尾工作,任务执行的状态等

任务队列:用于存放没有处理的任务。提供一种缓存机制

 需要实现runnable接口的对象

类的层次

  • Executor 最上层接口,之定义了executor
  • ExecutorService接口 继承了Executor接口,并扩展出Callable、future、关闭方法
  • ScheduledExecutorService接口  继承ExecutorService接口,扩展出定时任务执行方法
  • ThreadPoolExecutor 基础、标准的线程池实现
  • ScheduledThreadPoolExecutor 继承ThreadPoolExecutor  ,并实现ScheduledExecutorService接口相关的定时任务方法

ExecutorService 接口

定义出submit 方法和 invokeAll方法,包括invokeAny 方法;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

其中定义出的invokeAll 的意义就是提交任务列表,全部执行完毕,返回执行结果;或者定义好超时时间,自动返回结果

  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

 invokeAny 的意义是执行一个成功,就返回结果。

在来看submit  的源代码。

  Future<?> submit(Runnable task);
  <T> Future<T> submit(Callable<T> task);

在其中的任务,包括callable和runnable;返回指定类型。  

class MyCallable implements Callable<Integer>

    @Override
    public Integer call() throws Exception 
        return 1;
    



class MyRunnable implements Runnable

    @Override
    public void run() 
        return ;
    

在测试 submit方法

static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 5, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>());

	public static void main(String args[]) throws ExecutionException, InterruptedException 
		// Runnable Test
		Future run_future = pool.submit(new MyRunnable());
		System.out.println("run_future: " + run_future.get());

		// Callable Test
		Future call_future = pool.submit(new MyCallable());
		System.out.println("call_future:" + call_future.get());

	

得到得结果,虽然runnable 会返回future,但是是返回null。

run_future: null
call_future:1

除非使用重载得方法。预先设定好返回值;得到得结果就是设置的返回结果

// Runnable Test
		Future run_future = pool.submit(new MyRunnable(), 1);
		System.out.println("run_future: " + run_future.get());

提供的 在阻塞所有的任务的方法

/**

*阻塞,直到关闭后所有任务都已完成执行请求,或者发生超时,或者当前线程正在运行
*中断,以先发生者为准。
*/   
 boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

在来继续分析一下参数的意思

  • 第一个参数为核心线程数,表示默认会创建5个核心线程去处理任务
  • 第二个参数为最大线程数,表示当任务队列满了过后,会继续创建线程数为10个处理任务
  • 第三和第四个参数一起用的,超时时间,当大于核心线程空置超过5秒则销毁线程
  • 第五个参数,则是创建一个任务队列。
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                5,      //超过核心线程数的线程,如果超过5s(keepAliveTime)还没有任务给他执行,这个线程就会被销毁
                TimeUnit.SECONDS,                       //keepAliveTime 的时间单位
                new LinkedBlockingQueue<Runnable>(5)     //传入无界的等待队列
        );
 /**
     * 测试: 提交15个执行时间需要3秒的任务,看线程池的状况
     *
     * @param threadPoolExecutor 传入不同的线程池,看不同的结果
     * @throws Exception
     */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception 
        // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
        for (int i = 0; i < 30; i++) 
            int n = i;
            threadPoolExecutor.submit(new Runnable() 
                @Override
                public void run() 
                    try 
                        System.out.println("任务" + n +" 开始执行");
                        Thread.sleep(3000L);
                        System.err.println("任务" + n +" 执行结束");
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
            );
            System.out.println("任务" + i + " 提交成功");
        

        while(true)
            // 查看线程数量,查看队列等待数量
            Thread.sleep(1000L);
            System.out.println(">>> 线程数量:" + threadPoolExecutor.getPoolSize());
            System.out.println(">>> 队列任务数量:" + threadPoolExecutor.getQueue().size());
        

    

这里得到测试的结果,引申出的线程池的原理

 只有当execute 或者submit 执行任务时,才去创建新线程;默认的拒绝的策略是抛异常

  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,             //最大线程数 10
                5,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(5),   //等待队列容量为3

                //最多容纳13个任务,超出的会被拒绝执行
                new RejectedExecutionHandler()     //指定 任务拒绝策略
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
                        System.err.println("有任务被拒绝执行了");
                    
                );

可以自己定义出拒绝策略,只需要添加第六个参数 ,重写拒绝策略

Executors类工具类

这个是jdk提供的封装好的线程池,在开发中,方便快捷,因此采用这个工具类

  • newFixedThreadPool(int nThreads) 创建一个固定大小、任务队列无界的线程池,核心线程等于最大线程数
 public static ExecutorService newFixedThreadPool(int nThreads) 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    
  • newCachedThreadPool() 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果 池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE
     
  public static ExecutorService newCachedThreadPool() 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    
这里的SynchronousQueue 通过队列是不存任何元素的,同步队列,马上获取到数据。
这个队列为0,也就是说put方法, 是put不进去的,只有take方法,不断在读取的,take到,就可以put进去
  • newSingleThreadExecutor() 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一 个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1) 的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。
  public static ExecutorService newSingleThreadExecutor() 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    
  • newScheduledThreadPool(int corePoolSize) 能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
        return new ScheduledThreadPoolExecutor(corePoolSize);
    

线程数量

如何确定合适的线程数量

  计算型任务:cpu数量的1到2倍

   IO型任务:相对于计算型任务,需要多一些线程,要根据io阻塞时长考量决定。也可以根据需要最大和最小数据量自动增减。例如tomcat最大为200

线程池基本实现

任务仓库

 //1、需要一个任务仓库
    private BlockingQueue<Runnable> blockingQueue;


    //2、 集合容器,存放工作线程
    private List<Thread> workers;

    //3、普通线程要执行多个task,需要封装一下
    public static class Worker extends Thread

        private FixedSizeThreadPool pool;

        public Worker(FixedSizeThreadPool pool)
            this.pool = pool;
        

        @Override
        public void run() 
            while(this.pool.isWorking || this.pool.blockingQueue.size() > 0)
                Runnable task = null;

                try 
                    //如果没有任务,就阻塞等待任务
                    if (this.pool.isWorking)
                        task = this.pool.blockingQueue.take();
                    else
                        task = this.pool.blockingQueue.poll();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                

                if (task != null)
                    task.run();
                
            
        
    

其他的基本就是实现submit方法 和executor方法,都比较简单的,简易版本的线程池。

最后

本篇文章主要介绍的是高并发多线程中线程池,从应用和一些代码理解线程池的应用,平常的使用方式,之后我会更新ThreadPoolExecutor从源码分析线程池怎么工作的,希望有更深入的理解

java——多线程高并发系列之线程池(executor)的理解与使用(代码片段)

文章目录:写在前面Demo1(使用Executors创建线程池)Demo2(使用ThreadPoolExecutor创建线程池)关于ThreadPoolExecutor中的七大参数、四种拒绝策略线程池的执行策略写在前面可以以newThread(()->线程执行的任务).start();... 查看详情

高并发多线程基础之线程间通信与数据共享及其应用(代码片段)

前言本篇文章主要介绍的是java多线程之间如何通信,协同处理任务,以及数据共享,定时任务处理等操作。多线程之间通信的方式在实际开发过程中多个线程同时操作,有两种情况的,数据共享和线程间协作... 查看详情

高并发多线程之线程基础中生命周期线程封闭cpu缓存(代码片段)

...的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程包括由操作系统分配的内存空间,包含一个或多个线程。一个线程不能独立的存在,它必须是进程的一... 查看详情

高并发之——不得不说的线程池与threadpoolexecutor类浅析(代码片段)

...说,Java的线程池技术是Java最核心的技术之一,在Java的高并发领域中,Java的线程池技术是一个永远绕不开的话题。既然Java的线程池技术这么重要(怎么能说是这么重要呢?那是相当的重要,那家伙老重要了,哈哈哈),那么,... 查看详情

高并发中,那些不得不说的线程池与threadpoolexecutor类(代码片段)

...细节的底层原理和源码实现。本文分享自华为云社区《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》,作者:冰河。既然Java中支持以多线程 查看详情

高并发之——从源码角度分析创建线程池究竟有哪些方式(代码片段)

前言在Java的高并发领域,线程池一直是一个绕不开的话题。有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用Executors工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池... 查看详情

高并发的epoll+线程池,业务在线程池内(代码片段)

我们知道,服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指“I/O线程”,即负责I/O操作,协调分配任务的“管理线程”,而实际的请求和任务通常交由所谓“工作者线程”处理。通常多线程模型下,每个线... 查看详情

并发编程系列之如何正确使用线程池?(代码片段)

并发编程系列博客并发编程系列之如何正确使用线程池?在上一章节的学习中,我们掌握了线程的基本知识,接着本博客会继续学习多线程中的线程池知识1、线程是不是越多越好?在学习多线程之前,读者可... 查看详情

高并发内存池(代码片段)

从零实现一个高并发内存池高并发内存池整体框架设计申请内存部分高并发内存池-ThreadCache高并发内存池-CentralCache高并发内存池-PagaCache释放内存部分高并发内存池-ThreadCache高并发内存池-CentralCache高并发内存池-PagaCache优化部分... 查看详情

高并发内存池(代码片段)

从零实现一个高并发内存池高并发内存池整体框架设计申请内存部分高并发内存池-ThreadCache高并发内存池-CentralCache高并发内存池-PagaCache释放内存部分高并发内存池-ThreadCache高并发内存池-CentralCache高并发内存池-PagaCache优化部分... 查看详情

多线程之线程池threadpoolexecutor(代码片段)

作用并发编程的艺术降低资源消耗提高响应速度提高线程的可管理性码出高效java开发手册利用线程池管理并复用线、控制最大并发数等实现任务线程队列缓存策略和拒绝机制实现某些与时间相关的功能,如定时执行,周... 查看详情

多线程之线程池threadpoolexecutor(代码片段)

作用并发编程的艺术降低资源消耗提高响应速度提高线程的可管理性码出高效java开发手册利用线程池管理并复用线、控制最大并发数等实现任务线程队列缓存策略和拒绝机制实现某些与时间相关的功能,如定时执行,周... 查看详情

高并发多线程安全之信号量线程组守护线程线程栅栏等的分析(代码片段)

高并发多线程安全之原子性问题、CAS机制及问题解决方案多线程编程之java内存模型(JMM)与可见性问题前言前两篇文章主要分析的是线程安全中java内存模型;以及常见的线程安全三大问题:原子性、可见性、有序性问题这... 查看详情

java——多线程高并发系列之创建多线程的三种方式(threadrunnablecallable)(代码片段)

...xff09;写在前面历时一个星期,终于整完了Java多线程高并发这个系 查看详情

juc并发编程之completablefuture基础用法(代码片段)

目录实现多线程的四种方式方式一:继承Thread类方式二:实现Runnable接口方式三:实现Callable接口方式四:线程池创建异步对象回调方法handle方法 线程串行化 任务组合组合任务单任务完成及执行实现多线程的四... 查看详情

juc并发编程之completablefuture基础用法(代码片段)

目录实现多线程的四种方式方式一:继承Thread类方式二:实现Runnable接口方式三:实现Callable接口方式四:线程池创建异步对象回调方法handle方法 线程串行化 任务组合组合任务单任务完成及执行实现多线程的四... 查看详情

测开之并发编程篇・《并发并行线程队列进程和协程》(代码片段)

并发编程并发和并行多任务概念并发和并行同步和异步多线程threading模块介绍自定义线程类线程任务函数的传递多线程资源共享和资源竞争问题GIL全局解释锁互斥锁通过锁解决资源竞争问题死锁队列队列的方法FIFO先入先出队列LI... 查看详情

java——多线程高并发系列之threadlocal的使用(代码片段)

文章目录:写在前面Demo1Demo2Demo3写在前面除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决为每个线程绑定自己的值。Demo1packagecom.szh.threadlocal;/***ThreadLocal的基本使用*/publicclassTest01//定义一... 查看详情