ウェブソケットMQTTサブスクリプション

MQTTクライアントとWebソケット接続の組み合わせ。

WebSockets MQTT Subscriptionでは、Web ソケット接続とQMqttClient を組み合わせたカスタムQIODevice を設計する方法を紹介します。

カスタム QIODevice の作成

新しいカスタム・デバイス、WebSocketIODevice は、QIODevice のサブクラスでなければなりません:

class WebSocketIODevice : public QIODevice
{
    Q_OBJECT
public:
    WebSocketIODevice(QObject *parent = nullptr);

    bool isSequential() const override;
    qint64 bytesAvailable() const override;

    bool open(OpenMode mode) override;
    void close() override;

    qint64 readData(char *data, qint64 maxlen) override;
    qint64 writeData(const char *data, qint64 len) override;

    void setUrl(const QUrl &url);
    void setProtocol(const QByteArray &data);
Q_SIGNALS:
    void socketConnected();

public slots:
    void handleBinaryMessage(const QByteArray &msg);
    void onSocketConnected();

private:
    QByteArray m_protocol;
    QByteArray m_buffer;
    QWebSocket m_socket;
    QUrl m_url;
};

接続とサブスクリプションを管理するクラスの設計

WebSocketIODevice は、 および と並んで、 クラスのプライベート・メンバになります:QMqttClient QMqttSubscription ClientSubscription

private:
    QMqttClient m_client;
    QMqttSubscription *m_subscription;
    QUrl m_url;
    QString m_topic;
    WebSocketIODevice m_device;
    int m_version;

メッセージの購読と受信

メインロジックはClientSubscription クラスのconnectAndSubscribe() メソッドに実装されています。MQTT 接続を初期化する前に、Web ソケットが正常に接続されたことを確認する必要があります。MQTT 接続が確立されると、QMqttClient はトピックを購読できる。サブスクリプションに成功すると、QMqttSubscription を使用して、サブスクライブしたトピックからClientSubscription クラスのhandleMessage() メソッドで処理されるメッセージを受信できるようになります。

void ClientSubscription::connectAndSubscribe()
{
    qCDebug(lcWebSocketMqtt) << "Connecting to broker at " << m_url;

    m_device.setUrl(m_url);
    m_device.setProtocol(m_version == 3 ? "mqttv3.1" : "mqtt");

    connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() {
        qCDebug(lcWebSocketMqtt) << "WebSocket connected, initializing MQTT connection.";

        m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1);
        m_client.setTransport(&m_device, QMqttClient::IODevice);

        connect(&m_client, &QMqttClient::connected, this, [this]() {
            qCDebug(lcWebSocketMqtt) << "MQTT connection established";

            m_subscription = m_client.subscribe(m_topic);
            if (!m_subscription) {
                qDebug() << "Failed to subscribe to " << m_topic;
                emit errorOccured();
            }

            connect(m_subscription, &QMqttSubscription::stateChanged,
                    [](QMqttSubscription::SubscriptionState s) {
                qCDebug(lcWebSocketMqtt) << "Subscription state changed:" << s;
            });

            connect(m_subscription, &QMqttSubscription::messageReceived,
                    [this](QMqttMessage msg) {
                handleMessage(msg.payload());
            });
        });

        m_client.connectToHost();
    });
    if (!m_device.open(QIODevice::ReadWrite))
        qDebug() << "Could not open socket device";
}

ファイル

本ドキュメントに含まれる文書の著作権は、それぞれの所有者に帰属します 本書で提供されるドキュメントは、Free Software Foundation が発行したGNU Free Documentation License version 1.3に基づいてライセンスされています。 Qtおよびそれぞれのロゴは、フィンランドおよびその他の国におけるThe Qt Company Ltd.の 商標です。その他すべての商標は、それぞれの所有者に帰属します。