Linux下生产者消费者模型
- 一、什么是生产者消费者模型
- 二、代码实现
- 三、运行结果与修改
一、什么是生产者消费者模型
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦,提高效率的。
本文中使用条件变量和互斥锁保证该模型更加合理、安全。
队列我们直接使用STL中的queue来实现,但是STL并不保证临界资源(队列)的安全,所以本文中使用锁来保证线程对临界资源的访问。
条件变量使用场景:
当队列满了的时候,生产者就不应该再生产了(不要竞争锁了),应该让消费者来消费;
当队列空的时候,消费者就不应该再消费了(不要竞争锁了),应该让生产者来生产。
生产者、消费者、队列关系:
生产者与消费者我们使用两个线程模拟(单生产者单消费者),那么生产什么?消费什么呢?在此我们设置一个Task类,只需要把想要实现的功能放入Task类当中即可。大家可以自由发挥,本文生产者生产一个int类型0~2999的随机值,消费者判断是否为闰年。既然是一个Task任务,所以我们在向队列中Push和Pop的时候也是以Task为整体,所以我们的queue需要使用模板。
二、代码实现
Makefile
cpmain:CpMain.cpp
g++ $^ -o $@ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cpmain
注意:不管是否使用Makefile,g++后面一定要添加-lpthread导入线程库,因为线程相关操作在第三方库中
BlockQueue.hpp主要实现队列的同步与互斥
#pragma once
#include <iostream>
#include <queue>
const int g_cap = 6;
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = g_cap)
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
void Push(const T &in)
{
LockQueue();
while (IsFull())
{
ProducterWait();
}
_bq.push(in);
WakeupConsumer();
UnLockQueue();
}
void Pop(T *out)
{
LockQueue();
while (IsEmpty())
{
ConsumerWait();
}
*out = _bq.front();
_bq.pop();
WakeupProducter();
UnLockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
bool IsFull()
{
return _bq.size() == _cap;
}
bool IsEmpty()
{
return _bq.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ProducterWait()
{
pthread_cond_wait(&_empty, &_mutex);
}
void ConsumerWait()
{
pthread_cond_wait(&_full, &_mutex);
}
void WakeupProducter()
{
pthread_cond_signal(&_empty);
}
void WakeupConsumer()
{
pthread_cond_signal(&_full);
}
private:
std::queue<T> _bq;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _full;
pthread_cond_t _empty;
};
pthread_cond_wait 调用的时候,会首先自动释放_mutex,然后再挂起自己!
pthread_cond_signal 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
Task.hpp主要实现业务逻辑
#pragma once
#include <iostream>
const int g_year = 2000;
class Task
{
public:
Task()
{
}
Task(int year)
: _year(year)
{
}
int GetYear()
{
return _year;
}
static bool IsLeapYear(int year)
{
if ((0 == year % 4 && 0 != year % 100) ||
(0 == year % 400))
{
return true;
}
return false;
}
bool handler() const
{
return IsLeapYear(_year);
}
~Task()
{
}
private:
int _year;
};
CpMain.cpp生产者生产一个任务后等待1秒,消费者会立即消费,但是此时队列为空,他只能等待生产者生产数据,不会一直竞争锁。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <time.h>
#include <unistd.h>
void *Consumer(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
while (true)
{
Task t;
bq->Pop(&t);
bool isLeapYear = t.handler();
int year = t.GetYear();
if (isLeapYear)
{
std::cout << "消费者:" << year << " 年是闰年!" << std::endl;
}
else
{
std::cout << "消费者:" << year << " 年不是闰年!" << std::endl;
}
}
}
void *Producter(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
while (true)
{
int year = rand() % 3000;
Task t(year);
std::cout << "生产者:" << year << " 年是闰年吗?" << std::endl;
bq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, (void *)bq);
pthread_create(&p, nullptr, Producter, (void *)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
三、运行结果与修改
关于代码的可修改:
1、CpMain.cpp中生产者函数中每生产一个任务sleep1秒。当然也可以是消费者sleep1秒,场景就是生产者一次生产队列最大容量(BlockQueue中设置为6)也可能看到一次打印1~7条数据的情况,并不是代码写错了,而是生产一个任务很快,而处理任务或者打印一条数据是需要时间的,也就造成了一点误差。
2、文中使用单生产者单消费者,如果有必要也可以多添加几个线程实现多生产多消费。
3、关于Task.hpp也可以重载()使用仿函数:
Task.hpp中只需要把原来的bool handler()const
函数换成下面的函数即可:
bool operator()()
{
return IsLeapYear(_year);
}
在CpMain.cpp中也不需要调用handler
了,修改如下:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)