Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue

2023-11-16

学习目标

我们都知道在并发编程中,阻塞队列在多线程中的场景特别有用,比如在生产和消费者模型中,生产者生产数据到队列,队列满时需要阻塞线程,停止生产。消费者消费队列,对队列为空时阻塞线程停止消费,在Java中有提供不同场景的阻塞队列,那么接下来我们将学习

  1. ReentrantLock的Condition原理
  2. BlockingQueue的定义
  3. 了解ArrayBlockingQueue的实现
  4. 如何手写一个阻塞队列

Condition原理

在学习阻塞队列之前,我们先需要弄清楚ReentrantLockCondition机制。我们知道使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单,扩展性更好。

  1. Condition能够支持不响应中断,而通过使用Object方式不支持
  2. Condition能够支持多个等待队列(new多个Condition对象),而Object方式只能支持一个
  3. Condition能够支持超时时间的设置,而Object不支持

具体看个小例子:

     	ReentrantLock lock = new ReentrantLock();
        Condition waitCond = lock.newCondition();
        Thread t1 = new Thread(() -> {
            System.out.println("before-wait...1");
            try {
                lock.lock();
                waitCond.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println("after-wait...1");
        });

        Thread t2 = new Thread(() -> {
            System.out.println("before-wait...2");
            try {
                lock.lock();
                waitCond.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println("after-wait...2");
        });
        t1.start();
        t2.start();
        Thread.sleep(1);
        lock.lock();
        waitCond.signalAll();
        System.out.println("signalAll");
        lock.unlock();

输出

before-wait...2
before-wait...1
signalAll
after-wait...2
after-wait...1

Process finished with exit code 0

可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用是AQS中的内部类ConditionObject

Condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。Condition内部也是使用相似的方式,内部维护了一个单向的 等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。


BlockingQueue

Java中定义了一个BlockingQueue这样的接口,继承Queue[public interface Queue<E> extends Collection<E>],BlockingQueue提供四种不同的处理方法

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    
  	boolean remove(Object o);
  	
    boolean offer(E e);
    E poll();
    
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    void put(E e) throws InterruptedException;
    
    E take() throws InterruptedException;
抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除方法 remove(o) poll() take(o) poll(o, timeout, timeunit)
检查方法 element() peek()

在JDK7提供了7个阻塞队列分别是:

  1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  5. SynchronousQueue:一个不存储元素的阻塞队列。
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

其中ArrayBlockingQueue是一个由数组组成的有界阻塞队列,Array顾名思义内部队列定义的是一个数组存储数据,既然是数组必然需要定义长度。
具体类型结构:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    }
  • items:一个Object的数组
  • tackIndex:出队列的下标
  • putIndex:入队列的下标
  • count:队列中元素的数量
  • lock:ReentrantLock类型的锁
  • notEmpty、notFull:Condition的等待

生产者和消费者共用lock锁,数组满时阻塞插入,即生产。数组空时阻塞获取,即消费。结合ReentranLock和Condition感觉好像懂了。如果是你,可以试试怎么实现。


实现ArrayBlockingQueue

先按上定义几个变量

 	final static int MAX = 10;
    LinkedList<Integer> queue = new LinkedList<>();

    ReentrantLock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition emtpy = lock.newCondition();

再看看生产者怎么构建,首先对于进来的每个线程上锁,判定当前数组长度大等于最大值,那就使当前线程进入等待队列,否则添加数据进数组。当然,对于长度为1的时候需要唤醒所有的消费线程。故可以实现如下:

	// Producer
    public void create() throws InterruptedException {
        lock.lock();
        if (queue.size() == MAX) {
            full.await();
            return;
        }
        int data = readData(); // 1s
        if(queue.size() == 1) {
            emtpy.signalAll();
        }
        queue.add(data);
        lock.unlock();
    }

消费者亦同理 :

    // Comsumer
    public void calculate() throws InterruptedException {

        lock.lock();
        if (queue.size() == 0) {
            emtpy.await();
            return;
        }
        int data = queue.remove();
        System.out.println("queue-size:" + queue.size());
        if(queue.size() == MAX - 1) {
            full.signalAll();
        }
        lock.unlock();
    }

那么我们手写的生产消费模型全部代码如下:

public class ProducerCustomerModel {
    final static int MAX = 10;
    LinkedList<Integer> queue = new LinkedList<>();
    ReentrantLock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition emtpy = lock.newCondition();
    
    int readData() throws InterruptedException {
        Thread.sleep((long)Math.random()*1000);

        return (int)Math.floor(Math.random()*1000);
    }
    // Producer
    public void readDb() throws InterruptedException {
        lock.lock();
        if (queue.size() == MAX) {
            full.await();
            return;
        }
        int data = readData(); // 1s
        if(queue.size() == 1) {
            emtpy.signalAll();
        }
        queue.add(data);
        lock.unlock();
    }

    // Comsumer
    public void calculate() throws InterruptedException {

        lock.lock();
        if (queue.size() == 0) {
            emtpy.await();
            return;
        }
        int data = queue.remove();
        System.out.println("get date"+data+" and queue-size:" + queue.size());
        if(queue.size() == MAX - 1) {
            full.signalAll();
        }
        lock.unlock();
    }


    public static void main(String[] argv) {
        ProducerCustomerModel p = new ProducerCustomerModel();
        for(int i = 0; i < 100; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        p.readDb();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        new Thread(() -> {
            while(true) {
                try {
                    p.calculate();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

发现输出有如下数据:

get date758 and queue-size:2
get date949 and queue-size:1
get date15 and queue-size:0
get date23 and queue-size:9
get date143 and queue-size:8
get date690 and queue-size:7
get date112 and queue-size:6
get date253 and queue-size:5
get date925 and queue-size:4
get date593 and queue-size:3
get date779 and queue-size:2
get date495 and queue-size:1
get date989 and queue-size:0

Process finished with exit code -1

其实回过头再在来看ArrayBlockingQueue其内部发现实现好像差不多就是这样的

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue 的相关文章

随机推荐

  • JAVA 获取指定月份的每周的开始日期和结束日期

    1 第一种情况 从1号开始到月份最后一天结束 代码如下 private DateTimeFormatter dateTimeFormatter DateTimeFormatter ofPattern yyyy MM dd public Li
  • Arduino和Python卡尔曼滤波对四元数进行姿态测定

    在本文中 我将演示使用EKF 扩展卡尔曼滤波 对四元数确定姿态的实现 并说明将多个传感器数据融合在一起以使系统正常工作的必要性 将要使用的传感器是陀螺仪 加速度计和磁力计 Arduino用于从传感器读取数据 但是数据处理将在python中完
  • 移动端开发框架

    总体概述 现在比较流行的移动APP开发框架有以下六种 网页 混合 渐进 原生 桥接 自绘 前三种体验与Web的体验相似 后三种与原生APP的体验相似 这六种框架形式 都有自己适用的范围 无所谓好坏 适用就是好 网页应用适用于传统网站APP化
  • 手写vue(三)模板渲染解析

    一 目标 创建一个Vue实例时 我们可以传入el配置项 去指定一个DOM元素作为Vue容器 而这个Vue容器中 可以使用例如插值表达式等Vue框架提供的语法 并且能够渲染到浏览器页面上 而浏览器并不能解析这些Vue语法 因此 Vue框架是通
  • python: How to Create a Python Package

    StudentScoreInfo py 学生成绩类 date 2023 06 16 edit Geovin Du geovindu 涂聚文 ide PyCharm 2023 1 python 11 import datetime impor
  • GAN生成手写数字实例讲解Colab使用教程

    Colab 全称Colaboratory 是谷歌提供的一个在线工作平台 可以与谷歌云盘协作使用 我们可以在Colab平台上运行代码 而且大部分常用的包都已经安装好 不需要再进行安装 也不需要进行环境配置 非常方便快捷 对于初学者来说非常友好
  • 颠覆传统逻辑的C程序

    1 在main之前运行的C代码 before main c include
  • k8s 部署spring cloud项目

    微服务架构是一项在云中部署应用和服务的新技术 大部分围绕微服务的争论都集中在容器或其他技术是否能很好的实施微服务 而红帽说API应该是重点 微服务可以在 自己的程序 中运行 并通过 轻量级设备与HTTP型API进行沟通 关键在于该服务可以在
  • LouvainMethod分布式运行的升级之路

    1 背景介绍 Louvain是大规模图谱的谱聚类算法 引入模块度的概念分二阶段进行聚类 直到收敛为止 分布式的代码可以在如下网址进行下载 GitHub Sotera spark distributed louvain modularity
  • Windows下SpringBoot连接Redis的正确使用姿势

    1 安装Redis 1 1通过wsl安装redis 参考官方安装文档 需要在wsl2上安装redis服务 注意我们启动redis的方式 First way 采用官方文档的方式 sudo service redis server start
  • Python自学——The One Day(Python基础——介绍)

    文章目录 Python基础 介绍 前言 编译型语言和解释型语言 Python是什么 Python的优缺点是什么 优点 缺点 Python的运行过程 Python能干什么 怎样学好Python Python基础 介绍 前言 编译型语言和解释型
  • 2014年10月4399校招笔试--游戏岗

    今天参加了4399的笔试 总的来说题目不难 不过有些题没答上来 特别是选择题最后几个关于图像的题目22 25 真心不会
  • vivado中的常用AXI接口IP核

    AXI是xilinx中常用的数据接口 种类和引脚数量极多 1 AXI GPIO AXI GPIO为AXI接口提供了一个通用的输入 输出接口 可以配置成单通道和双通道 每个通道的位宽都可以单独设置 另外 通过打开或者关闭三通道缓冲器 AXI
  • 使用 ST-LINK 烧录程序到 STM32

    前言 之前博主在使用单片机时 烧录程序用的都是串口的方式 最近公司定制了一个工业版单片机目前只支持使用 ST LINK 烧录 因此博主收集了一些资料 并整理了烧录程序的流程用于分享和后期自己回顾 准备工作 准备烧录编程器 博主直接在网上买了
  • 图像仿射变换原理4:组合变换及对应变换矩阵

    老猿Python博文目录 https blog csdn net LaoYuanPython 仿射变换博文传送门 带星号的为付费专栏文章 图像仿射变换原理1 齐次坐标来龙去脉详解 图像仿射变换原理2 矩阵变换 线性变换和图像线性变换矩阵 图
  • Linux下 VS Code 安装与 C 编程环境配置!

    对于多文件的C项目 大部分人会选择使用 cmake 来管理编译过程 对于精力充沛的朋友来说 也可以学习一下使用这个强大的工具 但我觉得如果只想在VS Code里写几行代码应对当前需求 没必要再去学习一个完全陌生的东西 也没必要把配置过程复杂
  • 捕鱼游戏java源码

    package fishlord import java awt Color import java awt Font import java awt Graphics import java awt event MouseAdapter
  • eclipse 报错 java.lang.NullPointerException at org.eclipse.jface.resource.JFaceResources.getResources

    java lang NullPointerException at org eclipse jface resource JFaceResources getResources JFaceResources java 209 删除文件 wo
  • MySQL——流程控制(IF、CASE、LOOP、WHILE、REPEAT、LEAVE、ITERATE)

    解决复杂问题不可能通过一个 SQL 语句完成 我们需要执行多个 SQL 操作 流程控制语句的作用就是控制存储过程中 SQL 语句的执行顺序 是我们完成复杂操作必不可少的一部分 接下来让我们一起开始学习吧 流程控制 只要是执行的程序 流程就分
  • Java基础学习之并发篇:手写阻塞队列ArrayBlockingQueue

    学习目标 我们都知道在并发编程中 阻塞队列在多线程中的场景特别有用 比如在生产和消费者模型中 生产者生产数据到队列 队列满时需要阻塞线程 停止生产 消费者消费队列 对队列为空时阻塞线程停止消费 在Java中有提供不同场景的阻塞队列 那么接下