收藏
回答

在微信小程序调用mqtt.js Client.end()方法是出现内部异常?

import BaseService from "./BaseService";
// import mqtt from "../utils/mqtt/mqtt.min"
import mqtt from "../utils/mqtt/mqtt-2.18.8"
import MqttConstant from "../constants/MqttConstant";
import LiveMessageError from "../dto/LiveMessageError";
import ErrorConst from "../constants/ErrorConst";
export default class MqttClientService extends BaseService {
    constructor(liveMessageApi) {
        super(liveMessageApi)
        this.mqttClient = null;
        this.topics = [];
        this.mqttOption = {
            keepalive: 60,
            clean: true,
            reconnectPeriod: 0,
            connectTimeout: 30 * 1000,
            username: "",
            password: "",
            protocolId: 'MQTT',
            protocolVersion: 4
        };
    }
    /**
     * 连接EMQX服务
     * @param {Array} serverUrls 
     * @param {Object} mqttOption 
     * @param {Array} topics
     */
    connectMqtt = async (serverUrl, mqttOption, topics) => {
        try {
            if (this.mqttClient && this.mqttClient.connected) {
                return;
            }
            this.topics = topics;
            let option = Object.assign({}, this.mqttOption, mqttOption);
            console.log("EMQX Connect option:", option);
            this.mqttClient = mqtt.connect(serverUrl, option);
            this.mqttClient.on("connect", (connack) => {
                console.log("EMQX服务连接成功");
                this.mqttClient.subscribe(topics, {
                    qos: MqttConstant.QOS_2
                }, (err, granted) => {
                    if (err != null) {
                        console.log("EMQX TOPIC订阅失败:", err);
                    } else {
                        console.log("EMQX TOPIC订阅成功:");
                        for (let i = 0; i < granted.length; i++) {
                            let grantedData = granted[i];
                            console.log("topic:", grantedData.topic, "qos:", grantedData.qos);
                        }
                        /**
                         * 发布事件
                         */
                        const message = {
                            eventName: MqttConstant.MQTT_CLIENT_EVENT_NAME_SUBSCRIBE_SUCCESS,
                            eventData: {
                                appId: this.liveMessageApi.options.appId,
                                clientId: this.liveMessageApi.clientId,
                                accId: this.liveMessageApi.userInfo.accId,
                                accToken: this.liveMessageApi.userInfo.accToken,
                            }
                        }
                        this.publishMessage(MqttConstant.MQTT_PUBLISH_TOPIC_CLIENT_EVENT, message);
                    }
                });
            });


            this.mqttClient.on("reconnect", () => {
                console.log("EMQX Client reconnect");
            });
            // this.mqttClient.on("close", () => {
            //     console.log("EMQX Client close");
            // });
            this.mqttClient.on("disconnect", (packet) => {
                console.log("EMQX Client disconnect");
            });
            this.mqttClient.on("offline", () => {
                console.log("EMQX Client offline");
            });
            this.mqttClient.on("error", (error) => {
                console.log("EMQX Client error");
            });
            // this.mqttClient.on("end", () => {
            //     console.log("EMQX Client end");
            // });


            this.mqttClient.on("message", (topic, payload, packet) => {
                console.log("EMQX Client 收到消息-topic:", topic);
                console.log("EMQX Client 收到消息-payload:", payload.toString());
            });
        } catch (error) {
            console.log("EMQX Client connectMqtt异常:", error);
        }
    }
    /**
     * 关闭
     */
    disconnect = () => {
        try {
            if (this.mqttClient && this.mqttClient.connected) {
                this.mqttClient.end();
                this.mqttClient = null;
            }
        } catch (error) {


        }
    }


    /**
     * 发布消息
     */


    publishMessage = async (topic, message) => {
        return new Promise((reslove, reject) => {
            try {
                if (this.mqttClient && this.mqttClient.connected) {
                    let messageStr = JSON.stringify(message);
                    this.mqttClient.publish(topic, messageStr, {
                        qos: 2
                    }, (err) => {
                        if (err) {
                            console.log("EMQX Client 消息发布错误:", err);
                            reject(err);
                        } else {
                            console.log("EMQX Client 发布消息-topic:", topic, "payload:", message);
                            reslove(true);
                        }
                    });


                } else {
                    reject(new LiveMessageError(ErrorConst.ERROR_CODE_MQTT_UNCONNECT, "客户端未连接EMQX服务"));
                }
            } catch (error) {
                console.log("EMQX Client 消息发布异常:", error);
                reject(error);
            }
        });
    }
}
回答关注问题邀请回答
收藏

2 个回答

  • T17_18
    T17_18
    2021-09-10

    请问下楼主解决了吗,我也遇到了这个情况

    2021-09-10
    有用
    回复
  • 😶
    😶
    2021-01-20

    小程序只有五个socket通道,超出了5个,建议app.js中做初始化。全局都使用一个。这样会解决这个问题。

    2021-01-20
    有用
    回复 5
    • Cgs
      Cgs
      2021-01-20
      2021-01-20
      回复
    • Cgs
      Cgs
      2021-01-20
      打扰了,按照您的回复在app.js进行初始也会出现报错,还有其他方案吗?
      2021-01-20
      回复
    • Cgs
      Cgs
      2021-01-20
      断点调试,当page index.js destory 执行完后没有报错,此时会进入vm运行代码,之后就出现错误,是否是与底层有冲突?有空麻烦看看,谢谢
      2021-01-20
      回复
    • Cgs
      Cgs
      2021-01-20
      或者您们有集成mqtt.js 案例发一份参考下
      2021-01-20
      回复
    • 筱筱
      筱筱
      2023-07-18回复Cgs
      请问这个问题解决了吗?
      2023-07-18
      回复
登录 后发表内容
问题标签