java-生产者消费者问题以及解决办法

2023-05-16

文章目录

    • 1.生产者消费者问题概述
    • 2.生产者消费者问题的解决办法
      • 2.1 解决思路
      • 2.2 实现方法
      • 2.3 代码实现
        • 2.3.1 wait()和nofity()方法
        • 2.3.2 await()/signal()方法
        • 2.3.3 BlockingQueue阻塞队列方法
        • 2.3.4 Semaphore信号量
        • 2.3.5 管道
          • 2.4.5.1 PipedInoutStream/PipedOutputStream(操作字节流)
          • 2.4.5.2 PipedReader/PipedWriter(操作字符流)

1.生产者消费者问题概述

生产者消费者问题是多线程同步的一个经典问题。生产者消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区取出商品。我们需要保证的是,当缓冲区满时,生产者不可生产商品;当缓冲区为空时,消费者不可取出商品。

这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。

  • 对于生产者,没有生产产品之前,要通知消费者等待,而生产了产品之后,又需要马上通知消费者消费
  • 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。
  • 在生产者消费者问题中,仅有synchronized是不够的
    • synchronized可阻止并发更新同一个共享资源,实现了同步
    • synchronized不能用来实现不同线程之间的消息传递(通信)

Java提供了几个方法解决线程之间的通信问题

方法名作用
wait()表示线程一直等待,直到其他线程通知,与sleep不同,会释放锁
wait(long timeout)指定等待的毫秒数
notify()唤醒一个处于等待状态的线程
notifyAll()唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度

注意:均是Object类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常IIIegalMonitorStateException

2.生产者消费者问题的解决办法

2.1 解决思路

  • 采用某种机制保护生产者消费者之间的同步,有较高的效率,并且易于实现,代码的可控制性较好,属于常用的方式
  • 在生产者和消费者之间建立一个管道,管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决核心在于保证同一资源被多个线程并发访问时的完整性,常用的同步方法时采用信号或者加锁机制,保证资源在任意时刻至多被一个线程访问

2.2 实现方法

  • wait()和nofity()方法
  • await()和signal()方法
  • BlockingQueue阻塞队列方法
  • 管道法
  • 信号量

2.3 代码实现

2.3.1 wait()和nofity()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
示例:

//测试生产者消费者模型--利用缓冲区解决
public class TestProductAndConsume1 {
    public static void main(String[] args) {
        SynContainer synContainer = new SynContainer();

        new Product(synContainer).start();
        new Consumer(synContainer).start();
    }
}

//生产者
class Product extends Thread {
    SynContainer container;

    public Product(SynContainer container){
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            container.push(new Chicken(i));
            System.out.println("生产了第" + i + "只鸡");
        }
    }
}

//消费者
class Consumer extends Thread {
    SynContainer container;

    public Consumer(SynContainer container){
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("消费了" + container.pop().id + "只鸡");
        }
    }
}

//产品
class Chicken {
    int id;

    public Chicken(int id) {
        this.id = id;
    }
}

//缓冲区
class SynContainer {
    //需要一个容器大小
    Chicken[] chickens = new Chicken[10];
    //容器计数器
    int count = 0;

    //生产者放入产品
    public synchronized void push(Chicken chicken){
        //容器满了,等待消费者消费
        if (count == chickens.length){
            //通知消费者消费,生产者等待
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果容器没有满,放入产品
        chickens[count] = chicken;
        count++;

        //通知消费者消费
        this.notifyAll();
    }

    //消费者消费产品
    public synchronized Chicken pop(){
        //判断能否消费
        if (count == 0){
            //等待生产者生产,消费者消费
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //如果可以消费
        count--;
        Chicken chicken = chickens[count];

        //吃完了,通知生产者生产
        this.notifyAll();
        return chicken;
    }
}

notify()方法可使所有正在等待队列中等待同一共享资源的"全部"线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,主要看JVM具体实现。

2.3.2 await()/signal()方法

在JDK5中,用ReenteantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

示例:(在这里只需要改动缓冲区即仓库类)

//缓冲区
class SynContainer {
    //需要一个容器大小
    Chicken[] chickens = new Chicken[10];
    //容器计数器
    int count = 0;

    // 锁
    private final Lock lock = new ReentrantLock();
    // 仓库满的条件变量
    private final Condition full = lock.newCondition();
    // 仓库空的条件变量
    private final Condition empty = lock.newCondition();

    //生产者放入产品
    public void push(Chicken chicken){
        //获得锁
        lock.lock();
        try {
            //容器满了,等待消费者消费
            if (count == chickens.length) {
                //通知消费者消费,生产者等待
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //如果容器没有满,放入产品
            chickens[count] = chicken;
            count++;

            //通知消费者消费
            empty.signalAll();
        } finally {
            lock.unlock();//释放锁
        }
    }

    //消费者消费产品
    public Chicken pop(){
        //获得锁
        lock.lock();
        Chicken chicken = null;
        try {
            //判断能否消费
            if (count == 0) {
                //等待生产者生产,消费者消费
                try {
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //如果可以消费
            count--;
            chicken = chickens[count];

            //吃完了,通知生产者生产
            full.signalAll();
        } finally {
            lock.unlock();
        }
        return chicken;
    }
}

2.3.3 BlockingQueue阻塞队列方法

BlockingQueue是JDk5新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

示例:(在这里只需要改动缓冲区即仓库类)

//缓冲区
class SynContainer {
    // 仓库存储的载体
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

    //生产者放入产品
    public void push(Chicken chicken) {
        try {
            list.put(chicken);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //消费者消费产品
    public Chicken pop() {
        Chicken chicken = null;
        try {
            chicken = (Chicken) list.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return chicken;
    }
}

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

2.3.4 Semaphore信号量

Semaphonre是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可对象,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,比如数据库 连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行)。

示例:(在这里只需要改动缓冲区即仓库类)

//缓冲区
class SynContainer {
    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<Object>();

    // 仓库的最大容量
    final Semaphore notFull = new Semaphore(10);
    // 将线程挂起,等待其他来触发
    final Semaphore notEmpty = new Semaphore(0);
    // 互斥锁
    final Semaphore mutex = new Semaphore(1);

    //生产者放入产品
    public void push(Chicken chicken) {
        try {
            notFull.acquire();
            mutex.acquire();
            list.add(chicken);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            mutex.release();
            notEmpty.release();
        }
    }

    //消费者消费产品
    public Chicken pop() {
        Chicken chicken = null;
        try {
            notEmpty.acquire();
            mutex.acquire();
            chicken = (Chicken) list.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            mutex.release();
            notFull.release();
        }
        return chicken;
    }
}

2.3.5 管道

管道是一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道读数据。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输入和输出。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

2.4.5.1 PipedInoutStream/PipedOutputStream(操作字节流)

示例:

//测试生产者消费者问题--管道-PipedInoutStream/PipedOutputStream(操作字节流)
public class TestProductAndConsume5 {
    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        Thread thread1 = new Thread(producer);
        Thread thread2 = new Thread(consumer);
        try {
            producer.getPipedOutputStream().connect(consumer.getPipedInputStream());
            thread2.start();
            thread1.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Producer implements Runnable {

    private PipedOutputStream pipedOutputStream;

    public Producer() {
        pipedOutputStream = new PipedOutputStream();
    }

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @SneakyThrows
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                pipedOutputStream.write(("This is a test , ID = " + i + "!\n").getBytes());
            }
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    private PipedInputStream pipedInputStream;

    public Consumer() {
        pipedInputStream = new PipedInputStream();
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @SneakyThrows
    @Override
    public void run() {
        int length = -1;
        byte[] buffer = new byte[1024];
        try {
            while ((length = pipedInputStream.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, length));
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
2.4.5.2 PipedReader/PipedWriter(操作字符流)

示例与操作字节流基本一致,只是替换相应流,这里不再贴代码

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java-生产者消费者问题以及解决办法 的相关文章

  • Java lambda表达式使用笔记

    package com allsaints music admin import com allsaints music admin service entrymgr bak Student import lombok Data impor
  • .NET Framework 与 .NET Core 的区别与联系

    当今 net 生态系统如下 xff1a 从上面图中我们可以看到 net 主要分为三个部分 net FrameWork net Core Xamarin XAMARIN 主要用来构建APP的 xff08 包括IOS xff0c Android
  • .net 代码命名规范

    CAST 源代码命名规范手册 v1 1 Pascal 命名 xff1a 每一个单词首字母必须大写 Camel 命名 xff1a 第一个单词首字母小写 xff0c 其余单词首字母必须大写 任何命名必须优先使用英文单词表达意思 xff0c 若不
  • SpringBoot项目配置文件编写方式参考

    背景 为防止出现各环境配置文件不同步的情况 xff0c 现根据实际开发情况 xff0c 制定该配置文件编写参考 介绍 SpringBoot使用一个全局的配置文件 xff0c 配置文件名是固定的 xff1b application prope
  • xshell 连接 Linux kvm图形界面报错问题

    如下的报错 zyq 64 zyq sudo virt manager zyq 64 zyq virt manager 18561 Gtk WARNING cannot open display localhost 10 0 以上前提是需先安
  • 一文让你看懂Golang如何打造实时聊天系统

    项目截图 简介 在本次课程中 xff0c 我们来学习使用WebSocket来打造一个实时聊天系统 我们会从一下几个方面来进行学习 xff1a 什么是websocket xff1b Websocket与传统的HTTP协议有什么区别 xff1b
  • 【IBM MQ】使用IBM MQ远程连接时报错AMQ 4043解决思路

    我使用IBM MQ客户端远程连接队列管理器时 xff0c 报错 AMQ 4043 xff0c 百度基本找不到解决办法 xff0c 唯一一个解决方法是这个 xff08 https stackoom com question 1sroR xff
  • Arch Linux安装桌面及常用软件

    Arch Linux安装桌面及常用软件 安装桌面环境 显卡驱动 span class token comment 查看显卡情况 span lspci k span class token operator span span class t
  • Python 报错处理 paramiko.ssh_exception.SSHException: Error reading SSH protocol banner

    使用多进程启动多个ssh报错 xff1a Exception client Error reading SSH protocol banner Errno 104 Connection reset by peer During handli
  • 信息加密(简单的字母转换)

    题目 在传递信息的过程中 xff0c 为了加密 xff0c 有时需要按一定规则将文本转换成密文发送出去 有一种加密规则是这样的 xff1a 1 对于字母字符 xff0c 将其转换成其后的第3个字母 例如 xff1a A D xff0c a
  • js按钮绑定点击事件

    1 第一种 34 btn 34 click function 操作 2 第二种 document getElementById 39 foo 39 addEventListener 39 click 39 function 3 第三种 xf
  • 微信小程序与微信公众号同一用户登录问题

    微信小程序与微信公众号同一用户登录问题 最近在做微信小程序与微信公众号登录合并的接口 整理相关资料以及个人认识的心得写了这篇文章与大家一起分享 首先 xff0c 简单说下我遇到的问题是我们的程序调用微信小程序得到openid 然后通过ope
  • 快速编译system.img和boot.img的方法【转】

    本文转载自 xff1a http www cnblogs com wanqieddy archive 2012 10 22 2734024 html 快速编译system img xff0c 可以使用这个命令 xff1a make syst
  • SSM----SpringMVC

    SpringMVC 1 什么是SpringMVC Spring MVC属于SpringFrameWork的后续产品 xff0c 已经融合在Spring Web Flow里面 Spring 框架提供了构建 Web 应用程序的全功能 MVC 模
  • mvp契约类

    public class MainActivity extends AppCompatActivity implements IContract IView private IContract IPresenter presenter 64
  • Rxjava和Retrofit结合使用大量请求时候出现OOM的问题

    在使用RxJava 43 Retrofit的过程中 出现了OOM的问题 报错日志如下 java lang OutOfMemoryError pthread create 1040KB stack failed Try again at ja
  • vc++中进程间的通信

    进程通常被定义为一个正在运行的程序的实例 xff0c 它由两个部分组成 xff1a 一个是操作系统用来管理进程的内核对象 内核对象也是系统用来存放关于进程的统计信息的地方 另一个是地址空间 xff0c 它包含所有的可执行模块或DLL模块的代
  • 如何用YOLO+Tesseract实现定制OCR系统

    转载 AI开发者 xff1a https mp weixin qq com s Eq6POwgyME WJYK9NWpzDw 什么是 OCR xff1f OCR 指的是光学字符识别 它用于从扫描的文档或图片中读取文本 这项技术被用来将几乎任
  • cnn-过拟合(over-fitting)

    概念 为了得到一致假设而使假设变得过度严格称为过拟合 1 给定一个假设空间H xff0c 一个假设h属于H xff0c 如果存在其他的假设h 属于H 使得在训练样例上h的错误率比h 小 xff0c 但在整个实例分布上h 比h的错误率小 xf
  • cnn-欠拟合(underfitting)

    模型不能很好拟合数据 称之为欠拟合 直白的说 xff1a 模型没有找到数据规律或不完整 xff0c 泛化能力不强 在训练和测试数据集上 xff0c 预测或训练结果都和真实结果相差很远 一般解决方法 增加新特征 xff0c 可以考虑加入进特征

随机推荐

  • ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full comm

    类似这种问题 xff0c 不一定是pip版本不对 xff0c 有可能是某个文件不存在 xff0c 例如 在python3 5环境中安装scikit image pip install scikit image 61 61 0 12 就出现
  • AI最新资讯,持续更新

    三星 人造人 项目曝光 xff01 效果太逼真 xff0c 可自主生成新表情 动作和对话 https mp weixin qq com s 417fL3oYVE1vOwsVHMmqow Det3D 首个通用 3D 目标检测框架 https
  • 二维码生成以及扫一扫解析二维码原理

    二维码生成以及扫一扫解析二维码原理 1 生成URL xff0c 确定要通过二维码传达的信息 xff0c 也就是通过扫一扫可以获得地址和数据信息 1 得到随机数 xff0c 用随机数得到签名 xff0c 签名验证身份 String ranSt
  • idea 不能生成target

    1 改module https blog csdn net qq 15304369 article details 93715206 2 pom配置文件 修改为 xff1a lt packaging gt jar lt packaging
  • mariadb 遇到的坑

    mariadb13 3 25 配置文件失效 xff08 折腾了很久 xff09 xff0c 当时我需要配置主从 xff0c 发现binlog无法打开 xff0c 配置了bin log项还是不行 xff01 当my cnf 文件权限过大时 x
  • CV资料汇总

    1 图像风格迁移 Neural Style 简史 https www sohu com a 221597595 236505 2 一文让你理解什么是卷积神经网络 https www jianshu com p 1ea2949c0056
  • skinmagic 对话框菜单展示

    我偶用skinmagic xff0c 在换对话框皮肤时候 xff0c 发现菜单不见了 xff0c 几经折腾 xff0c 发现SetWindowSkin m hWnd 34 Dialog 34 在iniInstance xff08 xff09
  • 系统如何支持高并发

    给个例子 xff0c 你的系统部署的机器是4核8G xff0c 数据库服务器是16核32G 此时假设你的系统用户量总共就10万 xff0c 用户量很少 xff0c 日活用户按照不同系统的场景有区别 xff0c 我们取一个较为客观的比例 xf
  • Firewalld防火墙基础

    目录 一 Firewalld 概述 1 1 Firewalld的简述 1 2 Firewalld 和 iptables的区别 1 3 firewalld的区域 1 3 1 firewalld的9个区域 1 3 2 firewalld的数据处
  • CentOS7安装Oracle JDK

    CentOS7默认安装的是OpenJDK 如果安装Oracle JDK xff0c 需要按如下方式操作 xff1a 1 登录http www oracle com technetwork java javase downloads inde
  • 百度2014校招笔试题(一)

    算法和程序设计题 xff1a 1 题意 xff1a 一幢大楼的底层有1001根电线 xff0c 这些电线一直延伸到大楼楼顶 xff0c 你需要确定底层的1001个线头和楼顶的1001次线头的对应关系 你有一个电池 xff0c 一个灯泡 xf
  • Acwing 1175.最大联通子图(tarjan缩点求scc)

    Acwing 1175 最大连通子图 题意 一个有向图 G 61 V E G 61 V
  • 用github搭建个人(博客网站

    x1f308 博客主页 xff1a 卿云阁 x1f48c 欢迎关注 x1f389 点赞 x1f44d 收藏 留言 x1f4dd x1f31f 本文由卿云阁原创 xff01 x1f64f 作者水平很有限 xff0c 如果发现错误 xff0c
  • 多线程下HashMap的死循环

    多线程下HashMap的死循环 Java的HashMap是非线程安全的 多线程下应该用ConcurrentHashMap 多线程下 HashMap 的问题 xff08 这里主要说死循环问题 xff09 xff1a 1 多线程put操作后 x
  • 找出一个图中所有的强连通子图

    如果一个有向图中的没对顶点都可以从通过路径可达 xff0c 那么就称这个图是强连通的 一个 strongly connected component就是一个有向图中最大的强连通子图 下图中就有三个强连通子图 xff1a 应用kosaraju
  • win7启动分区不存在,使用分区工具修正

    DiskGenius 分区右键 激活当前分区
  • getElementById获取不到td标签

    一次测试中发现 然后使用getElementById获取不到此标签 xff0c 将td改成div即可 不知道是不是单独使用td标签的问题 code
  • 应用宝YSDK支付接入技术细节

    前言 应用宝是出了名的坑 xff0c 主要体现在 xff1a 文档杂乱繁多信息不全或描述模糊文档格式不规范技术支持很不及时 并且可以明显察觉到为了兼容QQ和微信 xff0c 应用宝的接入规范有诸多不合理的地方 来来回回折腾了一周 xff0c
  • 用Word2007批量设置图片位置

    转自 xff1a http www ccw com cn college htm2010 20100727 877695 shtml Word2007的 查找和替换 功能并不仅仅可以对文字进行批量的查找替换 xff0c 还有很多神奇的功能
  • java-生产者消费者问题以及解决办法

    文章目录 1 生产者消费者问题概述2 生产者消费者问题的解决办法2 1 解决思路2 2 实现方法2 3 代码实现2 3 1 wait 和nofity 方法2 3 2 await signal 方法2 3 3 BlockingQueue阻塞队