生产者和消费者的三种实现方式(Java)

2023-05-16

什么是生产者消费者问题 ?

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

注:该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。

同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

问题分析

需要注意以下几点:

  1. 在缓冲区为空时,消费者不能再进行消费;
  2. 在缓冲区为满时,生产者不能再进行生产;
  3. 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步;
  4. 注意生产和消费的互斥条件。

方式一:synchronized、wait和notify

定义 Data 资源类,类中定义资源仓库的大小,当前资源个数。资源类的incrment()和decrement()方法是synchronized 的。生产者/消费者线程共享一个资源Data。

public class NotifyAndWaitTest1 {

    // 模拟 一个资源的获取和释放

    // 步骤
    // 生产者:
    //      1 判断资源是否充裕;
    //          1 如果资源充裕,就没必要再生产了,等待消费者消费完资源为止
    //      2 如果资源不足,就必须立即生产资源
    //          1 资源生产完之后,必须通知消费者
    // 消费者:
    //      1 判断资源是否充裕;
    //          1 如果资源不足,就不能再消费了,等待生产者生产出资源为止
    //      2 如果资源充足
    //          1 直接消费,之后,再通知生产者
    //
    // 注意:
    // Object#notifyAll 方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。
    // 此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。
    // 即,最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。
    //
    // 问题:
    // 1 唤醒线程的问题:
    //      1 有可能会出现极端情况,
    //          1 每次唤醒的都是生产者线程,消费者线程一直处于就绪状态,如果生产者不判断生产的必要性,那么,资源就会越积越多,超过仓库的容量。
    //          2 也可能,每次唤醒的都是消费者线程,生产者生产完第一个资源,就一直处于就绪状态,如果消费者不判断是否可以消费,那么,就会出现 资源负数
    //
    // 解决:
    // 每次被唤醒后,都判断是否应该被唤醒,否则,就再次进入阻塞状态
    // 1 方案一:(情况发生后,补救)
    //      1 生产者:每次被唤醒后,都判断(生产的必要性) 再生产资源是否会超过仓库的容量
    //      2 消费者:每次被唤醒后,都判断(是否可以消费)消费之后是否会出现 资源负数,即,我要的资源,是否都充足
    //
    // 2 方案二:(既然是Object#notifyAll 引起的问题,就不让这种情况发生)
    //      1 设置两把锁,消费者锁和生产者锁
    //      2 所有生产者共享生产者锁,所有消费者共享消费者锁

    public static void main(String[] args) throws Exception {

        Data data = new Data();

        // A 线程,生产资源 10 个
        new Thread(()->{
            for (int i = 0; i < 555; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        // B 线程,消费资源 10 个
        new Thread(()->{
            for (int i = 0; i < 555; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();


        // C 线程,生产资源 10 个
        new Thread(()->{
            for (int i = 0; i < 666; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        // D 线程,消费资源 10 个
        new Thread(()->{
            for (int i = 0; i < 666; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();

    }

    static class Data{
        // 当前资源个数
        private int data = 0;
        // 资源仓库的最大容量为 3
        private final int MAX_SIZE = 3;

        // +1
        public synchronized void increment() throws InterruptedException {
            // 资源充裕,等待消费者消费
            // if (data >= 1){
            //     // 无限阻塞,直到被唤醒
            //     this.wait();
            // }
            while (data + 1 > MAX_SIZE){
                // 没必要生产,无限阻塞,直到被唤醒
                this.wait();
            }

            // 生产
            data++;
            System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
            // 随机唤醒一个线程
            // 这里其实应该唤醒一个消费者
            // 但是,由于唤醒是随机的,所以,可能唤醒生产者
            // 所以,在唤醒之后,生产者要判断是否有必要生产
            // if 应该换成 while
            this.notifyAll();
        }

        // -1
        public synchronized void decrement() throws InterruptedException {
            // 资源不足,等待生产者生产
            // if (data <= 0){
            //     this.wait();
            // }
            while (data - 1 < 0){
                // 资源不足,无限等待,直到被唤醒
                this.wait();
            }

            // 消费
            data--;
            System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
            // 随机唤醒一个线程
            // 这里其实应该唤醒一个生产者
            // 但是,由于唤醒是随机的,所以,可能唤醒消费者
            // 所以,在唤醒之后,消费者要判断是否可以消费
            // if 应该换成 while
            this.notifyAll();
        }
    }
}

方式二:lock和condition的await、signalAll

public class NotifyAndWaitTest2 {

    public static void main(String[] args) {

        Data data = new Data();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();
    }

    static class Data{
        // 公平锁,类似于队列,默认 =false 是非公平锁
        private final ReentrantLock lock = new ReentrantLock(true);
        // 资源监视器
        private Condition condition = lock.newCondition();
        // 当前资源数
        private int count = 0;
        // 仓库资源的最大数量
        private final int MAX_COUNT = 3;

        // ++; 生产资源
        public void increment(){
            lock.lock();
            try {
                while (count + 1 > MAX_COUNT){
                    // 无限等待,直到 被唤醒
                    condition.await();
                }
                count++;
                System.out.println("[" + Thread.currentThread().getName() +"] +1 " + count);
                // 唤醒所有
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        // --;消费资源
        public void decrement(){
            lock.lock();
            try {
                while (count - 1 < 0){
                    // 无限等待,直到 被唤醒
                    condition.await();
                }
                count--;
                System.out.println("[" + Thread.currentThread().getName() +"] -1 " + count);
                // 唤醒所有
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }
}

方式三:BlockingQueue

定义Data资源类,资源类持有一个BlockingQueue。生产者/消费者线程共享一个资源类Data的成员变量,调用Queue的put()和take() 实现生产和消费。

  • BlockingQueue#put: 添加一个元素,如果队列满了,则阻塞
  • BlockingQueue#take: 移除并返回队列头部的元素,如果队列为空则阻塞
public class BlockingQueueConsumerProducer {

    public static void main(String[] args) {
        // 资源
        Data data = new Data();
        // 生产者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"A").start();
        // 消费者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"D").start();
    }

    static class Data{

        // 阻塞队列
        private final BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(3);

        public void increment(){
            try {
                // 1 代表资源
                // 添加一个元素,如果队列满了,则阻塞
                resourceQueue.put(1);
                System.out.println(Thread.currentThread().getName() + " "
                        + resourceQueue.size() +
                        " +");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void decrement(){
            try {
                // 移除并返回队列头部的元素,如果队列为空则阻塞
                resourceQueue.take();
                System.out.println(Thread.currentThread().getName() + " " +
                        + resourceQueue.size() + " -");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

生产者和消费者的三种实现方式(Java) 的相关文章

  • OS2.3.7:多生产者,多消费者问题

    文章目录 0 问题描述1 问题分析2 实现3 总结 0 问题描述 桌子上有一只盘子 xff0c 每次只能向其中放入一个水果 爸爸专向盘子中放苹果 xff0c 妈妈专向盘子中放橘子 xff0c 儿子专等着吃盘子中的橘子 xff0c 女儿专等着
  • java 方法名类名命名规范

    一 命名规范 1 项目名全部小写 2 包名全部小写 3 类名首字母大写 xff0c 如果类名由多个单词组成 xff0c 每个单词的首字母都要大写 大驼峰 xff0c 如 xff1a public class MyFirstClass 4 变
  • Qt arm环境安装

    一 相关工作准备 Qt opensource 和 Qt everywhere 下载 链接 版本为5 9 8 arm linux gcc下载 链接 版本为4 8 3 tslib 下载 链接 版本为1 21 ps 可以不安装Qt opensou
  • STM32驱动ST7789V2 tft屏幕

    一 简介 本次教程使用的是1 54寸240 240像素的tft屏幕 xff0c 其接口协议为SPI协议 在使用的过程中仅需要四根数据即可驱动点亮屏幕 然后硬件使用的是STM32F103C8T6核心板 xff0c 用的是SPI2 一般购买屏幕
  • linux设置复杂度策略、登录超时处理功能

    1 在字符终端下 xff0c 实现某一用户连续错误登陆N次后 xff0c 就锁定该用户X分钟 pam tally2 执行 vi etc pam d login 在 PAM 1 0 下新起一行 xff0c 加入 auth required p
  • 飞控陀螺仪,磁力计,加速计,四元数姿态结算

    MPU6050主要包含陀螺仪和加速度计 陀螺仪主要测量角速度 xff0c 即可以测出某一时间段物体转过的角度 加速度计测量的是物体的加速度 xff0c 重力加速度即物体受重力作用的情况下具有的加速度 xff0c 物体静止时 xff0c 加速
  • 智慧物业管理系统(Springboot)

    开发工具 xff1a IDEA xff0c jdk1 8 数据库 xff1a mysql5 7 前台框架 xff1a layui 后端技术 xff1a springboot 项目描述 xff1a 1 前台住户登录 2 智慧物业管理后台 2
  • 北京大学2020公开课 AVL-Python实现代码

    class TreeNode def init self key val left 61 None right 61 None parent 61 None self key 61 key self payload 61 val self
  • Docker-2020详细教程<配合千锋Java学习营>

    Docker 2020详细教程 lt 配合千锋Java学习营 gt 2020 Docker最新超详细版教程通俗易懂 一 Docker介绍 1 下载Dcoker依的赖环境 想安装Docker xff0c 需要先将依赖的环境全部下载下来 xff
  • 使用阿里云部署Flask网页

    使用阿里云部署Flask网页 前端网页部署 阿里云apache CentOS 配置好Apache后 xff0c 将一整个html css js文件全部copy进 var www html目录下 之后就可以通过访问IP地址访问到你的index
  • MapReduce的个人理解

    MapReduce的个人理解 文章目录 MapReduce模型简介Map和Reduce函数这里给出一个简单实例 MapReduce的工作流程工作流程概述MapReduce的各个执行阶段 Shuffle过程详解Shuffle过程简介Map端的
  • Hadoop配置

    Hadoop配置 文章目录 Linux shell配置环境变量使环境变量生效Hadoop 集群安装配置到两台阿里云linux主机上Hadoop集群模式安装实验环境实验内容1 安装jdk2 下面来修改环境变量3 安装hadoop4 下面来修改
  • HDFS 的使用和管理

    HDFS 的使用和管理 文章目录 HDFS 的使用和管理实验环境实验内容实验步骤1 启动hadoop的hdfs相关进程2 用jps查看HDFS是否启动3 验证HDFS运行状态4 ls 命令5 put 命令6 moveFromLocal 命令
  • HDFS API操作

    HDFS API操作 实验环境 Linux Ubuntu 16 04 前提条件 xff1a 1 xff09 Java 运行环境部署完成 2 xff09 Hadoop 的单点部署完成 上述前提条件 xff0c 我们已经为你准备就绪了 实验内容
  • HBase的安装部署和使用

    HBase的安装部署和使用 文章目录 HBase的安装部署和使用实验环境实验内容实验步骤1 点击 34 命令行终端 34 xff0c 打开新的命令行窗口2 解压安装包3 更改文件夹名和所属用户4 设置HBASE HOME环境变量5 修改hb
  • 熟悉常用的HBase操作

    熟悉常用的HBase操作 文章目录 实验环境实验内容1 编程实现以下指定功能 xff0c 并用Hadoop提供的HBase Shell命令完成相同的任务 xff08 1 xff09 列出HBase所有的表的相关信息 xff0c 如表名 创建
  • Hive的安装部署和管理

    Hive的安装部署和管理 文章目录 实验环境实验内容实验步骤1 点击 34 命令行终端 34 xff0c 打开新窗口2 解压安装包3 更改文件夹名和所属用户4 设置HIVE HOME环境变量5 导入MySql jdbc jar包到hive
  • Hive数仓:使用桶表

    Hive数仓 xff1a 使用桶表 文章目录 Hive数仓 xff1a 使用桶表实验环境实验步骤1 点击 34 命令行终端 34 xff0c 打开新窗口2 启动MySQL3 指定元数据数据库类型并初始化Schema4 启动Hadoop5 启
  • python 获取当前文件路径

    一 Python 获取当前文件路径方法 sys path 0 获取文件当前工作目录路径 绝对路径 sys argv 0 获得模块所在的路径 由系统决定是否是全名 若显示调用python指令 xff0c 如python demo py xff
  • PySpark中的RDD基本操作

    PySpark中的RDD基本操作 课程性质 xff1a PySpark数据处理 文章目录 1 实验目标2 本次实验主要使用的 P y t h

随机推荐

  • PySpark中的RDD创建

    PySpark中的RDD创建 课程性质 xff1a PySpark数据处理 文章目录 1 实验目标2 本次实验主要使用的 P y t h
  • el-table-column的formatter的使用

    当后端返回来的数据格式需要再去处理 xff1b 可以使用formatter属性 lt el table column label 61 34 性别 34 align 61 34 center 34 formatter 61 34 genda
  • 提示“无法修正错误,因为您要求某些软件包保持现状,就是它们破坏了软件包间的依赖关系“的解决方案

    使用sudo apt get install lt packgename gt 时出现提示无法修正错误 xff0c 因为您要求某些软件包保持现状 xff0c 就是它们破坏了软件包间的依赖关系 可以换个命令 sudo aptitude ins
  • aosp下载、编译、刷机和单编framework(android 12)

    我的设备 xff1a 咸鱼上买的pixel 3a 一 aosp下载 1 安装repo mkdir bin PATH 61 bin PATH curl sSL 39 https gerrit googlesource proxy ustclu
  • LAMP架构之mysql的安装部署

    mysql的安装部署 一 mysql编译安装1 编译过程 二 LAMP架构的部署 一 mysql编译安装 官网地址如下 xff0c 进入选择版本 xff1a https downloads mysql com archives commun
  • hexo博客绑定自己的域名

    hexo博客绑定自己的域名 学习网址1 学习网址2 学习网址3 一 购买域名 登录阿里云账号 控制台 搜索框输入域名 域名注册 输入需要注册的域名 xff08 查看是否被占用 xff09 加入购物车 xff08 显示不能备案的不可买 xff
  • SimpleDateFormat类 格式化日期

    功能 xff1a 格式化和解析日期 将Date类型的日期格式化成我们需要的日期类型一般是 字符串类型将字符串类的日期再转回来 用到两个方法 format Date date xff1a 将date型转换成特定格式的字符串 parse Str
  • 队列(Java实现)

    1 1应用场景 银行排队 xff1a 1 2基本介绍 特点 队列是一个有序列表 xff0c 可以用数组或是链表来实现 遵循先入先出的原则 即 xff1a 先存入队列的数据 xff0c 要先取出 后存入的要后取出 示意图 解释 MaxSize
  • IO字节流读取文本中文乱码

    1 1问题说明 我们都知道字符流适用于读取文本 xff0c 而字节流能读取文本 照片 视频等 xff0c 但是用字节流读取文本到我们程序的控制台中会出现中文乱码的情况 xff0c 如下图 我的文本中的数据是 生活很简单 xff0c 过了今天
  • glibc所安装的工具程序

    catchsegv 当程序发生segmentation fault的时候 用来建立一个 堆栈跟踪 gencat 建立消息列表 getconf 针对文件系统的指定变量显示其系统设置值 getent 从 系统管理数据库获取一个条目 glibcb
  • 单链表(java实现)

    1 1 链表 Linked List 介绍 链表是有序的列表 xff0c 但是它在内存中是存储如下 链表是以节点的方式来存储 是链式存储每个节点包含 data 域 xff0c next 域 xff1a 指向下一个节点 如图 xff1a 发现
  • prepareStatement的使用

    1 1prepareStatement解决sql注入的问题 span class token comment 演示sql注入的安全问题 span span class token keyword public span static voi
  • 动态sql

    1 什么是动态sql sql的内容是变化的 可以根据条件获取到不同的sql语句 主要是where部分发生变化 动态sql的实现 使用的是mybatis提供的标签 2 为什么使用动态sql 使用动态sql可以解决某些功能的使用 例如使用条件查
  • 分页插件--PageHelper

    mybatis的分页查询可以通过PageHelper插件实现 在数据库中我们使用分页查询的sql语句为 xff1a select from 表名 where 条件 limit page 1 pageSize pageSize page 当前
  • springboot框架

    1 什么是springboot框架 Spring是一个开源框架 xff0c Spring是于2003 年兴起的一个轻量级的Java 开发框架 xff0c 由Rod Johnson 在其著作 Expert One On One J2EE De
  • Elasticsearch入门及整合springboot

    1 Elasticsearch概述 1 1 搜索是什么 概念 xff1a 用户输入想要的关键词 xff0c 返回含有该关键词的所有信息 场景 xff1a 1 互联网搜索 xff1a 谷歌 百度 各种新闻首页 2 站内搜索 xff08 垂直搜
  • springboot+mybatis-plus+vue完成微信支付(前后端分离)

    微信支付的学习链接 https pay weixin qq com wiki doc api native php chapter 61 9 1 一 数据库准备 t order表 主要完成订单查询 span class token comm
  • springcloud学习笔记

    第一章 微服务的介绍 1 1系统架构演变 随着互联网的发展 xff0c 网站应用的规模也在不断的扩大 xff0c 进而导致系统架构也在不断的进行变化 从互联网早起到现在 xff0c 系统架构大体经历了下面几个过程 单体应用架构 gt 垂直应
  • windows2016 AD域修改密码策略

    1 服务器管理器 gt 工具 gt 组策略管理 2 域 gt 域名 gt 组策略对象 gt Default Domain Policy 域 gt 域名 gt Default Domain Policy同样可以 gt 右键 gt 编辑 3 计
  • 生产者和消费者的三种实现方式(Java)

    什么是生产者消费者问题 生产者消费者问题 xff08 英语 xff1a Producer consumer problem xff09 xff0c 也称有限缓冲问题是一个多线程同步问题的经典案例 该问题描述了共享固定大小缓冲区的两个线程 即