使用 Reactor 的非阻塞 ReentrantLock

2024-01-01

我需要限制同时处理同一资源的客户端数量
所以我尝试实现模拟

lock.lock();
try {
     do work
} finally {
    lock.unlock();
}

但以非阻塞方式使用 Reactor 库。 我有这样的东西。

但我有一个问题:
有一个更好的方法吗
或者也许有人知道已实施的解决方案
或者也许这不是反应式世界中应该做的事情,并且有另一种方法可以解决此类问题?

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class NonblockingLock {
    private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);

    private String currentOwner;
    private final AtomicInteger lockCounter = new AtomicInteger();
    private final FluxSink<Boolean> notifierSink;
    private final Flux<Boolean> notifier;
    private final String resourceId;

    public NonblockingLock(String resourceId) {
        this.resourceId = resourceId;
        EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
        notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
        notifier = processor.startWith(true);
    }

    /**
     * Nonblocking version of
     * <pre><code>
     *     lock.lock();
     *     try {
     *         do work
     *     } finally {
     *         lock.unlock();
     *     }
     * </code></pre>
     * */
    public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
        Objects.requireNonNull(owner, "owner");
        return notifier.filter(it -> tryAcquire(owner))
                .next()
                .transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
                .doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
                .doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
                .flatMapMany(it -> work)
                .doFinally(s -> {
                    if (tryRelease(owner)) {
                        LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
                        notifierSink.next(true);
                    }
                });
    }

    private boolean tryAcquire(String owner) {
        boolean acquired;
        synchronized (this) {
            if (currentOwner == null) {
                currentOwner = owner;
            }
            acquired = currentOwner.equals(owner);
            if (acquired) {
                lockCounter.incrementAndGet();
            }
        }
        return acquired;
    }

    private boolean tryRelease(String owner) {
        boolean released = false;
        synchronized (this) {
            if (currentOwner.equals(owner)) {
                int count = lockCounter.decrementAndGet();
                if (count == 0) {
                    currentOwner = null;
                    released = true;
                }
            }
        }
        return released;
    }
}

我认为它应该是这样工作的

@Test
public void processWithLock() throws Exception {
    NonblockingLock lock = new NonblockingLock("work");
    String client1 = "client1";
    String client2 = "client2";
    Flux<String> requests = getWork(client1, lock)
            //emulate async request for resource by another client
            .mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
            //emulate async request for resource by the same client
            .mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
    StepVerifier.create(requests)
            .expectSubscription()
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client2)
            .expectNext(client2)
            .expectNext(client2)
            .expectComplete()
            .verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
    return lock.processWithLock(client, null,
            Flux.interval(Duration.ofMillis(300))
                    .take(3)
                    .map(i -> client)
                    .log(client)
    );
}

现在 Reactor 已经引入了Sinks,这样的锁更容易实现。我已经写了图书馆 https://github.com/gudzpoz/reactor-locks/,您可以这样编写代码:

import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;

Flux<String> getWork(String client, Duration delay, Lock lock) {
    return Mono.delay(delay)
              .flatMapMany(l -> lock.withLock(() ->
                  Flux.interval(Duration.ofMillis(300))
                      .take(3)
                      .map(i -> client)
                      .log(client)));
}

它内部使用一个队列Sinks.Empty跟踪锁定请求。每次解锁时,它只是从队列中轮询并发送到Mono an ON_COMPLETE信号,这可能比向所有请求者广播稍好一些Sinks.many().multicast()。它利用了以下功能:Sinks.Empty不能多次发射,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止发射ON_COMPLETE,反之亦然。

并且通过包裹Flux.using围绕锁,可以确保锁在所有情况下都正确解锁,例如try-finally.

如果您有兴趣,这里是实现的一部分。原来的答案是synchronized并且可能会在竞争条件下阻塞,并且使用 CAS 操作重写以下内容,以便锁是非阻塞的。 (在库中,现在所有的锁都是通过CAS操作实现的。)

    private volatile int count = 0;  // 0 if unlocked

    public LockHandle tryLock() {
        if (COUNT.compareAndSet(this, 0, 1)) {
            // Optimistic acquiring
            return LockHandle.empty();
        } else {
            LockHandle handle = SinkUtils.queueSink(queue);
            fairDecrement(false);
            return handle;
        }
    }
    public void unlock() {
        if (fairness) {
            fairDecrement(true);
        } else {
            COUNT.set(this, 0);
            fairDecrement(false);
        }
    }
    /*
     * If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
     * If "unlocking", we jump directly to the decrementing.
     */
    private void fairDecrement(boolean unlocking) {
        /*
         * COUNT states:
         * - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
         * - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
         *               Note that the two are mutual exclusive, since they both require COUNT++ == 0.
         *
         * If "unlocking", then we are responsible for decrements.
         *
         * Otherwise,
         * 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
         *    decrement operation. Either way, some thread will eventually emit to pending requests.
         *    We increment COUNT to signal to the emitter that the queue could have potentially been
         *    appended to after its last emission.
         * 2. If COUNT++ == 0, then we are responsible for decrementing.
         */
        if (unlocking || COUNT.incrementAndGet(this) == 1) {
            do {
                if (SinkUtils.emitAnySink(queue)) {
                    /*
                     * Leaves the decrementing job to the next lock holder, who will unlock somehow.
                     */
                    return;
                }
                /*
                 * It is now safe to decrement COUNT, since there is no concurrent decrements.
                 */
            } while (COUNT.decrementAndGet(this) != 0);
        }
    }

另外,如果您想将客户端数量限制为 N 而不是 1,该库提供了ReactiveSemaphore,对应于java.util.concurrent.Semaphore.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Reactor 的非阻塞 ReentrantLock 的相关文章

  • Spring Security 通过并发登录尝试将用户锁定

    我是安全新手 遇到了一个问题 该问题导致用户帐户被锁定 只有重新启动应用程序才能修复它 我有一个带有 spring security 4 0 2 RELEASE 应用程序的 spring boot 1 3 0 BUILD SNAPSHOT
  • 如何编写 Hibernate HQL 查询来删除所有“孙子”元素?

    我有学校 里面有团体 里面有学生 我想删除特定学校的所有学生 在 SQL 中我可以编写以下查询 DELETE FROM students1 WHERE students1 group id IN SELECT id FROM group1
  • 无法访问类型的封闭实例。 [复制]

    这个问题在这里已经有答案了 整个代码是 public class ThreadLocalTest ThreadLocal
  • 何时/为何使用/定义接口[重复]

    这个问题在这里已经有答案了 可能的重复 何时最好使用 java 中的接口 https stackoverflow com questions 2586389 when best to use an interface in java Hi
  • 如何让Spring RabbitMQ创建一个新的队列?

    根据我对rabbit mq的 有限 经验 如果您为尚不存在的队列创建新的侦听器 则会自动创建该队列 我正在尝试将 Spring AMQP 项目与rabbit mq 一起使用来设置侦听器 但出现错误 这是我的 xml 配置
  • JavaFX Platform.runLater 的使用以及从不同线程访问 UI

    我有几个问题Platform runLater 我有一个 JavaFX 应用程序类 在这个类中 我运行一个线程 该线程从网络套接字读取数据 现在当我创建一个新的Stage在线程内部 系统抛出异常 JavaFX 事件调度程序线程和我的网络读取
  • 设置 SWT Shell 的默认字体

    有没有办法为整个 Shell 设置默认字体 以便任何新控件都将使用相同的字体 看来现在我必须为我创建的每个控件设置字体 这导致了太多的冗余 默认使用的字体由平台选择 请参阅中的其他信息 类字体 SWT 标准小部件工具包 http book
  • 实现与扩展:何时使用?有什么不同?

    请用易于理解的语言进行解释或提供某些文章的链接 extends is for 延伸一类 implements is for 实施一个接口 接口和常规类之间的区别在于 在接口中您不能实现任何声明的方法 只有 实现 接口的类才能实现方法 C 中
  • @OneToMany 与 @JoinTable 错误

    我试图理解 OneToMany with JoinTable 对于这样的场景 我正在使用 JPA 2 1 Hibernate 5 0 4 和 Oracle 11 XE 当我打电话时userDao save user 下面的代码 我有 jav
  • RSA SignatureException:签名长度不正确

    我在签署 rsa 签名时遇到问题 我有一个用私钥加密的签名 然而 当我尝试使用公钥验证它时遇到问题 我得到以下异常 java security SignatureException Signature length not correct
  • 在 TestNG 中运行多个类

    我正在尝试自动化一个场景 其中我想登录一次应用程序 然后进行操作而无需再次重新登录 考虑一下 我有在特定类的 BeforeSuite 方法中登录应用程序的代码 public class TestNGClass1 public static
  • Java 唤醒休眠线程

    我阅读了其他帖子 但没有找到我正在寻找的确切答案 所以我希望有人能给出一些澄清 我有一个将运行一段时间的程序 我有一些在后台运行的线程来执行各种任务 为了简单起见 让我们考虑 3 个线程 ThreadA每 10 秒执行一次任务 其中Thre
  • 如何制作无限的jscrollpane?

    我之前已经实现过拖动滚动 但是创建无限滚动窗格的最佳方法是什么 当然不会有任何滚动条 我将实现拖动滚动 我想做的是在无限表面上实现动态加载 EDIT 当然 它实际上不会是无限的 我想问如何伪造它 您可以执行以下操作 AdjustmentCl
  • SimpleDateFormat 将 lenient 设置为 false 时出现异常

    为什么这段代码会抛出无法解析日期的异常 SimpleDateFormat f new SimpleDateFormat yyyy MM dd T HH mm ss 000Z f setLenient false String dateStr
  • 如何从 Google Custom Search API 获取超过 100 个结果

    我正在尝试使用 Google Custom Search API 在 Java 中进行研究 因此 我需要为每个查询提供一个大的结果集 然而 我似乎仅限于前 100 个结果 这比我需要的要少得多 我使用这样的列表方法 list setStar
  • 如何通过子 POJO 的属性过滤复合 ManyToMany POJO?

    我有两个像这样的房间实体 Entity public class Teacher implements Serializable PrimaryKey autoGenerate true public int id ColumnInfo n
  • Google Cloud Messaging - 立即收到或长时间延迟收到的消息

    我在大学最后一年的项目中使用谷歌云消息传递 一切正常 但我在使用 GCM 时遇到了一些麻烦 通常 消息要么几乎立即传递 要么有很大的延迟 我读过这篇文章 但我真的认为它不适用于这种情况 GCM 通常会在消息发送后立即传送消息 然而 这并不总
  • 方法签名中带或不带synchronized关键字的方法具有相同的字节码

    对于以下 2 个类 获得相同的 Java 字节码 java版本 java 版本 1 8 0 181 Java TM SE 运行时环境 构建 1 8 0 181 b13 Java HotSpot TM 64 位服务器 VM 内部版本 25 1
  • 将带有 webapp 的 WAR 部署到 Maven 中央存储库是否有意义?

    这样做有意义吗 如果是 我在哪里可以找到使用简单的 Web Hello World 执行此操作的示例 当人们从 Maven 执行 Web 应用程序时 他们会使用 Jetty 来运行它吗 我想 tomcat 太重了 任何帮助将不胜感激 谢谢
  • 编写自定义 Eclipse 调试器

    EDIT 一定有某种方法可以解决这个问题 而无需编写全新的调试器 我目前正在研究在现有 java 调试器之上构建的方法 如果有人对如何获取 Java 调试器已有的信息 有关堆栈帧 变量 原始数据等 有任何想法 那将非常有帮助 我想要做的是我

随机推荐