Linux下生产者消费者模型

2023-05-16

Linux下生产者消费者模型

  • 一、什么是生产者消费者模型
  • 二、代码实现
  • 三、运行结果与修改

一、什么是生产者消费者模型

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦,提高效率的。

本文中使用条件变量和互斥锁保证该模型更加合理、安全。

队列我们直接使用STL中的queue来实现,但是STL并不保证临界资源(队列)的安全,所以本文中使用锁来保证线程对临界资源的访问。

条件变量使用场景:

当队列满了的时候,生产者就不应该再生产了(不要竞争锁了),应该让消费者来消费;
当队列空的时候,消费者就不应该再消费了(不要竞争锁了),应该让生产者来生产。

生产者、消费者、队列关系:

生产者与消费者我们使用两个线程模拟(单生产者单消费者),那么生产什么?消费什么呢?在此我们设置一个Task类,只需要把想要实现的功能放入Task类当中即可。大家可以自由发挥,本文生产者生产一个int类型0~2999的随机值,消费者判断是否为闰年。既然是一个Task任务,所以我们在向队列中Push和Pop的时候也是以Task为整体,所以我们的queue需要使用模板。

二、代码实现

Makefile

cpmain:CpMain.cpp
	g++ $^ -o $@ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f cpmain

注意:不管是否使用Makefile,g++后面一定要添加-lpthread导入线程库,因为线程相关操作在第三方库中

BlockQueue.hpp主要实现队列的同步与互斥

#pragma once
#include <iostream>
#include <queue>

const int g_cap = 6;

template <class T>
class BlockQueue
{
public:
  BlockQueue(int cap = g_cap)
      : _cap(cap)
  {
    // 初始化锁
    pthread_mutex_init(&_mutex, nullptr);
    // 初始化条件变量
    pthread_cond_init(&_full, nullptr);
    pthread_cond_init(&_empty, nullptr);
  }

  void Push(const T &in)
  {
    // 向队列中生产资源
    LockQueue();
    while (IsFull())
    // 使用while判断而不是if  主要是为了避免挂起失败和伪唤醒的情况
    {
      // 如果_bq队列满了,生产者不应该继续生产而是等待

      ProducterWait();
      // pthread_cond_wait
      //  调用的时候,会首先自动释放_mutex,然后再挂起自己!
      // pthread_cond_signal
      //  返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
    }
    _bq.push(in);
    // 唤醒消费者
    WakeupConsumer();
    // 唤醒代码与解锁代码谁先谁后无所谓,因为唤醒必须要竞争到锁才行,生产者没释放锁之前,消费者不可能被唤醒
    UnLockQueue();
  }

  void Pop(T *out)
  {
    // 消费队列中的资源
    LockQueue();
    while (IsEmpty())
    {
      // 如果_bq队列空了,消费者不应该继续消费而是等待
      ConsumerWait();
    }
    *out = _bq.front();
    _bq.pop();
    // 唤醒生产者
    WakeupProducter();
    UnLockQueue();
  }

  ~BlockQueue()
  {
    // 释放锁
    pthread_mutex_destroy(&_mutex);
    // 释放条件变量
    pthread_cond_destroy(&_full);
    pthread_cond_destroy(&_empty);
  }

private:
  bool IsFull()
  {
    return _bq.size() == _cap;
  }

  bool IsEmpty()
  {
    return _bq.size() == 0;
  }

  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }

  void UnLockQueue()
  {
    pthread_mutex_unlock(&_mutex);
  }

  void ProducterWait()
  {
    // 调用的时候,会首先自动释放_mutex,然后再挂起自己!
    pthread_cond_wait(&_empty, &_mutex);
    // 生产者等待空这个条件变量
  }

  void ConsumerWait()
  {
    pthread_cond_wait(&_full, &_mutex);
    // 消费者等待满这个条件变量
  }

  void WakeupProducter()
  {
    // 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
    pthread_cond_signal(&_empty);
  }

  void WakeupConsumer()
  {
    pthread_cond_signal(&_full);
  }

private:
  std::queue<T> _bq;      // 阻塞队列
  int _cap;               // 队列容量
  pthread_mutex_t _mutex; // 互斥锁
  pthread_cond_t _full;   // 条件变量->队列满限制生产者->消费者在该条件下等待
  pthread_cond_t _empty;  // 条件变量->队列空限制消费者->生产者在该条件下等待
};

pthread_cond_wait 调用的时候,会首先自动释放_mutex,然后再挂起自己!
pthread_cond_signal 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!

Task.hpp主要实现业务逻辑

#pragma once

#include <iostream>

const int g_year = 2000;

class Task
{
public:
  Task()
  {
  }

  Task(int year)
      : _year(year)
  {
  }

  int GetYear()
  {
    return _year;
  }

  // 检测一个年份是否为闰年 与当前对象没有关系
  static bool IsLeapYear(int year)
  {
    if ((0 == year % 4 && 0 != year % 100) ||
        (0 == year % 400))
    {
      return true;
    }

    return false;
  }

  // 检测当前对象是否为闰年
  bool handler() const
  {
    return IsLeapYear(_year);
  }

  ~Task()
  {
  }

private:
  int _year;
};

CpMain.cpp生产者生产一个任务后等待1秒,消费者会立即消费,但是此时队列为空,他只能等待生产者生产数据,不会一直竞争锁。

#include "BlockQueue.hpp"
#include "Task.hpp"

#include <iostream>
#include <time.h>
#include <unistd.h>

void *Consumer(void *args)
{
  BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
  while (true)
  {
    Task t;
    bq->Pop(&t);
    bool isLeapYear = t.handler();
    int year = t.GetYear();
    if (isLeapYear)
    {
      std::cout << "消费者:" << year << " 年是闰年!" << std::endl;
    }
    else
    {
      std::cout << "消费者:" << year << " 年不是闰年!" << std::endl;
    }
  }
}

void *Producter(void *args)
{
  BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
  while (true)
  {
    // 产生0~2099的随机数
    int year = rand() % 3000;
    Task t(year);
    std::cout << "生产者:" << year << " 年是闰年吗?" << std::endl;
    bq->Push(t);
    sleep(1);
  }
}

int main()
{
  srand((long long)time(nullptr)); // 生成随机数
  BlockQueue<Task> *bq = new BlockQueue<Task>();
  pthread_t c, p; // 创建两个线程充当生产者和消费者
  pthread_create(&c, nullptr, Consumer, (void *)bq);
  pthread_create(&p, nullptr, Producter, (void *)bq);

  pthread_join(c, nullptr); // 等待线程
  pthread_join(p, nullptr);
  return 0;
}

三、运行结果与修改

在这里插入图片描述
关于代码的可修改:

1、CpMain.cpp中生产者函数中每生产一个任务sleep1秒。当然也可以是消费者sleep1秒,场景就是生产者一次生产队列最大容量(BlockQueue中设置为6)也可能看到一次打印1~7条数据的情况,并不是代码写错了,而是生产一个任务很快,而处理任务或者打印一条数据是需要时间的,也就造成了一点误差。

2、文中使用单生产者单消费者,如果有必要也可以多添加几个线程实现多生产多消费。

3、关于Task.hpp也可以重载()使用仿函数:
Task.hpp中只需要把原来的bool handler()const函数换成下面的函数即可:

  // 仿函数
  bool operator()()
  {
    return IsLeapYear(_year);
  }

在CpMain.cpp中也不需要调用handler了,修改如下:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux下生产者消费者模型 的相关文章

  • Tomcat Intellij Idea:远程部署

    RackSpace 云服务器 Ubuntu 12 04 Intellij Idea 11 1 2 Windows 8 Tomcat 7 0 26 JDK 6 在 Intellij Idea 上 当我尝试在远程 Tomcat 7 服务器上运行
  • 为什么 Linux 原始套接字的 RX 环大小限制为 4GB?

    背景 我试图mmap 我的原始套接字的 RX 环形缓冲区64 bitLinux 应用程序 我的环由 4096 个块组成 每个块大小为 1MB 总共 4GB 请注意 每个 1MB 块中可以有许多帧 如果您好奇 请参阅此文档了解背景信息 htt
  • 在 C 中使用单个消息队列是否可以实现双向通信

    我希望服务器向客户端发送一些消息 并让客户端确认它 我被分配了这个任务 我可以在 C linux 中使用单个消息队列来完成它还是我需要创建两个 谢谢 是的 可以使用 sysV 消息队列来做到这一点 从您之前的问题来看 您正在使用该队列 您可
  • python获取上传/下载速度

    我想在我的计算机上监控上传和下载速度 一个名为 conky 的程序已经在 conky conf 中执行了以下操作 Connection quality alignr wireless link qual perc wlan0 downspe
  • 使用 \r 并打印一些文本后如何清除控制台中的一行?

    对于我当前的项目 有一些代码很慢并且我无法使其更快 为了获得一些关于已完成 必须完成多少的反馈 我创建了一个进度片段 您可以在下面看到 当你看到最后一行时 sys stdout write r100 80 n I use 80覆盖最终剩余的
  • linux-x64 二进制文件无法在 linuxmusl-x64 平台上使用错误

    我正在安装Sharp用于使用 package json 的 Nodejs 项目的 docker 映像上的映像压缩包 当我创建容器时 我收到有关 Sharp 包的以下错误 app node modules sharp lib libvips
  • Linux 上的 Pervasive ODBC 错误 [01000][unixODBC][驱动程序管理器]无法打开 lib '/usr/local/psql/lib/odbcci.so':找不到文件

    我正在尝试让 Pervasive v10 客户端 ODBC 在 Centos 6 上运行 据我所知 没有 64 位 ODBC 客户端 因此我必须使用 32 位客户端 我终于成功安装了它 但尝试使用时出现以下错误 isql v mydsn 0
  • GMail 421 4.7.0 稍后重试,关闭连接

    我试图找出为什么它无法使用 GMail 从我的服务器发送邮件 为此 我使用 SwiftMailer 但我可以将问题包含在以下独立代码中
  • 并行运行 shell 脚本

    我有一个 shell 脚本 打乱大型文本文件 600 万行和 6 列 根据第一列对文件进行排序 输出 1000 个文件 所以伪代码看起来像这样 file1 sh bin bash for i in seq 1 1000 do Generat
  • .net-core:ILDASM / ILASM 的等效项

    net core 是否有相当于 ILDASM ILASM 的功能 具体来说 我正在寻找在 Linux 上运行的东西 因此为什么是 net core ildasm 和 ilasm 工具都是使用此存储库中的 CoreCLR 构建的 https
  • 从 Xlib 转换为 xcb

    我目前正在将我的一个应用程序从 Xlib 移植到 libxcb 但在查找有关我有时使用的 XInput2 扩展的信息时遇到了一些麻烦 libxcb 中有 XInput2 实现吗 如果是的话 在哪里可以找到文档 目前我在使用此功能时遇到问题
  • 如何使用waf构建共享库?

    我想使用构建一个共享库waf http code google com p waf 因为它看起来比 GNU 自动工具更容易 更简洁 到目前为止 我实际上有几个与我开始编写的 wscript 有关的问题 VERSION 0 0 1 APPNA
  • 内核的panic()函数是否完全冻结所有其他进程?

    我想确认内核的panic 功能和其他类似kernel halt and machine halt 一旦触发 保证机器完全冻结 那么 所有的内核和用户进程都被冻结了吗 是panic 可以被调度程序中断吗 中断处理程序仍然可以执行吗 用例 如果
  • 与 pthread 的进程间互斥

    我想使用一个互斥体 它将用于同步对两个不同进程共享的内存中驻留的某些变量的访问 我怎样才能做到这一点 执行该操作的代码示例将非常感激 以下示例演示了 Pthread 进程间互斥体的创建 使用和销毁 将示例推广到多个进程作为读者的练习 inc
  • 绕过 dev/urandom|random 进行测试

    我想编写一个功能测试用例 用已知的随机数值来测试程序 我已经在单元测试期间用模拟对其进行了测试 但我也希望用于功能测试 当然不是全部 最简单的方法是什么 dev urandom仅覆盖一个进程 有没有办法做类似的事情chroot对于单个文件并
  • linux下如何从文本文件中获取值

    我有一些文本格式的文件 xxx conf 我在这个文件中有一些文本 disablelog 1 当我使用 grep r disablelog oscam conf 输出是 disablelog 1 但我只需要值1 请问你有什么想法吗 一种方法
  • 如何使用Android获取Linux内核的版本?

    如何在 Android 应用程序中获取 Linux 内核的版本 不是 100 确定 但我认为调用 uname r 需要 root 访问权限 无论如何 有一种不太肮脏的方法可以做到这一点 那就是 System getProperty os v
  • 这种文件锁定方法可以接受吗?

    我们有 10 个 Linux 机器 每周必须运行 100 个不同的任务 这些计算机主要在我们晚上在家时执行这些任务 我的一位同事正在开发一个项目 通过使用 Python 自动启动任务来优化运行时间 他的程序将读取任务列表 抓取一个打开的任务
  • 如何在 Mac OSX Mavericks 中正确运行字符串工具?

    如何在 Mac OSX Mavericks 中正确运行字符串工具 我尝试按照我在网上找到的示例来运行它 strings a UserParser class 但我收到此错误 错误 Applications Xcode app Content
  • 复制目录内容

    我想将目录 tmp1 的内容复制到另一个目录 tmp2 tmp1 可能包含文件和其他目录 我想使用C C 复制tmp1的内容 包括模式 如果 tmp1 包含目录树 我想递归复制它们 最简单的解决方案是什么 我找到了一个解决方案来打开目录并读

随机推荐

  • Android Studio编译无错,但在模拟器上无法运行App

    今天在学习郭霖老师的 第一行代码 第三版时碰到一个问题 xff0c 明明运行无错 xff0c 却无法在模拟器上正常运行 一开始提示说 Waiting for all target devices to come online xff0c 在
  • 突破CloudFlare五秒盾付费版

    使用的第三方库 cloudscraper 可以绕过免费版的五秒盾 但遇到付费版就无能为力了 最近在爬币圈的网站 xff0c 其中有一个网站叫做 xff1a Codebase 1 使用的就是付费版的 CloudFlare 五秒盾 当我们使用
  • CentOS-7 配置 SSH 远程登录

    CentOS 7 配置 SSH 远程登录 CentOS 配置 SSH 远程登录一 环境二 配置网络1 检查网络状态2 下载 net tools 工具3 检查主机与虚拟机是否正常连通 三 SSH 配置1 检查 CentOS 系统是否已经安装了
  • Linux中如何使用systemctl进行服务的管理?

    一 运行级别的分类 runlevel 运行级别0 xff1a 系统停机状态 运行级别1 xff1a 单用户工作状态 xff0c root权限 xff0c 用于系统维护 xff0c 禁止远程登陆 运行级别2 xff1a 多用户状态 没有NFS
  • Hibernate的配置

    Hibernate 的配置 1 创建工程 xff1b 2 导入 Jar 包 xff1b 3 写 Hibernate 配置文件 xff08 hibernate cfg xml xff09 a 数据库连接信息 b 映射文件包含配置 4 创建表和
  • vue3 axios

    Axios 是一个基于 promise 网络请求库 xff0c 作用于node js 和浏览器中 简介 Axios 是一个用于浏览器和Node的基于承诺的简单HTTP客户端 它提供了一个易于使用的库 xff0c 占地面积小 它还有一个可扩展
  • Linux系统四种安软方式

    Linux软件安装方式 本地安装 把需要的软件压缩包下载到linux主机 xff0c 在主机上直接解压启动即可完成本地安装 xff0c 即绿色版安装 把需要的rpm软件包下载到linux主机 xff0c 在主机上使用rpm命令完成本地安装
  • 关于vscode安装扩展插件提示:获取扩展失败,XHR error

    在我们安装vscode扩展插件时 xff0c 出现报错 xff1a error while fetching extensions XHR error 搜了很多网友的解决方案 xff0c 比如修改网络代理设置 xff0c 修改hosts文件
  • simulink模块,提供xpctarget下驱动源码

    simulink模块 xff0c 提供xpctarget下驱动源码 77999632700099250风中的蜗牛
  • SQL SERVER创建字段注释

    第一种方法是用SQL SERVER的管理工具 表设计中的列属性自带说明 xff0c 填写会自动生成注释 第二种方法 如果在navicat等工具上无法可视化创建注释的 xff0c 需要执行语句 EXEC sys sp addextendedp
  • Android平台上如何让应用程序获得系统权限以及如何使用platform密钥给apk签名

    您好 xff0c 欢迎关注我的专栏 xff0c 本篇文章是关于 Flutter 的系列文 xff0c 从简单的 Flutter 介绍开始 xff0c 一步步带你了解进入 Flutter 的世界 你最好有一定的移动开发经验 xff0c 如果没
  • Android 11 Settings源码入门,我就不信你还听不明白了

    前言 曾听过很多人说Android学习很简单 xff0c 做个App就上手了 xff0c 工作机会多 xff0c 毕业后也比较容易找工作 这种观点可能是很多Android开发者最开始入行的原因之一 在工作初期 xff0c 工作主要是按照业务
  • Linux破解密码

    1 重启虚拟机 xff0c 在引导界面按 e xff08 按鼠标左键 xff0c 用键盘控制上下 xff09 xff0c 进入类界面 xff0c 把中间的ro改为 rw rd break 2 按住Ctrl 43 x xff0c 进入紧急界面
  • “移除”虚拟机和“从磁盘中删除”虚拟机的区别

    1 二者的区别 xff1a 移除 虚拟机操作只是在虚拟机上删除了 xff0c 并没有在Windows系统中删除相关文件 xff0c 是部分删除 xff1b 而 从磁盘中删除 是既在虚拟机上删除了 xff0c 也删除了Windows系统中的相
  • Linux常用命令

    一条命令的结构 xff1a 用户名 64 主机名 工作目录 提示符 lt 命令 gt 选项 参数1 参数2 一 文件操作类命令 1 touch命令 xff1a 用于建立文件或更新文件的修改日期 1 语法格式 xff1a touch 参数 文
  • 内部类

    链接 xff1a https www nowcoder com questionTerminal 48524c47dd924887be6684b17175fa40 1 为什么使用内部类 使用内部类最吸引人的原因是 xff1a 每个内部类都能
  • CentOS 8本地离线YUM源的配置

    1 准备好CentOS 8相同版本号的系统镜像文件 2 添加光驱硬件 xff0c 在光驱中调用iso镜像文件 xff08 具体操作 xff1a 先打开设置里面的CD DVD xff0c 再点击使用ISO镜像文件 xff0c 选择浏览会跳转到
  • Linux操作系统:管理用户和组

    任务一 xff1a Linux用户类型和组群 1 Linux系统下的用户账户分为三种 超级用户 xff08 root xff09 xff1a 拥有系统的最高权限 xff0c 可以不受限制的对任何文件和命令进行操作 xff0c 对系统具有绝对
  • 在CentOS_8中添加新的硬盘

    添加新硬盘的具体步骤 xff1a 第一步 xff1a 第二步 xff1a 第三步 xff1a xff08 注意 xff1a 这里选择 SATA A xff0c 其优点是可随时使用 xff0c 无需重启 xff1b 而 SCSI S 需要重启
  • Linux下生产者消费者模型

    Linux下生产者消费者模型 一 什么是生产者消费者模型二 代码实现三 运行结果与修改 一 什么是生产者消费者模型 生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题 生产者和消费者彼此之间不直接通讯 xff0c 而通过阻塞队