import BaseService from "./BaseService";
import mqtt from "../utils/mqtt/mqtt-2.18.8"
import MqttConstant from "../constants/MqttConstant";
import LiveMessageError from "../dto/LiveMessageError";
import ErrorConst from "../constants/ErrorConst";
export default class MqttClientService extends BaseService {
constructor(liveMessageApi) {
super(liveMessageApi)
this.mqttClient = null;
this.topics = [];
this.mqttOption = {
keepalive: 60,
clean: true,
reconnectPeriod: 0,
connectTimeout: 30 * 1000,
username: "",
password: "",
protocolId: 'MQTT',
protocolVersion: 4
};
}
connectMqtt = async (serverUrl, mqttOption, topics) => {
try {
if (this.mqttClient && this.mqttClient.connected) {
return;
}
this.topics = topics;
let option = Object.assign({}, this.mqttOption, mqttOption);
console.log("EMQX Connect option:", option);
this.mqttClient = mqtt.connect(serverUrl, option);
this.mqttClient.on("connect", (connack) => {
console.log("EMQX服务连接成功");
this.mqttClient.subscribe(topics, {
qos: MqttConstant.QOS_2
}, (err, granted) => {
if (err != null) {
console.log("EMQX TOPIC订阅失败:", err);
} else {
console.log("EMQX TOPIC订阅成功:");
for (let i = 0; i < granted.length; i++) {
let grantedData = granted[i];
console.log("topic:", grantedData.topic, "qos:", grantedData.qos);
}
const message = {
eventName: MqttConstant.MQTT_CLIENT_EVENT_NAME_SUBSCRIBE_SUCCESS,
eventData: {
appId: this.liveMessageApi.options.appId,
clientId: this.liveMessageApi.clientId,
accId: this.liveMessageApi.userInfo.accId,
accToken: this.liveMessageApi.userInfo.accToken,
}
}
this.publishMessage(MqttConstant.MQTT_PUBLISH_TOPIC_CLIENT_EVENT, message);
}
});
});
this.mqttClient.on("reconnect", () => {
console.log("EMQX Client reconnect");
});
this.mqttClient.on("disconnect", (packet) => {
console.log("EMQX Client disconnect");
});
this.mqttClient.on("offline", () => {
console.log("EMQX Client offline");
});
this.mqttClient.on("error", (error) => {
console.log("EMQX Client error");
});
this.mqttClient.on("message", (topic, payload, packet) => {
console.log("EMQX Client 收到消息-topic:", topic);
console.log("EMQX Client 收到消息-payload:", payload.toString());
});
} catch (error) {
console.log("EMQX Client connectMqtt异常:", error);
}
}
disconnect = () => {
try {
if (this.mqttClient && this.mqttClient.connected) {
this.mqttClient.end();
this.mqttClient = null;
}
} catch (error) {
}
}
publishMessage = async (topic, message) => {
return new Promise((reslove, reject) => {
try {
if (this.mqttClient && this.mqttClient.connected) {
let messageStr = JSON.stringify(message);
this.mqttClient.publish(topic, messageStr, {
qos: 2
}, (err) => {
if (err) {
console.log("EMQX Client 消息发布错误:", err);
reject(err);
} else {
console.log("EMQX Client 发布消息-topic:", topic, "payload:", message);
reslove(true);
}
});
} else {
reject(new LiveMessageError(ErrorConst.ERROR_CODE_MQTT_UNCONNECT, "客户端未连接EMQX服务"));
}
} catch (error) {
console.log("EMQX Client 消息发布异常:", error);
reject(error);
}
});
}
}
请问下楼主解决了吗,我也遇到了这个情况
小程序只有五个socket通道,超出了5个,建议app.js中做初始化。全局都使用一个。这样会解决这个问题。
// app.js
import LiveMessageApi from "./live-message/index";
App({
onLaunch:()=>{
console.log("onLaunch");
},
apiInstance:new LiveMessageApi(),
})
//Page index.js
"accId": "zhangsan",
const app = getApp();
Page({
onLoad: function () {
console.log("onLoad");
/**
* 获取LiveMessageApi 实例
*/
},
liveMessageDestory:() => {
if(app.apiInstance){
app.apiInstance.destory();
app.apiInstance = null;
}
},
})