WebSockets MQTT Subscription

Combining an MQTT client with a web socket connection.

WebSockets MQTT Subscription shows how to design a custom QIODevice to combine a web socket connection with QMqttClient.

Creating a Custom QIODevice

The new custom device, WebSocketIODevice, has to be a subclass of QIODevice:

class WebSocketIODevice : public QIODevice
{
    Q_OBJECT
public:
    WebSocketIODevice(QObject *parent = nullptr);

    bool isSequential() const override;
    qint64 bytesAvailable() const override;

    bool open(OpenMode mode) override;
    void close() override;

    qint64 readData(char *data, qint64 maxlen) override;
    qint64 writeData(const char *data, qint64 len) override;

    void setUrl(const QUrl &url);
    void setProtocol(const QByteArray &data);
Q_SIGNALS:
    void socketConnected();

public slots:
    void handleBinaryMessage(const QByteArray &msg);
    void onSocketConnected();

private:
    QByteArray m_protocol;
    QByteArray m_buffer;
    QWebSocket m_socket;
    QUrl m_url;
};

Designing a Class to Manage the Connection and Subscription

WebSocketIODevice will be a private member of the ClientSubscription class alongside the QMqttClient and the QMqttSubscription:

private:
    QMqttClient m_client;
    QMqttSubscription *m_subscription;
    QUrl m_url;
    QString m_topic;
    WebSocketIODevice m_device;
    int m_version;

Subscribing to and Receiving Messages

The main logic is implemented in the connectAndSubscribe() method of the ClientSubscription class. You need to verify that the web socket has successfully connected before you can initialize an MQTT connection over it. After the MQTT connection has been established, the QMqttClient can subscribe to the topic. If the subscription is successful, the QMqttSubscription can be used to receive messages from the subscribed topic that will be handled by the handleMessage() method of the ClientSubscription class.

void ClientSubscription::connectAndSubscribe()
{
    qCDebug(lcWebSocketMqtt) << "Connecting to broker at " << m_url;

    m_device.setUrl(m_url);
    m_device.setProtocol(m_version == 3 ? "mqttv3.1" : "mqtt");

    connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() {
        qCDebug(lcWebSocketMqtt) << "WebSocket connected, initializing MQTT connection.";

        m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1);
        m_client.setTransport(&m_device, QMqttClient::IODevice);

        connect(&m_client, &QMqttClient::connected, this, [this]() {
            qCDebug(lcWebSocketMqtt) << "MQTT connection established";

            m_subscription = m_client.subscribe(m_topic);
            if (!m_subscription) {
                qDebug() << "Failed to subscribe to " << m_topic;
                emit errorOccured();
            }

            connect(m_subscription, &QMqttSubscription::stateChanged,
                    [](QMqttSubscription::SubscriptionState s) {
                qCDebug(lcWebSocketMqtt) << "Subscription state changed:" << s;
            });

            connect(m_subscription, &QMqttSubscription::messageReceived,
                    [this](QMqttMessage msg) {
                handleMessage(msg.payload());
            });
        });

        m_client.connectToHost();
    });
    if (!m_device.open(QIODevice::ReadWrite))
        qDebug() << "Could not open socket device";
}

Files:

© 2024 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.