Linux----生产者和消费者模型

2023-05-16

文章目录

    • 生产者和消费者模型的概念
    • 基于阻塞队列下的生产者和消费者模型
    • 信号量
    • 基于循环队列下的生产者和消费者模型
    • 线程池的概念
    • 读者-写者模型
    • 自旋锁

生产者和消费者模型的概念

优点:
1,将生产环节和消费环节解耦。
2,极大的提高效率。
3,支持并发。

321原则
3种关系:
1,生产者和生产者:竞争 和 互斥。
2,消费者和消费者:竞争 和 互斥。
3,生产者和消费者: 同步 和 互斥。

2种角色:
生产者和消费者:本质是执行流

1个交易场所:一段缓冲区(缓冲区)。
在这里插入图片描述

基于阻塞队列下的生产者和消费者模型

当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素,
当队列满时,往队列里存放元素的操作也会被阻塞。
主要练习的是条件变量,并对条件变量的操作函数进行掌握
在这里插入图片描述

C++代码

amespace chen
{
    const int default_cap = 5;
    template <class T>
    class block_queue
    {
    public:
        block_queue()
            : _cap(default_cap)
        {
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_isfull, nullptr);
            pthread_cond_init(&_isempty, nullptr);
        }

        //加锁
        void pq_lock()
        {
            pthread_mutex_lock(&_mtx);
        }

        //解锁
        void pq_unlock()
        {
            pthread_mutex_unlock(&_mtx);
        }
        
        //让消费者等待,当队列为空的时候
        void consumer_wait()
        {
            pthread_cond_wait(&_isempty, &_mtx);
        }
        
        //唤醒消费者
        void wakeup_consumer()
        {
            pthread_cond_signal(&_isempty);
        }

        void product_wait()
        {
            pthread_cond_wait(&_isfull, &_mtx);
        }

        void wakeup_product()
        {
            pthread_cond_signal(&_isfull);
        }

        bool isfull()
        {
            return _cap == _bq.size();
        }

        bool isempty()
        {
            return _bq.size() == 0;
        }

        //生产者生产
        void Push(const T& in)
        { 
            //会有多个生产者线程同时访问,临界区需要加锁
            pq_lock();

            //为什么要while不用if呢?
            //防止挂起失败或者伪唤醒,如果是if就是直接向下执行了,但是队列任然是满的
            while(isfull()) 
            {
                cout << "队列为满: " << _cap <<  endl;
                product_wait();
            } 
            
            _bq.push(in);
            //整个位置说明已经有数据了,如果说消费者被挂起了,应该被唤醒
            wakeup_consumer();
            pq_unlock();

        }
          
        //消费者消费
        void Pop(T* out)
        {
            //多个消费者线程访问,需要加锁的
            pq_lock();
            while(isempty())
            {
                consumer_wait();
            }

            *out = _bq.front();
            _bq.pop();
            //消费者已经消费了,说明有位置了,如果生产者被挂起了,这时候应该唤醒它
            wakeup_product();
            pq_unlock();
        }

        ~block_queue()
        {
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_isfull);
            pthread_cond_destroy(&_isempty);
        }

    private:
        queue<T> _bq;
        int _cap;
        pthread_mutex_t _mtx;      //保护临界资源的锁
        pthread_cond_t _isfull;      //条件变量,用来表示队列为满,消费者该消费
        pthread_cond_t _isempty;     //条件变量,表示队列为空,生产者该生产了
    };

}

信号量

什么是信号量呢?
本质就是一个计数器,描述临界资源的数量。
临界资源本质上可以划分一个一个的小资源,通过信号量,完全可以让多个线程去
同时访问临界资源中的不同区域,从而实现并发。

线程操作函数:
在这里插入图片描述

基于循环队列下的生产者和消费者模型

复习一下循环队列
在这里插入图片描述
我们如何实现基于循环队列下的生产者和消费者模型呢? 我们这里不需要多加一个位置
利用信号量:定义两个信号量。 一个表示sem_blank 临界资源格子的数量, 一个表示sem_data临界资源数据的数量。

同一个位置,为空或者为满,生产者和消费者需要同步互斥。
不在同一个位置,可以并发执行。

C++代码:

#pragma once

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <unistd.h>
#include <vector>
using namespace std;

namespace chen
{
   template <class T>
   class ring_queue
   {
   public:
      ring_queue(int cap = 10)
          : _cap(cap) //循环队列默认容量大小为10
            ,_pstep(0)
            ,_cstep(0)
      {
         _ring_queue.resize(_cap,0);
         pthread_mutex_init(&_pmtx, nullptr);
         pthread_mutex_init(&_cmtx, nullptr);

         sem_init(&_blanksem, 0, 10); // 0表示线程间共享,10是个数
         sem_init(&_datasem, 0, 0);
      }

      void Push(const T &val)
      {
         //申请信号量
         sem_wait(&_blanksem); // p()操作 --

         pthread_mutex_lock(&_pmtx);
         _ring_queue[_pstep] = val;
         cout << "生产下标" << _pstep << endl;
         _pstep++;
         _pstep %= _cap;
         pthread_mutex_unlock(&_pmtx);
         

         sem_post(&_datasem); // v()操作 ++
      }

      void Pop(T *out)
      {
         sem_wait(&_datasem);
         
         //要多消费者,需要加锁 ,step相当于是临界资源了
         pthread_mutex_lock(&_cmtx);
         *out = _ring_queue[_cstep];

         cout << "消费下标" << _cstep << endl;

         _cstep++; 
         _cstep %= _cap;
         pthread_mutex_unlock(&_cmtx);

         sem_post(&_blanksem);
      }

      ~ring_queue()
      {
         pthread_mutex_destroy(&_pmtx);
         pthread_mutex_destroy(&_cmtx);

         sem_destroy(&_blanksem);
         sem_destroy(&_datasem);
      }

   private:
      vector<T> _ring_queue;
      int _cap;

      //两把锁,多生产,多消费,因为pstep,_cstep对于生产者和消费者是临界资源
      pthread_mutex_t _pmtx;
      pthread_mutex_t _cmtx;

      sem_t _blanksem; //信号量,格子的个数,生产者关注的
      sem_t _datasem;  //信号量,数据的个数,消费者关注的

      int _pstep; //下标位置
      int _cstep;
   };
}

线程池的概念

在这里插入图片描述

#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

using namespace std;

namespace chen
{
    template<class T>
    class thread_pool
    {
    public:
           thread_pool()
           :_num(5)
           {
               pthread_cond_init(&_cond, nullptr);
               pthread_mutex_init(&_mtx, nullptr);
           }
           
           void Lock()
           {
              pthread_mutex_lock(&_mtx);
           }

           void Unlock()
           {
              pthread_mutex_unlock(&_mtx);
           }

           void Wait()
           {
              pthread_cond_wait(&_cond, &_mtx);
           }

           void Wakeup()
           {
              pthread_cond_signal(&_cond);
           }

          // 在类中要让线程执行类内成员方法,是不行的,因为this指针要占用第一个传参位置
          // 必须让线程执行静态方法,
           static void* pthread_run(void* args)
           {
               pthread_detach(pthread_self()); //线程分离,不需要等待。

               thread_pool<T>* tp = (thread_pool<T>*)args;

               while(true)
               {
                    tp->Lock();
                    while(tp->_task_queue.empty())
                    {
                        tp->Wait();
                    }
                      
                    tp->pop_task(); 

                    tp->Unlock();

                    sleep(5);
               }
                
           }

           void init_thread_pool()
           {
               pthread_t tid;
               for(int i = 0; i < _num; i++)
               {
                   pthread_create(&tid, nullptr, pthread_run, (void*)this); //这里必须要传参
               }
           }

           void push_task(const T& in)
           {
                //多个线程可能会产生任务,也是需要加锁的,为了方便,这里只用一个线程产生任务
    
                _task_queue.push(in);
             
                 
                Wakeup(); //可以唤醒消费线程来执行了
           }

           void pop_task()
           {
              // *out = _task_queue.front();
               _task_queue.pop();
               cout << "我是线程:" << pthread_self() << "正在处理任务" << endl;
           }


           ~thread_pool()
           {
               pthread_cond_destroy(&_cond);
               pthread_mutex_destroy(&_mtx);
           }
    private:
            int _num;
            queue<T> _task_queue;
            pthread_cond_t  _cond; //当任务队列中没有任务的时候,所有的线程都要挂起等待。
            pthread_mutex_t _mtx;  //保护临界区
    };
}

读者-写者模型

适合的是多读少写的场景
写独占,读共享,读锁优先级高

321原则
3:3种关系:
读者和读者 (无关系,因为读者不会取走数据,与消费者生产者模型不同,是因为消费者会取走资源)
写者和写者:互斥
读者和写者:互斥,同步

2:2种角色 2个执行流(线程)

1个交易场所:一段缓冲区 或者一个STL容器(queue,vector)

优先级的概念:
读者优先:当读者和写者同时到来的时候,让读者先访问。

写者优先:当读者和写者同时来的时候, 当前读者和将要晚来的读者都要等待,都不进入临界区,当临界区没有读者的时候
该写者才进入临界区。

自旋锁

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux----生产者和消费者模型 的相关文章

随机推荐