BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue原理分析

2023-11-13

阻塞队列与非阻塞队

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

1.ArrayDeque, (数组双端队列) 
2.PriorityQueue, (优先级队列) 
3.ConcurrentLinkedQueue, (基于链表的并发队列) 
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
8.PriorityBlockingQueue, (带优先级的无界阻塞队列) 
9.SynchronousQueue (并发同步阻塞队列)

BlockingQueue是一个接口单向的阻塞队列,

BlockingDueue是一个接口双向的阻塞队列,

其源码分析如下:

public interface BlockingQueue<E> extends Queue<E> {
   
    //将指定的元素添加到对列中,如果队列容量未满。如果满了,抛出异常
    boolean add(E e);

    //如果可以在不违反容量限制的情况下立即将指定元素插入此队列,则在成功时返回 true,如果当前 
    //没有可用空间则返回 false。 当使用容量受限的队列时,这种方法通常比 add 更可取,它可以仅通 
    //过抛出异常来插入元素失败。
    boolean offer(E e);

    //将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。 
    void put(E e) throws InterruptedException;

     //带有等待时间的入队方法
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

   //获取并移除此队列的头部,如果没有元素则等待(阻塞), 
	//直到有元素将唤醒等待线程执行该操作 
    E take() throws InterruptedException;

   //与offer对应
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

     //查看队列还有的容量
    int remainingCapacity();

   // 删除操作,与add对应
    boolean remove(Object o);


    public boolean contains(Object o);


    int drainTo(Collection<? super E> c);


    int drainTo(Collection<? super E> c, int maxElements);
}

查看接口的继承关系图

接下来我们重点介绍 ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque

ArrayBlockingQueue的源码解析

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //队列底层容器
    final Object[] items;

    /** 下一个 take, poll, peek or remove的数组下标位置 */
    int takeIndex;

    /** 下一个 next put, offer, or add的数组下标位置 */
    int putIndex;

    /** 数组的已有数量 */
    int count;

    /**  队列锁******/
    final ReentrantLock lock;

    /** 获取队列数据的的condition等待队列 */
    private final Condition notEmpty;

    /** 存入队列数据的condition 等待队列*/
    private final Condition notFull;

    transient Itrs itrs = null;

     //队列构造方法必须指定大小,有界队列
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // boolean fair ture为公平锁,严格的按照FIFO的原则获取锁,默认非公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    代码省略。。。。。。。。
}

接下来看源码分析ArrayBlockingQueue是如何对队列进行add-remove put-take offer-poll

  //add方法实际是调用的offer方法如如果成功返回true 否则抛出异常   
   public boolean add(E e) {
        return super.add(e);
    }

 
    public boolean offer(E e) {
        //检查传入的数据是否为空
        checkNotNull(e);
         // 定义锁 final变量,构造器类初始化的时候就已经申请好,state变量初始化好内存。
        final ReentrantLock lock = this.lock;
        lock.lock();
        try { 
           //判断当前的count是否等于数组的容量
           // 是返回fasle 否则加入到队列中
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

      private void enqueue(E x) {
        final Object[] items = this.items;
         // putIndex 入队的下一个数组下标值
        items[putIndex] = x;
//需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中//从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //当队列数据不为空时唤醒取数队列的等待condtion表示可以去获取数据了
        notEmpty.signal();
    }


    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //中断锁。当前线程可以响应中断 中断后,抛出异常
        lock.lockInterruptibly();
        try {
            while (count == items.length)
               //当队列满了以后,那么入队的condition等待队列 等待,将该线程加入的等待队列中
                notFull.await();
            //否则入队操作
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue的源码解析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
   
     //存储数据的节点
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 容量大小 默认为 Integer.MAX_VALUE */
    private final int capacity;

    /** 当前队列的数量 */
    private final AtomicInteger count = new AtomicInteger();

     //头节点
    transient Node<E> head;

    //尾节点
    private transient Node<E> last;

    /** 取数据锁对象 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**  Wait queue for waiting takes  */
    private final Condition notEmpty = takeLock.newCondition();

    /** put数据锁对象 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 等待队列条件 */
    private final Condition notFull = putLock.newCondition();

由上面的LinkedBlockingQueue源码可以分析到 LinkedBlockingQueue 是一个基于链表的阻塞的无边界的队列。默认的队列容量为Integer.MAX_VALUE  当然这样容易引发的问题,当入列速率远超过出队速率时,可能会造成内存溢出、即超出容量的最大值。另外ArrayBlockingQueue入队与出队只有一把锁。但是LinkedBlockingQueue入队和出队是不同的锁对象、提高了队列的吞吐量

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    // 如果容量还够。唤醒入队的condtion去写入数据
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
             // 如果容量为0表示队列还有一条数据 (原来的值为-1) 。唤醒入队的condtion去写入数据
            signalNotEmpty();
        return c >= 0;
    }

take阻塞获取的方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //获取当前队列大小
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可中断
        try {
            //如果队列没有数据,挂机当前线程到条件对象的等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //如果存在数据直接删除并返回该数据
            x = dequeue();
            c = count.getAndDecrement();//队列大小减1
            if (c > 1)
                notEmpty.signal();//还有数据就唤醒后续的消费线程
        } finally {
            takeLock.unlock();
        }
        //满足条件,唤醒条件对象上等待队列中的添加线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

LinkedBlockingDeque是一个双向的阻塞的无边界的队列相比于LinkedBlockingDeque

可以任意的选择向链表的的头部添加数据还是向链表的尾部添加数据。当然也可以选择取数据的方向、

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {

    void addFirst(E e);

    void addLast(E e);

    boolean offerFirst(E e);

    boolean offerLast(E e);

    。。。代码省略。。。
 }
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
           // 如果获取到的头结点中的数据为空 那么消费数据的线程挂起 等待
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }


  private E unlinkFirst() {
      
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
      //头结点的下一结点赋值给自己 方便GC
        f.next = f; // help GC
  // 将头结点的的下一个结点值变成头结点
        first = n;
        if (n == null)
            last = null;
        else
          //操作成功将头结点的的前面前结点设置为null
            n.prev = null;
     // 队列总数减一
        --count;
//  入队操作唤醒
        notFull.signal();
 // 并返回头结点的内容
        return item;
    }

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue原理分析 的相关文章

随机推荐

  • 【ONNX】pytorch模型导出成ONNX格式:支持多参数与动态输入

    pytorch格式的模型在部署之前一般需要做格式转换 本文介绍了如何将pytorch格式的模型导出到ONNX格式的模型 ONNX Open Neural Network Exchange 格式是一种常用的开源神经网络格式 被较多推理引擎支持
  • 老司机都在用的浏览器,体积小功能齐全,直呼内行

    现在市面上的浏览器简直是多不胜数 虽然数量多 但是好用的并不多 尤其是某些大厂的浏览器 无用的功能越来越多 越来越臃肿 体积也越来越大 使用体验还不好 有时候甚至不如一些小众浏览器 今天给大家安利2款老司机都在用的手机浏览器 体积非常小 但
  • 刷脸支付对商家来说有着巨大的应用价值

    科技发展永不停止 一步一步改善着我们的生活 回顾支付方式的变化 从最初的以物易物到货币再到移动支付 一步步的发展都越来越便捷 而刷脸支付相比于扫码支付 省去了手机这个中间媒介 用人脸作为支付凭证 大大提升了付款效率 在现如今的科技下 可以毫
  • STM32F407移植FATFS文件系统(版本 R0.09b)支持长文件名和中文名称

    FatFs文件系统 默认是不支持长文件名和中文名称的 要想支持长文件名和中文名称 需要打开ffconf h文件进行配置 一 支持长文件名 FatFs文件系统 默认是不支持长文件名的 要想支持长文件名 需要打开ffconf h文件进行配置 找
  • linux下的vsftpd的db_load命令

    db load是linux下创建虚拟账户 选项 T允许应用程序能够将文本文件转译载入进数据库 由于我们之后是将虚拟用户的信息以文件方式存储在文件里的 为了让Vsftpd这个应用程序能够通过文本来载入用户数据 必须要使用这个选项 指定了选项
  • 第十六篇

    Ln Linux ln 英文全拼 link files 命令是一个非常重要命令 它的功能是为某一个文件在另外一个位置建立一个同步的链接 当我们需要在不同的目录 用到相同的文件时 我们不需要在每一个需要的目录下都放一个必须相同的文件 我们只要
  • 【无人机】采用最基本的自由空间路损模型并且不考虑小尺度衰落(多径多普勒)固定翼无人机轨迹规划(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 2 1 文献结果 2 2 Matlab代码复现结果 3 参考文献 4 Matlab代码及文章
  • SQLServer帐号管理

    1 用户的创建 如图所示 右击 登录名 选中新建登录名 2 如何创建数据库用户 在数据库的下面 创建用户名 同时通过 浏览 按钮 关联想要授权的登录账号 3 给用户分配权限 注意 此处是给需要分配权限的数据库中的用户 进行安全对象的管理 例
  • 联想笔记本Ideapad300S-14ISK安装固态硬盘和win10

    1 拆机 把背面的螺丝钉拧开 拿个卡找个缝插进去转一圈就打开了 2 内存 先把电源排线拔掉 排线旁边有标识 和 内存条外面盖着一个金属壳 把四周的小固定扣压下去就能打开 内存条型号DDR3L 买了个DDR4才发现插不进去 3 固态硬盘 把机
  • visio使用技巧

    出处 blog作者 卡纳瓦罗 里面的东西主要是看了吕贤聪的 visio2003视频教程 后总结的一些技巧 还有平时应用visio时的一些心得 1 鼠标按住尺规的边缘往外拉 会拉出一条绘图辅助线 帮助绘图 精确定位 按del键就会消失 而且选
  • 多线程与同步代码块详解

    线程是程序执行的一条路径 一个进程中可以包含多条线程 多线程并发执行可以提高程序的效率 可以同时完成多项工作 多线程并发执行的实质就是CPU在做着高速的切换 多线程的应用场景 红蜘蛛同时共享屏幕给多个电脑 迅雷开启多条线程一起下载 QQ同时
  • 微芯I/O控制器瞄准工业与嵌入式运算应用

    微芯科技 Microchip Technology 日前发布SCH322X系列I O控制器新品 该系列产品基于工业及嵌入式开发工程师的需求而开发 功能丰富且具高灵活性 新一代I O控制器系列拥有尺寸更小的包装和更长的产品生命周期 可运用于更
  • caj转pdf

    https caj2pdf cn
  • 关于不同浏览器的内核与引擎--记录一下

    https www cnblogs com gdutbean archive 2012 02 21 2362003 html https www cnblogs com guanghe p 11719334 html js引擎介绍 几种JS
  • Docker: 改变容器化世界的革命性技术

    目录 1 1什么是虚拟化 1 2什么是Docker 1 3容器与虚拟机的比较 1 4Docker组建 2 Docker安装 2 2设置ustc的镜像 2 3Docker的启动与停止 3 docker常用命令 3 1镜像 3 2容器相关命令
  • Dockerfile参数详解

    FROM 功能为指定基础镜像 并且必须是第一条指令 如果不以任何镜像为基础 那么写法为 FROM scratch 同时意味着接下来所写的指令将作为镜像的第一层开始 语法 FROM
  • discuz 手机版伪静态

    前两天发布的这篇文章发现内容与实际情况不太相符特来更改 请多多包涵 网上有些现成的插件 价格却并不亲民 一个简简单单的伪静态插件居然标价三百 我也是醉了 我就在想 伪静态翻来覆去不就那么点东西吗 你能做我就做不了啦 在此帮各位仍有相同需求的
  • BP神经网络基本介绍

    1 主要解决的是什么问题 预测类 聚类分析 2 原理 思路是啥 什么是人工神经网络 ANN 模拟大脑对信号处理的一种算法 基本原理介绍 权关系是具体不清楚他们之间的关系 并未给出准确的对应关系 只是假设他们之间的关系可以用一组权来表示 阈值
  • 企业DevOps:实施过程中需要关注的各项要点

    作者 亚马逊云科技企业市场战略总监 Stephen Orban 经验并非凭空创造 而是依靠点滴积累所实现 阿尔贝 加缪 在此次的企业DevOps探索之旅系列文章当中 我将带大家一同探讨企业在具备一定DevOps经验之后又该如何处理下一步可能
  • BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue原理分析

    阻塞队列与非阻塞队 阻塞队列与普通队列的区别在于 当队列是空的时 从队列中获取元素的操作将会被阻塞 或者当队列是满时 往队列里添加元素的操作会被阻塞 试图从空的阻塞队列中获取元素的线程将会被阻塞 直到其他的线程往空的队列插入新的元素 同样