待機条件を使用するプロデューサとコンシューマ

Producer and Consumer using Wait Conditions の例では、QWaitConditionQMutex を使って、プロデューサースレッドとコンシューマースレッドが共有する循環バッファへのアクセスを制御する方法を示しています。

プロデューサはバッファの終端に達するまでバッファにデータを書き込み、その時点でバッファは最初から再スタートし、既存のデータを上書きします。コンシューマースレッドは、生成されたデータを読み取り、標準エラーに書き込む。

待ち状態によって、ミューテックスだけで可能な並行処理よりも高いレベルの並行処理が可能になる。もしバッファへのアクセスが単にQMutex でガードされていたら、コンシューマースレッドはプロデューサースレッドと同時にバッファにアクセスすることはできない。しかし、両方のスレッドがバッファの異なる部分で同時に作業しても、何の問題もありません。

この例は、ProducerConsumer という2つのクラスから構成されている。QThreadこの2つのクラス間の通信に使用される循環バッファと、それを保護する同期ツールはグローバル変数です。

プロデューサーとコンシューマーの問題を解決するために、QWaitConditionQMutex を使用する代わりに、QSemaphore を使用することもできます。これは、セマフォを使用するプロデューサとコンシューマの例で行われていることです。

グローバル変数

まず、サーキュラー・バッファと関連する同期ツールについて復習しましょう:

constexpr int DataSize = 100000;
constexpr int BufferSize = 8192;

QMutex mutex; // protects the buffer and the counter
char buffer[BufferSize];
int numUsedBytes;

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;

DataSize は、プロデューサが生成するデータ量です。例をできるだけシンプルにするため、定数とします。 は、サーキュラー・バッファのサイズです。これは よりも小さい。つまり、ある時点でプロデューサはバッファの終端に到達し、最初から再スタートする。BufferSize DataSize

プロデューサーとコンシューマーを同期させるには、2つの待機条件と1つのミューテックスが必要である。bufferNotEmpty 条件は、プロデューサがデータを生成したときに通知され、コンシューマに読み取りを開始できることを知らせます。bufferNotFull コンディションは、コンシューマーがあるデータを読み込んだときにシグナルが発せられ、プロデ ューサーにもっとデータを生成できることを知らせる。numUsedBytes は、データを含むバッファのバイト数である。

待ち条件、ミューテックス、numUsedBytes カウンタを組み合わせることで、プロデューサがコンシューマよりBufferSize バイト以上先にいることはなく、プロデューサがまだ生成していないデータをコンシューマが読むことはありません。

プロデューサー・クラス

Producer クラスのコードを見てみましょう:

class Producer : public QThread
{
public:
    explicit Producer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == BufferSize)
                    bufferNotFull.wait(&mutex);
            }

            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];

            {
                const QMutexLocker locker(&mutex);
                ++numUsedBytes;
                bufferNotEmpty.wakeAll();
            }
        }
    }
};

プロデューサーはDataSize バイトのデータを生成する。バイトをサーキュラー・バッファに書き込む前に、まずバッファがいっぱいかどうかをチェックしなければなりません(すなわち、numUsedBytesBufferSize と等しい)。バッファが一杯の場合、スレッドはbufferNotFull の条件で待つ。

最後に、プロデューサはnumUsedBytes をインクリメントし、numUsedBytes が必ず0より大きいので、bufferNotEmpty の条件が真であることを通知します。

numUsedBytes 変数へのすべてのアクセスをミューテックスでガードする。さらに、QWaitCondition::wait ()関数は、引数としてミューテックスを受け付ける。このミューテックスは、スレッドがスリープ状態になる前にアンロックされ、スレッドが目覚めたときにロックされる。さらに、競合状態が発生しないように、ロック状態から待機状態への遷移はアトミックである。

コンシューマー・クラス

Consumer

class Consumer : public QThread
{
public:
    explicit Consumer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == 0)
                    bufferNotEmpty.wait(&mutex);
            }

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            {
                const QMutexLocker locker(&mutex);
                --numUsedBytes;
                bufferNotFull.wakeAll();
            }
        }
        fprintf(stderr, "\n");
    }
};

コードはプロデューサーのものとよく似ている。バイトを読み込む前に、バッファが一杯かどうかではなく、空かどうかをチェックし(numUsedBytes は0)、空であればbufferNotEmpty の条件で待機する。バイトを読み込んだら、numUsedBytes をデクリメントし(インクリメントする代わりに)、bufferNotFull 条件をシグナルします(bufferNotEmpty 条件の代わりに)。

main()関数

main() では、2つのスレッドを作成し、QThread::wait ()を呼び出して、終了する前に両方のスレッドが終了する時間を確保します:

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

では、プログラムを実行するとどうなるだろうか?最初は、プロデューサースレッドだけが何かできる。コンシューマはbufferNotEmpty の条件が通知されるのを待つためにブロックされている(numUsedBytes は0)。プロデューサーがバッファに1バイト入れると、numUsedBytes は厳密に0より大きくなり、bufferNotEmpty の条件が通知されます。この時点で、2つのことが起こり得る:コンシューマースレッドがそのバイトを引き継いで読むか、プロデューサーが2バイト目を生成するかである。

この例で示したプロデューサー-コンシューマー・モデルは、高度に同時並行なマルチスレッド・アプリケーションを書くことを可能にする。マルチプロセッサマシン上では、2つのスレッドがバッファの異なる部分で同時にアクティブになることができるため、このプログラムは、同等のミューテックスベースのプログラムの最大2倍高速になる可能性があります。

しかし、これらの利点が常に実現されるわけではないことに注意してください。QMutex のロックとアンロックにはコストがかかります。実際には、バッファをチャンクに分割し、個々のバイトの代わりにチャンクを操作することがおそらく有意義でしょう。バッファサイズも、実験に基づいて慎重に選択しなければならないパラメータです。

プロジェクト例 @ code.qt.io

©2024 The Qt Company Ltd. 本書に含まれる文書の著作権は、それぞれの所有者に帰属します。 ここで提供されるドキュメントは、Free Software Foundation が発行したGNU Free Documentation License version 1.3に基づいてライセンスされています。 Qtおよびそれぞれのロゴは、フィンランドおよびその他の国におけるThe Qt Company Ltd.の 商標です。その他すべての商標は、それぞれの所有者に帰属します。