Java并发编程之CyclicBarrier详解

2023-11-13

简介

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier源码解析

CyclicBarrier的类图如下: 


通过类图我们可以看到,CyclicBarrier内部使用了ReentrantLock和Condition两个类。它有两个构造函数:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

await方法

调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 不超时等待
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

这两个方法最终都会调用dowait(boolean, long)方法,它也是CyclicBarrier的核心方法,该方法定义如下:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }

        // 获取下标
        int index = --count;
        // 如果是 0,说明最后一个线程调用了该方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行栅栏任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果没有时间限制,则直接等待,直到被唤醒
                if (!timed)
                    trip.await();
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这代的
                    // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    Thread.currentThread().interrupt();
                }
            }

            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

dowait(boolean, long)方法的主要逻辑处理比较简单,如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:

  • 最后一个线程到达,即index == 0
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态

在上面的源代码中,我们可能需要注意Generation 对象,在上述代码中我们总是可以看到抛出BrokenBarrierException异常,那么什么时候抛出异常呢?如果一个线程处于等待状态时,如果其他线程调用reset(),或者调用的barrier原本就是被损坏的,则抛出BrokenBarrierException异常。同时,任何线程在等待时被中断了,则其他所有线程都将抛出BrokenBarrierException异常,并将barrier置于损坏状态。

同时,Generation描述着CyclicBarrier的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

private static class Generation {
    boolean broken = false;
}

默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation:

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

除了上面讲到的栅栏更新换代以及损坏状态,我们在使用CyclicBarrier时还要要注意以下几点:

  • CyclicBarrier使用独占锁来执行await方法,并发性可能不是很高
  • 如果在等待过程中,线程被中断了,就抛出异常。但如果中断的线程所对应的CyclicBarrier不是这代的,比如,在最后一次线程执行signalAll后,并且更新了这个“代”对象。在这个区间,这个线程被中断了,那么,JDK认为任务已经完成了,就不必在乎中断了,只需要打个标记。该部分源码已在dowait(boolean, long)方法中进行了注释。
  • 如果线程被其他的CyclicBarrier唤醒了,那么g肯定等于generation,这个事件就不能return了,而是继续循环阻塞。反之,如果是当前CyclicBarrier唤醒的,就返回线程在CyclicBarrier的下标。完成了一次冲过栅栏的过程。该部分源码已在dowait(boolean, long)方法中进行了注释。

应用程序示例

我们看一个CyclicBarrier的应用示例:

public class CyclicBarrierTest {
	// 自定义工作线程
	private static class Worker extends Thread {
		private CyclicBarrier cyclicBarrier;
		
		public Worker(CyclicBarrier cyclicBarrier) {
			this.cyclicBarrier = cyclicBarrier;
		}
		
		@Override
		public void run() {
			super.run();
			
			try {
				System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
				cyclicBarrier.await();
				System.out.println(Thread.currentThread().getName() + "开始执行");
				// 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + "执行完毕");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		int threadCount = 3;
		CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
		
		for (int i = 0; i < threadCount; i++) {
			System.out.println("创建工作线程" + i);
			Worker worker = new Worker(cyclicBarrier);
			worker.start();
		}
	}
}

运行结果(不唯一):

创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕

在上述代码中,我们自定义的工作线程必须要等所有参与线程开始之后才可以执行,我们可以使用CyclicBarrier类来帮助我们完成。从程序的执行结果中也可以看出,所有的工作线程都运行await()方法之后都到达了栅栏位置,然后,3个工作线程才开始执行业务处理。

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;

CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

相关博客

AbstractQueuedSynchronizer同步队列详解

AbstractQueuedSynchronizer独占式同步状态获取与释放

Java并发编程之ReentrantLock详解

Java并发编程之Condition详解

参考资料

方腾飞:《Java并发编程的艺术》

Doug Lea:《Java并发编程实战》

【死磕Java并发】-----J.U.C之并发工具类:CyclicBarrier

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java并发编程之CyclicBarrier详解 的相关文章

  • 使用 Jersey Client 忽略自签名 ssl 证书 [重复]

    这个问题在这里已经有答案了 我正在使用 Jersey 客户端库对 jboss 上运行的其余服务运行测试 我使用自签名证书在服务器上正确设置了 https 在本地主机上运行 但是 每当我使用 https url 运行测试时 都会收到以下错误
  • 是否可以使用检测重新定义核心 JDK 类?

    我想重新定义字节码StackOverflowError构造函数 因此当堆栈溢出发生时我有一个 钩子 我想要做的就是在构造函数的开头插入对我选择的静态方法的单个方法调用 是否有可能做到这一点 您应该能够使用两种方法之一来完成此操作 除非在过去
  • Spring Rest-API - 403 禁止错误响应

    我是 Spring 新手 我正在编写 REST API 我收到 403 删除 放置禁止错误 以下是我正在处理的示例 RequestMapping value noteId method RequestMethod PUT public Re
  • Spring Rest POST Json RequestBody 不支持内容类型

    当我尝试使用 post 方法发布新对象时 RequestBody 无法识别 contentType Spring 已经配置完毕 POST 可以与其他对象一起使用 但不能与这个特定对象一起使用 org springframework web
  • JPA 为每个项目选择最新实例

    假设我有一个会议实体 每次会议都有一个与会者和一个会议日期 在我的会议表中 我可能为每个与会者举行多个会议 每个会议都有不同的日期 我需要一个 JPA 查询 该查询将为所有与会者仅选择最新的会议 例如 如果我的桌子看起来像这样 Meetin
  • 如何使用 Gradle 2.10 将 ANTLR 词法分析器语法导入到另一个语法中?

    我一直在和 Terence Parr 一起学习 ANTLR 4权威的 ANTLR 4 参考 到目前为止我一直在使用 Gradle 2 10 及其内置 ANTLR 插件进行跟踪 然而 我在获取一些我从第 4 章第 38 41 页改编的代码以使
  • Stream#limit 返回的元素是否可以少于预期?

    如果流s下面至少有n元素 流在什么情况下sLimit可能少于n元素 如果有的话 Stream sLimit s limit n 提问原因 在这个答案 https stackoverflow com a 28082107 829571 我读到
  • JavaPreparedStatementUTF-8字符问题

    我有一份准备好的声明 PreparedStatement st 在我的代码中 我尝试使用 st setString 方法 st setString 1 userName userName 的值为 ak a setString 方法将 ak
  • java中main的返回类型

    我想知道为什么java中main方法只有void返回类型 public static void main String args 为什么main方法除了void之外没有其他返回类型 Thanks 简短的回答是 因为这就是语言规范 http
  • 在 Java 中查询 XML 的最简单方法

    我有带有 XML 的小字符串 例如 String myxml
  • 如何加快 jar 签名者的速度?

    我使用 ant 来签署我的 jars 以进行网络启动部署 Ant signjar 在 Web 启动签名时非常慢 如何加快签名过程 我找到了一种可能的解决方案 早些时候 在构建脚本 ant signjar 中 按顺序调用所有 jar 我们使用
  • 使用 eclipse 配置mockito 时出现问题。给出错误:java.lang.verifyError

    当我将我的mockito库添加到类路径中 并使用一个简单的mockito示例进行测试时 我尝试使用模拟对象为函数add返回错误的值 我得到java lang verifyerror 以下是用于测试的代码 后面是 logcat Test pu
  • 从 AlertDialog 返回值

    我想构建一个函数来创建 AlertDialog 并返回用户输入的字符串 这是我用于创建对话框的函数 如何返回该值 String m Text private String openDialog String title AlertDialo
  • 在Java中使用==而不是equals来比较不可变对象可以吗

    考虑调用静态工厂方法 valueOf 的两个 Integer 类型的引用 如下所示 Integer a Integer valueOf 10 Integer b Integer valueOf 10 考虑到Integer是不可变的 使用 而
  • 将 @RequestLine 与 Feign 一起使用

    我有一个工作 Feign 接口定义为 FeignClient content link service public interface ContentLinkServiceClient RequestMapping method Requ
  • 使用 Mockitos 传递参数化输入

    我正在使用 Mockito 进行单元测试 我想知道是否可以使用 Junit 测试中的方式发送参数化输入参数 e g InjectMocks MockClass mockClass new MockClass Test public void
  • Web服务连接超时和请求超时之间的区别

    WebClientTestService service new WebClientTestService int connectionTimeOutInMs 5000 Map
  • Web 应用程序似乎启动了名为 [22] 的线程,但未能停止它。这很可能造成内存泄漏

    我有一个 Web 应用程序 后端有 Servlet 部署在 tomcat 上 该应用程序是简单的java应用程序 我经常在服务器日志中看到此错误 严重 Web 应用程序似乎启动了一个名为 22 但未能阻止它 这很有可能 造成内存泄漏 是否存
  • Java applet 是否会违反同源策略

    我需要请求一些东西并从其他域获取信息 我知道由于同源政策 javascript 无法做到这一点 我的另一个选择是通过我的服务器发出代理请求 我不希望请求来自我的服务器的 IP 也不想为我的服务器创建额外的负载 并且希望客户端这样做 是否可以
  • 生成签名和加密的 JWT

    我正在尝试使用生成签名和加密的 JWT 令牌雨云智威汤逊 http connect2id com products nimbus jose jwt private void generateToken throws JOSEExceptio

随机推荐

  • redis中hash表内容删除的方法代码

    hash Redis hash是一个string类型的field和value的映射表 hash特别适合用于存储对象 Redis 中每个hash可以存储 232 1键值对 40多亿 实例 127 0 0 1 6379 gt HMSET run
  • 毕业设计-基于BP神经网络预测系统的设计- MATLAB

    目录 前言 课题背景和意义 实现技术思路 一 神经网络 三 图形用户界面的实现 四 神经网络预测系统的设计 五 神经网络预测系统的性能和特点 部分源代码 实现效果图样例 最后 前言 大四是整个大学期间最忙碌的时光 一边要忙着备考或实习为毕业
  • vue开发调试

    1 调试方式 1 1 为什么调试 当遇到应用逻辑出现错误 但又无法准确定位的时候 同后台项目开发一样 可以在JS实现的应用逻辑中设置断点 并进行单步 进入方法内 跳出方法等调试 从而准确定位问题根源 1 2 调试方法 本文主要讲两种方式 d
  • Linux系统安装配置curl

    1 获得安装包 从网上直接下载或者其他途径 这里直接wget wget http curl haxx se download curl 7 20 0 tar gz 2 解压到当前目录 或者 http www linuxidc com Lin
  • 贝叶斯方法应用:检测时间序列拐点

    随着时间推移 制造设备比如贴片机的位置由于各种原因会产生小的偏差 这些偏差可能是阶跃 也有可能是渐变的形式 由于偏差值很小 产线的自动光学检测设备并不会报警 然而小的偏差如果不经处理 经过一定时间累积会产生较大偏差 影响产品质量 为了能够提
  • 最好看的代码雨特效

    上代码
  • 2010年10大热门的开源NoSQL服务器软件

    NoSQL 就是反SQL 是一项全新的数据库革新运动 特别是在 2010 年得以迅猛发展 而各种开源的 NoSQL 软件突然间涌现在你面前 目前似乎没有对 NoSQL 给出一个标准的定义 也没有相应的规范 但从这些软件可以看出 NoSQL
  • WCF 第五章 一个单一实例中的多线程

    默认的InstanceContextMode行为设置指导WCF为每个请求创建一 个新的服务实例 然后在很多情况下 这不是最好的解决方案 例如 如果一个服务有一个代价很高的例行初始化 比如 一个构造器从一个数据库读取数据或者创 建一个大的内存
  • Linux OpenGauss 数据库远程连接

    目录 前言 1 Linux 安装 openGauss 2 Linux 安装cpolar 3 创建openGauss主节点端口号公网地址 4 远程连接openGauss 5 固定连接TCP公网地址 6 固定地址连接测试 前言 openGaus
  • carla二次开发(一)自定义创建地图

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 为什么创建carla地图很困难 二 官方创建方法 三 blender插件创建地图 四 City Engine创建地图 五 ue4创建地图 前言 carla
  • 某A类网络10.0.0.0的子网掩码255.224.0.0,请确定可以划分的子网个数,写出每个子网的子网号及每个子网的主机范围。

    某A类网络10 0 0 0的子网掩码255 224 0 0 请确定可以划分的子网个数 写出每个子网的子网号及每个子网的主机范围 可以确定划分的子网有2 3 2 6个 子网号 10 32 0 0主机范围 10 32 0 1 10 63 255
  • 数据结构计算题

    1 将下列函数按它们在 n 时的无穷大阶数 从小到大排列 n n n3 7n5 nlogn 2n 2 n3 log2n n1 2 log2n 3 2 n n n2 log2n 解答 log2n n1 2 log2n n nlog2n n2
  • c语言a b等于c的编程,简单的a+b (C语言代码)

    解题思路 题目中要求多次输入 所以需要一个死循环来进行控制 一般采用while 1 或者for 注意事项 scanf 函数需要加上取地址符 且它的返回值 它的返回值可以分成三种情况 1 正整数 表示正确输入参数的个数 例如执行 scanf
  • Java学习笔记——String类

    目录 API概述 案例 键盘录入字符串 String 概述 String类的常见构造方法 创建字符串对象的区别 String常见的面试题 字符串的比较 案例 用户登录 遍历字符串 案例 手机号屏蔽 字符串截取方法 案例 敏感词替换 字符串替
  • 决策树概述+模块介绍+重要参数(criterion+random_state&splitter+减枝参数+目标权重参数)+回归树(参数+实例+拟合正弦曲线)+泰坦尼克号生存者预测实例

    文章目录 什么是sklearn 一 决策树概述 一 概述 二 基础概念 三 决策树算法的核心是要解决两个问题 二 模块sklearn tree的使用 一 模块介绍 二 使用介绍 三 重要参数 一 criterion 二 random sta
  • JavaScript_day02

    文章目录 BOM与DOM操作 BOM操作 window子对象 history对象 location对象 掌握 弹出框 计时器相关 DOM操作 查找标签 节点操作 innerText 和 innerHTML 获取值操作 class css操作
  • 电机控制基础——定时器捕获单输入脉冲原理

    1 问题引出 在单片机与嵌入式开发中 某些场景需要捕获传感器的高电平 或低电平 信号的持续时间 如红外解码信号 编码器输入信号等 如下图 以单一的一段高电平输入信号为例 如何测量这段高电平的时间呢 从直观上理解 就是要不断的检测这个信号 当
  • IPv6与Volp

    文章目录 前言 1 IP v4与IP v6 1 1 IP v4的概念与存在的问题 1 2 ipv6概述 1 3 对比IP v4 IP v6的优点 1 3 ipv4与ipv6的包头比较 1 4 IP v6的基本术语 1 5 IP v6地址表示
  • 自主异常检测算法(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 Matlab代码 数据 4 参考文献 1 概述 文献来源 本文介绍了一种在实证数据分析
  • Java并发编程之CyclicBarrier详解

    简介 栅栏类似于闭锁 它能阻塞一组线程直到某个事件的发生 栅栏与闭锁的关键区别在于 所有的线程必须同时到达栅栏位置 才能继续执行 闭锁用于等待事件 而栅栏用于等待其他线程 CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集