现在 Reactor 已经引入了Sinks
,这样的锁更容易实现。我已经写了图书馆 https://github.com/gudzpoz/reactor-locks/,您可以这样编写代码:
import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;
Flux<String> getWork(String client, Duration delay, Lock lock) {
return Mono.delay(delay)
.flatMapMany(l -> lock.withLock(() ->
Flux.interval(Duration.ofMillis(300))
.take(3)
.map(i -> client)
.log(client)));
}
它内部使用一个队列Sinks.Empty
跟踪锁定请求。每次解锁时,它只是从队列中轮询并发送到Mono
an ON_COMPLETE
信号,这可能比向所有请求者广播稍好一些Sinks.many().multicast()
。它利用了以下功能:Sinks.Empty
不能多次发射,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止发射ON_COMPLETE
,反之亦然。
并且通过包裹Flux.using
围绕锁,可以确保锁在所有情况下都正确解锁,例如try-finally
.
如果您有兴趣,这里是实现的一部分。原来的答案是synchronized
并且可能会在竞争条件下阻塞,并且使用 CAS 操作重写以下内容,以便锁是非阻塞的。 (在库中,现在所有的锁都是通过CAS操作实现的。)
private volatile int count = 0; // 0 if unlocked
public LockHandle tryLock() {
if (COUNT.compareAndSet(this, 0, 1)) {
// Optimistic acquiring
return LockHandle.empty();
} else {
LockHandle handle = SinkUtils.queueSink(queue);
fairDecrement(false);
return handle;
}
}
public void unlock() {
if (fairness) {
fairDecrement(true);
} else {
COUNT.set(this, 0);
fairDecrement(false);
}
}
/*
* If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
* If "unlocking", we jump directly to the decrementing.
*/
private void fairDecrement(boolean unlocking) {
/*
* COUNT states:
* - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
* - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
* Note that the two are mutual exclusive, since they both require COUNT++ == 0.
*
* If "unlocking", then we are responsible for decrements.
*
* Otherwise,
* 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
* decrement operation. Either way, some thread will eventually emit to pending requests.
* We increment COUNT to signal to the emitter that the queue could have potentially been
* appended to after its last emission.
* 2. If COUNT++ == 0, then we are responsible for decrementing.
*/
if (unlocking || COUNT.incrementAndGet(this) == 1) {
do {
if (SinkUtils.emitAnySink(queue)) {
/*
* Leaves the decrementing job to the next lock holder, who will unlock somehow.
*/
return;
}
/*
* It is now safe to decrement COUNT, since there is no concurrent decrements.
*/
} while (COUNT.decrementAndGet(this) != 0);
}
}
另外,如果您想将客户端数量限制为 N 而不是 1,该库提供了ReactiveSemaphore
,对应于java.util.concurrent.Semaphore
.