WebSockets MQTT-Abonnement
Kombination eines MQTT-Clients mit einer WebSocket-Verbindung.
WebSockets MQTT Subscription zeigt, wie man eine benutzerdefinierte QIODevice entwirft, um eine Web-Socket-Verbindung mit QMqttClient zu kombinieren.
Erstellen eines benutzerdefinierten QIODevice
Das neue benutzerdefinierte Gerät, WebSocketIODevice
, muss eine Unterklasse von QIODevice sein:
class WebSocketIODevice : public QIODevice { Q_OBJECT public: WebSocketIODevice(QObject *parent = nullptr); bool isSequential() const override; qint64 bytesAvailable() const override; bool open(OpenMode mode) override; void close() override; qint64 readData(char *data, qint64 maxlen) override; qint64 writeData(const char *data, qint64 len) override; void setUrl(const QUrl &url); void setProtocol(const QByteArray &data); Q_SIGNALS: void socketConnected(); public slots: void handleBinaryMessage(const QByteArray &msg); void onSocketConnected(); private: QByteArray m_protocol; QByteArray m_buffer; QWebSocket m_socket; QUrl m_url; };
Entwerfen einer Klasse zur Verwaltung der Verbindung und des Abonnements
WebSocketIODevice
wird ein privates Mitglied der Klasse ClientSubscription
neben QMqttClient und QMqttSubscription sein:
private: QMqttClient m_client; QMqttSubscription *m_subscription; QUrl m_url; QString m_topic; WebSocketIODevice m_device; int m_version;
Abonnieren und Empfangen von Nachrichten
Die Hauptlogik ist in der Methode connectAndSubscribe()
der Klasse ClientSubscription
implementiert. Bevor Sie eine MQTT-Verbindung darüber initialisieren können, müssen Sie überprüfen, ob der Websocket erfolgreich verbunden wurde. Nachdem die MQTT-Verbindung hergestellt wurde, kann QMqttClient das Topic abonnieren. Wenn die Subskription erfolgreich ist, kann QMqttSubscription verwendet werden, um Nachrichten von dem abonnierten Topic zu empfangen, die von der Methode handleMessage()
der Klasse ClientSubscription
verarbeitet werden.
void ClientSubscription::connectAndSubscribe() { qCDebug(lcWebSocketMqtt)<< "Connecting to broker at "<< m_url; m_device.setUrl(m_url); m_device.setProtocol(m_version == 3 ? "mqttv3.1": "mqtt"); connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() { qCDebug(lcWebSocketMqtt)<< "WebSocket connected, initializing MQTT connection."; m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1); m_client.setTransport(&m_device, QMqttClient::IODevice); connect(&m_client, &QMqttClient::connected, this, [this]() { qCDebug(lcWebSocketMqtt)<< "MQTT Verbindung aufgebaut"; m_subscription = m_client.subscribe(m_topic); if (!m_subscription) { qDebug() << "Failed to subscribe to " << m_topic; emit errorOccured(); } connect(m_subscription, &QMqttSubscription::stateChanged, [](QMqttSubscription::SubscriptionState s) { qCDebug(lcWebSocketMqtt)<< "Subscription state changed:"<< s; }); connect(m_subscription, &QMqttSubscription::messageReceived, [this](QMqttMessage msg) { handleMessage(msg.payload()); }); }); m_client.connectToHost(); }); if (!m_device.open(QIODevice::ReadWrite)) qDebug() << "Could not open socket device"; }
Dateien:
© 2025 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.