Producer und Consumer unter Verwendung von Wartebedingungen

Das Beispiel Producer and Consumer using Wait Conditions zeigt, wie man QWaitCondition und QMutex verwendet, um den Zugriff auf einen Ringpuffer zu steuern, der von einem Producer-Thread und einem Consumer-Thread gemeinsam genutzt wird.

Der Producer-Thread schreibt Daten in den Puffer, bis er das Ende des Puffers erreicht, woraufhin er von vorne beginnt und die vorhandenen Daten überschreibt. Der Consumer-Thread liest die Daten, wenn sie produziert werden, und schreibt sie in den Standardfehler.

Wartebedingungen ermöglichen ein höheres Maß an Gleichzeitigkeit als dies mit Mutexen allein möglich ist. Wären die Zugriffe auf den Puffer einfach durch ein QMutex geschützt, könnte der Consumer-Thread nicht gleichzeitig mit dem Producer-Thread auf den Puffer zugreifen. Es kann jedoch nicht schaden, wenn beide Threads gleichzeitig an verschiedenen Teilen des Puffers arbeiten.

Das Beispiel umfasst zwei Klassen: Producer und Consumer. Beide erben von QThread. Der zirkuläre Puffer, der für die Kommunikation zwischen diesen beiden Klassen verwendet wird, und die Synchronisationswerkzeuge, die ihn schützen, sind globale Variablen.

Eine Alternative zur Verwendung von QWaitCondition und QMutex zur Lösung des Producer-Consumer-Problems ist die Verwendung von QSemaphore. Dies wird im Beispiel Producer and Consumer using Semaphores verwendet.

Globale Variablen

Beginnen wir mit der Betrachtung des Ringpuffers und der zugehörigen Synchronisationswerkzeuge:

constexpr int DataSize = 100000;
constexpr int BufferSize = 8192;

QMutex mutex; // protects the buffer and the counter
char buffer[BufferSize];
int numUsedBytes;

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;

DataSize ist die Menge der Daten, die der Producer erzeugt. Um das Beispiel so einfach wie möglich zu halten, setzen wir sie als Konstante ein. BufferSize ist die Größe des Ringpuffers. Sie ist kleiner als DataSize, was bedeutet, dass der Producer irgendwann das Ende des Puffers erreicht und wieder von vorne beginnt.

Um den Producer und den Consumer zu synchronisieren, benötigen wir zwei Wartebedingungen und einen Mutex. Die Bedingung bufferNotEmpty wird signalisiert, wenn der Producer einige Daten erzeugt hat und dem Consumer mitteilt, dass er mit dem Lesen beginnen kann. Die Bedingung bufferNotFull wird signalisiert, wenn der Verbraucher einige Daten gelesen hat und dem Erzeuger mitteilt, dass er weitere Daten erzeugen kann. Die numUsedBytes ist die Anzahl der Bytes im Puffer, die Daten enthalten.

Zusammen stellen die Wartebedingungen, der Mutex und der numUsedBytes Zähler sicher, dass der Producer dem Consumer nie mehr als BufferSize Bytes voraus ist und dass der Consumer nie Daten liest, die der Producer noch nicht erzeugt hat.

Producer-Klasse

Schauen wir uns den Code für die Klasse Producer an:

class Producer : public QThread
{
public:
    explicit Producer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == BufferSize)
                    bufferNotFull.wait(&mutex);
            }

            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];

            {
                const QMutexLocker locker(&mutex);
                ++numUsedBytes;
                bufferNotEmpty.wakeAll();
            }
        }
    }
};

Der Producer erzeugt DataSize Bytes an Daten. Bevor er ein Byte in den Ringpuffer schreibt, muss er zunächst prüfen, ob der Puffer voll ist (d. h., numUsedBytes ist gleich BufferSize). Wenn der Puffer voll ist, wartet der Thread auf die Bedingung bufferNotFull.

Am Ende inkrementiert der Produzent numUsedBytes und signalisiert, dass die Bedingung bufferNotEmpty wahr ist, da numUsedBytes notwendigerweise größer als 0 ist.

Wir schützen alle Zugriffe auf die Variable numUsedBytes mit einem Mutex. Darüber hinaus akzeptiert die Funktion QWaitCondition::wait() einen Mutex als Argument. Dieser Mutex wird entsperrt, bevor der Thread in den Ruhezustand versetzt wird, und gesperrt, wenn der Thread wieder erwacht. Außerdem ist der Übergang vom gesperrten Zustand in den Wartezustand atomar, um das Auftreten von Race Conditions zu verhindern.

Verbraucher-Klasse

Wenden wir uns nun der Klasse Consumer zu:

class Consumer : public QThread
{
public:
    explicit Consumer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == 0)
                    bufferNotEmpty.wait(&mutex);
            }

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            {
                const QMutexLocker locker(&mutex);
                --numUsedBytes;
                bufferNotFull.wakeAll();
            }
        }
        fprintf(stderr, "\n");
    }
};

Der Code ist dem des Produzenten sehr ähnlich. Bevor wir das Byte lesen, prüfen wir, ob der Puffer leer ist (numUsedBytes ist 0), anstatt ob er voll ist, und warten auf die Bedingung bufferNotEmpty, wenn er leer ist. Nachdem wir das Byte gelesen haben, dekrementieren wir numUsedBytes (anstatt es zu inkrementieren) und signalisieren die bufferNotFull Bedingung (anstatt der bufferNotEmpty Bedingung).

Die Funktion main()

In main() erstellen wir die beiden Threads und rufen QThread::wait() auf, um sicherzustellen, dass beide Threads Zeit haben, sich zu beenden, bevor wir das Programm beenden:

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

Was passiert nun, wenn wir das Programm ausführen? Zunächst ist der Producer-Thread der einzige, der etwas tun kann; der Consumer-Thread ist blockiert und wartet darauf, dass die Bedingung bufferNotEmpty signalisiert wird (numUsedBytes ist 0). Sobald der Producer ein Byte in den Puffer eingegeben hat, ist numUsedBytes größer als 0, und die Bedingung bufferNotEmpty wird signalisiert. Zu diesem Zeitpunkt können zwei Dinge geschehen: Entweder übernimmt der Consumer-Thread und liest das Byte, oder der Producer kann ein zweites Byte produzieren.

Das in diesem Beispiel vorgestellte Producer-Consumer-Modell ermöglicht es, hochgradig nebenläufige Multithreading-Anwendungen zu schreiben. Auf einer Multiprozessormaschine ist das Programm potenziell bis zu doppelt so schnell wie das entsprechende Mutex-basierte Programm, da die beiden Threads gleichzeitig auf verschiedenen Teilen des Puffers aktiv sein können.

Beachten Sie jedoch, dass diese Vorteile nicht immer realisiert werden. Das Sperren und Entsperren von QMutex hat seinen Preis. In der Praxis würde es sich wahrscheinlich lohnen, den Puffer in Chunks aufzuteilen und auf Chunks statt auf einzelnen Bytes zu arbeiten. Auch die Puffergröße ist ein Parameter, der sorgfältig und experimentell gewählt werden muss.

Beispielprojekt @ code.qt.io

© 2025 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.