java生产消费模型—arrayblockingqueue详解(代码片段)

xinxinblog xinxinblog     2022-12-04     185

关键词:

背景需求

  生产消费模型是线程协作关系中十分常见的一种。通常,一个(多个)线程负责生产,一个(多个)线程可以从生产的列表中获取并消费;生产的内容可以按需求设计,可以是一个Integer,可以是String,可以Object,也可以是任意类型的对象,只要有生产消费的需求。

  例如,厨师负责生产美食,放在桌子上,服务员负责取走(消费)美食。这里,厨师就扮演着生产者的身份,美食是生产的内容,服务员就扮演着消费者的身份。

  下面用这个厨师与服务员的案例来分析下生产消费模型需要实现哪些功能才能满足需求:

如何实现这个需求

  若要实现以上的需求,我们该考虑哪些方面呢?

(1)厨师是厨师,负责做美食;服务员负责消费美食。厨师与服务员可以同时运行(两个独立线程)。

(2)厨师与服务员作为两个独立线程,必须有一个约定好的公共区域:厨师把生产好的美食往这个区域放,服务员从这个区域取。并且,厨师与服务员并不想和对方接触过多(低耦合),只想和这个公共区域(桌子)打交道。

(3)通常,先生产的内容应该被先消费(先做的美食先送给顾客,防止凉了),符合FIFO特性。若要选取某种数据结构的容器作为公共区域,Queue是最佳方案(符合FIFO特性)。

(4)并发有危险厨师和服务员都在这个公共区域(Queue)中操作,同时操作可能存在问题。例如服务员正在从区域A拿盘子时,厨师把新的盘子也往区域A放,会发生碰撞;又如,同一个盘子可能有多个服务员过来争抢;也可能,多个厨师做好了美食把盘子往同一个区域放,也会发生碰撞。

  因此,需要实现并发的保护:厨师(生产者)往桌子(Queue)上放盘子(生产)之前,先获取锁,以保证他在操作共享区域(Queue)时没有其他厨师或者服务员过来争抢导致发生冲突;在放完之后,释放掉锁,让其他的厨师或者服务员操作。服务员操作时也是一个道理,要先获取锁,操作完成之后要释放锁。

(5)阻塞的需求:若桌子(Queue)空了,服务员该怎么办呢?是每隔几秒钟过来看一下桌子?不好,因为这样太累(轮训方式开销大,并不知道什么时候Queue中才会有新的盘子)。

  比较好的方案是:在桌子上放一个BB机(Queue中实现条件变量),和厨师约定好:若桌子空了,服务员可以去睡觉,等厨师做好饭了,通过BB机呼叫一下服务员(唤醒消费线程)(若Queue消费完毕,消费线程可以阻塞等待【 队列非空】的条件,当生产线程有新的生产内容,把内容放进Queue之后,通过条件变量唤醒消费线程)。而桌子没空的时候(Queue中一直有数据),服务员可以一直工作,则不需要睡觉(消费线程一直消费,不需要等待)。

  同理,也可能出现相反的场景:服务员比较少,端盘子比较慢,而厨师比较多,做饭比较快(生产速度快于消费速度)。这时,若桌子无限大(无界队列),那厨师会一直往桌子上放,导致桌子上盘子越来越多;而若桌子大小有限(有界队列),那么当桌子放满了之后,那就没地儿放了,咋办?

  可以用一样的方式,再在队列内部添加一个条件变量,当队列满了,生产者则等待该队列【队列未满】条件的发生,同时休眠等待。当消费者消费一次之后,触发【队列未满】的条件,这时生产者可以被唤醒继续工作。

Java类库中成熟的设计-ArrayBlockingQueue

为了满足无数场景下以上类似的需求,jdk中加入了该线程安全阻塞FIFO队列的实现类:ArrayBlockingQueue,继承关系如下:

 

技术图片

 

首先,BlockingQueue最基础的是个集合Collection;

同时,实现了Queue的接口,因此具备普通Queue的特性,可以offer/add以添加元素至队列尾部,可以poll以从队列头部取内容,可以peek查看队列头的元素。

同时,实现了BlockingQueue的接口,在Queue基础上实现的特性:

(1)一个是线程安全,可以并发offer,可以并发poll,可以并发同时offer和poll,内部是加锁ReentrantLock实现的;

(2)另一个,就是阻塞功能。

  >> 当调用blockingQueue.put(E e)接口想将元素入队列时,若队列未满,则直接入队列(enqueue);

  若队列已满,则notFull.await()休眠等待条件变量【notFull队列未满】的发生,才唤醒线程继续生产。

  >> 当调用blockingQueue.take()接口时想从队列中取队列头的元素时,若队列为空,则直接取走(dequeue);

  若队列已空,则notEmpty.await()休眠等到条件变量【notEmpry队列未满】的发生,才唤醒线程继续消费。

源码解读

下面,带着以上这些概念的基础,看下源码实现。

首先,成员:

/** The queued items */
final Object[] items; //保存生产内容对象

/** items index for next take, poll, peek or remove */
int takeIndex; //数组下一个要消费位置

/** items index for next put, offer, or add */
int putIndex; //数组中下一个要生产存放的位置

/** Number of elements in the queue */
int count; //当前总共存放的内容对象数量

/** Main lock guarding all access */
final ReentrantLock lock; //并发操作的互斥,读取、写入之前都要获取该锁

/** Condition for waiting takes */
private final Condition notEmpty; //队列非空的条件变量,用于唤醒因队列空掉而阻塞的消费者线程

/** Condition for waiting puts */
private final Condition notFull; //队列非满的条件变量,用于唤醒因队列已满导致阻塞的生产者线程

  从以上的成员可以看得出来,数据是存放在数组Object[] items,并用putIndex指示下一个将要存放的位置,用getIndex存放下一个将要取元素的位置。

例如,假设items容量为5

在存入之前,应该是这样:

<<operation0>>
0      1      2      3      4
null    null    null    null    null
putIndex=0
takeIndex=0

存了一个‘A‘之后,应该是这样: putIndex++

<<operation1>>
0      1      2      3      4
‘A‘     null     null    null     null
      putIndex=1
getIndex=0

  

再存入一个‘B‘之后,应该是这样:putIndex++

<<operation2>>

0      1      2      3      4
‘A‘     ‘B’      null    null     null
            putIndex=2
getIndex=0

 

取一个元素出来,应该是这样:对头的元素‘A‘被取出来了,getIndex++

<<operation3>>
0      1      2      3      4
null    ‘B’      null    null     null
            putIndex=2
      getIndex=1

  

再存入2个元素:

<<operation4>>
0      1      2      3      4
null    ‘B’      ‘C‘      ‘d‘     null
                          putIndex=3
      getIndex=1

 

此时putIndex已经到头(4),若要再存入,则循环到0:


<<operaion5>>
0      1      2      3      4
null    ‘B’      ‘C‘      ‘d‘     ‘E‘
putIndex=0
      getIndex=1

 

此时,若再存入一个,则满了

<<operation6>>
0      1      2      3      4
‘F‘     ‘B’      ‘C‘      ‘d‘     ‘E‘
      putIndex=1
      getIndex=1

会发现,putIndex已经赶上了getIndex,没有空间了,那么生产者就会阻塞并等待【队列非满】条件变量的发生。

等到消费者再取一个元素出来,就会触发【队列非满】条件变量,让生产者线程唤醒继续生产。

<<operation7>>
0      1      2      3      4
‘F‘     null     ‘C‘      ‘d‘     ‘E‘
      putIndex=1
            getIndex=2

  

下面贴出部分源码,对应上述思路:

take(), put()

    public E take() throws InterruptedException 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            while (count == 0)
                notEmpty.await();
            return dequeue();
         finally 
            lock.unlock();
        
    

private E dequeue() 
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;

  

    public void put(E e) throws InterruptedException 
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            while (count == items.length)
                notFull.await();
            enqueue(e);
         finally 
            lock.unlock();
        
    

private void enqueue(E x) 
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();

 

offer(), add()

再贴一下其他类似接口的源码:

public boolean offer(E e) 
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lock();
	try 
		if (count == items.length)
			return false;
		else 
			enqueue(e);
			return true;
		
	 finally 
		lock.unlock();
	



public boolean add(E e) 
	if (offer(e))
		return true;
	else
		throw new IllegalStateException("Queue full");

 offer, add与put职责类型,区别在于:

offer若因队列满了直接返回false,比较温和;而add因队列满了会抛出异常,比较强制;而put若队列满了,会阻塞等待知道队列有位置了再插入元素。

 

poll()

 

    public E poll() 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            return (count == 0) ? null : dequeue();
         finally 
            lock.unlock();
        
    

 

poll()与take()类似,区别在于:

poll时若队列为空,那么直接返回null;而take时,若队列为空,会阻塞直到队列不为空了,再返回队列中的数据;

 














生产者消费者模型

内容:1.什么是生产者消费者模型2.python实现生产者消费者模型3.Java实现生产者消费者模型 参考:https://www.cnblogs.com/Eva-J/articles/8253549.html  查看详情

java实现多线程生产者消费模型及优化方案

生产者-消费者模型是进程间通信的重要内容之一。其原理十分简单,但自己用语言实现往往会出现很多的问题,下面我们用一系列代码来展现在编码中容易出现的问题以及最优解决方案。/*  单生产者、单消费者生产烤鸭  ... 查看详情

java生产者——消费者线程模型

...hronized)的使用。其PopThread类对应消费者,PushThread类对应生产者,SafeStack对应存放资源的仓库。下面的TestSafeStack创建了1个生产者对象,1个存放资源的仓库对象,2个消费者对象。消费者类:1/*2*通过实现Runnable接口实现线程3*/4pub... 查看详情

生产者消费者模型-java代码实现

什么是生产者-消费者模式  比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消... 查看详情

java第62节生产者消费者模型

2016-07-02packagecom.java1995;importjava.util.List;/***生产者**@authorAdministrator**/publicclassProducerextendsThread{privateList<Integer>list;privateintmax;//构造方法publicProducer(Stringname,intmax, 查看详情

生产者消费者主线程在其他线程运行时退出[重复]

...码sn-p:publicclassProducerConsumerTestpublicstaticvoidmain(String[]args)ArrayBlockingQ 查看详情

java多线程的生产者与消费者模型,线程间的通信

java多线程中的生产者与消费者模式:首先有一个阻塞队列,生产者将生产的东西放到队列里,消费者再从队列中取。当队列中的东西数量达到其容量就发生阻塞。importjava.util.Random;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.c... 查看详情

生产者消费者模型详解(代码片段)

生产者消费者模型文章目录生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型什么是生产者消费者模型生产者消费者模式就是通过一个容器来解决生产者... 查看详情

linux生产者消费者模型(代码片段)

文章目录Linux生产者消费者模型生产者消费者的概念生产者消费者模型的特点生产者消费者模型的优点基于BlockingQueue的生产者消费者模型C++queue模拟阻塞队列实现生产者消费者模型Linux生产者消费者模型生产者消费者的概念... 查看详情

java多线程_生产者消费者模式1

生产者消费者模型    具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。(图片来自网... 查看详情

消息中间件activemq学习笔记[java编码mq,消费者生产者基本模型](代码片段)

...:尚硅谷ActiveMQ教程快速入门文章目录1.Java编码MQ,模拟基础生产者消费者自定义消息生产者自定义同步阻塞式的消息消费者异步监听方式的消费者关于3种常见的消费者问题队列案例总结2.Topic主题队列.主题比较MQ产品比较JMSTopic主... 查看详情

生产者和消费者模型

生产者-消费者模型网上有很多生产者-消费者模型的定义和实现。本文研究最常用的有界生产者-消费者模型,简单概括如下:生产者持续生产,直到缓冲区满,阻塞;缓冲区不满后,继续生产消费者持续消费,直到缓冲区空,阻... 查看详情

生产者消费者模型(代码片段)

一生产者消费者模型介绍为什么要使用生产者消费者模型生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,... 查看详情

java多线程:生产者消费者模型(代码片段)

文章目录1.生产者消费者1.1生产者和消费者模式概述1.2经典案例:生产者和消费者1.2.1Object类的等待和唤醒方法1.2.2代码实现1.3生产者和消费者案例优化1.3.1代码实现1.生产者消费者生产者消费者模式是一个十分经典的多线程协... 查看详情

生产者消费者模型(代码片段)

1、生产者消费者模型作用和示例如下:1)通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,这是生产者消费者模型最重要的作用2)解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费... 查看详情

c++实现生产者和消费者模型(代码片段)

C++实现生产者和消费者模型C++实现生产者和消费者模型1、实现细节2、单生产者-单消费者模型3、单生产者-多消费者模型4、多生产者-单消费者模型5、多生产者-多消费者模型参考C++实现生产者和消费者模型1、... 查看详情

生产者消费者模型

 生产者消费者模型在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。  为什么要使用生产者和消费者模式在线程世... 查看详情

生产者消费者模型

  生产者消费者模型在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。  为什么要使用生产者和消费者模式在线... 查看详情