使用等待条件的生产者和消费者

使用等待条件的生产者和消费者示例展示了如何使用QWaitConditionQMutex 控制对生产者线程和消费者线程共享的循环缓冲区的访问。

生产者向缓冲区写入数据,直到写到缓冲区的末尾,然后从头开始重新写入,覆盖现有数据。消费者线程读取生成的数据,并将其写入标准错误。

通过等待条件,可以获得比单用互斥更高的并发性。如果对缓冲区的访问只是由QMutex 进行保护,那么消费者线程就无法与生产者线程同时访问缓冲区。然而,让两个线程同时处理缓冲区的不同部分并没有什么坏处。

该示例包括两个类:ProducerConsumer 。这两个类都继承自 。这两个类都继承自QThread 。这两个类之间用于通信的循环缓冲区和保护缓冲区的同步工具都是全局变量。

除了使用QWaitConditionQMutex 来解决生产者-消费者问题外,另一种方法是使用QSemaphore使用 Semaphores 的生产者和消费者示例就是这样做的。

全局变量

让我们先回顾一下循环缓冲区和相关的同步工具:

constexpr int DataSize = 100000;
constexpr int BufferSize = 8192;

QMutex mutex; // protects the buffer and the counter
char buffer[BufferSize];
int numUsedBytes;

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;

DataSize 是生产者将生成的数据量。 是循环缓冲区的大小。它小于 ,这意味着生产者会在某一时刻到达缓冲区的尽头,然后从头开始。BufferSize DataSize

为了同步生产者和消费者,我们需要两个等待条件和一个互斥。bufferNotEmpty 条件在生产者生成一些数据时发出信号,告诉消费者可以开始读取数据。bufferNotFull 条件在消费者读取了一些数据后发出信号,告诉生产者可以生成更多数据。numUsedBytes 是缓冲区中包含数据的字节数。

等待条件、互斥和numUsedBytes 计数器共同确保生产者永远不会比消费者早超过BufferSize 字节,也确保消费者永远不会读取生产者尚未生成的数据。

生产者类

让我们回顾一下Producer 类的代码:

class Producer : public QThread
{
public:
    explicit Producer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == BufferSize)
                    bufferNotFull.wait(&mutex);
            }

            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];

            {
                const QMutexLocker locker(&mutex);
                ++numUsedBytes;
                bufferNotEmpty.wakeAll();
            }
        }
    }
};

生产者生成DataSize 字节的数据。在向循环缓冲区写入字节之前,它必须首先检查缓冲区是否已满(即numUsedBytes 等于BufferSize )。如果缓冲区已满,线程就会等待bufferNotFull 条件。

最后,生产者会递增numUsedBytes ,并提示bufferNotEmpty 条件为真,因为numUsedBytes 必然大于 0。

我们使用互斥来保护对numUsedBytes 变量的所有访问。此外,QWaitCondition::wait() 函数接受一个互斥作为参数。该互斥在线程进入睡眠状态前解锁,并在线程醒来时锁定。此外,从锁定状态到等待状态的转换是原子性的,以防止出现竞赛条件。

消费者类

让我们来看看Consumer 类:

class Consumer : public QThread
{
public:
    explicit Consumer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == 0)
                    bufferNotEmpty.wait(&mutex);
            }

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            {
                const QMutexLocker locker(&mutex);
                --numUsedBytes;
                bufferNotFull.wakeAll();
            }
        }
        fprintf(stderr, "\n");
    }
};

其代码与生产者类非常相似。在读取字节之前,我们会检查缓冲区是否为空(numUsedBytes 为 0),而不是是否已满,如果缓冲区为空,则等待bufferNotEmpty 条件。读取字节后,我们会递减numUsedBytes (而不是递增),并向bufferNotFull 条件发出信号(而不是bufferNotEmpty 条件)。

main() 函数

main() 中,我们创建了两个线程,并调用QThread::wait() 以确保两个线程在退出前都有时间完成工作:

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

那么,当我们运行程序时会发生什么呢?起初,生产者线程是唯一能做任何事情的线程;消费者线程被阻塞,等待bufferNotEmpty 条件发出信号(numUsedBytes 为 0)。一旦生产者将一个字节放入缓冲区,numUsedBytes 严格大于 0,bufferNotEmpty 条件就会发出信号。这时,可能会发生两种情况:要么消费者线程接管并读取该字节,要么生产者生产第二个字节。

本例中介绍的生产者-消费者模型使编写高度并发的多线程应用程序成为可能。在多处理器机器上,由于两个线程可以同时作用于缓冲区的不同部分,因此程序的速度可能是基于互斥的程序的两倍。

不过要注意的是,这些优势并不总是能实现。锁定和解锁QMutex 是有代价的。在实际操作中,将缓冲区划分为若干小块,并对小块而不是单个字节进行操作可能更有价值。缓冲区大小也是一个参数,必须根据实验结果谨慎选择。

示例项目 @ code.qt.io

© 2025 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.