UNPKG

20.5 kBJavaScriptView Raw
1'use strict';
2/**
3 * Module dependencies
4 */
5/*global setImmediate:true*/
6var events = require('events'),
7 Store = require('./store'),
8 eos = require('end-of-stream'),
9 mqttPacket = require('mqtt-packet'),
10 Writable = require('readable-stream').Writable,
11 inherits = require('inherits'),
12 reInterval = require('reinterval'),
13 validations = require('./validations'),
14 setImmediate = global.setImmediate || function (callback) {
15 // works in node v0.8
16 process.nextTick(callback);
17 },
18 defaultConnectOptions = {
19 keepalive: 10,
20 reschedulePings: true,
21 protocolId: 'MQTT',
22 protocolVersion: 4,
23 reconnectPeriod: 1000,
24 connectTimeout: 30 * 1000,
25 clean: true
26 };
27
28function defaultId () {
29 return 'mqttjs_' + Math.random().toString(16).substr(2, 8);
30}
31
32function sendPacket (client, packet, cb) {
33 try {
34 var buf = mqttPacket.generate(packet);
35
36 client.emit('packetsend', packet);
37
38 if (client.stream.write(buf) && cb) {
39 cb();
40 } else if (cb) {
41 client.stream.once('drain', cb);
42 }
43 } catch (err) {
44 if (cb) {
45 cb(err);
46 } else {
47 client.emit('error', err);
48 }
49 }
50}
51
52function storeAndSend (client, packet, cb) {
53 client.outgoingStore.put(packet, function storedPacket (err) {
54 if (err) {
55 return cb && cb(err);
56 }
57 sendPacket(client, packet, cb);
58 });
59}
60
61function nop () {}
62
63/**
64 * MqttClient constructor
65 *
66 * @param {Stream} stream - stream
67 * @param {Object} [options] - connection options
68 * (see Connection#connect)
69 */
70function MqttClient (streamBuilder, options) {
71 var k,
72 that = this;
73
74 if (!(this instanceof MqttClient)) {
75 return new MqttClient(streamBuilder, options);
76 }
77
78 this.options = options || {};
79
80 // Defaults
81 for (k in defaultConnectOptions) {
82 if ('undefined' === typeof this.options[k]) {
83 this.options[k] = defaultConnectOptions[k];
84 } else {
85 this.options[k] = options[k];
86 }
87 }
88
89 this.options.clientId = this.options.clientId || defaultId();
90
91 this.streamBuilder = streamBuilder;
92
93 // Inflight message storages
94 this.outgoingStore = this.options.outgoingStore || new Store();
95 this.incomingStore = this.options.incomingStore || new Store();
96
97 // Should QoS zero messages be queued when the connection is broken?
98 this.queueQoSZero = null == this.options.queueQoSZero ? true : this.options.queueQoSZero;
99
100 // Ping timer, setup in _setupPingTimer
101 this.pingTimer = null;
102 // Is the client connected?
103 this.connected = false;
104 // Are we disconnecting?
105 this.disconnecting = false;
106 // Packet queue
107 this.queue = [];
108 // connack timer
109 this.connackTimer = null;
110 // Reconnect timer
111 this.reconnectTimer = null;
112 // MessageIDs starting with 1
113 this.nextId = Math.floor(Math.random() * 65535);
114
115 // Inflight callbacks
116 this.outgoing = {};
117
118 // Mark connected on connect
119 this.on('connect', function () {
120 this.connected = true;
121 var outStore = null;
122 outStore = this.outgoingStore.createStream();
123
124 // Control of stored messages
125 outStore.once('readable', function () {
126 function storeDeliver () {
127 var packet = outStore.read(1),
128 cb;
129 if (!packet) {
130 return;
131 }
132 // Avoid unnecesary stream read operations when disconnected
133 if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
134 outStore.read(0);
135 cb = that.outgoing[packet.messageId];
136 that.outgoing[packet.messageId] = function () {
137 // Ensure that the original callback passed in to publish gets invoked
138 if (cb) {
139 cb();
140 }
141 // Ensure that the next message will only be read after callback is issued
142 storeDeliver();
143 };
144 that._sendPacket(packet);
145 } else if (outStore.destroy) {
146 outStore.destroy();
147 }
148 }
149 storeDeliver();
150 })
151 .on('error', this.emit.bind(this, 'error'));
152 });
153
154 // Mark disconnected on stream close
155 this.on('close', function () {
156 this.connected = false;
157 clearTimeout(this.connackTimer);
158 });
159
160 // Setup ping timer
161 this.on('connect', this._setupPingTimer);
162
163 // Send queued packets
164 this.on('connect', function () {
165 var queue = this.queue;
166
167 function deliver () {
168 var entry = queue.shift(),
169 packet = null;
170
171 if (!entry) {
172 return;
173 }
174
175 packet = entry.packet;
176
177 that._sendPacket(
178 packet,
179 function (err) {
180 if (entry.cb) {
181 entry.cb(err);
182 }
183 deliver();
184 }
185 );
186 }
187
188 deliver();
189 });
190
191
192 // Clear ping timer
193 this.on('close', function () {
194 if (null !== that.pingTimer) {
195 that.pingTimer.clear();
196 that.pingTimer = null;
197 }
198 });
199
200 // Setup reconnect timer on disconnect
201 this.on('close', this._setupReconnect);
202
203 events.EventEmitter.call(this);
204
205 this._setupStream();
206}
207inherits(MqttClient, events.EventEmitter);
208
209/**
210 * setup the event handlers in the inner stream.
211 *
212 * @api private
213 */
214MqttClient.prototype._setupStream = function () {
215 var connectPacket,
216 that = this,
217 writable = new Writable(),
218 parser = mqttPacket.parser(this.options),
219 completeParse = null,
220 packets = [];
221
222 this._clearReconnect();
223
224 this.stream = this.streamBuilder(this);
225
226 parser.on('packet', function (packet) {
227 packets.push(packet);
228 });
229
230 function process () {
231 var packet = packets.shift(),
232 done = completeParse;
233 if (packet) {
234 that._handlePacket(packet, process);
235 } else {
236 completeParse = null;
237 done();
238 }
239 }
240
241 writable._write = function (buf, enc, done) {
242 completeParse = done;
243 parser.parse(buf);
244 process();
245 };
246
247 this.stream.pipe(writable);
248
249 // Suppress connection errors
250 this.stream.on('error', nop);
251
252 // Echo stream close
253 eos(this.stream, this.emit.bind(this, 'close'));
254
255 // Send a connect packet
256 connectPacket = Object.create(this.options);
257 connectPacket.cmd = 'connect';
258 // avoid message queue
259 sendPacket(this, connectPacket);
260
261 // Echo connection errors
262 parser.on('error', this.emit.bind(this, 'error'));
263
264 // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
265 this.stream.setMaxListeners(1000);
266
267 clearTimeout(this.connackTimer);
268 this.connackTimer = setTimeout(function () {
269 that._cleanUp(true);
270 }, this.options.connectTimeout);
271};
272
273MqttClient.prototype._handlePacket = function (packet, done) {
274 this.emit('packetreceive', packet);
275
276 switch (packet.cmd) {
277 case 'publish':
278 this._handlePublish(packet, done);
279 break;
280 case 'puback':
281 case 'pubrec':
282 case 'pubcomp':
283 case 'suback':
284 case 'unsuback':
285 this._handleAck(packet);
286 done();
287 break;
288 case 'pubrel':
289 this._handlePubrel(packet, done);
290 break;
291 case 'connack':
292 this._handleConnack(packet);
293 done();
294 break;
295 case 'pingresp':
296 this._handlePingresp(packet);
297 done();
298 break;
299 default:
300 // do nothing
301 // maybe we should do an error handling
302 // or just log it
303 break;
304 }
305};
306
307MqttClient.prototype._checkDisconnecting = function (callback) {
308 if (this.disconnecting) {
309 if (callback) {
310 callback(new Error('client disconnecting'));
311 } else {
312 this.emit('error', new Error('client disconnecting'));
313 }
314 }
315 return this.disconnecting;
316};
317
318/**
319 * publish - publish <message> to <topic>
320 *
321 * @param {String} topic - topic to publish to
322 * @param {String, Buffer} message - message to publish
323 * @param {Object} [opts] - publish options, includes:
324 * {Number} qos - qos level to publish on
325 * {Boolean} retain - whether or not to retain the message
326 * @param {Function} [callback] - function(err){}
327 * called when publish succeeds or fails
328 * @returns {MqttClient} this - for chaining
329 * @api public
330 *
331 * @example client.publish('topic', 'message');
332 * @example
333 * client.publish('topic', 'message', {qos: 1, retain: true});
334 * @example client.publish('topic', 'message', console.log);
335 */
336MqttClient.prototype.publish = function (topic, message, opts, callback) {
337 var packet;
338
339 // .publish(topic, payload, cb);
340 if ('function' === typeof opts) {
341 callback = opts;
342 opts = null;
343 }
344
345 // Default opts
346 if (!opts) {
347 opts = {qos: 0, retain: false};
348 }
349
350 if (this._checkDisconnecting(callback)) {
351 return this;
352 }
353
354 packet = {
355 cmd: 'publish',
356 topic: topic,
357 payload: message,
358 qos: opts.qos,
359 retain: opts.retain,
360 messageId: this._nextId()
361 };
362
363 switch (opts.qos) {
364 case 1:
365 case 2:
366
367 // Add to callbacks
368 this.outgoing[packet.messageId] = callback || nop;
369 this._sendPacket(packet);
370 break;
371 default:
372 this._sendPacket(packet, callback);
373 break;
374 }
375
376 return this;
377};
378
379/**
380 * subscribe - subscribe to <topic>
381 *
382 * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
383 * @param {Object} [opts] - optional subscription options, includes:
384 * {Number} qos - subscribe qos level
385 * @param {Function} [callback] - function(err, granted){} where:
386 * {Error} err - subscription error (none at the moment!)
387 * {Array} granted - array of {topic: 't', qos: 0}
388 * @returns {MqttClient} this - for chaining
389 * @api public
390 * @example client.subscribe('topic');
391 * @example client.subscribe('topic', {qos: 1});
392 * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
393 * @example client.subscribe('topic', console.log);
394 */
395MqttClient.prototype.subscribe = function () {
396 var packet,
397 args = Array.prototype.slice.call(arguments),
398 subs = [],
399 obj = args.shift(),
400 callback = args.pop() || nop,
401 opts = args.pop(),
402 invalidTopic;
403
404 if ('string' === typeof obj) {
405 obj = [obj];
406 }
407
408 if ('function' !== typeof callback) {
409 opts = callback;
410 callback = nop;
411 }
412
413 invalidTopic = validations.validateTopics(obj);
414 if ( null !== invalidTopic ) {
415 callback(new Error('Invalid topic ' + invalidTopic));
416 return this;
417 }
418
419 if (this._checkDisconnecting(callback)) {
420 return this;
421 }
422
423 if (!opts) {
424 opts = { qos: 0 };
425 }
426
427 if (Array.isArray(obj)) {
428 obj.forEach(function (topic) {
429 subs.push({
430 topic: topic,
431 qos: opts.qos
432 });
433 });
434 } else {
435 Object
436 .keys(obj)
437 .forEach(function (k) {
438 subs.push({
439 topic: k,
440 qos: obj[k]
441 });
442 });
443 }
444
445 packet = {
446 cmd: 'subscribe',
447 subscriptions: subs,
448 qos: 1,
449 retain: false,
450 dup: false,
451 messageId: this._nextId()
452 };
453
454 this.outgoing[packet.messageId] = callback;
455
456 this._sendPacket(packet);
457
458 return this;
459};
460
461/**
462 * unsubscribe - unsubscribe from topic(s)
463 *
464 * @param {String, Array} topic - topics to unsubscribe from
465 * @param {Function} [callback] - callback fired on unsuback
466 * @returns {MqttClient} this - for chaining
467 * @api public
468 * @example client.unsubscribe('topic');
469 * @example client.unsubscribe('topic', console.log);
470 */
471MqttClient.prototype.unsubscribe = function (topic, callback) {
472 var packet = {
473 cmd: 'unsubscribe',
474 qos: 1,
475 messageId: this._nextId()
476 };
477
478 callback = callback || nop;
479
480 if (this._checkDisconnecting(callback)) {
481 return this;
482 }
483
484 if ('string' === typeof topic) {
485 packet.unsubscriptions = [topic];
486 } else if ('object' === typeof topic && topic.length) {
487 packet.unsubscriptions = topic;
488 }
489
490 this.outgoing[packet.messageId] = callback;
491
492 this._sendPacket(packet);
493
494 return this;
495};
496
497/**
498 * end - close connection
499 *
500 * @returns {MqttClient} this - for chaining
501 * @param {Boolean} force - do not wait for all in-flight messages to be acked
502 * @param {Function} cb - called when the client has been closed
503 *
504 * @api public
505 */
506MqttClient.prototype.end = function (force, cb) {
507 var that = this;
508
509 if ('function' === typeof force) {
510 cb = force;
511 force = false;
512 }
513
514 function closeStores () {
515 that.incomingStore.close(function () {
516 that.outgoingStore.close(cb);
517 });
518 }
519
520 function finish () {
521 that._cleanUp(force, closeStores);
522 }
523
524 if (this.disconnecting) {
525 return true;
526 }
527
528 this.disconnecting = true;
529
530 if (!force && 0 < Object.keys(this.outgoing).length) {
531 // wait 10ms, just to be sure we received all of it
532 this.once('outgoingEmpty', setTimeout.bind(null, finish, 10));
533 } else {
534 finish();
535 }
536
537 return this;
538};
539
540/**
541 * _reconnect - implement reconnection
542 * @api privateish
543 */
544MqttClient.prototype._reconnect = function () {
545 this.emit('reconnect');
546 this._setupStream();
547};
548
549/**
550 * _setupReconnect - setup reconnect timer
551 */
552MqttClient.prototype._setupReconnect = function () {
553 var that = this;
554
555 if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
556 this.emit('offline');
557 that.reconnectTimer = setInterval(function () {
558 that._reconnect();
559 }, that.options.reconnectPeriod);
560 }
561};
562
563/**
564 * _clearReconnect - clear the reconnect timer
565 */
566MqttClient.prototype._clearReconnect = function () {
567 if (this.reconnectTimer) {
568 clearInterval(this.reconnectTimer);
569 this.reconnectTimer = false;
570 }
571};
572
573
574/**
575 * _cleanUp - clean up on connection end
576 * @api private
577 */
578MqttClient.prototype._cleanUp = function (forced, done) {
579
580 if (done) {
581 this.stream.on('close', done);
582 }
583
584 if (forced) {
585 this.stream.destroy();
586 } else {
587 this._sendPacket(
588 { cmd: 'disconnect' },
589 setImmediate.bind(
590 null,
591 this.stream.end.bind(this.stream)
592 )
593 );
594 }
595
596 if (this.reconnectTimer) {
597 this._clearReconnect();
598 this._setupReconnect();
599 }
600
601 if (null !== this.pingTimer) {
602 this.pingTimer.clear();
603 this.pingTimer = null;
604 }
605};
606
607/**
608 * _sendPacket - send or queue a packet
609 * @param {String} type - packet type (see `protocol`)
610 * @param {Object} packet - packet options
611 * @param {Function} cb - callback when the packet is sent
612 * @api private
613 */
614MqttClient.prototype._sendPacket = function (packet, cb) {
615 if (!this.connected) {
616 if (0 < packet.qos || 'publish' !== packet.cmd || this.queueQoSZero) {
617 this.queue.push({ packet: packet, cb: cb });
618 } else if (cb) {
619 cb(new Error('No connection to broker'));
620 }
621
622 return;
623 }
624
625 // When sending a packet, reschedule the ping timer
626 this._shiftPingInterval();
627
628 switch (packet.qos) {
629 case 2:
630 case 1:
631 storeAndSend(this, packet, cb);
632 break;
633 /**
634 * no need of case here since it will be caught by default
635 * and jshint comply that before default it must be a break
636 * anyway it will result in -1 evaluation
637 */
638 case 0:
639 /* falls through */
640 default:
641 sendPacket(this, packet, cb);
642 break;
643 }
644};
645
646/**
647 * _setupPingTimer - setup the ping timer
648 *
649 * @api private
650 */
651MqttClient.prototype._setupPingTimer = function () {
652 var that = this;
653
654 if (!this.pingTimer && this.options.keepalive) {
655 this.pingResp = true;
656 this.pingTimer = reInterval(function () {
657 that._checkPing();
658 }, this.options.keepalive * 1000);
659 }
660};
661
662/**
663 * _shiftPingInterval - reschedule the ping interval
664 *
665 * @api private
666 */
667MqttClient.prototype._shiftPingInterval = function () {
668 if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
669 this.pingTimer.reschedule(this.options.keepalive * 1000);
670 }
671};
672/**
673 * _checkPing - check if a pingresp has come back, and ping the server again
674 *
675 * @api private
676 */
677MqttClient.prototype._checkPing = function () {
678 if (this.pingResp) {
679 this.pingResp = false;
680 this._sendPacket({ cmd: 'pingreq' });
681 } else {
682 // do a forced cleanup since socket will be in bad shape
683 this._cleanUp(true);
684 }
685};
686
687/**
688 * _handlePingresp - handle a pingresp
689 *
690 * @api private
691 */
692MqttClient.prototype._handlePingresp = function () {
693 this.pingResp = true;
694};
695
696/**
697 * _handleConnack
698 *
699 * @param {Object} packet
700 * @api private
701 */
702
703MqttClient.prototype._handleConnack = function (packet) {
704 var rc = packet.returnCode,
705 // TODO: move to protocol
706 errors = [
707 '',
708 'Unacceptable protocol version',
709 'Identifier rejected',
710 'Server unavailable',
711 'Bad username or password',
712 'Not authorized'
713 ];
714
715 clearTimeout(this.connackTimer);
716
717 if (0 === rc) {
718 this.emit('connect', packet);
719 } else if (0 < rc) {
720 this.emit('error',
721 new Error('Connection refused: ' + errors[rc]));
722 }
723};
724
725/**
726 * _handlePublish
727 *
728 * @param {Object} packet
729 * @api private
730 */
731/*
732those late 2 case should be rewrite to comply with coding style:
733
734case 1:
735case 0:
736 // do not wait sending a puback
737 // no callback passed
738 if (1 === qos) {
739 this._sendPacket({
740 cmd: 'puback',
741 messageId: mid
742 });
743 }
744 // emit the message event for both qos 1 and 0
745 this.emit('message', topic, message, packet);
746 this.handleMessage(packet, done);
747 break;
748default:
749 // do nothing but every switch mus have a default
750 // log or throw an error about unknown qos
751 break;
752
753for now i just suppressed the warnings
754*/
755MqttClient.prototype._handlePublish = function (packet, done) {
756 var topic = packet.topic.toString(),
757 message = packet.payload,
758 qos = packet.qos,
759 mid = packet.messageId,
760 that = this;
761
762 switch (qos) {
763 case 2:
764 this.incomingStore.put(packet, function () {
765 that._sendPacket({cmd: 'pubrec', messageId: mid}, done);
766 });
767 break;
768 case 1:
769 // do not wait sending a puback
770 // no callback passed
771 this._sendPacket({
772 cmd: 'puback',
773 messageId: mid
774 });
775 /* falls through */
776 case 0:
777 // emit the message event for both qos 1 and 0
778 this.emit('message', topic, message, packet);
779 this.handleMessage(packet, done);
780 break;
781 default:
782 // do nothing
783 // log or throw an error about unknown qos
784 break;
785 }
786};
787
788/**
789 * Handle messages with backpressure support, one at a time.
790 * Override at will.
791 *
792 * @param Packet packet the packet
793 * @param Function callback call when finished
794 * @api public
795 */
796MqttClient.prototype.handleMessage = function (packet, callback) {
797 callback();
798};
799
800/**
801 * _handleAck
802 *
803 * @param {Object} packet
804 * @api private
805 */
806
807MqttClient.prototype._handleAck = function (packet) {
808 var mid = packet.messageId,
809 type = packet.cmd,
810 response = null,
811 cb = this.outgoing[mid],
812 that = this;
813
814 if (!cb) {
815 // Server sent an ack in error, ignore it.
816 return;
817 }
818
819 // Process
820 switch (type) {
821 case 'pubcomp':
822 // same thing as puback for QoS 2
823 case 'puback':
824 // Callback - we're done
825 delete this.outgoing[mid];
826 this.outgoingStore.del(packet, cb);
827 break;
828 case 'pubrec':
829 response = {
830 cmd: 'pubrel',
831 qos: 2,
832 messageId: mid
833 };
834
835 this._sendPacket(response);
836 break;
837 case 'suback':
838 delete this.outgoing[mid];
839 this.outgoingStore.del(packet, function (err, original) {
840 if (err) {
841 // missing packet, what should we do?
842 return that.emit('error', err);
843 }
844
845 var i,
846 origSubs = original.subscriptions,
847 granted = packet.granted;
848
849 for (i = 0; i < granted.length; i += 1) {
850 origSubs[i].qos = granted[i];
851 }
852
853 cb(null, origSubs);
854 });
855 break;
856 case 'unsuback':
857 delete this.outgoing[mid];
858 this.outgoingStore.del(packet, cb);
859 break;
860 default:
861 that.emit('error', new Error('unrecognized packet type'));
862 }
863
864 if (this.disconnecting &&
865 0 === Object.keys(this.outgoing).length) {
866 this.emit('outgoingEmpty');
867 }
868};
869
870/**
871 * _handlePubrel
872 *
873 * @param {Object} packet
874 * @api private
875 */
876
877MqttClient.prototype._handlePubrel = function (packet, callback) {
878 var mid = packet.messageId,
879 that = this;
880
881 that.incomingStore.get(packet, function (err, pub) {
882 if (err) {
883 return that.emit('error', err);
884 }
885
886 if ('pubrel' !== pub.cmd) {
887 that.emit('message', pub.topic, pub.payload, pub);
888 that.incomingStore.put(packet);
889 }
890
891 that._sendPacket({cmd: 'pubcomp', messageId: mid}, callback);
892 });
893};
894
895/**
896 * _nextId
897 */
898MqttClient.prototype._nextId = function () {
899 var id = this.nextId++;
900 // Ensure 16 bit unsigned int:
901 if (65535 === id) {
902 this.nextId = 1;
903 }
904 return id;
905};
906
907module.exports = MqttClient;