UNPKG

19.1 kBJavaScriptView Raw
1'use strict';
2/**
3 * Module dependencies
4 */
5/*global setImmediate:true*/
6var events = require('events'),
7 Store = require('./store'),
8 eos = require('end-of-stream'),
9 mqttPacket = require('mqtt-packet'),
10 Writable = require('readable-stream').Writable,
11 inherits = require('inherits'),
12 setImmediate = global.setImmediate || function (callback) {
13 // works in node v0.8
14 process.nextTick(callback);
15 },
16 defaultConnectOptions = {
17 keepalive: 10,
18 protocolId: 'MQTT',
19 protocolVersion: 4,
20 reconnectPeriod: 1000,
21 connectTimeout: 30 * 1000,
22 clean: true
23 };
24
25function defaultId () {
26 return 'mqttjs_' + Math.random().toString(16).substr(2, 8);
27}
28
29function sendPacket (client, packet, cb) {
30 try {
31 var buf = mqttPacket.generate(packet);
32 if (!client.stream.write(buf) && cb) {
33 client.stream.once('drain', cb);
34 } else if (cb) {
35 cb();
36 }
37 } catch (err) {
38 if (cb) {
39 cb(err);
40 } else {
41 client.emit('error', err);
42 }
43 }
44}
45
46function storeAndSend (client, packet, cb) {
47 client.outgoingStore.put(packet, function storedPacket (err) {
48 if (err) {
49 return cb && cb(err);
50 }
51 sendPacket(client, packet, cb);
52 });
53}
54
55function nop () {}
56
57/**
58 * MqttClient constructor
59 *
60 * @param {Stream} stream - stream
61 * @param {Object} [options] - connection options
62 * (see Connection#connect)
63 */
64function MqttClient (streamBuilder, options) {
65 var k,
66 that = this;
67
68 if (!(this instanceof MqttClient)) {
69 return new MqttClient(streamBuilder, options);
70 }
71
72 this.options = options || {};
73
74 // Defaults
75 for (k in defaultConnectOptions) {
76 if ('undefined' === typeof this.options[k]) {
77 this.options[k] = defaultConnectOptions[k];
78 } else {
79 this.options[k] = options[k];
80 }
81 }
82
83 this.options.clientId = this.options.clientId || defaultId();
84
85 this.streamBuilder = streamBuilder;
86
87 // Inflight message storages
88 this.outgoingStore = this.options.outgoingStore || new Store();
89 this.incomingStore = this.options.incomingStore || new Store();
90
91 // Ping timer, setup in _setupPingTimer
92 this.pingTimer = null;
93 // Is the client connected?
94 this.connected = false;
95 // Are we disconnecting?
96 this.disconnecting = false;
97 // Packet queue
98 this.queue = [];
99 // Are we intentionally disconnecting?
100 this.disconnecting = false;
101 // connack timer
102 this.connackTimer = null;
103 // Reconnect timer
104 this.reconnectTimer = null;
105 // MessageIDs starting with 1
106 this.nextId = Math.floor(Math.random() * 65535);
107
108 // Inflight callbacks
109 this.outgoing = {};
110
111 // Mark connected on connect
112 this.on('connect', function () {
113 this.connected = true;
114 var outStore = null;
115 outStore = this.outgoingStore.createStream();
116
117 // Control of stored messages
118 outStore.once('readable', function () {
119 function storeDeliver () {
120 var packet = outStore.read(1);
121 if (!packet) {
122 return;
123 }
124 // Avoid unnecesary stream read operations when disconnected
125 if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
126 outStore.read(0);
127 // Ensure that the next message will only be read after callback is issued
128 that.outgoing[packet.messageId] = storeDeliver;
129 that._sendPacket(packet);
130 } else if (outStore.destroy) {
131 outStore.destroy();
132 }
133 }
134 storeDeliver();
135 })
136 .on('error', this.emit.bind(this, 'error'));
137 });
138
139 // Mark disconnected on stream close
140 this.on('close', function () {
141 this.connected = false;
142 });
143
144 // Setup ping timer
145 this.on('connect', this._setupPingTimer);
146
147 // Send queued packets
148 this.on('connect', function () {
149 var queue = this.queue;
150
151 function deliver () {
152 var entry = queue.shift(),
153 packet = null;
154
155 if (!entry) {
156 return;
157 }
158
159 packet = entry.packet;
160
161 that._sendPacket(
162 packet,
163 function (err) {
164 if (entry.cb) {
165 entry.cb(err);
166 }
167 deliver();
168 }
169 );
170 }
171
172 deliver();
173 });
174
175
176 // Clear ping timer
177 this.on('close', function () {
178 if (null !== that.pingTimer) {
179 clearInterval(that.pingTimer);
180 that.pingTimer = null;
181 }
182 });
183
184 // Setup reconnect timer on disconnect
185 this.on('close', this._setupReconnect);
186
187 events.EventEmitter.call(this);
188
189 this._setupStream();
190}
191inherits(MqttClient, events.EventEmitter);
192
193/**
194 * setup the event handlers in the inner stream.
195 *
196 * @api private
197 */
198MqttClient.prototype._setupStream = function () {
199 var connectPacket,
200 that = this,
201 writable = new Writable(),
202 parser = mqttPacket.parser(this.options),
203 completeParse = null,
204 packets = [];
205
206 this._clearReconnect();
207
208 this.stream = this.streamBuilder(this);
209
210 parser.on('packet', function (packet) {
211 packets.push(packet);
212 });
213
214 function process () {
215 var packet = packets.shift(),
216 done = completeParse;
217 if (packet) {
218 that._handlePacket(packet, process);
219 } else {
220 completeParse = null;
221 done();
222 }
223 }
224
225 writable._write = function (buf, enc, done) {
226 completeParse = done;
227 parser.parse(buf);
228 process();
229 };
230
231 this.stream.pipe(writable);
232
233 // Suppress connection errors
234 this.stream.on('error', nop);
235
236 // Echo stream close
237 eos(this.stream, this.emit.bind(this, 'close'));
238
239 // Send a connect packet
240 connectPacket = Object.create(this.options);
241 connectPacket.cmd = 'connect';
242 // avoid message queue
243 sendPacket(this, connectPacket);
244
245 // Echo connection errors
246 parser.on('error', this.emit.bind(this, 'error'));
247
248 // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
249 this.stream.setMaxListeners(1000);
250
251 clearTimeout(this.connackTimer);
252 this.connackTimer = setTimeout(function () {
253 that._cleanUp(true);
254 }, this.options.connectTimeout);
255};
256
257MqttClient.prototype._handlePacket = function (packet, done) {
258 switch (packet.cmd) {
259 case 'publish':
260 this._handlePublish(packet, done);
261 break;
262 case 'puback':
263 case 'pubrec':
264 case 'pubcomp':
265 case 'suback':
266 case 'unsuback':
267 this._handleAck(packet);
268 done();
269 break;
270 case 'pubrel':
271 this._handlePubrel(packet, done);
272 break;
273 case 'connack':
274 this._handleConnack(packet);
275 done();
276 break;
277 case 'pingresp':
278 this._handlePingresp(packet);
279 done();
280 break;
281 default:
282 // do nothing
283 // maybe we should do an error handling
284 // or just log it
285 break;
286 }
287};
288
289MqttClient.prototype._checkDisconnecting = function (callback) {
290 if (this.disconnecting) {
291 if (callback) {
292 callback(new Error('client disconnecting'));
293 } else {
294 this.emit(new Error('client disconnecting'));
295 }
296 }
297 return this.disconnecting;
298};
299
300/**
301 * publish - publish <message> to <topic>
302 *
303 * @param {String} topic - topic to publish to
304 * @param {String, Buffer} message - message to publish
305 * @param {Object} [opts] - publish options, includes:
306 * {Number} qos - qos level to publish on
307 * {Boolean} retain - whether or not to retain the message
308 * @param {Function} [callback] - function(err){}
309 * called when publish succeeds or fails
310 * @returns {MqttClient} this - for chaining
311 * @api public
312 *
313 * @example client.publish('topic', 'message');
314 * @example
315 * client.publish('topic', 'message', {qos: 1, retain: true});
316 * @example client.publish('topic', 'message', console.log);
317 */
318MqttClient.prototype.publish = function (topic, message, opts, callback) {
319 var packet;
320
321 // .publish(topic, payload, cb);
322 if ('function' === typeof opts) {
323 callback = opts;
324 opts = null;
325 }
326
327 // Default opts
328 if (!opts) {
329 opts = {qos: 0, retain: false};
330 }
331
332 if (this._checkDisconnecting(callback)) {
333 return this;
334 }
335
336 packet = {
337 cmd: 'publish',
338 topic: topic,
339 payload: message,
340 qos: opts.qos,
341 retain: opts.retain,
342 messageId: this._nextId()
343 };
344
345 switch (opts.qos) {
346 case 1:
347 case 2:
348
349 // Add to callbacks
350 this.outgoing[packet.messageId] = callback || nop;
351 this._sendPacket(packet);
352 break;
353 default:
354 this._sendPacket(packet, callback);
355 break;
356 }
357
358 return this;
359};
360
361/**
362 * subscribe - subscribe to <topic>
363 *
364 * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
365 * @param {Object} [opts] - optional subscription options, includes:
366 * {Number} qos - subscribe qos level
367 * @param {Function} [callback] - function(err, granted){} where:
368 * {Error} err - subscription error (none at the moment!)
369 * {Array} granted - array of {topic: 't', qos: 0}
370 * @returns {MqttClient} this - for chaining
371 * @api public
372 * @example client.subscribe('topic');
373 * @example client.subscribe('topic', {qos: 1});
374 * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
375 * @example client.subscribe('topic', console.log);
376 */
377MqttClient.prototype.subscribe = function () {
378 var packet,
379 args = Array.prototype.slice.call(arguments),
380 subs = [],
381 obj = args.shift(),
382 callback = args.pop() || nop,
383 opts = args.pop();
384
385 if ('string' === typeof obj) {
386 obj = [obj];
387 }
388
389 if (this._checkDisconnecting(callback)) {
390 return this;
391 }
392
393 if ('function' !== typeof callback) {
394 opts = callback;
395 callback = nop;
396 }
397
398 if (!opts) {
399 opts = { qos: 0 };
400 }
401
402 if (Array.isArray(obj)) {
403 obj.forEach(function (topic) {
404 subs.push({
405 topic: topic,
406 qos: opts.qos
407 });
408 });
409 } else {
410 Object
411 .keys(obj)
412 .forEach(function (k) {
413 subs.push({
414 topic: k,
415 qos: obj[k]
416 });
417 });
418 }
419
420 packet = {
421 cmd: 'subscribe',
422 subscriptions: subs,
423 qos: 1,
424 retain: false,
425 dup: false,
426 messageId: this._nextId()
427 };
428
429 this.outgoing[packet.messageId] = callback;
430
431 this._sendPacket(packet);
432
433 return this;
434};
435
436/**
437 * unsubscribe - unsubscribe from topic(s)
438 *
439 * @param {String, Array} topic - topics to unsubscribe from
440 * @param {Function} [callback] - callback fired on unsuback
441 * @returns {MqttClient} this - for chaining
442 * @api public
443 * @example client.unsubscribe('topic');
444 * @example client.unsubscribe('topic', console.log);
445 */
446MqttClient.prototype.unsubscribe = function (topic, callback) {
447 var packet = {
448 cmd: 'unsubscribe',
449 qos: 1,
450 messageId: this._nextId()
451 };
452
453 callback = callback || nop;
454
455 if (this._checkDisconnecting(callback)) {
456 return this;
457 }
458
459 if ('string' === typeof topic) {
460 packet.unsubscriptions = [topic];
461 } else if ('object' === typeof topic && topic.length) {
462 packet.unsubscriptions = topic;
463 }
464
465 this.outgoing[packet.messageId] = callback;
466
467 this._sendPacket(packet);
468
469 return this;
470};
471
472/**
473 * end - close connection
474 *
475 * @returns {MqttClient} this - for chaining
476 * @param {Boolean} force - do not wait for all in-flight messages to be acked
477 * @param {Function} cb - called when the client has been closed
478 *
479 * @api public
480 */
481MqttClient.prototype.end = function (force, cb) {
482 var that = this;
483
484 if ('function' === typeof force) {
485 cb = force;
486 force = false;
487 }
488
489 function closeStores () {
490 that.incomingStore.close(function () {
491 that.outgoingStore.close(cb);
492 });
493 }
494
495 function finish () {
496 that._cleanUp(force, closeStores);
497 }
498
499 if (this.disconnecting) {
500 return true;
501 }
502
503 this.disconnecting = true;
504
505 if (!force && 0 < Object.keys(this.outgoing).length) {
506 // wait 10ms, just to be sure we received all of it
507 this.once('outgoingEmpty', setTimeout.bind(null, finish, 10));
508 } else {
509 finish();
510 }
511
512 return this;
513};
514
515/**
516 * _reconnect - implement reconnection
517 * @api privateish
518 */
519MqttClient.prototype._reconnect = function () {
520 this.emit('reconnect');
521 this._setupStream();
522};
523
524/**
525 * _setupReconnect - setup reconnect timer
526 */
527MqttClient.prototype._setupReconnect = function () {
528 var that = this;
529
530 if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
531 this.emit('offline');
532 that.reconnectTimer = setInterval(function () {
533 that._reconnect();
534 }, that.options.reconnectPeriod);
535 }
536};
537
538/**
539 * _clearReconnect - clear the reconnect timer
540 */
541MqttClient.prototype._clearReconnect = function () {
542 if (this.reconnectTimer) {
543 clearInterval(this.reconnectTimer);
544 this.reconnectTimer = false;
545 }
546};
547
548
549/**
550 * _cleanUp - clean up on connection end
551 * @api private
552 */
553MqttClient.prototype._cleanUp = function (forced, done) {
554 if (done) {
555 this.stream.on('close', done);
556 }
557
558 if (forced) {
559 this.stream.destroy();
560 } else {
561 this._sendPacket(
562 { cmd: 'disconnect' },
563 setImmediate.bind(
564 null,
565 this.stream.end.bind(this.stream)
566 )
567 );
568 }
569
570 this._clearReconnect();
571
572 if (null !== this.pingTimer) {
573 clearInterval(this.pingTimer);
574 this.pingTimer = null;
575 }
576};
577
578/**
579 * _sendPacket - send or queue a packet
580 * @param {String} type - packet type (see `protocol`)
581 * @param {Object} packet - packet options
582 * @param {Function} cb - callback when the packet is sent
583 * @api private
584 */
585MqttClient.prototype._sendPacket = function (packet, cb) {
586 if (!this.connected) {
587 return this.queue.push({ packet: packet, cb: cb });
588 }
589
590 switch (packet.qos) {
591 case 2:
592 case 1:
593 storeAndSend(this, packet, cb);
594 break;
595 /**
596 * no need of case here since it will be caught by default
597 * and jshint comply that before default it must be a break
598 * anyway it will result in -1 evaluation
599 */
600 case 0:
601 /* falls through */
602 default:
603 sendPacket(this, packet, cb);
604 break;
605 }
606};
607
608/**
609 * _setupPingTimer - setup the ping timer
610 *
611 * @api private
612 */
613MqttClient.prototype._setupPingTimer = function () {
614 var that = this;
615
616 if (!this.pingTimer && this.options.keepalive) {
617 this.pingResp = true;
618 this.pingTimer = setInterval(function () {
619 that._checkPing();
620 }, this.options.keepalive * 1000);
621 }
622};
623
624/**
625 * _checkPing - check if a pingresp has come back, and ping the server again
626 *
627 * @api private
628 */
629MqttClient.prototype._checkPing = function () {
630 if (this.pingResp) {
631 this.pingResp = false;
632 this._sendPacket({ cmd: 'pingreq' });
633 } else {
634 // do a forced cleanup since socket will be in bad shape
635 this._cleanUp(true);
636 }
637};
638
639/**
640 * _handlePingresp - handle a pingresp
641 *
642 * @api private
643 */
644MqttClient.prototype._handlePingresp = function () {
645 this.pingResp = true;
646};
647
648/**
649 * _handleConnack
650 *
651 * @param {Object} packet
652 * @api private
653 */
654
655MqttClient.prototype._handleConnack = function (packet) {
656 var rc = packet.returnCode,
657 // TODO: move to protocol
658 errors = [
659 '',
660 'Unacceptable protocol version',
661 'Identifier rejected',
662 'Server unavailable',
663 'Bad username or password',
664 'Not authorized'
665 ];
666
667 clearTimeout(this.connackTimer);
668
669 if (0 === rc) {
670 this.emit('connect', packet);
671 } else if (0 < rc) {
672 this.emit('error',
673 new Error('Connection refused: ' + errors[rc]));
674 }
675};
676
677/**
678 * _handlePublish
679 *
680 * @param {Object} packet
681 * @api private
682 */
683/*
684those late 2 case should be rewrite to comply with coding style:
685
686case 1:
687case 0:
688 // do not wait sending a puback
689 // no callback passed
690 if (1 === qos) {
691 this._sendPacket({
692 cmd: 'puback',
693 messageId: mid
694 });
695 }
696 // emit the message event for both qos 1 and 0
697 this.emit('message', topic, message, packet);
698 this.handleMessage(packet, done);
699 break;
700default:
701 // do nothing but every switch mus have a default
702 // log or throw an error about unknown qos
703 break;
704
705for now i just suppressed the warnings
706*/
707MqttClient.prototype._handlePublish = function (packet, done) {
708 var topic = packet.topic.toString(),
709 message = packet.payload,
710 qos = packet.qos,
711 mid = packet.messageId,
712 that = this;
713
714 switch (qos) {
715 case 2:
716 this.incomingStore.put(packet, function () {
717 that._sendPacket({cmd: 'pubrec', messageId: mid}, done);
718 });
719 break;
720 case 1:
721 // do not wait sending a puback
722 // no callback passed
723 this._sendPacket({
724 cmd: 'puback',
725 messageId: mid
726 });
727 /* falls through */
728 case 0:
729 // emit the message event for both qos 1 and 0
730 this.emit('message', topic, message, packet);
731 this.handleMessage(packet, done);
732 break;
733 default:
734 // do nothing
735 // log or throw an error about unknown qos
736 break;
737 }
738};
739
740/**
741 * Handle messages with backpressure support, one at a time.
742 * Override at will.
743 *
744 * @param Packet packet the packet
745 * @param Function callback call when finished
746 * @api public
747 */
748MqttClient.prototype.handleMessage = function (packet, callback) {
749 callback();
750};
751
752/**
753 * _handleAck
754 *
755 * @param {Object} packet
756 * @api private
757 */
758
759MqttClient.prototype._handleAck = function (packet) {
760 var mid = packet.messageId,
761 type = packet.cmd,
762 response = null,
763 cb = this.outgoing[mid],
764 that = this;
765
766 if (!cb) {
767 // Server sent an ack in error, ignore it.
768 return;
769 }
770
771 // Process
772 switch (type) {
773 case 'pubcomp':
774 // same thing as puback for QoS 2
775 case 'puback':
776 // Callback - we're done
777 delete this.outgoing[mid];
778 this.outgoingStore.del(packet, cb);
779 break;
780 case 'pubrec':
781 response = {
782 cmd: 'pubrel',
783 qos: 2,
784 messageId: mid
785 };
786
787 this._sendPacket(response);
788 break;
789 case 'suback':
790 delete this.outgoing[mid];
791 this.outgoingStore.del(packet, function (err, original) {
792 var i,
793 origSubs = original.subscriptions,
794 granted = packet.granted;
795
796 if (err) {
797 // missing packet, what should we do?
798 return that.emit('error', err);
799 }
800
801 for (i = 0; i < granted.length; i += 1) {
802 origSubs[i].qos = granted[i];
803 }
804
805 cb(null, origSubs);
806 });
807 break;
808 case 'unsuback':
809 delete this.outgoing[mid];
810 this.outgoingStore.del(packet, cb);
811 break;
812 default:
813 that.emit('error', new Error('unrecognized packet type'));
814 }
815
816 if (this.disconnecting &&
817 0 === Object.keys(this.outgoing).length) {
818 this.emit('outgoingEmpty');
819 }
820};
821
822/**
823 * _handlePubrel
824 *
825 * @param {Object} packet
826 * @api private
827 */
828
829MqttClient.prototype._handlePubrel = function (packet, callback) {
830 var mid = packet.messageId,
831 that = this;
832
833 that.incomingStore.get(packet, function (err, pub) {
834 if (err) {
835 return that.emit('error', err);
836 }
837
838 if ('pubrel' !== pub.cmd) {
839 that.emit('message', pub.topic, pub.payload, pub);
840 that.incomingStore.put(packet);
841 }
842
843 that._sendPacket({cmd: 'pubcomp', messageId: mid}, callback);
844 });
845};
846
847/**
848 * _nextId
849 */
850MqttClient.prototype._nextId = function () {
851 var id = this.nextId++;
852 // Ensure 16 bit unsigned int:
853 if (65535 === id) {
854 this.nextId = 1;
855 }
856 return id;
857};
858
859module.exports = MqttClient;