学习目标
我们都知道在并发编程中,阻塞队列在多线程中的场景特别有用,比如在生产和消费者模型中,生产者生产数据到队列,队列满时需要阻塞线程,停止生产。消费者消费队列,对队列为空时阻塞线程停止消费,在Java中有提供不同场景的阻塞队列,那么接下来我们将学习
- ReentrantLock的Condition原理
- BlockingQueue的定义
- 了解ArrayBlockingQueue的实现
- 如何手写一个阻塞队列
Condition原理
在学习阻塞队列之前,我们先需要弄清楚ReentrantLock
的Condition
机制。我们知道使用synchronized
结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单,扩展性更好。
- Condition能够支持不响应中断,而通过使用Object方式不支持
- Condition能够支持多个等待队列(new多个Condition对象),而Object方式只能支持一个
- Condition能够支持超时时间的设置,而Object不支持
具体看个小例子:
ReentrantLock lock = new ReentrantLock();
Condition waitCond = lock.newCondition();
Thread t1 = new Thread(() -> {
System.out.println("before-wait...1");
try {
lock.lock();
waitCond.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println("after-wait...1");
});
Thread t2 = new Thread(() -> {
System.out.println("before-wait...2");
try {
lock.lock();
waitCond.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println("after-wait...2");
});
t1.start();
t2.start();
Thread.sleep(1);
lock.lock();
waitCond.signalAll();
System.out.println("signalAll");
lock.unlock();
输出
before-wait...2
before-wait...1
signalAll
after-wait...2
after-wait...1
Process finished with exit code 0
可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用是AQS
中的内部类ConditionObject
Condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。Condition内部也是使用相似的方式,内部维护了一个单向的 等待队列,所有调用condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。
BlockingQueue
Java中定义了一个BlockingQueue
这样的接口,继承Queue[public interface Queue<E> extends Collection<E>]
,BlockingQueue提供四种不同的处理方法
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean remove(Object o);
boolean offer(E e);
E poll();
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
|
抛出异常 |
返回特殊值 |
一直阻塞 |
超时退出 |
插入方法 |
add(o) |
offer(o) |
put(o) |
offer(o, timeout, timeunit) |
移除方法 |
remove(o) |
poll() |
take(o) |
poll(o, timeout, timeunit) |
检查方法 |
element() |
peek() |
— |
— |
在JDK7提供了7个阻塞队列,分别是:
-
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
-
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
-
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
-
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
-
SynchronousQueue:一个不存储元素的阻塞队列。
-
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
-
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue
其中ArrayBlockingQueue
是一个由数组组成的有界阻塞队列,Array顾名思义内部队列定义的是一个数组存储数据,既然是数组必然需要定义长度。
具体类型结构:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
- items:一个Object的数组
- tackIndex:出队列的下标
- putIndex:入队列的下标
- count:队列中元素的数量
- lock:ReentrantLock类型的锁
- notEmpty、notFull:Condition的等待
生产者和消费者共用lock锁,数组满时阻塞插入,即生产。数组空时阻塞获取,即消费。结合ReentranLock和Condition感觉好像懂了。如果是你,可以试试怎么实现。
实现ArrayBlockingQueue
先按上定义几个变量
final static int MAX = 10;
LinkedList<Integer> queue = new LinkedList<>();
ReentrantLock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition emtpy = lock.newCondition();
再看看生产者怎么构建,首先对于进来的每个线程上锁,判定当前数组长度大等于最大值,那就使当前线程进入等待队列,否则添加数据进数组。当然,对于长度为1的时候需要唤醒所有的消费线程。故可以实现如下:
// Producer
public void create() throws InterruptedException {
lock.lock();
if (queue.size() == MAX) {
full.await();
return;
}
int data = readData(); // 1s
if(queue.size() == 1) {
emtpy.signalAll();
}
queue.add(data);
lock.unlock();
}
消费者亦同理 :
// Comsumer
public void calculate() throws InterruptedException {
lock.lock();
if (queue.size() == 0) {
emtpy.await();
return;
}
int data = queue.remove();
System.out.println("queue-size:" + queue.size());
if(queue.size() == MAX - 1) {
full.signalAll();
}
lock.unlock();
}
那么我们手写的生产消费模型全部代码如下:
public class ProducerCustomerModel {
final static int MAX = 10;
LinkedList<Integer> queue = new LinkedList<>();
ReentrantLock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition emtpy = lock.newCondition();
int readData() throws InterruptedException {
Thread.sleep((long)Math.random()*1000);
return (int)Math.floor(Math.random()*1000);
}
// Producer
public void readDb() throws InterruptedException {
lock.lock();
if (queue.size() == MAX) {
full.await();
return;
}
int data = readData(); // 1s
if(queue.size() == 1) {
emtpy.signalAll();
}
queue.add(data);
lock.unlock();
}
// Comsumer
public void calculate() throws InterruptedException {
lock.lock();
if (queue.size() == 0) {
emtpy.await();
return;
}
int data = queue.remove();
System.out.println("get date"+data+" and queue-size:" + queue.size());
if(queue.size() == MAX - 1) {
full.signalAll();
}
lock.unlock();
}
public static void main(String[] argv) {
ProducerCustomerModel p = new ProducerCustomerModel();
for(int i = 0; i < 100; i++) {
new Thread(() -> {
while (true) {
try {
p.readDb();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(() -> {
while(true) {
try {
p.calculate();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
发现输出有如下数据:
get date758 and queue-size:2
get date949 and queue-size:1
get date15 and queue-size:0
get date23 and queue-size:9
get date143 and queue-size:8
get date690 and queue-size:7
get date112 and queue-size:6
get date253 and queue-size:5
get date925 and queue-size:4
get date593 and queue-size:3
get date779 and queue-size:2
get date495 and queue-size:1
get date989 and queue-size:0
Process finished with exit code -1
其实回过头再在来看ArrayBlockingQueue其内部发现实现好像差不多就是这样的
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)