使用 Reactor 的非阻塞 ReentrantLock



try {
     do work
} finally {

但以非阻塞方式使用 Reactor 库。 我有这样的东西。


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class NonblockingLock {
    private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);

    private String currentOwner;
    private final AtomicInteger lockCounter = new AtomicInteger();
    private final FluxSink<Boolean> notifierSink;
    private final Flux<Boolean> notifier;
    private final String resourceId;

    public NonblockingLock(String resourceId) {
        this.resourceId = resourceId;
        EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
        notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
        notifier = processor.startWith(true);

     * Nonblocking version of
     * <pre><code>
     *     lock.lock();
     *     try {
     *         do work
     *     } finally {
     *         lock.unlock();
     *     }
     * </code></pre>
     * */
    public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
        Objects.requireNonNull(owner, "owner");
        return notifier.filter(it -> tryAcquire(owner))
                .transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
                .doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
                .doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
                .flatMapMany(it -> work)
                .doFinally(s -> {
                    if (tryRelease(owner)) {
                        LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);

    private boolean tryAcquire(String owner) {
        boolean acquired;
        synchronized (this) {
            if (currentOwner == null) {
                currentOwner = owner;
            acquired = currentOwner.equals(owner);
            if (acquired) {
        return acquired;

    private boolean tryRelease(String owner) {
        boolean released = false;
        synchronized (this) {
            if (currentOwner.equals(owner)) {
                int count = lockCounter.decrementAndGet();
                if (count == 0) {
                    currentOwner = null;
                    released = true;
        return released;


public void processWithLock() throws Exception {
    NonblockingLock lock = new NonblockingLock("work");
    String client1 = "client1";
    String client2 = "client2";
    Flux<String> requests = getWork(client1, lock)
            //emulate async request for resource by another client
            .mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
            //emulate async request for resource by the same client
            .mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
private static Flux<String> getWork(String client, NonblockingLock lock) {
    return lock.processWithLock(client, null,
                    .map(i -> client)

现在 Reactor 已经引入了Sinks,这样的锁更容易实现。我已经写了图书馆 https://github.com/gudzpoz/reactor-locks/,您可以这样编写代码:

import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;

Flux<String> getWork(String client, Duration delay, Lock lock) {
    return Mono.delay(delay)
              .flatMapMany(l -> lock.withLock(() ->
                      .map(i -> client)

它内部使用一个队列Sinks.Empty跟踪锁定请求。每次解锁时,它只是从队列中轮询并发送到Mono an ON_COMPLETE信号,这可能比向所有请求者广播稍好一些Sinks.many().multicast()。它利用了以下功能:Sinks.Empty不能多次发射,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止发射ON_COMPLETE,反之亦然。


如果您有兴趣,这里是实现的一部分。原来的答案是synchronized并且可能会在竞争条件下阻塞,并且使用 CAS 操作重写以下内容,以便锁是非阻塞的。 (在库中,现在所有的锁都是通过CAS操作实现的。)

    private volatile int count = 0;  // 0 if unlocked

    public LockHandle tryLock() {
        if (COUNT.compareAndSet(this, 0, 1)) {
            // Optimistic acquiring
            return LockHandle.empty();
        } else {
            LockHandle handle = SinkUtils.queueSink(queue);
            return handle;
    public void unlock() {
        if (fairness) {
        } else {
            COUNT.set(this, 0);
     * If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
     * If "unlocking", we jump directly to the decrementing.
    private void fairDecrement(boolean unlocking) {
         * COUNT states:
         * - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
         * - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
         *               Note that the two are mutual exclusive, since they both require COUNT++ == 0.
         * If "unlocking", then we are responsible for decrements.
         * Otherwise,
         * 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
         *    decrement operation. Either way, some thread will eventually emit to pending requests.
         *    We increment COUNT to signal to the emitter that the queue could have potentially been
         *    appended to after its last emission.
         * 2. If COUNT++ == 0, then we are responsible for decrementing.
        if (unlocking || COUNT.incrementAndGet(this) == 1) {
            do {
                if (SinkUtils.emitAnySink(queue)) {
                     * Leaves the decrementing job to the next lock holder, who will unlock somehow.
                 * It is now safe to decrement COUNT, since there is no concurrent decrements.
            } while (COUNT.decrementAndGet(this) != 0);

另外,如果您想将客户端数量限制为 N 而不是 1,该库提供了ReactiveSemaphore,对应于java.util.concurrent.Semaphore.


