首先感谢下这个博主 我很多都是在他这边借鉴的,下面是他的文章的原链接
https://blog.csdn.net/qq_38318941/article/details/102558351
下面是我对他的线程方面的改动 以及我的一些理解,我才用的是官方推荐的moveToThread的方法来创建多线程的。废话不多说,开始讲解关于生产者消费者模型的搭建。
QT中的生产者消费者模型涉及到很多知识,其中生产者和消费者涉及到多线程问题,还有由于生产者消费者要操作同一块内存的数据所以,这又涉及到队列(QQueue)和信号量(QSemaphore)的综合应用.
这里对需要进行同一块操作的内存进行包装成一个类blockMsgQueue
blockMsgQueue.h文件
#ifndef BLOCKMSGQUEUE_H
#define BLOCKMSGQUEUE_H
#include <QSemaphore>
#include <QMutex>
#include <QQueue>
class blockMsgQueue
{
public:
blockMsgQueue(const int &maxMsgsNum, const unsigned int &msgSize);
~blockMsgQueue();
void addMsg(const char *msgPack);
void getMsg(char *msgPack);
private:
int m_maxMsgsNum;
unsigned int m_msgSize;
QSemaphore *m_pFreeMsgs;
QSemaphore *m_pUsedMsgs;
QQueue<char *> m_queue;
QMutex m_mutex;
};
#endif // BLOCKMSGQUEUE_H
blockMsgQueue.c文件
#include "blockMsgQueue.h"
blockmsgqueue::blockMsgQueue(const int &m_maxMsgsNum, const unsigned int &msgSize)
:m_maxMsgsNum(m_maxMsgsNum),
m_msgSize(msgSize)
{
m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);
m_pUsedMsgs = new QSemaphore(0);
}
blockmsgqueue::~blockmsgqueue()
{
}
void blockmsgqueue::addMsg(const char *msgPack)
{
m_pFreeMsgs->acquire(1);
m_mutex.lock();
char *msgIn = new char[m_msgSize];
memcpy(msgIn, msgPack, m_msgSize);
m_queue.enqueue(msgIn);
m_mutex.unlock();
m_pUsedMsgs->release(1);
}
void blockmsgqueue::getMsg(char *msgPack)
{
m_pUsedMsgs->acquire(1);
m_mutex.lock();
char *msgOut = m_queue.dequeue();
memcpy(msgPack, msgOut, m_msgSize);
m_mutex.unlock();
m_pFreeMsgs->release(1);
}
对这个类的解释:
m_maxMsgsNum:最大的队列数量就是可以操作内存块的块数。
m_msgSize:每小块内存的尺寸。
信号量:
QSemaphore(n):建立对象时可以给n个资源,就是给多少块内存。
void acquire(n):这个函数,操作一次就减少n个资源,如果现在资源不到n个就会阻塞(在这个项目中如果没有一个资源就阻塞等待能有可用的资源)。
void release(n):这个函数,操作一次就增加n个资源。
在操作同一块内存块组,需要两个信号量来配合使用,一个是空闲内存块的信号量m_pFreeMsgs,用来标识是否有可用的内存块,这个是给生产者使用的。另一个是已经使用了多少内存块的信号量m_pUsedMsgs,用来标识是否有转载有用户数据的内存块,这个是给消费者使用的。
m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);
m_pUsedMsgs = new QSemaphore(0);
这里是预先设定好信号量,预示着有10个空的内存块,0个装载用户数据的内存块
队列:
队列是一种数据结构,一种先入先出的数据结构
enqueue:入队
dequeue:出队
这个函数要将数据装载在共享内存块组内
void blockmsgqueue::addMsg(const char *msgPack)
{
m_pFreeMsgs->acquire(1);//用掉一个内存块,在下面要装载数据了
m_mutex.lock();//因为要操作同一块内存,就要锁住防止数据出问题
char *msgIn = new char[m_msgSize];
memcpy(msgIn, msgPack, m_msgSize);//将数据放入新申请下来的内存
//这里可以看做将数据打包好
m_queue.enqueue(msgIn);//将数据包放入队列中
m_mutex.unlock();//解锁队列操作
m_pUsedMsgs->release(1);//说明装载用户数据的内存块又多了一块
}
这个函数是将装载共享内存数据拿出来
void blockmsgqueue::getMsg(char *msgPack)
{
m_pUsedMsgs->acquire(1);//说明装载用户数据的内存块又少了一块
m_mutex.lock();//因为要操作同一块内存,就要锁住防止数据出问题
char *msgOut = m_queue.dequeue();//将数据从队列中取出
memcpy(msgPack, msgOut, m_msgSize);//并拷贝在可以拿出来的地址
m_mutex.unlock();//解锁队列操作
m_pFreeMsgs->release(1);//说明又有一块空闲内存块可以用了
}
下面就是对这个类的使用了,因为QT官方推荐使用moveToThread()方法,所以这里就用这种方法来写
共享资源就用同一个头文件来定义
CommonRes.h文件
#ifndef COMMONRES_H
#define COMMONRES_H
typedef struct
{
char *buf;
unsigned int bufLen;
}MSG_PACK;
const int msgsNum = 170;
#endif // COMMONRES_H
生产者 producerthread.h
#ifndef PRODUCERTHREAD_H
#define PRODUCERTHREAD_H
#include <QObject>
#include "blockmsgqueue.h"
#include <QElapsedTimer>
#include "CommonRes.h"
class producerThread : public QObject
{
Q_OBJECT
public:
explicit producerThread(QObject *parent = nullptr);
~producerThread();
bool setMsgQueue(blockmsgqueue *pMsg);
signals:
public slots:
void work();
private:
void producerSleep(int semc);
blockmsgqueue *m_ProducerMsgQueue;
};
#endif // PRODUCERTHREAD_H
producerthread.c
#include "producerthread.h"
#include <QRandomGenerator>
#include <QDebug>
producerThread::producerThread(QObject *parent)
: QObject{parent}
{
m_ProducerMsgQueue = nullptr;
}
producerThread::~producerThread()
{
}
bool producerThread::setMsgQueue(blockmsgqueue *pMsg)
{
m_ProducerMsgQueue = pMsg;
if(m_ProducerMsgQueue == nullptr)
{
qDebug()<<"Producer Thread:Message queue initialize failed";
return false;
}
return true;
}
void producerThread::work()
{
for(int i=0; i<msgsNum; i++)
{
//char Index = msgsNum%2;
MSG_PACK pack;
pack.bufLen = 8;
pack.buf = new char[pack.bufLen];
char str[8] = {0};
for (int j = 0; j < pack.bufLen - 1; j++)
{
str[j] = "ABCDE12345"[QRandomGenerator::global()->bounded(10)];
}
memcpy(pack.buf, str, pack.bufLen);
//if(m_ProducerMsgQueue == nullptr) break;
m_ProducerMsgQueue->addMsg((char*)&pack);
qDebug()<<QString("producer pack %1: %2").arg(i+1).arg(pack.buf);
producerSleep(30);
}
}
void producerThread::producerSleep(int msec)
{
QElapsedTimer t;
t.start();
while(t.elapsed()<msec);
}
这里需要注意的点事是 要使用 bool setMsgQueue(blockmsgqueue *pMsg);这个函数将在外面申请的共享内存传入到生产者中进行操作。
消费者 consumerthread.h
#ifndef CONSUMERTHREAD_H
#define CONSUMERTHREAD_H
#include <QObject>
#include "blockmsgqueue.h"
#include <QElapsedTimer>
#include "CommonRes.h"
class consumerThread : public QObject
{
Q_OBJECT
public:
explicit consumerThread(QObject *parent = nullptr);
~consumerThread();
bool setMsgQueue(blockmsgqueue *pMsg);
signals:
public slots:
void work();
private:
void consumerSleep(int msec);
blockmsgqueue *m_ConsumerMsgQueue;
};
#endif // CONSUMERTHREAD_H
consumerthread.c
#include "consumerthread.h"
#include <QThread>
#include <QDebug>
#include <QString>
consumerThread::consumerThread(QObject *parent)
: QObject{parent}
{
m_ConsumerMsgQueue = nullptr;
}
consumerThread::~consumerThread()
{
m_ConsumerMsgQueue = nullptr;
}
bool consumerThread::setMsgQueue(blockmsgqueue *pMsg)
{
m_ConsumerMsgQueue = pMsg;
if(m_ConsumerMsgQueue == nullptr)
{
qDebug()<<"Consumer Thread:Message queue initialize failed!";
return false;
}
else
return true;
}
void consumerThread::work()
{
QString comsumer_i = QString::number(quintptr(QThread::currentThreadId()));
//consumer_i++;
//int c_i = consumer_i;
for(int i=0; i<msgsNum; i++)
{
MSG_PACK Cpack;
if(m_ConsumerMsgQueue == nullptr)
{
qDebug()<<"m_ConsumerMsgQueue empty";
break;
}
else
{
m_ConsumerMsgQueue->getMsg((char*)&Cpack);
}
//takei++;
qDebug()<<QString("comsumer %1 take pack %2").arg(comsumer_i).arg(Cpack.buf);
//qDebug() << QString("comsumer %1 take pack %2: %3").arg(c_i).arg(takei).arg(Cpack.buf);
delete[] Cpack.buf;
Cpack.buf = nullptr;
consumerSleep(100);
}
}
void consumerThread::consumerSleep(int msec)
{
QElapsedTimer t;
t.start();
while(t.elapsed()<msec);
}
消费者也是一样的道理,要使用 bool setMsgQueue(blockmsgqueue *pMsg);这个函数将在外面申请的共享内存传入到生产者中进行操作。
MainWindow.h
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
#include "blockmsgqueue.h"
#include "producerthread.h"
#include "consumerthread.h"
#include <QThread>
#include <QDebug>
#include "blockmsgqueue.h"
QT_BEGIN_NAMESPACE
namespace Ui { class MainWindow; }
QT_END_NAMESPACE
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
MainWindow(QWidget *parent = nullptr);
~MainWindow();
blockmsgqueue *msgQueue = new blockmsgqueue(10, sizeof(MSG_PACK));
private:
Ui::MainWindow *ui;
producerThread *producer;
consumerThread *consumer1;
consumerThread *consumer2;
consumerThread *consumer3;
QThread *producerTh;
QThread *consumerTh1;
QThread *consumerTh2;
QThread *consumerTh3;
signals:
void startProducerWork();
void startconsumer1Work();
void startconsumer2Work();
void startconsumer3Work();
private slots:
void on_pushButton_clicked();
void on_pushButton_2_clicked();
};
#endif // MAINWINDOW_H
MainWindow.c
#include "mainwindow.h"
#include "ui_mainwindow.h"
MainWindow::MainWindow(QWidget *parent)
: QMainWindow(parent)
, ui(new Ui::MainWindow)
{
ui->setupUi(this);
producer = new producerThread();
producer->setMsgQueue(msgQueue);
consumer1 = new consumerThread();
consumer1->setMsgQueue(msgQueue);
consumer2 = new consumerThread();
consumer2->setMsgQueue(msgQueue);
consumer3 = new consumerThread();
consumer3->setMsgQueue(msgQueue);
producerTh = new QThread();
consumerTh1 = new QThread();
consumerTh2 = new QThread();
consumerTh3 = new QThread();
connect(this, &MainWindow::startProducerWork, producer, &producerThread::work);
connect(this, &MainWindow::startconsumer1Work, consumer1, &consumerThread::work);
connect(this, &MainWindow::startconsumer2Work, consumer2, &consumerThread::work);
connect(this, &MainWindow::startconsumer3Work, consumer3, &consumerThread::work);
connect(producerTh, &QThread::finished, producer, &QObject::deleteLater);
connect(consumerTh1, &QThread::finished, consumer1, &QObject::deleteLater);
connect(consumerTh2, &QThread::finished, consumer2, &QObject::deleteLater);
connect(consumerTh3, &QThread::finished, consumer3, &QObject::deleteLater);
producer->moveToThread(producerTh);
consumer1->moveToThread(consumerTh1);
consumer2->moveToThread(consumerTh2);
consumer3->moveToThread(consumerTh3);
}
MainWindow::~MainWindow()
{
if(producerTh->isRunning())
{
producerTh->quit();
producerTh->wait();
}
if(consumerTh1->isRunning())
{
consumerTh1->quit();
consumerTh1->wait();
}
if(consumerTh2->isRunning())
{
consumerTh2->quit();
consumerTh2->wait();
}
if(consumerTh3->isRunning())
{
consumerTh3->quit();
consumerTh3->wait();
}
delete ui;
}
void MainWindow::on_pushButton_clicked()
{
producerTh->start();
consumerTh1->start();
consumerTh2->start();
consumerTh3->start();
qDebug()<<"start";
emit startProducerWork();
emit startconsumer1Work();
emit startconsumer2Work();
emit startconsumer3Work();
}
void MainWindow::on_pushButton_2_clicked()
{
if(producerTh->isRunning())
{
producerTh->quit();
producerTh->wait();
}
if(consumerTh1->isRunning())
{
consumerTh1->quit();
consumerTh1->wait();
}
if(consumerTh2->isRunning())
{
consumerTh2->quit();
consumerTh2->wait();
}
if(consumerTh3->isRunning())
{
consumerTh3->quit();
consumerTh3->wait();
}
}
接下来就是对这整个进行讲解,该项目有个问题 就是在消费者使用后,旧的队列没有删除,而新进队的队列都是新new的,集体代码在这块
void blockmsgqueue::addMsg(const char *msgPack)
{
m_pFreeMsgs->acquire(1);
m_mutex.lock();
char *msgIn = new char[m_msgSize];
memcpy(msgIn, msgPack, m_msgSize);
m_queue.enqueue(msgIn);//这里就是不断地new,然后进队
m_mutex.unlock();
m_pUsedMsgs->release(1);
}
void blockmsgqueue::getMsg(char *msgPack)
{
m_pUsedMsgs->acquire(1);
m_mutex.lock();
char *msgOut = m_queue.dequeue();//而这里出队后并没有对 //m_queue.dequeue()进行处 //理,这句只是将 //m_queue.dequeue()赋值给 //msgOut,并没有删除操作
memcpy(msgPack, msgOut, m_msgSize);
m_mutex.unlock();
m_pFreeMsgs->release(1);
}
就像这样
为了解决这个问题可以采用新的blockMsgQueue类像这样
blockMsgQueue.h
#ifndef BLOCKMSGQUEUE_H
#define BLOCKMSGQUEUE_H
#include <QSemaphore>
#include <QMutex>
#include <QQueue>
class blockmsgqueue
{
public:
blockmsgqueue(const int &maxMsgsNum, const unsigned int &msgSize);
~blockmsgqueue();
void addMsg(const char *msgPack);
void getMsg(char *msgPack);
private:
int m_maxMsgsNum;
unsigned int m_msgSize;
QSemaphore *m_pFreeMsgs;
QSemaphore *m_pUsedMsgs;
QQueue<char *> m_freeQueue;
QQueue<char *> m_queue;
QMutex m_mutex;
};
#endif // BLOCKMSGQUEUE_H
blockMsgQueue.c
#include "blockmsgqueue.h"
blockmsgqueue::blockmsgqueue(const int &m_maxMsgsNum, const unsigned int &msgSize)
:m_maxMsgsNum(m_maxMsgsNum),
m_msgSize(msgSize)
{
m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);
m_pUsedMsgs = new QSemaphore(0);
for(int i=0; i<m_maxMsgsNum; i++)
{
char *freeMsg = new char[m_msgSize];
m_freeQueue.enqueue(freeMsg);
}
}
blockmsgqueue::~blockmsgqueue()
{
if(m_pFreeMsgs)
{
m_pFreeMsgs = nullptr;
}
if(m_pUsedMsgs)
{
m_pUsedMsgs = nullptr;
}
QQueue<char*>::iterator ite1;
QQueue<char*>::iterator ite2;
for(ite1 = m_freeQueue.begin(); ite1 != m_freeQueue.end(); ++ite1)
{
delete[] (*ite1);
(*ite1) = nullptr;
}
for(ite2 = m_queue.begin(); ite2 != m_queue.end(); ++ite2)
{
delete[] (*ite2);
(*ite2) = nullptr;
}
}
void blockmsgqueue::addMsg(const char *msgPack)
{
m_pFreeMsgs->acquire(1);
m_mutex.lock();
char *msgIn = m_freeQueue.dequeue();
//char *msgIn = new char[m_msgSize];
memcpy(msgIn, msgPack, m_msgSize);
m_queue.enqueue(msgIn);
m_mutex.unlock();
m_pUsedMsgs->release(1);
}
void blockmsgqueue::getMsg(char *msgPack)
{
m_pUsedMsgs->acquire(1);
m_mutex.lock();
char *msgOut = m_queue.dequeue();
memcpy(msgPack, msgOut, m_msgSize);
m_freeQueue.enqueue(msgOut);
m_mutex.unlock();
m_pFreeMsgs->release(1);
}
这个的功能就是多了一个新的队列和旧的队列循环使用