2.2 C++ queue模拟阻塞队列的生产消费模型
代码:
为了便于理解,我们以单生产者,单消费者,来进行讲解。
![](https://img-blog.csdnimg.cn/4ed1b1615e7e486a86a3eb0a199eb0f5.png)
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#define NUM 8
class BlockQueue
{
private:
std::queue<int> q;
int cap;
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t empty;
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&full, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&empty, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&full);
}
void NotifyConsume()
{
pthread_cond_signal(&empty);
}
bool IsEmpty()
{
return (q.size() == 0 ? true : false);
}
bool IsFull()
{
return (q.size() == cap ? true : false);
}
public:
BlockQueue(int _cap = NUM) : cap(_cap)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void PushData(const int &data)
{
LockQueue();
while (IsFull())
{
NotifyConsume();
std::cout << "queue full, notify consume data, product stop." << std::endl;
ProductWait();
}
q.push(data);
// NotifyConsume();
UnLockQueue();
}
void PopData(int &data)
{
LockQueue();
while (IsEmpty())
{
NotifyProduct();
std::cout << "queue empty, notify product data, consume stop." << std::endl;
ConsumeWait();
}
data = q.front();
q.pop();
// NotifyProduct();
UnLockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
};
void *consumer(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
int data;
for (;;)
{
bqp->PopData(data);
std::cout << "Consume data done : " << data << std::endl;
}
}
// more faster
void *producter(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
bqp->PushData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
BlockQueue bq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&bq);
pthread_create(&p, NULL, producter, (void *)&bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
模拟实现:
![](https://img-blog.csdnimg.cn/078c657686fe4981b27bbced1224f584.png)
BlockQueue.hpp:
![](https://img-blog.csdnimg.cn/cf095a2320424a9ebbd13d5481440c8e.png)
#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 实现新需求: 我只想保存最新的5个任务,如果来了任务,老的任务,我想让他直接被丢弃(自行选择实现)
const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
public:
//生产接口
void push(const T &in) // const &: 纯输入
{
// 加锁
// 判断->是否适合生产->bq是否为满->程序员视角的条件->1. 满(不生产) 2. 不满(生产)
// if(满) 不生产,休眠
// else if(不满) 生产,唤醒消费者
// 解锁
lockQueue();
while (isFull()) // ifFull就是我们在临界区中设定的条件
{
// before: 当我等待的时候,会自动释放mutex_
proBlockWait(); //阻塞等待,等待被唤醒。 被唤醒 != 条件被满足(概率虽然很小),被唤醒 && 条件被满足
//解决伪唤醒(使用while)
// after: 当我醒来的时候,我是在临界区里醒来的!!
}
// 条件满足,可以生产
pushCore(in); //生产完成
// wakeupCon(); // 唤醒消费者
unlockQueue();
wakeupCon(); // 唤醒消费者
}
//消费接口
T pop()
{
// 加锁
// 判断->是否适合消费->bq是否为空->程序员视角的条件->1. 空(不消费) 2. 有(消费)
// if(空) 不消费,休眠
// else if(有) 消费,唤醒生产者
// 解锁
lockQueue();
while (isEmpty())
{
conBlockwait(); //阻塞等待,等待被唤醒,?
}
// 条件满足,可以消费
T tmp = popCore();
unlockQueue();
wakeupPro(); // 唤醒生产者
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
void proBlockWait() // 生产者一定是在临界区中的!
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&proCond_, &mutex_);
}
void conBlockwait() //阻塞等待,等待被唤醒
{
// 1. 在阻塞线程的时候,会自动释放mutex_锁
pthread_cond_wait(&conCond_, &mutex_);
// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回
// 为什么我们上节课,写的代码,批量退出线程的时候,发现无法退出?
//唤醒时, 调用pthread_cond_wait(&conCond_, &mutex_);,重新去竞争这个锁,多个线程去竞争一个,且那个拥有锁的线程退出没有释放锁,最后导致其他线程阻塞。
}
void wakeupPro() // 唤醒生产者
{
pthread_cond_signal(&proCond_);
}
void wakeupCon() // 唤醒消费者
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T &in)
{
bq_.push(in); //生产完成
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
uint32_t cap_; //容量
queue<T> bq_; // blockqueue
pthread_mutex_t mutex_; //保护阻塞队列的互斥锁
pthread_cond_t conCond_; // 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
BlockQueueTest.cc: ![](https://img-blog.csdnimg.cn/a3db9077cb294a9cb0bf75af05cd5ffd.png)
#include "Task.hpp"
#include "BlockQueue.hpp"
#include <ctime>
const std::string ops = "+-*/%";
// 并发,并不是在临界区中并发(一般),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发
void *consumer(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bqp->pop(); // 消费任务
int result = t(); //处理任务 --- 任务也是要花时间的!
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;
}
}
void *productor(void *args)
{
BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2. 生产任务
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;
sleep(1);
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
// 定义一个阻塞队列
// 创建两个线程,productor, consumer
// productor ----- consumer
// BlockQueue<int> bq;
// bq.push(10);
// int a = bq.pop();
// cout << a << endl;
// 既然可以使用int类型的数据,我们也可以使用自己封装的类型,包括任务
// BlockQueue<int> bq;
BlockQueue<Task> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
Task.hpp:
![](https://img-blog.csdnimg.cn/a4d0ac246cf24fc394bd1fcced8c5234.png)
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task()
:elemOne_(0)
,elemTwo_(0)
,operator_('0')
{}
Task(int one, int two, char op)
:elemOne_(one)
,elemTwo_(two)
,operator_(op)
{}
int operator() ()
{
return run();
}
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
{
if (elemTwo_ == 0)
{
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ / elemTwo_;
}
}
break;
case '%':
{
if (elemTwo_ == 0)
{
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else
{
result = elemOne_ % elemTwo_;
}
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
makefile:
![](https://img-blog.csdnimg.cn/343a7e90d5d34076b82016fc68552125.png)
CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=blockQueue
src=BlockQueueTest.cc
$(bin):$(src)
$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:
rm -f $(bin)
![](https://img-blog.csdnimg.cn/f39825e3ec7e49719858366dc5da0510.png)
3.POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
上面的生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序 (POSIX信号量)
信号量是一个计数器,描述临界资源数量的计数器。二元信号量==互斥锁