セマフォを使ったプロデューサとコンシューマ

セマフォを使ったProducerとConsumerの例では、QSemaphore を使ってプロデューサースレッドとコンシューマースレッドが共有する循環バッファへのアクセスを制御する方法を示します。

プロデューサはバッファの終端に達するまでバッファにデータを書き込み、その時点でバッファは最初から再スタートし、既存のデータを上書きします。コンシューマースレッドは、生成されたデータを読み取り、標準エラーに書き込む。

セマフォは、ミューテックスよりも高いレベルの並行処理を可能にする。もしバッファへのアクセスがQMutex でガードされていたら、コンシューマースレッドはプロデューサースレッドと同時にバッファにアクセスすることはできない。しかし、両方のスレッドがバッファの異なる部分で同時に作業しても、何の問題もありません。

この例は、ProducerConsumer という2つのクラスから構成されている。QThreadこの2つのクラス間の通信に使用される循環バッファと、それを保護するセマフォはグローバル変数です。

プロデューサーとコンシューマーの問題を解決するためにQSemaphore を使用する代わりに、QWaitConditionQMutex を使用することもできます。これは、待機条件を使用するプロデューサとコンシューマの例で行われていることです。

グローバル変数

まず、サーキュラー・バッファと関連するセマフォについて復習しましょう:

constexpr int DataSize = 100000;

constexpr int BufferSize = 8192;
char buffer[BufferSize];

QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;

DataSize は、プロデューサが生成するデータ量です。例をできるだけシンプルにするために、定数とします。 は、サーキュラー・バッファのサイズです。これは、 よりも小さく、ある時点でプロデューサがバッファの終端に到達し、最初から再スタートすることを意味する。BufferSize DataSize

プロデューサとコンシューマを同期させるために、2つのセマフォが必要である。freeBytes セマフォは、バッファの "空き "領域(プロデューサがまだデータで満たされていない領域、またはコンシューマがすでに読み込んだ領域)を制御します。usedBytes のセマフォは、バッファの "used" 領域(プロデューサがデータを入れたが、コンシューマがまだ読んでいない領域)を制御します。

これらのセマフォは、プロデューサがコンシューマよりBufferSize バイト以上先にいることがないようにし、プロデューサがまだ生成していないデータをコンシューマが読むことがないようにします。

freeBytes セマフォはBufferSize で初期化される。なぜなら、最初はバッファ全体が空だからである。usedBytes セマフォは 0 に初期化されます(何も指定されていない場合のデフォルト値)。

プロデューサー・クラス

Producer クラスのコードを見てみましょう:

class Producer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            freeBytes.acquire();
            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];
            usedBytes.release();
        }
    }
};

プロデューサーはDataSize バイトのデータを生成します。バイトを循環バッファに書き込む前に、freeBytes セマフォを使用して "空き "バイトを取得する必要があります。コンシューマがプロデューサのペースについていけなかった場合、QSemaphore::acquire ()の呼び出しはブロックされるかもしれない。

最後に、プロデューサはusedBytes セマフォを使用してバイトを解放する。free "バイトは "used "バイトに正常に変換され、コンシューマが読む準備ができた。

コンシューマ・クラス

次にConsumer

class Consumer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            usedBytes.acquire();
            fprintf(stderr, "%c", buffer[i % BufferSize]);
            freeBytes.release();
        }
        fprintf(stderr, "\n");
    }
};

コードはプロデューサーの場合とよく似ていますが、今回はその逆ではなく、「使用済み」バイトを獲得して「空き」バイトを解放する点が異なります。

main()関数

main() では、2つのスレッドを作成し、QThread::wait ()を呼び出して、終了する前に両方のスレッドが終了する時間を確保する:

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

では、プログラムを実行するとどうなるのか?コンシューマーは、usedBytes セマフォが解放されるのを待つためにブロックされています(その最初のavailable() カウントは 0 です)。プロデューサーが1バイトをバッファに入れると、freeBytes.available()BufferSize - 1 になり、usedBytes.available() は 1 になる。この時点で、2つのことが起こり得る:コンシューマースレッドがそのバイトを引き継いで読むか、プロデューサースレッドが2バイト目を生成するかである。

この例で示したプロデューサー-コンシューマー・モデルにより、高度に同時並行的なマルチスレッド・アプリケーションを書くことが可能になる。マルチプロセッサマシンでは、2つのスレッドがバッファの異なる部分で同時にアクティブになることができるため、このプログラムは、同等のミューテックスベースのプログラムの最大2倍高速になる可能性があります。

しかし、これらの利点が常に実現されるわけではないことに注意してください。QSemaphore の取得と解放にはコストがかかります。実際には、バッファをチャンクに分割し、個々のバイトの代わりにチャンクを操作することがおそらく有意義でしょう。バッファサイズも、実験に基づいて慎重に選択しなければならないパラメータです。

プロジェクト例 @ code.qt.io

© 2025 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.