WebSockets MQTT 订阅

将 MQTT 客户端与网络套接字连接相结合。

WebSockets MQTT Subscription展示了如何设计一个自定义QIODevice ,将Web 套接字连接与QMqttClient 结合起来。

创建自定义 QIODevice

新的自定义设备WebSocketIODevice 必须是QIODevice 的子类:

class WebSocketIODevice : public QIODevice
{
    Q_OBJECT
public:
    WebSocketIODevice(QObject *parent = nullptr);

    bool isSequential() const override;
    qint64 bytesAvailable() const override;

    bool open(OpenMode mode) override;
    void close() override;

    qint64 readData(char *data, qint64 maxlen) override;
    qint64 writeData(const char *data, qint64 len) override;

    void setUrl(const QUrl &url);
    void setProtocol(const QByteArray &data);
Q_SIGNALS:
    void socketConnected();

public slots:
    void handleBinaryMessage(const QByteArray &msg);
    void onSocketConnected();

private:
    QByteArray m_protocol;
    QByteArray m_buffer;
    QWebSocket m_socket;
    QUrl m_url;
};

设计管理连接和订阅的类

WebSocketIODevice 将与 和 一起成为 类的私有成员:QMqttClient QMqttSubscription ClientSubscription

private:
    QMqttClient m_client;
    QMqttSubscription *m_subscription;
    QUrl m_url;
    QString m_topic;
    WebSocketIODevice m_device;
    int m_version;

订阅和接收信息

主要逻辑在ClientSubscription 类的connectAndSubscribe() 方法中实现。在通过它初始化 MQTT 连接之前,您需要验证网络套接字是否已成功连接。建立 MQTT 连接后,QMqttClient 就可以订阅主题。如果订阅成功,QMqttSubscription 可用于接收来自订阅主题的消息,这些消息将由ClientSubscription 类的handleMessage() 方法处理。

voidClientSubscription::connectAndSubscribe() { qCDebug(lcWebSocketMqtt)<< "Connecting to broker at "<<m_url; m_device.setUrl(m_url); m_device.setProtocol(m_version== 3 ? "mqttv3.1":"mqtt"); connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() { qCDebug(lcWebSocketMqtt)<< "WebSocket 已连接,正在初始化 MQTT 连接"; m_client.setProtocolVersion(m_version== 3 ?QMqttClient::MQTT_3_1  QMqttClient::MQTT_3_1_1); m_client.setTransport(&m_device QMqttClientm_client.setTransport(&m_device, ::IODevice); connect(&m_client, &m_device, ::connected,this,[thisQMqttClient连接(&m_client, &::connected, this,[this]() { qCDebug(lcWebSocketMqtt)<< "MQTT 连接已建立"; m_subscription=m_client.subscribe(m_topic);if(!m_subscription) {                qDebug() << "Failed to subscribe to " << m_topic;
               emiterrorOccured(); } connect(m_subscription, &QMqttSubscription::stateChanged, [](QMqttSubscription::SubscriptionState s) { qCDebug(lcWebSocketMqtt)<< "Subscription state changed:"<<s; }); connect(m_subscription, &::messageReceived, [this](QMqttSubscription::messageReceived, [this](QMqttMessagemsg) { handleMessage(msg.payload()); }); }); m_client.connectToHost(); });if(!m_device.open(QIODevice::ReadWrite))        qDebug() << "Could not open socket device";
}

文件:

© 2025 The Qt Company Ltd. Documentation contributions included herein are the copyrights of their respective owners. The documentation provided herein is licensed under the terms of the GNU Free Documentation License version 1.3 as published by the Free Software Foundation. Qt and respective logos are trademarks of The Qt Company Ltd. in Finland and/or other countries worldwide. All other trademarks are property of their respective owners.