_Linux多线程--生产者消费者模型篇

2023-05-16

文章目录

  • 1. 为何要使用生产者消费者模型
  • 2. 基于BlockingQueue的生产者消费者模型
  • 3. C++ queue模拟阻塞队列的生产消费模型
    • 条件变量使用规范
    • 简单测试
      • 1. BlockQueue (缓存--超市)
      • 2. ConProd.cc
      • 3. 结果展示
    • 升级版测试&&设计与RAII风格的加锁方式
      • 1. BlockQueue.hpp
      • 2. Task.hpp
      • 3. LockGuard.hpp(RAII风格的加锁方式)
      • 4. ConProd.cc
      • 5. 结果展示:

1. 为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  • 例子:
    就好比一个正在运行的网络程序;我们拿数据需要时间,我们处理数据也需要时间;那么当没数据时,另一个处理数据线程不就一直等吗?这就有点浪费效率了。那么中间有个缓存,就可以大大提高效率了。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

在这里插入图片描述

  • 三种关系:
    • 生产者和生成者(互斥关系、竞争)
    • 消费者和消费者(互斥关系、竞争)
    • 生产者和消费者(互斥、同步竞争)
  • 二种角色: 生产者/消费者
  • 一个交易场所:超市

2. 基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述
—(图片摘于相关教材资料)

3. C++ queue模拟阻塞队列的生产消费模型

条件变量使用规范

  • 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
  • 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
  • 例子:
    在这里插入图片描述
  • 注意:
  • pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!
  • 当我被唤醒时,我从哪里醒来呢??
    • 从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的
    • 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
  • pthread_cond_wait: 但是只要是一个函数,就可能调用失败 && pthread_cond_wait: 可能存在 伪唤醒 的情况
    • 不用if判断;用while在这里插入图片描述

简单测试

1. BlockQueue (缓存–超市)

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

#define gDefaultCap 5
template <class T>
class BlockQueue
{
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }
    void push(const T &in) // 生产者
    {
        pthread_mutex_lock(&_mtx);
        while (isQueueFull())
            pthread_cond_wait(&_Full, &_mtx); // 检测临界资源条件
        // 数据满了; 等待消费者消费数据
        _bq.push(in); // 生产者放数据
        //if(_bq.size() >= _capacity/2) pthread_cond_signal(&_Empty); //数据积累到>=一半时再发送
        pthread_cond_signal(&_Empty); // 唤醒; 通知消费者,可以消费了
        pthread_mutex_unlock(&_mtx);
    }
    void pop(T *out) // 消费者
    {
        pthread_mutex_lock(&_mtx);
        while (isQueueEmpty())
            pthread_cond_wait(&_Empty, &_mtx); // 检测临界资源条件
        // 数据为空; 等待生产者生产数据
        *out = _bq.front();
        _bq.pop();           // 消费者消费
        pthread_cond_signal(&_Full); // 唤醒; 通知生产者,可以生成了了
        pthread_mutex_unlock(&_mtx);
    }

public:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

2. ConProd.cc

#include <iostream>
#include <pthread.h>
#include <ctime>
#include <unistd.h>

#include "BlockQueue.hpp"

void *productor(void *args)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)args;
    while (true)
    {
        int a = rand() % 10 + 1;
        bq->push(a);
        std::cout << "productor生成的数据是" << a << std::endl;
    }
    return nullptr;
}

void *consumer(void *args)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)args;
    while (true)
    {
        usleep(rand()%1000);
        int a;
        bq->pop(&a);
        std::cout << "consumer消费的数据是" << a << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    // 随机数种子
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x202300);
    BlockQueue<int> *bq = new BlockQueue<int>();
    // 创建线程
    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    // 等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    // 释放资源
    delete bq;

    return 0;
}

3. 结果展示

在这里插入图片描述

升级版测试&&设计与RAII风格的加锁方式

1. BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

#include "LockGuard.hpp"
#define gDefaultCap 5
template <class T>
class BlockQueue
{
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }
    void push(const T &in) // 生产者
    {
        // pthread_mutex_lock(&_mtx);
        LockGuard Lockguard(_mtx);
        while (isQueueFull())   //访问临界资源,100%确定,资源是就绪的!
            pthread_cond_wait(&_Full, &_mtx); // 检测临界资源条件
        // 数据满了; 等待消费者消费数据
        _bq.push(in); // 生产者放数据
        //if(_bq.size() >= _capacity/2) pthread_cond_signal(&_Empty); //数据积累到>=一半时再发送
        pthread_cond_signal(&_Empty); // 唤醒; 通知消费者,可以消费了
        //pthread_mutex_unlock(&_mtx);
    }// 出了函数后自动调用lockgrard 析构函数
    void pop(T *out) // 消费者
    {
        // pthread_mutex_lock(&_mtx);
        LockGuard Lockguard(_mtx);
        while (isQueueEmpty())
            pthread_cond_wait(&_Empty, &_mtx); // 检测临界资源条件
        // 数据为空; 等待生产者生产数据
        *out = _bq.front();
        _bq.pop();           // 消费者消费
        pthread_cond_signal(&_Full); // 唤醒; 通知生产者,可以生成了了
        // pthread_mutex_unlock(&_mtx);
    }

public:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

2. Task.hpp

#pragma once

#include <iostream>
#include <functional>

using func_t = std::function<int(int, int)>;

class Task
{
public:
    Task() {}                                                   // 便于获取任务
    Task(int x, int y, func_t func) : _x(x), _y(y), _func(func) // 制作任务
    {}

    int operator()() // 仿函数
    {
        return _func(_x, _y); // 函数调用
    }

public:
    int _x;
    int _y;
    func_t _func;
};

3. LockGuard.hpp(RAII风格的加锁方式)

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t& mtx):_mtx(mtx)
    {}
    void lock()
    {
        std::cout << "要进行加锁" << std::endl;
        pthread_mutex_lock(&_mtx);
    }
    void unlock()
    {
        std::cout << "要进行解锁" << std::endl;
        pthread_mutex_unlock(&_mtx);
    }
    ~Mutex()
    {}
public:
    pthread_mutex_t _mtx;
};

// RAII风格的加锁方式
class LockGuard
{
public:
    LockGuard(pthread_mutex_t& mtx):_mx(mtx)
    {
        _mx.lock();
    }
    ~LockGuard()
    {
        _mx.unlock();
    }

private:
    Mutex _mx;    
};

4. ConProd.cc

#include <iostream>
#include <pthread.h>
#include <ctime>
#include <unistd.h>

#include "BlockQueue.hpp"
#include "Task.hpp"

int myMul(int x, int y)
{
    return x * y;
}

void *productor(void *args)
{
    // BlockQueue<int> *bq = (BlockQueue<int> *)args;
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    while (true)
    {
        // int a = rand() % 10 + 1;
        // bq->push(a);
        // std::cout << "productor生成的数据是" << a << std::endl;

        // 制作任务
        int x = rand() % 10 + 1;
        usleep(rand() % 1000);
        int y = rand() % 5 + 1;
        std::cout << pthread_self() << ":发布任务..." << x << 'x' << y << "=" << '?' << std::endl; 
        Task t(x, y, myMul);
        bq->push(t);
        sleep(1);  
    }
    return nullptr;
}

void *consumer(void *args)
{
    // BlockQueue<int> *bq = (BlockQueue<int> *)args;
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    while (true)
    {
        // usleep(rand() % 1000);
        // int a;
        // bq->pop(&a);
        // std::cout << "consumer消费的数据是" << a << std::endl;
        // sleep(1);

        // 获取任务
        Task t;
        bq->pop(&t);

        // 完成任务
        std::cout << pthread_self() << ":执行任务..." << t._x << 'x' << t._y << "=" << t() << std::endl;
    }
    return nullptr;
}

#define CONSUMER_NUM 6
#define PRODUCTOR_NUM 3
int main()
{
    // 随机数种子
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x202300);
    // BlockQueue<int> *bq = new BlockQueue<int>();
    BlockQueue<Task> *bq = new BlockQueue<Task>();

    // 创建线程
    pthread_t c[CONSUMER_NUM], p[PRODUCTOR_NUM];
    for(int i=0; i<CONSUMER_NUM; ++i)
    {
        pthread_create(c+i, nullptr, consumer, bq);
    }
    for(int i=0; i<PRODUCTOR_NUM; ++i)
    {
        pthread_create(p+i, nullptr, productor, bq);
    }

    // 等待线程
    for(int i=0; i<CONSUMER_NUM; ++i)
    {
        pthread_join(c[i], nullptr);
    }
    for(int i=0; i<PRODUCTOR_NUM; ++i)
    {
        pthread_join(p[i], nullptr);
    }

    // 释放资源
    delete bq;

    return 0;
}

5. 结果展示:

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

_Linux多线程--生产者消费者模型篇 的相关文章

  • 二维码生成以及扫一扫解析二维码原理

    二维码生成以及扫一扫解析二维码原理 1 生成URL xff0c 确定要通过二维码传达的信息 xff0c 也就是通过扫一扫可以获得地址和数据信息 1 得到随机数 xff0c 用随机数得到签名 xff0c 签名验证身份 String ranSt
  • idea 不能生成target

    1 改module https blog csdn net qq 15304369 article details 93715206 2 pom配置文件 修改为 xff1a lt packaging gt jar lt packaging
  • mariadb 遇到的坑

    mariadb13 3 25 配置文件失效 xff08 折腾了很久 xff09 xff0c 当时我需要配置主从 xff0c 发现binlog无法打开 xff0c 配置了bin log项还是不行 xff01 当my cnf 文件权限过大时 x
  • CV资料汇总

    1 图像风格迁移 Neural Style 简史 https www sohu com a 221597595 236505 2 一文让你理解什么是卷积神经网络 https www jianshu com p 1ea2949c0056
  • skinmagic 对话框菜单展示

    我偶用skinmagic xff0c 在换对话框皮肤时候 xff0c 发现菜单不见了 xff0c 几经折腾 xff0c 发现SetWindowSkin m hWnd 34 Dialog 34 在iniInstance xff08 xff09
  • 系统如何支持高并发

    给个例子 xff0c 你的系统部署的机器是4核8G xff0c 数据库服务器是16核32G 此时假设你的系统用户量总共就10万 xff0c 用户量很少 xff0c 日活用户按照不同系统的场景有区别 xff0c 我们取一个较为客观的比例 xf
  • Firewalld防火墙基础

    目录 一 Firewalld 概述 1 1 Firewalld的简述 1 2 Firewalld 和 iptables的区别 1 3 firewalld的区域 1 3 1 firewalld的9个区域 1 3 2 firewalld的数据处
  • CentOS7安装Oracle JDK

    CentOS7默认安装的是OpenJDK 如果安装Oracle JDK xff0c 需要按如下方式操作 xff1a 1 登录http www oracle com technetwork java javase downloads inde
  • 百度2014校招笔试题(一)

    算法和程序设计题 xff1a 1 题意 xff1a 一幢大楼的底层有1001根电线 xff0c 这些电线一直延伸到大楼楼顶 xff0c 你需要确定底层的1001个线头和楼顶的1001次线头的对应关系 你有一个电池 xff0c 一个灯泡 xf
  • Acwing 1175.最大联通子图(tarjan缩点求scc)

    Acwing 1175 最大连通子图 题意 一个有向图 G 61 V E G 61 V
  • 用github搭建个人(博客网站

    x1f308 博客主页 xff1a 卿云阁 x1f48c 欢迎关注 x1f389 点赞 x1f44d 收藏 留言 x1f4dd x1f31f 本文由卿云阁原创 xff01 x1f64f 作者水平很有限 xff0c 如果发现错误 xff0c
  • 多线程下HashMap的死循环

    多线程下HashMap的死循环 Java的HashMap是非线程安全的 多线程下应该用ConcurrentHashMap 多线程下 HashMap 的问题 xff08 这里主要说死循环问题 xff09 xff1a 1 多线程put操作后 x
  • 找出一个图中所有的强连通子图

    如果一个有向图中的没对顶点都可以从通过路径可达 xff0c 那么就称这个图是强连通的 一个 strongly connected component就是一个有向图中最大的强连通子图 下图中就有三个强连通子图 xff1a 应用kosaraju
  • win7启动分区不存在,使用分区工具修正

    DiskGenius 分区右键 激活当前分区
  • getElementById获取不到td标签

    一次测试中发现 然后使用getElementById获取不到此标签 xff0c 将td改成div即可 不知道是不是单独使用td标签的问题 code
  • 应用宝YSDK支付接入技术细节

    前言 应用宝是出了名的坑 xff0c 主要体现在 xff1a 文档杂乱繁多信息不全或描述模糊文档格式不规范技术支持很不及时 并且可以明显察觉到为了兼容QQ和微信 xff0c 应用宝的接入规范有诸多不合理的地方 来来回回折腾了一周 xff0c
  • 用Word2007批量设置图片位置

    转自 xff1a http www ccw com cn college htm2010 20100727 877695 shtml Word2007的 查找和替换 功能并不仅仅可以对文字进行批量的查找替换 xff0c 还有很多神奇的功能
  • java-生产者消费者问题以及解决办法

    文章目录 1 生产者消费者问题概述2 生产者消费者问题的解决办法2 1 解决思路2 2 实现方法2 3 代码实现2 3 1 wait 和nofity 方法2 3 2 await signal 方法2 3 3 BlockingQueue阻塞队
  • 【Remote Development】VSCode 基于 SSH 进行远程开发

    系统需求 我们在 VSCode 下载由微软官方推出的 Remote SSH 插件 查看一下里面的描述 xff0c 对于远程机器的要求如下 xff1a Local A supported OpenSSH compatible SSH clie
  • git idea创建新分支,获取/合并主支代码的2个方法

    其他sql格式也在更新中 xff0c 可直接查看这个系列 xff0c 要是没有你需要的格式 xff0c 可在评论或私信我 个人目录 获取主支代码的2个方法 1 xff0c 创建一个分支 xff0c 获取主支的所有代码 xff08 场景 xf

随机推荐

  • spring手把手超详细讲解(基本配置,基于xml)

    spring教程 1 1 容器概述1 1 1 配置元数据1 1 2 容器的实例化1 1 3 容器的使用 1 2 bean的概述1 2 1 命名bean1 2 2 实例化Bean 1 3 依赖1 3 1 依赖注入1 3 2 使用 属性1 3
  • 18.5 重载全局new、delete、定位new及重载等

    一 xff1a 重载全局operator new和operator delete操作符 span class token macro property span class token directive hash span span cl
  • java进程占用CPU过高常见的两种情况及分析定位

    java进程爆cpu的快速定位 1 背景 在程序开发的过程中 xff0c 难免遇到进程占用cpu过高 xff08 现网居多 开发环境 xff09 的情况 xff0c 现网出现这种情况就需要及时的能定位到问题 xff0c 快速解决 xff0c
  • 【Android ViewBinding】内存泄露

    场景 在MainActivity中分别加载两个Fragment处理业务 首先触发加载SecondFragment xff1a MainActivity触发 supportFragmentManager commit add R id con
  • Shell小脚本实现一键关机/重启虚拟机

    利用Shell脚本实现一键关机 重启虚拟机 xff0c 解决每次虚拟机关机或重启都需要手动一个个关机或重启的烦恼 xff01 1 脚本一 xff1a shut sh span class token comment bin bash spa
  • LAMP环境搭建

    前言 一 在虚拟机上安装Linux系统 二 安装Apache 1 下载好后 xff0c 看了看版本 xff0c 不是太老 xff0c 就没有继续安装 2 开启Apache服务 3 设置Apache开机启动服务 4 尝试一下是否启动了服务 x
  • 小程序跳坑之安卓真机不能访问服务器的问题

    因为一项目 xff0c 有几个页面都需要访问服务器 xff0c 从服务器上下载数据 xff0c 在苹果和开发者工具上都运行完美 xff0c 唯独一款安卓手机 xff0c 访问不了 xff0c 经测试 xff0c 发现是汉字编码问题 xff0
  • python Tkinter 界面button调用多进程函数,弹出多个相同界面

    这是我的界面button command的函数start simulate 这是我的多进程函数 xff1a 点击之后 xff0c 弹出多个相同界面 把调用多进程的函数在 if name 61 61 39 main 39 这里调用就不会出现多
  • python入门之if-else语句

    文章目录 一 if语句二 elif语句三 if嵌套语句四 else语句1五 else语句2六 if else语句举例1七 if else语句举例2 一 if语句 span class token keyword if span False
  • Ubuntu 16.04 远程桌面

    1 安装xrdp sudo apt get install xrdp 2 安装vnc4server 我这里是安装xrdp的时候自动安装的 我看网上很多说是需要单独安装的 3 安装xfce4 sudo apt get install xubu
  • GitLab端口冲突 解决办法

    访问gitlab xff0c 出现 xff1a 502 GitLab在使用的过程中 xff0c 会开启80端口 xff0c 如果80端口被其他的应用程序占用 xff0c 则GitLab的该项服务不能使用 xff0c 所以访问GitLab会失
  • Android开发 之 确认凭证

    确认凭证 主要目的 xff1a 设置不用验证时间 设置为30秒 xff0c 当超过30秒后则需要重新验证身份才能操作 您的应用可以根据用户在多久之前最后一次解锁设备来验证其身份 此功能让用户不必费心记忆应用特定密码 xff0c 您也无需实现
  • inner join、outer join、right join、left join 之间的区别

    inner join outer join right join left join 之间的区别 一 sql的left join right join inner join之间的区别 left join 左联接 返回包括左表中的所有记录和右
  • "大泥球"仍然是最常见的软件设计

    大泥球 xff0c 是指杂乱无章 错综复杂 邋遢不堪 随意拼贴的大堆代码 这些年来 xff0c 为了对付这个泥球 xff0c 我们看到了多种指导方法 xff0c 比如SOLID GRASP 和KISS xff0c 与其他诸多年代久远的 提倡
  • 3322.org带来的麻烦

    大概是3322 org被短时间攻破 xff0c 下载他的动态域名客户端的时候下到一个病毒Trojandropper js adagent gd xff0c 把江民关了 xff0c 并且再也开不开 系统还原不行 xff0c 安全模式也进不去
  • Qt学习笔记:多线程的使用

    文章目录 前言1 何时使用线程2 QThread类实现多线程2 1 多线程的实现方法2 2 线程休眠2 3 正确结束线程 3 线程同步3 1 互斥量3 2 信号量3 3 条件变量 4 线程池参考资料 前言 程序中调用耗时的操作 xff08
  • 上位机开发笔记:环形缓冲区

    文章目录 前言1 环形缓冲区工作机制1 1 实现原理1 2 区分缓冲区满或者空1 总是保持一个存储单元为空2 使用计数数据3 镜像指示位 2 Qt实现环形缓冲区2 1 QByteArray环形缓冲区2 2 QSemaphore实现环形缓冲区
  • IDEA搭建Spring框架环境

    IDEA搭建Spring框架环境 一 spring 框架概念 spring 是众多开源 java 项目中的一员 xff0c 基于分层的 javaEE 应用一站式轻量 级开源框架 xff0c 主要核心是 Ioc 控制反转 依赖注入 与 Aop
  • SQL SERVER中索引类型包括的三种类型分别是

    xfeff xfeff 唯一索引 UNIQUE 聚集索引 CLUSTERED xff09 非聚集索引 NONCLUSTERED xff09 主键与唯一索引的区别 主键是一种约束 xff0c 唯一索引是一种索引 xff0c 两者在本质上是不同
  • _Linux多线程--生产者消费者模型篇

    文章目录 1 为何要使用生产者消费者模型2 基于BlockingQueue的生产者消费者模型3 C 43 43 queue模拟阻塞队列的生产消费模型条件变量使用规范简单测试1 BlockQueue 缓存 超市 2 ConProd cc3 结