Semaphores Example
The producer writes data to the buffer until it reaches the end of the buffer, at which point it restarts from the beginning, overwriting existing data. The consumer thread reads the data as it is produced and writes it to standard error.
Semaphores make it possible to have a higher level of concurrency than mutexes. If accesses to the buffer were guarded by a QMutex, the consumer thread couldn't access the buffer at the same time as the producer thread. Yet, there is no harm in having both threads working on different parts of the buffer at the same time.
The example comprises two classes: Producer
and Consumer
. Both inherit from QThread. The circular buffer used for communicating between these two classes and the semaphores that protect it are global variables.
An alternative to using QSemaphore to solve the producer-consumer problem is to use QWaitCondition and QMutex. This is what the Wait Conditions example does.
Global Variables
Let's start by reviewing the circular buffer and the associated semaphores:
#ifdef Q_WS_S60 const int DataSize = 300; #else const int DataSize = 100000; #endif const int BufferSize = 8192; char buffer[BufferSize]; QSemaphore freeBytes(BufferSize); QSemaphore usedBytes;
DataSize
is the amout of data that the producer will generate. To keep the example as simple as possible, we make it a constant. BufferSize
is the size of the circular buffer. It is less than DataSize
, meaning that at some point the producer will reach the end of the buffer and restart from the beginning.
To synchronize the producer and the consumer, we need two semaphores. The freeBytes
semaphore controls the "free" area of the buffer (the area that the producer hasn't filled with data yet or that the consumer has already read). The usedBytes
semaphore controls the "used" area of the buffer (the area that the producer has filled but that the consumer hasn't read yet).
Together, the semaphores ensure that the producer is never more than BufferSize
bytes ahead of the consumer, and that the consumer never reads data that the producer hasn't generated yet.
The freeBytes
semaphore is initialized with BufferSize
, because initially the entire buffer is empty. The usedBytes
semaphore is initialized to 0 (the default value if none is specified).
Producer Class
Let's review the code for the Producer
class:
class Producer : public QThread { public: void run() { qsrand(QTime(0,0,0).secsTo(QTime::currentTime())); for (int i = 0; i < DataSize; ++i) { freeBytes.acquire(); buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4]; usedBytes.release(); } } };
The producer generates DataSize
bytes of data. Before it writes a byte to the circular buffer, it must acquire a "free" byte using the freeBytes
semaphore. The QSemaphore::acquire() call might block if the consumer hasn't kept up the pace with the producer.
At the end, the producer releases a byte using the usedBytes
semaphore. The "free" byte has successfully been transformed into a "used" byte, ready to be read by the consumer.
Consumer Class
Let's now turn to the Consumer
class:
class Consumer : public QThread { Q_OBJECT public: void run() { for (int i = 0; i < DataSize; ++i) { usedBytes.acquire(); #ifdef Q_WS_S60 QString text(buffer[i % BufferSize]); freeBytes.release(); emit stringConsumed(text); #else fprintf(stderr, "%c", buffer[i % BufferSize]); freeBytes.release(); #endif } fprintf(stderr, "\n"); } signals: void stringConsumed(const QString &text); protected: bool finish; };
The code is very similar to the producer, except that this time we acquire a "used" byte and release a "free" byte, instead of the opposite.
The main() Function
In main()
, we create the two threads and call QThread::wait() to ensure that both threads get time to finish before we exit:
int main(int argc, char *argv[]) { #ifdef Q_WS_S60 // Self made console for Symbian QApplication app(argc, argv); QPlainTextEdit console; console.setReadOnly(true); console.setTextInteractionFlags(Qt::NoTextInteraction); console.showMaximized(); Producer producer; Consumer consumer; QObject::connect(&consumer, SIGNAL(stringConsumed(const QString&)), &console, SLOT(insertPlainText(QString)), Qt::BlockingQueuedConnection); producer.start(); consumer.start(); app.exec(); #else QCoreApplication app(argc, argv); Producer producer; Consumer consumer; producer.start(); consumer.start(); producer.wait(); consumer.wait(); return 0; #endif }
So what happens when we run the program? Initially, the producer thread is the only one that can do anything; the consumer is blocked waiting for the usedBytes
semaphore to be released (its initial available() count is 0). Once the producer has put one byte in the buffer, freeBytes.available()
is BufferSize
- 1 and usedBytes.available()
is 1. At that point, two things can happen: Either the consumer thread takes over and reads that byte, or the consumer gets to produce a second byte.
The producer-consumer model presented in this example makes it possible to write highly concurrent multithreaded applications. On a multiprocessor machine, the program is potentially up to twice as fast as the equivalent mutex-based program, since the two threads can be active at the same time on different parts of the buffer.
Be aware though that these benefits aren't always realized. Acquiring and releasing a QSemaphore has a cost. In practice, it would probably be worthwhile to divide the buffer into chunks and to operate on chunks instead of individual bytes. The buffer size is also a parameter that must be selected carefully, based on experimentation.
Files:
© 2016 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.