什么是生产者消费者问题 ?
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。
注:该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。
同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
问题分析
需要注意以下几点:
- 在缓冲区为空时,消费者不能再进行消费;
- 在缓冲区为满时,生产者不能再进行生产;
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步;
- 注意生产和消费的互斥条件。
方式一:synchronized、wait和notify
定义 Data 资源类,类中定义资源仓库的大小,当前资源个数。资源类的incrment()和decrement()方法是synchronized 的。生产者/消费者线程共享一个资源Data。
public class NotifyAndWaitTest1 {
public static void main(String[] args) throws Exception {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
static class Data{
private int data = 0;
private final int MAX_SIZE = 3;
public synchronized void increment() throws InterruptedException {
while (data + 1 > MAX_SIZE){
this.wait();
}
data++;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (data - 1 < 0){
this.wait();
}
data--;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
this.notifyAll();
}
}
}
方式二:lock和condition的await、signalAll
public class NotifyAndWaitTest2 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
static class Data{
private final ReentrantLock lock = new ReentrantLock(true);
private Condition condition = lock.newCondition();
private int count = 0;
private final int MAX_COUNT = 3;
public void increment(){
lock.lock();
try {
while (count + 1 > MAX_COUNT){
condition.await();
}
count++;
System.out.println("[" + Thread.currentThread().getName() +"] +1 " + count);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement(){
lock.lock();
try {
while (count - 1 < 0){
condition.await();
}
count--;
System.out.println("[" + Thread.currentThread().getName() +"] -1 " + count);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
方式三:BlockingQueue
定义Data资源类,资源类持有一个BlockingQueue。生产者/消费者线程共享一个资源类Data的成员变量,调用Queue的put()和take() 实现生产和消费。
BlockingQueue#put
: 添加一个元素,如果队列满了,则阻塞BlockingQueue#take
: 移除并返回队列头部的元素,如果队列为空则阻塞
public class BlockingQueueConsumerProducer {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"D").start();
}
static class Data{
private final BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(3);
public void increment(){
try {
resourceQueue.put(1);
System.out.println(Thread.currentThread().getName() + " "
+ resourceQueue.size() +
" +");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void decrement(){
try {
resourceQueue.take();
System.out.println(Thread.currentThread().getName() + " " +
+ resourceQueue.size() + " -");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)