快速回答是肯定的,它们是线程安全的。但我们不要把它留在那里......
首先是一点家务管理,BlockingQueue
是一个接口,任何非线程安全的实现都将违反记录的契约。您包含的链接指的是LinkedBlockingQueue
,这有一些聪明之处。
The 您添加的链接 https://stackoverflow.com/questions/2695426/are-linkedblockingqueues-insert-and-remove-methods-thread-safe/22006181#22006181做了一个有趣的观察,是的,里面有两把锁LinkedBlockingQueue
。然而,它无法理解“简单”实现可能遇到的边缘情况实际上正在被处理,这就是为什么 take 和 put 方法比人们最初预期的更复杂的原因。
LinkedBlockingQueue
进行了优化以避免在读取和写入时使用相同的锁,这减少了争用,但是为了正确的行为,它依赖于队列不为空。当队列中有元素时,推入点和弹出点不在同一内存区域,可以避免争用。然而,当队列为空时,争用就无法避免,因此需要额外的代码来处理这种常见的“边缘”情况。这是代码复杂性和性能/可扩展性之间的常见权衡。
那么问题来了,如何LinkedBlockingQueue
知道队列何时为空/非空,从而处理线程?答案是它使用一个AtomicInteger
and a Condition
作为两个额外的并发数据结构。这AtomicInteger
用于检查队列的长度是否为零,条件用于等待信号以在队列可能处于所需状态时通知等待线程。这种额外的协调确实会产生开销,但在测量中表明,当增加并发线程的数量时,该技术的开销低于使用单个锁引入的争用。
下面我复制了代码LinkedBlockingQueue
并添加了解释它们如何工作的评论。在高水平上,take()
首先锁定所有其他调用take()
然后发出信号put()
有必要的。put()
以类似的方式工作,首先它会阻止所有其他调用put()
然后发出信号take()
如果需要的话。
来自put()
method:
// putLock coordinates the calls to put() only; further coordination
// between put() and take() follows below
putLock.lockInterruptibly();
try {
// block while the queue is full; count is shared between put() and take()
// and is safely visible between cores but prone to change between calls
// a while loop is used because state can change between signals, which is
// why signals get rechecked and resent.. read on to see more of that
while (count.get() == capacity) {
notFull.await();
}
// we know that the queue is not full so add
enqueue(e);
c = count.getAndIncrement();
// if the queue is not full, send a signal to wake up
// any thread that is possibly waiting for the queue to be a little
// emptier -- note that this is logically part of 'take()' but it
// has to be here because take() blocks itself
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
From take()
takeLock.lockInterruptibly();
try {
// wait for the queue to stop being empty
while (count.get() == 0) {
notEmpty.await();
}
// remove element
x = dequeue();
// decrement shared count
c = count.getAndDecrement();
// send signal that the queue is not empty
// note that this is logically part of put(), but
// for thread coordination reasons is here
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();