生产者消费者模型详解(代码片段)

小赵小赵福星高照~ 小赵小赵福星高照~     2022-12-09     688

关键词:

生产者消费者模型

文章目录

什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

我们首先举个例子便于理解生产者消费者模型:

我们平常都会去超市买一些需要的物品,给我们提供产品的是供应商,我们是消费者,我们是去超市消费,给我们提供产品的而是供应商,为什么我们会去超市呢?因为超市给我们提供了交易场所,生产者消费者模型本质是生产者把产品放到超市里面,消费者把产品从超市中拿出去。

计算机中的数据相当于产品,数据的加工处理是CPU以任务形式加工处理,生产者和消费者代表的是多线程或者多进程,超市相当于一段内存

生活中生产者将产品放在一个交易场所,让我们自己去拿,为什么这样呢?是因为效率高,比如菜鸟驿站的场景,快递公司把一批包裹放进菜鸟驿站,然后我们去拿就好了,提高了效率

厂商将交易场所当作缓冲区来用,生产者和消费者一般是进程和线程,空间或者交易场所:一块"内存块",产品相当于数据

生产者消费者321原则(自己总结:3种关系,2种角色。一个交易场所):

3种关系,生产者和生产者,生产者和消费者,消费者和消费者

生产者和生产者竞争关系(互斥),生产者和消费者(同步),消费者和消费者竞争关系(互斥)

2种角色,生产者和消费者

1个交易场所:一块"内存块"

int add(int a,int b)

    return a+b;

int main()

    int a = 10;
    int b = 20;
    int c = add(a,b);

方式一:

方式二:

第一种方式main函数生产a和b后调用add,main函数需要等待add函数执行完才能继续工作,这种称为强耦合,这种工作方式是串行,这种方式耦合度高,而第二种方式是线程1去执行main函数生成a和b,线程2去执行add函数,它们直接有一块区域来给线程1放数据和给线程2取数据,当线程2在add的时候,线程1也可以放数据,这种方式称为并行,也称之为解耦

并行是一种需求,表示有很多业务活动同时进行。异步是一种代码编写方式,同步和串行:同步是一种代码编写方式,串行是一种需求

基于BlockingQueue的生产者消费者模型

阻塞队列有什么用?

如果不把数据往阻塞队列里面放,假设数据通过网络给服务器用了10ms,服务器让线程1把数据写入数据库用了5ms,一共15ms,用户一共等待15ms,如果把数据往阻塞队列里面放,因为阻塞队列是在内存当中的,而数据库是文件是需要进行IO的,放在阻塞队列的时间要比放在数据库的时间少一些,所以用户等待的时间就少一些。

下面我们写一个单生产者单消费者模型:

单生产者单消费者模型

Makefile的编写:

main:main.cc
    g++ $^ -o $@ -lpthread
.PHONY:clean
clean:
	rm -f main

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_H__
#define __BLOCK_QUEUE_H__
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
class BlockQueue

private:
    std::queue<int> q;
    size_t _cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;//消费者的条件不满足时,将来消费者在该条件变量下等
    pthread_cond_t p_cond;//生产者的条件不满足时,将来生产者在该条件下等
public:
    bool IsFull()
    
        return q.size() >= _cap;
    
    bool IsEmpty()
    
        return q.empty();
    
    void LockQueue()
    
        pthread_mutex_lock(&lock);
    
    void UnLockQueue()
    
        pthread_mutex_unlock(&lock);
    
    void WakeUpComsumer()
    
    	pthread_cond_signal(&c_cond);
    
    void WakeUpProductor()
    
    	pthread_cond_signal(&p_cond);
    
    void ProducterWait()
    
    	pthread_cond_wait(&p_cond,&lock);
        //这里为什么要传锁,我们在等待时肯定是条件不满足了,我们通过判断才知道条件满不满足,
        //判断就需要保证进入临界区,我们是持有锁进入的,wait的时候必须要释放锁
    	//在调用该函数的时候,自动会释放lock
        //当该函数被返回时,返回到了临界区内,所以,该函数会让该线程重新持有该锁
    
    void ComsumerWait()
    
    	pthread_cond_wait(&c_cond,&lock);//在消费者释放锁时,生产者正申请锁,而消费者在等待
    
public:
    BlockQueue(size_t cap):_cap(cap)
    
        pthread_mutex_init(&lock,nullptr);
        pthread_cond_init(&c_cond,nullptr);
        pthread_cond_init(&p_cond,nullptr);
    
    void Put(int in)
    
        LockQueue();
        //if(isFull())
        while(isFull())
        
            WakeUpComsumor();//唤醒消费者
            ProducterWait();//生产者等待
        
    	q.push(in);
        UnLockQueue();
    
    void Get(int& out)
    
        LockQueue();
        //if(IsEmpty)
        while(IsEmpty())
        
            WakeUpProductor();
            ComsumerWait();//消费者者等待
        
        out = q.front();
        q.pop();
        LockQueue();
    
    ~BlockQueue()
    
        pthread_cond_destroy(&lcok);
        pthread_cond_destroy(&c_cond);
        pthread_cond_destroy(&p_cond);   
    
;
#endif

main.cc

#include"BlockQueue.hpp"
using namespace std;
void* consumer_run(void* arg)

    BlockQueue *bq = (BlockQueue*)arg;
    while(true)
    
        int n = 0;
        bq->Get(n);
        cout<<"consumer data is : " << n <<endl;
    

void* productor_run(void* arg)

    BlockQueue *bq = (BlockQueue*)arg;
	while(true)
    
        int data = rand()%10+1;
        bq->Put(data);
        cout<<"product data is : "<<data<<endl;
    

int main()

    BlockQueue *bq = new BlockQueue(5);
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer_run,(void*)bq);
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete bq;
    return 0;

上面代码有个细节不能用if判断满和空,万一被提前唤醒或者等待函数调用失败,它们会继续执行push数据和pop数据,此时就是非法操作了

多生产者多消费者模型

上面是单生产者单消费者模型,只解决了生产者和消费者之间的关系,如果是多生产者多消费者模型那么需要添加另外两种关系:还有生产者和生产者的关系(互斥),以及消费者和消费者的关系(互斥)

所以我们只需要在外面生产者和消费者的例程中放数据和取数据加锁即可:

#include"BlockQueue.hpp"
using namespace std;
pthread_mutex_t c_lock;
pthread_mutex_t p_lock;

void* consumer_run(void* arg)

    BlockQueue *bq = (BlockQueue*)arg;
    while(true)
    
        int n = 0;
        pthread_mutex_lock(&c_lock);
        bq->Get(n);
        pthread_mutex_unlock(&c_lock);
        cout<<"consumer data is : " << n <<endl;
    

void* productor_run(void* arg)

    BlockQueue *bq = (BlockQueue*)arg;
	while(true)
    
        pthread_mutex_lock(&p_lock);
        int data = rand()%10+1;
        bq->Put(data);
        pthread_mutex_unlock(&p_lock);
        cout<<"product data is : "<<data<<endl;
    

int main()

	BlockQueue *bq = new BlockQueue(5);
	pthread_t c,p;
	pthread_mutex_init(&c_lock,nullptr);
	pthread_mutex_init(&p_lock,nullptr);
    
    pthread_create(&c,nullptr,consumer_run,(void*)bq);
    pthread_create(&c,nullptr,consumer_run,(void*)bq);
    pthread_create(&c,nullptr,consumer_run,(void*)bq);
    pthread_create(&c,nullptr,consumer_run,(void*)bq);
    
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    pthread_create(&p,nullptr,productor_run,(void*)bq);
    
    
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete(bq);
    pthread_mutex_destroy(&c_lock);
	pthread_mutex_destroy(&p_lock);
    return 0;

那么上面做的事情有什么用呢?下面我们来看下面这个代码:

#ifndef __QUEUE_BLOCK_H__
#define __QUEUE_BLOCK_H__
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
class Task

public:
    int _x;
    int _y;
public:
    Task()
    
    Task(int x,int y):_x(x),_y(y)
    
    int Run()
    
        return _x+_y;
    
    ~Task()
    
;
class BlockQueue

private:
    std::queue<Task> q;
    size_t _cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;//消费者的条件不满足时,将来消费者在该条件变量下等
    pthread_cond_t p_cond;//生产者的条件不满足时,将来生产者在该条件下等
public:
    bool IsFull()
    
        return q.size() >= _cap;
    
    bool IsEmpty()
    
        return q.empty();
    
    void LockQueue()
    
        pthread_mutex_lock(&lock);
    
    void UnLockQueue()
    
        pthread_mutex_unlock(&lock);
    
    void WakeUpComsumer()
    
    	pthread_cond_signal(&c_cond);
    
    void WakeUpProductor()
    
    	pthread_cond_signal(&p_cond);
    
    void ProducterWait()
    
    	pthread_cond_wait(&p_cond,&lock);
        //这里为什么要传锁,我们在等待时肯定是条件不满足了,我们通过判断才知道条件满不满足,
        //判断就需要保证进入临界区,我们是持有锁进入的,wait的时候必须要释放锁
    	//在调用该函数的时候,自动会释放lock
        //当该函数被返回时,返回到了临界区内,所以,该函数会让该线程重新持有该锁
    
    void ComsumerWait()
    
    	pthread_cond_wait(&c_cond,&lock);//在消费者释放锁时,生产者正申请锁,而消费者在等待
    
public:
    BlockQueue(size_t cap):_cap(cap)
    
        pthread_mutex_init(&lock,nullptr);
        pthread_cond_init(&c_cond,nullptr);
        pthread_cond_init(&p_cond,nullptr);
    
    void Put(Task t)
    
        LockQueue();
        //if(isFull())
        while(isFull())
        
            WakeUpComsumor();//唤醒消费者
            ProducterWait();//生产者等待
        
    	q.push(t);
        UnLockQueue();
    
    void Get(Task& t)
    
        LockQueue();
        //if(IsEmpty)
        while(IsEmpty())
        
            WakeUpProductor();
            ComsumerWait();//消费者者等待
        
        t = q.front();
        q.pop();
        UnLockQueue();
    
    ~BlockQueue()
    
        pthread_cond_destroy(&lcok);
        pthread_cond_destroy(&c_cond);
        pthread_cond_destroy(&p_cond);   
    
;

#endif
#include"BlockQueue.hpp"

pthread_mutex_t c_lock;
pthread_mutex_t p_lock;
void* r1(void* arg)

    //生产者
   BlockQueue* bq =(BlockQueue*)arg;
   while(true)
   
       pthread_mutex_lock(&p_lock);
       int x = rand()%10+1;
       int y = rand()%100+1;
       Task t(x,y);
       bq->Put(t);
       pthread_mutex_unlock(&p_lock);
       cout<<"Product Task is: "<<x<<'+'<<y<<"= ?"<<endl;
   

void* r2(void* arg)

    //消费者
    BlockQueue* bq = (BlockQueue*)arg;
    while(true)
    
        //取数据
        Task t;
        pthread_mutex_lock(&c_lock);
        bq->Get(t);
        pthread_mutex_unlock(&c_lock);
        cout<<"Consumer: "<<t._x<<"+"<<t._y<<"="<<t.Run()<<endl;
    

int main()

    
    BlockQueue* bq = new BlockQueue(5);
    pthread_t c,p;
    pthread_mutex_init(&p_lock,NULL);
    pthread_mutex_init(&c_lock,NULL);
    pthread_create(&p,NULL,r1,(void*)bq);
    pthread_create(&p,NULL,r1,(void*)bq);
    pthread_create(&p,NULL,r1,(void*)bq);
    pthread_create(&p,NULL,r1,(void*)bq);

    pthread_create(&c,NULL,r2,(void*)bq);
    pthread_create(&c,NULL,r2,(void*)bq);
    pthread_create(生产者消费者模型详解(代码片段)

生产者消费者模型文章目录生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型什么是生产者消费者模型生产者消费者模式就是通过一个容器来解决生产者... 查看详情

生产者消费者模型详解(代码片段)

生产者消费者模型文章目录生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型什么是生产者消费者模型生产者消费者模式就是通过一个容器来解决生产者... 查看详情

深入了解androidhandler机制原理详解(代码片段)

...ff1a;子线程handler主线程其实构成了线程模型中的经典问题生产者-消费者模型。生产者-消费者模型:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中 查看详情

万字详解linux系列多线程(下)(代码片段)

...(2)代码使用(3)关于pthread_cond_wait二、生产者消费者模型1.什么是生产者消费者模型2.相关概念(1)一个交易场所(2)三种角色(3)三种关系3.基于阻塞队列的单生产者、单消费者模型&#x... 查看详情

rabbitmq详解(代码片段)

...消息持久化3.1交换机持久化3.2队列持久化3.3签收机制3.3.1生产者签收机制事务机制Confirm机制3.3.2消费 查看详情

生产者-消费者问题详解(代码片段)

1.前言  生产者-消费者问题是经典的线程同步问题(我会用java和c分别实现),主要牵扯到三个点: 一:能否互斥访问共享资源(不能同时访问共享数据); 二:当公共容器满时,生产者能否继续生产(生产者应阻塞并... 查看详情

linux生产者消费者模型(代码片段)

文章目录Linux生产者消费者模型生产者消费者的概念生产者消费者模型的特点生产者消费者模型的优点基于BlockingQueue的生产者消费者模型C++queue模拟阻塞队列实现生产者消费者模型Linux生产者消费者模型生产者消费者的概念... 查看详情

生产者消费者模型(代码片段)

1、生产者消费者模型作用和示例如下:1)通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率,这是生产者消费者模型最重要的作用2)解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费... 查看详情

linux多线程——生产者消费者模型(代码片段)

目录一.生产者消费者模型    1.1什么是生成者消费者模型    1.2生产者消费者模型的优点    1.3基于阻塞队列实现生产者消费者模型     1.4POSIX信号量    1.4.1信号量概念    1.4.2P操作和V操作    1.4.3理解信号量   ... 查看详情

c++实现生产者和消费者模型(代码片段)

C++实现生产者和消费者模型C++实现生产者和消费者模型1、实现细节2、单生产者-单消费者模型3、单生产者-多消费者模型4、多生产者-单消费者模型5、多生产者-多消费者模型参考C++实现生产者和消费者模型1、... 查看详情

生产者消费者模型(代码片段)

什么是消费者生产者模型:生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给... 查看详情

生产者消费者模型及队列,进程池(代码片段)

 生产者消费者模型生产者消费者模型主要是为了解耦可以借助队列来实现生产者消费者模型栈:先进后出(FirstInLastOut简称FILO)队列:先进先出(FirstInFirstOut简称FIFO)importqueue#不能进行多进程之间的数据传输(1)frommultiprocessingimportQueu... 查看详情

生产者消费者模型(代码片段)

一生产者消费者模型介绍为什么要使用生产者消费者模型生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,... 查看详情

生产者消费者模型(代码片段)

1.生产者与消费者之间的关系(1)生产者与生产者之间的关系      互斥(2)消费者与消费者之间的关系      互斥(3)生产者与消费者之间的关系      同步与互斥2.生产者消费者模型的描述  ... 查看详情

生产者-消费者模型(代码片段)

1.条件变量+互斥锁实现生产者-消费者模型:/*借助条件变量模拟生产者-消费者问题*/#include<stdlib.h>#include<unistd.h>#include<pthread.h>#include<stdio.h>/*链表作为公享数据,需被互斥量保护*///模拟箩筐structmsgstructmsg*next;intnu... 查看详情

生产者消费者模型(代码片段)

#生产者消费者模型#在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程#如果生产者速度快而消费者处理速度慢,或生产者处理速度慢而消费者处理速度快,这样就会发生等待#为了解决这个问题于是就引入... 查看详情

生产者消费者模型(代码片段)

frommultiprocessingimportProcess,Queueimporttimedefproducer(q):foriinrange(6):res=‘包子%s‘%itime.sleep(0.5)print(‘生产者生产了%s‘%res)q.put(res)defcosumer(q):whileTrue:res=q.get()ifresisNone:breaktime.sleep(2 查看详情

条件变量生产者和消费者模型(代码片段)

生产者消费者条件变量模型线程同步典型的案例即为生产者消费者模型,而借助条件变量来实现这一模型,是比较常见的一种方法。假定有两个线程,一个模拟生产者行为,一个模拟消费者行为。两个线程同时操作一个共享资源... 查看详情