QT 生产者消费者模型

2023-05-16

首先感谢下这个博主 我很多都是在他这边借鉴的,下面是他的文章的原链接

https://blog.csdn.net/qq_38318941/article/details/102558351

下面是我对他的线程方面的改动 以及我的一些理解,我才用的是官方推荐的moveToThread的方法来创建多线程的。废话不多说,开始讲解关于生产者消费者模型的搭建。

QT中的生产者消费者模型涉及到很多知识,其中生产者和消费者涉及到多线程问题,还有由于生产者消费者要操作同一块内存的数据所以,这又涉及到队列(QQueue)和信号量(QSemaphore)的综合应用.

这里对需要进行同一块操作的内存进行包装成一个类blockMsgQueue

blockMsgQueue.h文件

#ifndef BLOCKMSGQUEUE_H
#define BLOCKMSGQUEUE_H

#include <QSemaphore>
#include <QMutex>
#include <QQueue>

class blockMsgQueue
{
public:
    blockMsgQueue(const int &maxMsgsNum, const unsigned int &msgSize);
    ~blockMsgQueue();

    void addMsg(const char *msgPack);
    void getMsg(char *msgPack);

private:
    int m_maxMsgsNum;
    unsigned int m_msgSize;
    QSemaphore *m_pFreeMsgs;
    QSemaphore *m_pUsedMsgs;
    QQueue<char *> m_queue;
    QMutex m_mutex;
};
#endif // BLOCKMSGQUEUE_H

blockMsgQueue.c文件

#include "blockMsgQueue.h"

blockmsgqueue::blockMsgQueue(const int &m_maxMsgsNum, const unsigned int &msgSize)
    :m_maxMsgsNum(m_maxMsgsNum),
     m_msgSize(msgSize)
{
    m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);
    m_pUsedMsgs = new QSemaphore(0);

}

blockmsgqueue::~blockmsgqueue()
{

}

void blockmsgqueue::addMsg(const char *msgPack)
{
    m_pFreeMsgs->acquire(1);
    m_mutex.lock();
    char *msgIn = new char[m_msgSize];
    memcpy(msgIn, msgPack, m_msgSize);
    m_queue.enqueue(msgIn);
    m_mutex.unlock();
    m_pUsedMsgs->release(1);
}

void blockmsgqueue::getMsg(char *msgPack)
{
    m_pUsedMsgs->acquire(1);
    m_mutex.lock();
    char *msgOut = m_queue.dequeue();
    memcpy(msgPack, msgOut, m_msgSize);
    m_mutex.unlock();
    m_pFreeMsgs->release(1);
}

对这个类的解释:

m_maxMsgsNum:最大的队列数量就是可以操作内存块的块数。

m_msgSize:每小块内存的尺寸。

信号量:

QSemaphore(n):建立对象时可以给n个资源,就是给多少块内存。

void acquire(n):这个函数,操作一次就减少n个资源,如果现在资源不到n个就会阻塞(在这个项目中如果没有一个资源就阻塞等待能有可用的资源)。

void release(n):这个函数,操作一次就增加n个资源。

在操作同一块内存块组,需要两个信号量来配合使用,一个是空闲内存块的信号量m_pFreeMsgs,用来标识是否有可用的内存块,这个是给生产者使用的。另一个是已经使用了多少内存块的信号量m_pUsedMsgs,用来标识是否有转载有用户数据的内存块,这个是给消费者使用的。

m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);

m_pUsedMsgs = new QSemaphore(0);

这里是预先设定好信号量,预示着有10个空的内存块,0个装载用户数据的内存块

队列:

队列是一种数据结构,一种先入先出的数据结构

enqueue:入队

dequeue:出队

这个函数要将数据装载在共享内存块组内

void blockmsgqueue::addMsg(const char *msgPack)

{

m_pFreeMsgs->acquire(1);//用掉一个内存块,在下面要装载数据了

m_mutex.lock();//因为要操作同一块内存,就要锁住防止数据出问题

char *msgIn = new char[m_msgSize];

memcpy(msgIn, msgPack, m_msgSize);//将数据放入新申请下来的内存

//这里可以看做将数据打包好

m_queue.enqueue(msgIn);//将数据包放入队列中

m_mutex.unlock();//解锁队列操作

m_pUsedMsgs->release(1);//说明装载用户数据的内存块又多了一块

}

这个函数是将装载共享内存数据拿出来

void blockmsgqueue::getMsg(char *msgPack)

{

m_pUsedMsgs->acquire(1);//说明装载用户数据的内存块又少了一块

m_mutex.lock();//因为要操作同一块内存,就要锁住防止数据出问题

char *msgOut = m_queue.dequeue();//将数据从队列中取出

memcpy(msgPack, msgOut, m_msgSize);//并拷贝在可以拿出来的地址

m_mutex.unlock();//解锁队列操作

m_pFreeMsgs->release(1);//说明又有一块空闲内存块可以用了

}

下面就是对这个类的使用了,因为QT官方推荐使用moveToThread()方法,所以这里就用这种方法来写

共享资源就用同一个头文件来定义

CommonRes.h文件

#ifndef COMMONRES_H
#define COMMONRES_H


typedef struct
{
    char *buf;
    unsigned int bufLen;
}MSG_PACK;

const int msgsNum = 170;

#endif // COMMONRES_H

生产者 producerthread.h

#ifndef PRODUCERTHREAD_H
#define PRODUCERTHREAD_H

#include <QObject>
#include "blockmsgqueue.h"
#include <QElapsedTimer>
#include "CommonRes.h"

class producerThread : public QObject
{
    Q_OBJECT
public:
    explicit producerThread(QObject *parent = nullptr);
    ~producerThread();

    bool setMsgQueue(blockmsgqueue *pMsg);
signals:


public slots:
    void work();

private:
    void producerSleep(int semc);

    blockmsgqueue *m_ProducerMsgQueue;

};

#endif // PRODUCERTHREAD_H

producerthread.c

#include "producerthread.h"
#include <QRandomGenerator>
#include <QDebug>
producerThread::producerThread(QObject *parent)
    : QObject{parent}
{
    m_ProducerMsgQueue = nullptr;
}

producerThread::~producerThread()
{

}

bool producerThread::setMsgQueue(blockmsgqueue *pMsg)
{
    m_ProducerMsgQueue = pMsg;
    if(m_ProducerMsgQueue == nullptr)
    {
        qDebug()<<"Producer Thread:Message queue initialize failed";
        return false;
    }
    return true;
}

void producerThread::work()
{
    for(int i=0; i<msgsNum; i++)
    {
        //char Index = msgsNum%2;
        MSG_PACK pack;
        pack.bufLen = 8;
        pack.buf = new char[pack.bufLen];
        char str[8] = {0};
        for (int j = 0; j < pack.bufLen - 1; j++)
        {
            str[j] = "ABCDE12345"[QRandomGenerator::global()->bounded(10)];
        }

        memcpy(pack.buf, str, pack.bufLen);
        //if(m_ProducerMsgQueue == nullptr) break;
        m_ProducerMsgQueue->addMsg((char*)&pack);
        qDebug()<<QString("producer pack %1: %2").arg(i+1).arg(pack.buf);

        producerSleep(30);
    }
}

void producerThread::producerSleep(int msec)
{
    QElapsedTimer t;
    t.start();
    while(t.elapsed()<msec);
}

这里需要注意的点事是 要使用 bool setMsgQueue(blockmsgqueue *pMsg);这个函数将在外面申请的共享内存传入到生产者中进行操作。

消费者 consumerthread.h

#ifndef CONSUMERTHREAD_H
#define CONSUMERTHREAD_H

#include <QObject>
#include "blockmsgqueue.h"
#include <QElapsedTimer>
#include "CommonRes.h"

class consumerThread : public QObject
{
    Q_OBJECT
public:
    explicit consumerThread(QObject *parent = nullptr);
    ~consumerThread();

    bool setMsgQueue(blockmsgqueue *pMsg);
signals:

public slots:
    void work();

private:
    void consumerSleep(int msec);

    blockmsgqueue *m_ConsumerMsgQueue;

};

#endif // CONSUMERTHREAD_H

consumerthread.c

#include "consumerthread.h"
#include <QThread>
#include <QDebug>
#include <QString>
consumerThread::consumerThread(QObject *parent)
    : QObject{parent}
{
    m_ConsumerMsgQueue = nullptr;
}

consumerThread::~consumerThread()
{
    m_ConsumerMsgQueue = nullptr;
}

bool consumerThread::setMsgQueue(blockmsgqueue *pMsg)
{
    m_ConsumerMsgQueue = pMsg;
    if(m_ConsumerMsgQueue == nullptr)
    {
        qDebug()<<"Consumer Thread:Message queue initialize failed!";
        return false;
    }
    else
        return true;
}

void consumerThread::work()
{
    QString comsumer_i = QString::number(quintptr(QThread::currentThreadId()));
    //consumer_i++;
    //int c_i = consumer_i;
    for(int i=0; i<msgsNum; i++)
    {
        MSG_PACK Cpack;
        if(m_ConsumerMsgQueue == nullptr)
        {
            qDebug()<<"m_ConsumerMsgQueue empty";
            break;
        }
        else
        {
            m_ConsumerMsgQueue->getMsg((char*)&Cpack);
        }
        //takei++;
        qDebug()<<QString("comsumer %1 take pack %2").arg(comsumer_i).arg(Cpack.buf);
        //qDebug() << QString("comsumer %1 take pack %2:  %3").arg(c_i).arg(takei).arg(Cpack.buf);
        delete[] Cpack.buf;
        Cpack.buf = nullptr;
        consumerSleep(100);
    }

}

void consumerThread::consumerSleep(int msec)
{
    QElapsedTimer t;
    t.start();
    while(t.elapsed()<msec);
}

消费者也是一样的道理,要使用 bool setMsgQueue(blockmsgqueue *pMsg);这个函数将在外面申请的共享内存传入到生产者中进行操作。

MainWindow.h

#ifndef MAINWINDOW_H
#define MAINWINDOW_H

#include <QMainWindow>
#include "blockmsgqueue.h"
#include "producerthread.h"
#include "consumerthread.h"
#include <QThread>
#include <QDebug>

#include "blockmsgqueue.h"

QT_BEGIN_NAMESPACE
namespace Ui { class MainWindow; }
QT_END_NAMESPACE

class MainWindow : public QMainWindow
{
    Q_OBJECT

public:
    MainWindow(QWidget *parent = nullptr);
    ~MainWindow();


    blockmsgqueue *msgQueue = new blockmsgqueue(10, sizeof(MSG_PACK));

private:
    Ui::MainWindow *ui;

    producerThread *producer;

    consumerThread *consumer1;

    consumerThread *consumer2;

    consumerThread *consumer3;

    QThread *producerTh;

    QThread *consumerTh1;

    QThread *consumerTh2;

    QThread *consumerTh3;



signals:
    void startProducerWork();

    void startconsumer1Work();

    void startconsumer2Work();

    void startconsumer3Work();
private slots:
    void on_pushButton_clicked();
    void on_pushButton_2_clicked();
};
#endif // MAINWINDOW_H

MainWindow.c

#include "mainwindow.h"
#include "ui_mainwindow.h"

MainWindow::MainWindow(QWidget *parent)
    : QMainWindow(parent)
    , ui(new Ui::MainWindow)
{
    ui->setupUi(this);




    producer = new producerThread();
    producer->setMsgQueue(msgQueue);

    consumer1 = new consumerThread();
    consumer1->setMsgQueue(msgQueue);

    consumer2 = new consumerThread();
    consumer2->setMsgQueue(msgQueue);

    consumer3 = new consumerThread();
    consumer3->setMsgQueue(msgQueue);

    producerTh = new QThread();

    consumerTh1 = new QThread();
    consumerTh2 = new QThread();
    consumerTh3 = new QThread();

    connect(this, &MainWindow::startProducerWork, producer, &producerThread::work);

    connect(this, &MainWindow::startconsumer1Work, consumer1, &consumerThread::work);
    connect(this, &MainWindow::startconsumer2Work, consumer2, &consumerThread::work);
    connect(this, &MainWindow::startconsumer3Work, consumer3, &consumerThread::work);

    connect(producerTh, &QThread::finished, producer, &QObject::deleteLater);

    connect(consumerTh1, &QThread::finished, consumer1, &QObject::deleteLater);
    connect(consumerTh2, &QThread::finished, consumer2, &QObject::deleteLater);
    connect(consumerTh3, &QThread::finished, consumer3, &QObject::deleteLater);

    producer->moveToThread(producerTh);
    consumer1->moveToThread(consumerTh1);
    consumer2->moveToThread(consumerTh2);
    consumer3->moveToThread(consumerTh3);

}

MainWindow::~MainWindow()
{

    if(producerTh->isRunning())
    {
        producerTh->quit();
        producerTh->wait();
    }
    if(consumerTh1->isRunning())
    {
        consumerTh1->quit();
        consumerTh1->wait();
    }
    if(consumerTh2->isRunning())
    {
        consumerTh2->quit();
        consumerTh2->wait();
    }
    if(consumerTh3->isRunning())
    {
        consumerTh3->quit();
        consumerTh3->wait();
    }

    delete ui;
}


void MainWindow::on_pushButton_clicked()
{

    producerTh->start();
    consumerTh1->start();
    consumerTh2->start();
    consumerTh3->start();
    qDebug()<<"start";
    emit startProducerWork();
    emit startconsumer1Work();
    emit startconsumer2Work();
    emit startconsumer3Work();

}


void MainWindow::on_pushButton_2_clicked()
{

    if(producerTh->isRunning())
    {
        producerTh->quit();
        producerTh->wait();
    }
    if(consumerTh1->isRunning())
    {
        consumerTh1->quit();
        consumerTh1->wait();
    }
    if(consumerTh2->isRunning())
    {
        consumerTh2->quit();
        consumerTh2->wait();
    }
    if(consumerTh3->isRunning())
    {
        consumerTh3->quit();
        consumerTh3->wait();
    }

}

接下来就是对这整个进行讲解,该项目有个问题 就是在消费者使用后,旧的队列没有删除,而新进队的队列都是新new的,集体代码在这块

void blockmsgqueue::addMsg(const char *msgPack)

{

m_pFreeMsgs->acquire(1);

m_mutex.lock();

char *msgIn = new char[m_msgSize];

memcpy(msgIn, msgPack, m_msgSize);

m_queue.enqueue(msgIn);//这里就是不断地new,然后进队

m_mutex.unlock();

m_pUsedMsgs->release(1);

}

void blockmsgqueue::getMsg(char *msgPack)

{

m_pUsedMsgs->acquire(1);

m_mutex.lock();

char *msgOut = m_queue.dequeue();//而这里出队后并没有对 //m_queue.dequeue()进行处 //理,这句只是将 //m_queue.dequeue()赋值给 //msgOut,并没有删除操作

memcpy(msgPack, msgOut, m_msgSize);

m_mutex.unlock();

m_pFreeMsgs->release(1);

}

就像这样

为了解决这个问题可以采用新的blockMsgQueue类像这样

blockMsgQueue.h

#ifndef BLOCKMSGQUEUE_H
#define BLOCKMSGQUEUE_H

#include <QSemaphore>
#include <QMutex>
#include <QQueue>

class blockmsgqueue
{
public:
    blockmsgqueue(const int &maxMsgsNum, const unsigned int &msgSize);
    ~blockmsgqueue();

    void addMsg(const char *msgPack);
    void getMsg(char *msgPack);

private:
    int m_maxMsgsNum;
    unsigned int m_msgSize;
    QSemaphore *m_pFreeMsgs;
    QSemaphore *m_pUsedMsgs;
    QQueue<char *> m_freeQueue;
    QQueue<char *> m_queue;
    QMutex m_mutex;

};

#endif // BLOCKMSGQUEUE_H

blockMsgQueue.c

#include "blockmsgqueue.h"

blockmsgqueue::blockmsgqueue(const int &m_maxMsgsNum, const unsigned int &msgSize)
    :m_maxMsgsNum(m_maxMsgsNum),
     m_msgSize(msgSize)
{
    m_pFreeMsgs = new QSemaphore(m_maxMsgsNum);
    m_pUsedMsgs = new QSemaphore(0);


    for(int i=0; i<m_maxMsgsNum; i++)
    {
        char *freeMsg = new char[m_msgSize];
        m_freeQueue.enqueue(freeMsg);
    }

}

blockmsgqueue::~blockmsgqueue()
{

    if(m_pFreeMsgs)
    {
        m_pFreeMsgs = nullptr;
    }
    if(m_pUsedMsgs)
    {
        m_pUsedMsgs = nullptr;
    }

    QQueue<char*>::iterator ite1;
    QQueue<char*>::iterator ite2;

    for(ite1 = m_freeQueue.begin(); ite1 != m_freeQueue.end(); ++ite1)
    {
        delete[] (*ite1);
        (*ite1) = nullptr;
    }
    for(ite2 = m_queue.begin(); ite2 != m_queue.end(); ++ite2)
    {
        delete[] (*ite2);
        (*ite2) = nullptr;
    }

}

void blockmsgqueue::addMsg(const char *msgPack)
{
    m_pFreeMsgs->acquire(1);
    m_mutex.lock();
    char *msgIn = m_freeQueue.dequeue();
    //char *msgIn = new char[m_msgSize];
    memcpy(msgIn, msgPack, m_msgSize);
    m_queue.enqueue(msgIn);
    m_mutex.unlock();
    m_pUsedMsgs->release(1);
}

void blockmsgqueue::getMsg(char *msgPack)
{
    m_pUsedMsgs->acquire(1);
    m_mutex.lock();
    char *msgOut = m_queue.dequeue();
    memcpy(msgPack, msgOut, m_msgSize);
    m_freeQueue.enqueue(msgOut);
    m_mutex.unlock();
    m_pFreeMsgs->release(1);
}

这个的功能就是多了一个新的队列和旧的队列循环使用

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

QT 生产者消费者模型 的相关文章

随机推荐