UNPKG

34.1 kBJavaScriptView Raw
1/* jshint -W097 */
2/* jshint -W030 */
3/* jshint strict:true */
4/* jslint node: true */
5/* jslint esversion: 6 */
6'use strict';
7
8
9const mqtt = require('mqtt-connection');
10const ping = require('ping');
11const tcpp = require('tcp-ping');
12const net = require('net');
13const request = require('request');
14const datapoints = require(__dirname + '/datapoints');
15
16/**
17 * wait x miliseconds
18 * @param {number} ms - milliseconds
19 */
20function sleep(ms) {
21 return new Promise((resolve) => setTimeout(resolve, ms));
22}
23
24/**
25 * checks if funcito is an asynchron function
26 * @param {function} funct - function
27 */
28function isAsync(funct) {
29 if (funct && funct.constructor) return funct.constructor.name == 'AsyncFunction';
30 return undefined;
31}
32
33/**
34 * ping host
35 * @param {string} host - hostname or ip-address
36 */
37function pingAsyncOld(host) {
38 return new Promise((resolve, reject) => {
39 ping.sys.probe(host, (isAlive) => {
40 if (isAlive) {
41 resolve(true);
42 } else {
43 resolve(false);
44 }
45 });
46 });
47}
48
49/**
50 * Ping host with tcp
51 * @param {*} host - hostname like www.google.de or 192.168.20.1
52 * @param {*} port - 80 or 443
53 */
54function pingAsync(host, port) {
55 if (!port) port = 80;
56 return new Promise((resolve, reject) => {
57 tcpp.probe(host, port, (error, isAlive) => {
58 resolve(isAlive);
59 });
60 });
61}
62
63/**
64 * http request asynchron
65 * @param {string} url - url
66 */
67function requestAsync(url) {
68 return new Promise((resolve, reject) => {
69 request(url, (error, res, body) => {
70 if (!error && body) {
71 resolve(body);
72 } else {
73 reject(error);
74 }
75 });
76 });
77}
78
79/**
80 * search pattern in an object and replace the text with an text
81 * @param {object} source - object
82 * @param {regex} pattern - search text
83 * @param {string} replacement - replacement text
84 */
85function recursiveSubStringReplace(source, pattern, replacement) {
86 function recursiveReplace(objSource) {
87 switch (typeof objSource) {
88 case 'string':
89 return objSource.replace(pattern, replacement);
90 case 'object':
91 if (objSource === null) {
92 return null;
93 }
94 Object.keys(objSource).forEach(function (property) {
95 objSource[property] = recursiveReplace(objSource[property]);
96 });
97 return objSource;
98 default:
99 return objSource;
100 }
101 }
102 return recursiveReplace(source);
103}
104
105class MQTTClient {
106 constructor(adapter, objectHelper, eventEmitter, stream) {
107 this.active = true;
108 this.adapter = adapter;
109 this.objectHelper = objectHelper;
110 this.eventEmitter = eventEmitter;
111 this.packet;
112 this.qos = {};
113 this.messageIds = [];
114 this.messageId = 1;
115 this.states = {};
116 this.stream = stream;
117 this.client;
118 this.device = {};
119 this.http = {};
120 this.auth;
121 this.polltime = this.adapter.config.polltime * 1000 || 5000;
122 this.ip;
123 this.id;
124 this.devicename;
125 this.devicetype;
126 this.deviceid;
127 this.serialid;
128 this.mqttprefix;
129 this.deviceexist;
130 this.httptimeout = 5 * 1000;
131 if (this.adapter.config.httpusername && this.adapter.config.httppassword && this.adapter.config.httpusername.length > 0 && this.adapter.config.httppassword.length > 0)
132 this.auth = 'Basic ' + Buffer.from(this.adapter.config.httpusername + ':' + this.adapter.config.httppassword).toString('base64');
133 this.start();
134 }
135
136 /**
137 * to get sure, thatan instance will be start more than one, we check for running instances
138 * if an instance run with same name (shellyswitch-12345), we destroy the old instance
139 * @param {object} self - my instance
140 */
141 static _registerRun(self) {
142 if (self) {
143 if (!this.clientlist) this.clientlist = {};
144 let name = self.getId();
145 if (name && this.clientlist[name]) this.clientlist[name].destroy();
146 this.clientlist[name] = self;
147 }
148 }
149
150 /**
151 * Get IP of device back. For example
152 * 192.168.1.2
153 */
154 getIP() {
155 if (!this.ip) {
156 if (this.stream && this.stream.remoteAddress) this.ip = this.stream.remoteAddress;
157 }
158 return this.ip;
159 }
160
161 /**
162 * Get the ID of the Shelly Device. For example: shellyplug-s-12345
163 */
164 getId() {
165 if (!this.id) {
166 if (this.packet && this.packet.clientId) this.id = this.packet.clientId;
167 }
168 return this.id;
169 }
170
171 /**
172 * Get the Shelly Device type with the serialnumber of the device back.
173 * Device type could be for example SHRGBW2. The serial number of the
174 * device like 1234 will be added
175 * Example: SHRGBW2#1234#1
176 */
177 getDeviceName() {
178 if (!this.devicename) {
179 this.devicename = this.getDeviceType() + '#' + this.getSerialId() + '#1';
180 }
181 return this.devicename;
182 }
183
184 /**
185 * Get the Shelly Device type without serialnumber of the device back.
186 * Example: SHRGBW2
187 */
188 getDeviceType() {
189 if (!this.devicetype) {
190 let deviceid = this.getDeviceId();
191 this.devicetype = datapoints.getDeviceNameForMQTT(deviceid);
192 }
193 return this.devicetype;
194 }
195
196 /**
197 * Get the deviceid back without serial number.
198 * For example, you get shellyplug-s for shellyplug-s-12345 back
199 */
200 getDeviceId() {
201 if (!this.deviceid) {
202 let id = this.getId();
203 if (id) {
204 id = id.replace(/(.+?)\/(.+?)\/(.*)/, '$2');
205 this.deviceid = id.replace(/(.+)-(.+)/, '$1');
206 }
207 }
208 return this.deviceid;
209 }
210
211 /**
212 * Get the serialid back without devicename.
213 * For example, you get 12345 for shellyplug-s-12345 back
214 */
215 getSerialId() {
216 if (!this.serialid) {
217 let id = this.getId();
218 if (id) {
219 id = id.replace(/(.+?)\/(.+?)\/(.*)/, '$2');
220 this.serialid = id.replace(/(.+)-(.+)/, '$2');
221 }
222 }
223 return this.serialid;
224 }
225
226 /**
227 * Checks if Shelly device type in the configuration exist. If missing
228 * you have to add a configuration in the ./lib/devices direcotory
229 */
230 deviceExist() {
231 if (this.deviceexist === undefined) {
232 let deviceid = this.getDeviceId();
233 this.deviceexist = datapoints.getDeviceNameForMQTT(deviceid) ? true : false;
234 }
235 return this.deviceexist;
236 }
237
238 /**
239 * Returns a string for Logging with the IP address and name of Shelly Device and type
240 */
241 getName() {
242 let name = this.getDeviceName(); // SHRGBW2#1234#1
243 let ip = this.getIP(); // 192.168.11.1
244 let deviceid = this.getDeviceId(); // shellyplug-s-12345
245 let id = this.getId(); // shellyplug-s-12345
246 return ip + ' (' + deviceid + ' / ' + id + ' / ' + name + ')';
247 }
248
249 /**
250 * Cleanup, destroy this object
251 */
252 destroy() {
253 if (this.active) {
254 this.adapter.log.info('Destroy ' + this.getName());
255 this.active = false;
256 clearTimeout(this.timerid);
257 clearInterval(this.resendid);
258 clearTimeout(this.autoupdateid);
259 this.qos = {};
260 this.messageId = 1;
261 this.states = {};
262 this.device = {};
263 this.http = {};
264 this.ip = undefined;
265 this.id = undefined;
266 this.devicename = undefined;
267 this.devicetype = undefined;
268 this.deviceid = undefined;
269 this.serialid = undefined;
270 this.deviceexist = undefined;
271 this.mqttprefix = undefined;
272 if (this.client) {
273 this.client.removeAllListeners();
274 this.client.destroy();
275 }
276 }
277 }
278
279 /**
280 * Sends MQTT Messages, for example to change a state
281 * @param {*} topic
282 * @param {*} state
283 * @param {*} qos
284 * @param {*} dup
285 * @param {*} retain
286 * @param {function} cb
287 */
288 sendState2Client(topic, state, qos, dup, retain, cb) {
289 if (typeof qos === 'function') {
290 cb = qos;
291 dup = false;
292 qos = undefined;
293 }
294 if (typeof dup === 'function') {
295 cb = dup;
296 dup = false;
297 retain = undefined;
298 }
299 if (typeof retain === 'function') {
300 cb = retain;
301 retain = undefined;
302 }
303 qos = qos ? Number.parseInt(qos) : 0;
304 this.messageId &= 0xFFFFFFFF;
305 this.messageId++;
306 this.adapter.log.debug('Send state to ' + this.getName() + ' : ' + topic + ' = ' + state + ' (' + this.messageId + ')');
307 this.client.publish({ topic: topic, payload: state, qos: qos, retain: retain, messageId: this.messageId }, cb);
308 // if qos > 0 recognize message
309 if (qos > 0) {
310 this.deleteResendState2ClientFromTopic(topic);
311 this.resendState2Client('publish', this.messageId, { topic: topic, payload: state, qos: qos, dup: true, retain: retain, messageId: this.messageId });
312 }
313 }
314
315
316 resendState2Client(cmd, messageId, message) {
317 let retaintime = 5 * 1000;
318 if (!this.qos[messageId] || this.qos[messageId].cmd !== cmd || this.qos[messageId].message !== message) {
319 this.qos[messageId] = {
320 ts: Date.now(),
321 cmd: cmd,
322 count: 0,
323 message: message
324 };
325 }
326 if (this.qos[messageId] && this.qos[messageId].count < 10) {
327 clearTimeout(this.qos[messageId].resendid);
328 this.qos[messageId].resendid = setTimeout(() => {
329 if (this.qos[messageId]) {
330 let ts = Date.now();
331 this.qos[messageId].count++;
332 this.qos[messageId].ts = ts;
333 switch (this.qos[messageId].cmd) {
334 case 'publish':
335 this.client.publish(this.qos[messageId].message);
336 break;
337 case 'pubrel':
338 this.client.pubrel({ messageId: messageId });
339 break;
340 case 'pubrec':
341 this.client.pubrec({ messageId: messageId });
342 break;
343 case 'pubcomp':
344 this.client.pubcomp({ messageId: messageId });
345 break;
346 default:
347 break;
348 }
349 this.resendState2Client(cmd, messageId, message);
350 }
351 }, retaintime);
352 }
353 }
354
355 deleteResendState2Client(messageId) {
356 if (this.qos[messageId]) {
357 clearTimeout(this.qos[messageId].resendid);
358 delete this.qos[messageId];
359 }
360 }
361
362 deleteResendState2ClientFromTopic(topic) {
363 for (let messageId in this.qos) {
364 if (this.qos[messageId].message && this.qos[messageId].cmd === 'publish' && this.qos[messageId].message.topic === topic) {
365 clearTimeout(this.qos[messageId].resendid);
366 delete this.qos[messageId];
367 }
368 }
369 }
370
371 getResendState2Client(messageId) {
372 return this.qos[messageId];
373 }
374
375 /**
376 * delete old states in objects unter shelly.X.
377 * For example if the configuration for the device change
378 */
379 async deleteOldStates() {
380 let id = this.adapter.namespace + '.' + this.getDeviceName();
381 let obj = await this.adapter.getAdapterObjectsAsync();
382 let dps = datapoints.getAll('mqtt');
383 let deviceid = this.getDeviceId();
384 dps = dps[deviceid];
385 if (dps) {
386 for (let i in obj) {
387 let tmpid = obj[i];
388 if (!tmpid) continue;
389 let stateid = tmpid._id.replace(id + '.', '');
390 if (tmpid.type === 'state' && tmpid._id.startsWith(id)) {
391 if (!dps[stateid]) {
392 try {
393 await this.adapter.delObjectAsync(tmpid._id);
394 delete obj[tmpid._id];
395 this.adapter.log.info('Delete old state: ' + tmpid._id);
396 } catch (error) {
397 this.adapter.log.error('Could not delete old state: ' + tmpid._id);
398 }
399 }
400 }
401 }
402 }
403 // delete empty channels
404 for (let i in obj) {
405 let tmpidi = obj[i];
406 if (tmpidi && tmpidi.type && tmpidi._id && tmpidi.type === 'channel') {
407 let found = false;
408 for (let j in obj) {
409 let tmpidj = obj[j];
410 if (!tmpidj) {
411 continue;
412 }
413 if (tmpidj && tmpidj.type && tmpidj._id && tmpidj.type === 'state' && tmpidj._id.startsWith(tmpidi._id)) {
414 found = true;
415 break;
416 }
417 }
418 if (found === false) {
419 try {
420 await this.adapter.delObjectAsync(tmpidi._id);
421 delete obj[tmpidi._id];
422 this.adapter.log.info('Delete old channel: ' + tmpidi._id);
423 } catch (error) {
424 this.adapter.log.error('Could not delete old channel: ' + tmpidi._id);
425 }
426 }
427 }
428 }
429 }
430
431 /**
432 * Create objects unter shelly.0 for a new shelly device
433 * The Shelly device has to exist in the ./lib/devices/ directory
434 */
435 createObjects() {
436 if (Object.keys(this.device).length === 0) {
437 let deviceid = this.getDeviceId();
438 let devices = datapoints.getDeviceByType(deviceid, 'mqtt');
439 if (devices) {
440 devices = recursiveSubStringReplace(devices, new RegExp('<mqttprefix>', 'g'), this.mqttprefix);
441 // devices = recursiveSubStringReplace(devices, new RegExp('<devicetype>', 'g'), deviceid);
442 // devices = recursiveSubStringReplace(devices, new RegExp('<deviceid>', 'g'), this.getSerialId());
443 for (let j in devices) {
444 let statename = j;
445 let state = devices[statename];
446 state.state = statename;
447 let deviceid = this.getDeviceName();
448 if (!this.states[deviceid] || this.states[deviceid] !== deviceid) {
449 this.states[deviceid] = deviceid;
450 this.objectHelper.setOrUpdateObject(deviceid, {
451 type: 'device',
452 common: {
453 name: 'Device ' + deviceid
454 },
455 native: {}
456 }, ['name']);
457 }
458 let channel = statename.split('.').slice(0, 1).join();
459 if (channel !== statename) {
460 let channelid = deviceid + '.' + channel;
461 if (!this.states[channelid] || this.states[channelid] !== channelid) {
462 this.states[channelid] = channelid;
463 this.objectHelper.setOrUpdateObject(channelid, {
464 type: 'channel',
465 common: {
466 name: 'Channel ' + channel
467 }
468 }, ['name']);
469 }
470 }
471 let stateid = deviceid + '.' + statename;
472 let controlFunction;
473 if (state.mqtt && state.mqtt.mqtt_cmd) {
474 controlFunction = async (value) => {
475 let cmd = state.mqtt.mqtt_cmd;
476 if (state.mqtt && state.mqtt.mqtt_cmd_funct) {
477 try {
478 value = isAsync(state.mqtt.mqtt_cmd_funct) ? await state.mqtt.mqtt_cmd_funct(value, this) : state.mqtt.mqtt_cmd_funct(value, this);
479 } catch (error) {
480 this.adapter.log.error('Error in function state.mqtt.mqtt_cmd_funct for state ' + stateid + ' for ' + this.getName() + ' (' + error + ')');
481 }
482 }
483 this.sendState2Client(cmd, value, this.adapter.config.qos);
484 delete this.states[stateid];
485 };
486 } else if (state.mqtt && state.mqtt.http_cmd) {
487 controlFunction = async (value) => {
488 if (state.mqtt && state.mqtt.http_cmd_funct) {
489 try {
490 value = isAsync(state.mqtt.http_cmd_funct) ? await state.mqtt.http_cmd_funct(value, this) : state.mqtt.http_cmd_funct(value, this);
491 } catch (error) {
492 this.adapter.log.error('Error in function state.mqtt.http_cmd_funct for state ' + stateid + ' for ' + this.getName() + ' (' + error + ')');
493 }
494 }
495 let body;
496 let params;
497 try {
498 if (this.auth) {
499 params = {
500 url: 'http://' + this.getIP() + state.mqtt.http_cmd,
501 timeout: this.httptimeout,
502 qs: value,
503 headers: {
504 'Authorization': this.auth
505 }
506 };
507 } else {
508 params = {
509 url: 'http://' + this.getIP() + state.mqtt.http_cmd,
510 timeout: this.httptimeout,
511 qs: value
512 };
513 }
514 this.adapter.log.debug('Call url ' + JSON.stringify(params) + ' for ' + this.getName());
515 body = await requestAsync(params);
516 // this.adapter.log.debug('Create Object body : ' + body);
517 } catch (error) {
518 if (body && body === '401 Unauthorized') {
519 this.adapter.log.error('Wrong http username or http password! Please enter the user credential from restricted login for ' + this.getName());
520 } else {
521 this.adapter.log.error('Error in function state.mqtt.http_cmd for state ' + stateid + ' and request' + JSON.stringify(params) + ' for ' + this.getName() + ' (' + error + ')');
522 }
523 }
524 delete this.states[stateid];
525 };
526 }
527 if (state.mqtt.http_publish && !state.mqtt.mqtt_publish) {
528 if (!this.http[state.mqtt.http_publish]) this.http[state.mqtt.http_publish] = [];
529 this.http[state.mqtt.http_publish].push(statename);
530 }
531 let value;
532 if (state.mqtt.mqtt_init_value) value = state.mqtt.mqtt_init_value;
533 this.objectHelper.setOrUpdateObject(stateid, {
534 type: 'state',
535 common: state.common
536 }, ['name'], value, controlFunction);
537 }
538 this.device = devices;
539 }
540 this.objectHelper.processObjectQueue(() => { });
541 }
542 }
543
544 getDevices(topic) {
545 let states = [];
546 for (let i in this.device) {
547 let state = this.device[i];
548 if (state.mqtt && state.mqtt.mqtt_publish && topic === state.mqtt.mqtt_publish) states.push(state);
549 // if (state.mqtt && state.mqtt.mqtt_cmd && topic === state.mqtt.mqtt_cmd) states.push(state);
550 }
551 return states;
552 }
553
554 /**
555 * State changes from device will be saved in the ioBroker states
556 * @param {object} payload - object can be ervery type of value
557 */
558 async createIoBrokerState(topic, payload) {
559 this.adapter.log.debug('MQTT Message for ' + this.getId() + ' : ' + topic + ' / ' + JSON.stringify(payload));
560 let dps = this.getDevices(topic);
561 for (let i in dps) {
562 let dp = dps[i];
563 let deviceid = this.getDeviceName();
564 let stateid = deviceid + '.' + dp.state;
565 let value = payload.toString();
566 this.adapter.log.debug('Create State : ' + stateid + ', Payload: ' + JSON.stringify(payload) + ' for ' + this.getId());
567 this.adapter.log.debug('Create State : ' + stateid + ', Payload: ' + payload.toString() + ' for ' + this.getId());
568 try {
569 if (dp.mqtt && dp.mqtt.mqtt_publish === topic) {
570 if (dp.mqtt && dp.mqtt.mqtt_publish_funct)
571 value = isAsync(dp.mqtt.mqtt_publish_funct) ? await dp.mqtt.mqtt_publish_funct(value, this) : dp.mqtt.mqtt_publish_funct(value, this);
572 if (dp.common.type === 'boolean' && value === 'false') value = false;
573 if (dp.common.type === 'boolean' && value === 'true') value = true;
574 if (dp.common.type === 'number' && value !== undefined) value = Number(value);
575 // this.adapter.log.debug('createIoBrokerState(), State : ' + stateid + ', Value: ' + JSON.stringify(value));
576 if (value !== undefined && (!Object.prototype.hasOwnProperty.call(this.states, stateid) || this.states[stateid] !== value || this.adapter.config.updateUnchangedObjects)) {
577 this.adapter.log.debug('State change : ' + stateid + ', Value: ' + JSON.stringify(value) + ' for ' + this.getName());
578 this.adapter.log.debug('MQTT Message for ' + this.getId() + ' : ' + topic + ' = ' + value);
579 this.states[stateid] = value;
580 this.objectHelper.setOrUpdateObject(stateid, {
581 type: 'state',
582 common: dp.common
583 }, ['name'], value);
584 }
585 }
586 } catch (error) {
587 this.adapter.log.error('Error ' + error + ' in function dp.mqtt.mqtt_publish_funct for state ' + stateid + ' for ' + this.getName());
588 }
589 }
590 this.objectHelper.processObjectQueue(() => { });
591 }
592
593 /**
594 * Missting data in MQTT will be pulled by http
595 */
596 async httpIoBrokerState() {
597 // Test - to delete after a while
598 if (!this.httpIoBrokerStateTime || Date.now() >= (this.httpIoBrokerStateTime + (1000 * 60 * 10))) {
599 this.httpIoBrokerStateTime = Date.now();
600 // this.adapter.log.info('httpIoBrokerState() still running on ' + this.getName() + ' for ' + JSON.stringify(this.http));
601 }
602 let alive = await pingAsync(this.getIP());
603 if (alive === false) {
604 this.timerid = setTimeout(async () => await this.httpIoBrokerState(), 100);
605 return;
606 }
607 for (let i in this.http) {
608 let params;
609 let states = this.http[i];
610 try {
611 if (this.auth) {
612 params = {
613 url: 'http://' + this.getIP() + i,
614 timeout: this.httptimeout,
615 headers: {
616 'Authorization': this.auth
617 }
618 };
619 } else {
620 params = {
621 url: 'http://' + this.getIP() + i,
622 timeout: this.httptimeout
623 };
624 }
625 this.adapter.log.debug('http request' + JSON.stringify(params) + ' for ' + this.getName());
626 let body = await requestAsync(params);
627 for (let j in states) {
628 let state = this.device[states[j]];
629 if (state && state.state) {
630 let deviceid = this.getDeviceName();
631 let stateid = deviceid + '.' + state.state;
632 let value = body;
633 try {
634 if (state.mqtt && state.mqtt.http_publish_funct)
635 value = isAsync(state.mqtt.http_publish_funct) ? await state.mqtt.http_publish_funct(value, this) : state.mqtt.http_publish_funct(value, this);
636 if (state.common.type === 'boolean' && value === 'false') value = false;
637 if (state.common.type === 'boolean' && value === 'true') value = true;
638 if (state.common.type === 'number' && value !== undefined) value = Number(value);
639 if (value !== undefined && (!Object.prototype.hasOwnProperty.call(this.states, stateid) || this.states[stateid] !== value || this.adapter.config.updateUnchangedObjects)) {
640 this.adapter.log.debug('Set http state ' + stateid + ', Value: ' + JSON.stringify(value) + ' for ' + this.getName());
641 this.states[stateid] = value;
642 this.objectHelper.setOrUpdateObject(stateid, {
643 type: 'state',
644 common: state.common
645 }, ['name'], value);
646 }
647 this.polltime = this.adapter.config.polltime * 1000 || 5000;
648 } catch (error) {
649 if (error.name && error.name.startsWith('TypeError')) {
650 this.adapter.log.debug('Could not find property for state ' + stateid + ' and request' + JSON.stringify(params) + ' for ' + this.getName() + ' (' + error + ')');
651 } else {
652 this.polltime = 60 * 1000;
653 if (body && body === '401 Unauthorized') {
654 this.adapter.log.error('Wrong http username or http password! Please enter the user credential from restricted login for ' + this.getName());
655 break;
656 } else {
657 this.adapter.log.error('Error in function httpIoBrokerState for state ' + stateid + ' and request' + JSON.stringify(params) + ' for ' + this.getName() + ' (' + error + ')');
658 }
659 }
660 }
661 }
662 }
663 this.objectHelper.processObjectQueue(() => { });
664 } catch (error) {
665 // this.polltime = 60 * 1000;
666 // this.adapter.log.error('Error ' + error + ' - ' + this.getName());
667 }
668 }
669 if (this.http && Object.keys(this.http).length > 0) {
670 // await sleep(this.polltime);
671 this.timerid = setTimeout(async () => await this.httpIoBrokerState(), this.polltime);
672 }
673 }
674
675 async firmwareUpdatePolling() {
676 if (this.adapter.config.autoupdate) {
677 await this.firmwareUpdate(true);
678 this.autoupdateid = setTimeout(async () => await this.firmwareUpdatePolling(), 60 * 1000);
679 }
680 }
681
682 async firmwareUpdate(update) {
683 if (!update) return;
684 this.adapter.log.debug('Calling function firmwareUpdate');
685 let params;
686 try {
687 if (this.auth) {
688 params = {
689 url: 'http://' + this.getIP() + '/ota?update=true',
690 timeout: this.httptimeout,
691 headers: {
692 'Authorization': this.auth
693 }
694 };
695 } else {
696 params = {
697 url: 'http://' + this.getIP() + '/ota?update=true',
698 timeout: this.httptimeout
699 };
700 }
701 this.adapter.log.debug('Call url ' + JSON.stringify(params) + ' for ' + this.getName());
702 let body = await requestAsync(params);
703 // this.adapter.log.info('Executing Firmwareupdate for ' + this.getName());
704 } catch (error) {
705 this.adapter.log.error('Error in function firmwareUpdate and request' + JSON.stringify(params) + ' for ' + this.getName() + ' (' + error + ')');
706 }
707 }
708
709
710 start() {
711 this.client = mqtt(this.stream);
712 this.eventEmitter.on('onFirmwareUpdate', async () => await this.firmwareUpdate(true));
713 // this.firmwareUpdatePolling();
714 this.listener();
715 }
716
717
718 async setMqttPrefixHttp() {
719 let body;
720 let params;
721
722 if (this.mqttprefix) {
723 return this.mqttprefix;
724 }
725
726 try {
727 if (this.auth) {
728 params = {
729 url: 'http://' + this.getIP() + '/settings',
730 timeout: this.httptimeout,
731 headers: {
732 'Authorization': this.auth
733 }
734 };
735 } else {
736 params = {
737 url: 'http://' + this.getIP() + '/settings',
738 timeout: this.httptimeout,
739 };
740 }
741 this.adapter.log.debug('Call url ' + JSON.stringify(params) + ' for ' + this.getName());
742 body = await requestAsync(params);
743 if (body) {
744 let settings = JSON.parse(body);
745 this.mqttprefix = settings.mqtt.id;
746 return this.mqttprefix;
747 }
748 } catch (error) {
749 if (body && body === '401 Unauthorized') {
750 this.adapter.log.error('Wrong http username or http password! Please enter the user credential from restricted login for ' + this.getName());
751 } else {
752 this.adapter.log.error('Error in function setMqttPrefixHttp() for request' + JSON.stringify(params) + ' for ' + this.getName() + ' (' + error + ')');
753 }
754 }
755 return undefined;
756 }
757
758 setMqttPrefixByWill(topic) {
759 // "shellies/huhu-shellybutton1-A4CF12F454A3/online"
760 if (this.mqttprefix) {
761 return this.mqttprefix;
762 } else {
763 if (topic) {
764 let arr = topic.split('/');
765 if (arr[0] === 'shellies') {
766 this.mqttprefix = arr[1];
767 return this.mqttprefix;
768 }
769 }
770 return undefined;
771 }
772 }
773
774
775 listener() {
776 // client connected
777 this.client.on('connect', async (packet) => {
778 this.packet = packet;
779 if (this.deviceExist()) {
780 if (packet.username === this.adapter.config.mqttusername && packet.password !== undefined && packet.password.toString() === this.adapter.config.mqttpassword) {
781 // check for existing instances
782 MQTTClient._registerRun(this);
783 this.adapter.log.info('Shelly device ' + this.getName() + ' with MQTT connected!');
784 // Letzer Wille speichern
785 if (packet.will) {
786 this.will = packet.will;
787 }
788 if (this.will && this.will.topic) {
789 this.setMqttPrefixByWill(this.will.topic);
790 } else {
791 await this.setMqttPrefixHttp();
792 }
793 this.deleteOldStates();
794 this.createObjects();
795 this.httpIoBrokerState();
796 this.client.connack({ returnCode: 0 });
797 // this.client.connack({ returnCode: 0, sessionPresent });
798 } else {
799 this.adapter.log.error('Wrong MQTT authentification for : ' + this.getName());
800 this.client.connack({ returnCode: 4 });
801 }
802 } else {
803 this.adapter.log.error('Shelly Device unknown, configuration for Shelly device ' + this.getName() + ' for MQTT does not exist!');
804 this.client.connack({ returnCode: 4 });
805 }
806 });
807 this.client.on('close', (status) => {
808 this.adapter.log.info('Close Client: ' + this.getName() + ' (' + status + ')');
809 this.destroy();
810 });
811 this.client.on('error', (error) => {
812 this.adapter.log.info('Error Client : ' + this.getName() + ' (' + error + ')');
813 // this.destroy();
814 });
815 this.client.on('disconnect', () => {
816 this.adapter.log.info('Client Disconnect : ' + this.getName());
817 this.destroy();
818 });
819 this.client.on('timeout', () => {
820 this.adapter.log.info('Client Timeout : ' + this.getName());
821 // this.destroy();
822 });
823 this.client.on('publish', (packet) => {
824 // this.adapter.log.debug('Publish packet for ' + this.getName() + ' : ' + JSON.stringify(packet));
825 if (packet.payload) this.adapter.log.debug('Publish ' + this.getName() + ' payload: ' + packet.topic + ' = ' + packet.payload.toString());
826 // the ip address in docker container ist sometimes in stream.remoteAddress. We replace it, with the announce address
827 if (packet.topic === 'shellies/announce' && packet.payload) {
828 try {
829 let ip = JSON.parse(packet.payload).ip;
830 if (ip) this.ip = ip;
831 } catch (error) {
832 // we do not change anything
833 }
834 }
835 this.createIoBrokerState(packet.topic, packet.payload);
836 switch (packet.qos) {
837 case 1:
838 this.client.puback({ messageId: packet.messageId });
839 break;
840 case 2:
841 this.client.pubrec({ messageId: packet.messageId });
842 this.resendState2Client('pubrec', packet.messageId);
843 break;
844 default:
845 break;
846 }
847 });
848 // this.client pinged
849 this.client.on('pingreq', () => {
850 // send a pingresp
851 this.client.pingresp();
852 });
853 // response for QoS2
854 this.client.on('pubrec', (packet) => {
855 let qosmsg = this.getResendState2Client(packet.messageId);
856 if (qosmsg && qosmsg.cmd === 'publish') {
857 this.client.pubrel({ messageId: packet.messageId });
858 this.resendState2Client('pubrel', packet.messageId);
859 } else {
860 this.adapter.log.warn('Client ' + this.getName() + ' received pubrec for unknown messageId ' + packet.messageId);
861 }
862 });
863 // response for QoS2
864 this.client.on('pubcomp', (packet) => {
865 let qosmsg = this.getResendState2Client(packet.messageId);
866 if (qosmsg && qosmsg.cmd === 'pubrec') {
867 this.deleteResendState2Client(packet.messageId);
868 } else {
869 this.adapter.log.warn('Client ' + this.getName() + ' received pubcomp for unknown messageId ' + packet.messageId);
870 }
871 });
872 // response for QoS2
873 this.client.on('pubrel', (packet) => {
874 let qosmsg = this.getResendState2Client(packet.messageId);
875 if (qosmsg && qosmsg.cmd === 'pubrec') {
876 this.deleteResendState2Client(packet.messageId);
877 this.client.pubcomp({ messageId: packet.messageId });
878 } else {
879 this.adapter.log.warn('Client ' + this.getName() + ' received pubrel for unknown messageId ' + packet.messageId);
880 }
881 });
882
883 // response for QoS1
884 this.client.on('puback', (packet) => {
885 // remove this message from queue
886 let qosmsg = this.getResendState2Client(packet.messageId);
887 if (qosmsg && qosmsg.cmd === 'publish') {
888 this.deleteResendState2Client(packet.messageId);
889 } else {
890 this.adapter.log.warn('Client ' + this.getName() + ' received puback for unknown messageId ' + packet.messageId);
891 }
892 });
893
894
895 this.client.on('unsubscribe', (packet) => {
896 this.client.unsuback({ messageId: packet.messageId });
897 });
898
899 // this.client subscribed
900 this.client.on('subscribe', (packet) => {
901 // send a suback with messageId and granted QoS level
902 this.adapter.log.debug('Subscribe for ' + this.getName() + ' : ' + JSON.stringify(packet));
903 const granted = [];
904 for (let i in packet.subscriptions) {
905 granted.push(packet.subscriptions[i].qos);
906 let topic = packet.subscriptions[i].topic;
907 // this.adapter.log.debug('publish topic: ' + topic);
908 }
909 if (packet.topic) this.adapter.log.debug('subscribe topic: ' + packet.topic);
910 // this.adapter.log.info('Will: ' + packet.will);
911 this.client.suback({ granted: granted, messageId: packet.messageId });
912 });
913
914 // timeout idle streams after 5 minutes
915 this.client.stream.setTimeout(1000 * 60 * 5);
916 }
917
918}
919
920class MQTTServer {
921
922 constructor(adapter, objectHelper, eventEmitter) {
923 if (!(this instanceof MQTTServer)) return new MQTTServer(adapter, objectHelper, eventEmitter);
924 this.messageId = 1;
925 this.server = new net.Server();
926 this.adapter = adapter;
927 this.clients = [];
928 this.objectHelper = objectHelper;
929 this.eventEmitter = eventEmitter;
930 }
931
932 listen() {
933 // let clientlist = {};
934 this.server.on('connection', (stream) => {
935 let client = new MQTTClient(this.adapter, this.objectHelper, this.eventEmitter, stream);
936 stream.on('timeout', () => {
937 this.adapter.log.info('Server Timeout for ' + stream.remoteAddress + ' (' + client.getName() + ')');
938 client.destroy();
939 stream.destroy();
940 });
941 stream.on('unload', () => {
942 this.adapter.log.info('Server Unload for ' + stream.remoteAddress + ' (' + client.getName() + ')');
943 client.destroy();
944 stream.destroy();
945 });
946 stream.on('error', () => {
947 this.adapter.log.info('Server Error for ' + stream.remoteAddress + ' (' + client.getName() + ')');
948 client.destroy();
949 stream.destroy();
950 });
951 });
952 this.server.on('close', () => {
953 this.adapter.log.info('Closing listender ');
954 });
955 this.server.on('error', (error) => {
956 this.adapter.log.error('Error in listender ' + error);
957 });
958 // listen on port 1883
959 this.server.listen(this.adapter.config.port, this.adapter.config.bind, () => {
960 });
961 }
962
963}
964
965module.exports = {
966 MQTTServer: MQTTServer
967};