UNPKG

37.1 kBJavaScriptView Raw
1'use strict'
2
3/**
4 * Module dependencies
5 */
6var events = require('events')
7var Store = require('./store')
8var mqttPacket = require('mqtt-packet')
9var Writable = require('readable-stream').Writable
10var inherits = require('inherits')
11var reInterval = require('reinterval')
12var validations = require('./validations')
13var xtend = require('xtend')
14var setImmediate = global.setImmediate || function (callback) {
15 // works in node v0.8
16 process.nextTick(callback)
17}
18var defaultConnectOptions = {
19 keepalive: 60,
20 reschedulePings: true,
21 protocolId: 'MQTT',
22 protocolVersion: 4,
23 reconnectPeriod: 1000,
24 connectTimeout: 30 * 1000,
25 clean: true,
26 resubscribe: true
27}
28var errors = {
29 0: '',
30 1: 'Unacceptable protocol version',
31 2: 'Identifier rejected',
32 3: 'Server unavailable',
33 4: 'Bad username or password',
34 5: 'Not authorized',
35 16: 'No matching subscribers',
36 17: 'No subscription existed',
37 128: 'Unspecified error',
38 129: 'Malformed Packet',
39 130: 'Protocol Error',
40 131: 'Implementation specific error',
41 132: 'Unsupported Protocol Version',
42 133: 'Client Identifier not valid',
43 134: 'Bad User Name or Password',
44 135: 'Not authorized',
45 136: 'Server unavailable',
46 137: 'Server busy',
47 138: 'Banned',
48 139: 'Server shutting down',
49 140: 'Bad authentication method',
50 141: 'Keep Alive timeout',
51 142: 'Session taken over',
52 143: 'Topic Filter invalid',
53 144: 'Topic Name invalid',
54 145: 'Packet identifier in use',
55 146: 'Packet Identifier not found',
56 147: 'Receive Maximum exceeded',
57 148: 'Topic Alias invalid',
58 149: 'Packet too large',
59 150: 'Message rate too high',
60 151: 'Quota exceeded',
61 152: 'Administrative action',
62 153: 'Payload format invalid',
63 154: 'Retain not supported',
64 155: 'QoS not supported',
65 156: 'Use another server',
66 157: 'Server moved',
67 158: 'Shared Subscriptions not supported',
68 159: 'Connection rate exceeded',
69 160: 'Maximum connect time',
70 161: 'Subscription Identifiers not supported',
71 162: 'Wildcard Subscriptions not supported'
72}
73
74function defaultId () {
75 return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
76}
77
78function sendPacket (client, packet, cb) {
79 client.emit('packetsend', packet)
80
81 var result = mqttPacket.writeToStream(packet, client.stream, client.options)
82
83 if (!result && cb) {
84 client.stream.once('drain', cb)
85 } else if (cb) {
86 cb()
87 }
88}
89
90function flush (queue) {
91 if (queue) {
92 Object.keys(queue).forEach(function (messageId) {
93 if (typeof queue[messageId].cb === 'function') {
94 queue[messageId].cb(new Error('Connection closed'))
95 delete queue[messageId]
96 }
97 })
98 }
99}
100
101function flushVolatile (queue) {
102 if (queue) {
103 Object.keys(queue).forEach(function (messageId) {
104 if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
105 queue[messageId].cb(new Error('Connection closed'))
106 delete queue[messageId]
107 }
108 })
109 }
110}
111
112function storeAndSend (client, packet, cb, cbStorePut) {
113 client.outgoingStore.put(packet, function storedPacket (err) {
114 if (err) {
115 return cb && cb(err)
116 }
117 cbStorePut()
118 sendPacket(client, packet, cb)
119 })
120}
121
122function nop () {}
123
124/**
125 * MqttClient constructor
126 *
127 * @param {Stream} stream - stream
128 * @param {Object} [options] - connection options
129 * (see Connection#connect)
130 */
131function MqttClient (streamBuilder, options) {
132 var k
133 var that = this
134
135 if (!(this instanceof MqttClient)) {
136 return new MqttClient(streamBuilder, options)
137 }
138
139 this.options = options || {}
140
141 // Defaults
142 for (k in defaultConnectOptions) {
143 if (typeof this.options[k] === 'undefined') {
144 this.options[k] = defaultConnectOptions[k]
145 } else {
146 this.options[k] = options[k]
147 }
148 }
149
150 this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
151
152 this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
153
154 this.streamBuilder = streamBuilder
155
156 // Inflight message storages
157 this.outgoingStore = options.outgoingStore || new Store()
158 this.incomingStore = options.incomingStore || new Store()
159
160 // Should QoS zero messages be queued when the connection is broken?
161 this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
162
163 // map of subscribed topics to support reconnection
164 this._resubscribeTopics = {}
165
166 // map of a subscribe messageId and a topic
167 this.messageIdToTopic = {}
168
169 // Ping timer, setup in _setupPingTimer
170 this.pingTimer = null
171 // Is the client connected?
172 this.connected = false
173 // Are we disconnecting?
174 this.disconnecting = false
175 // Packet queue
176 this.queue = []
177 // connack timer
178 this.connackTimer = null
179 // Reconnect timer
180 this.reconnectTimer = null
181 // Is processing store?
182 this._storeProcessing = false
183 // Packet Ids are put into the store during store processing
184 this._packetIdsDuringStoreProcessing = {}
185 /**
186 * MessageIDs starting with 1
187 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
188 */
189 this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
190
191 // Inflight callbacks
192 this.outgoing = {}
193
194 // True if connection is first time.
195 this._firstConnection = true
196
197 // Mark disconnected on stream close
198 this.on('close', function () {
199 this.connected = false
200 clearTimeout(this.connackTimer)
201 })
202
203 // Send queued packets
204 this.on('connect', function () {
205 var queue = this.queue
206
207 function deliver () {
208 var entry = queue.shift()
209 var packet = null
210
211 if (!entry) {
212 return
213 }
214
215 packet = entry.packet
216
217 that._sendPacket(
218 packet,
219 function (err) {
220 if (entry.cb) {
221 entry.cb(err)
222 }
223 deliver()
224 }
225 )
226 }
227
228 deliver()
229 })
230
231 // Clear ping timer
232 this.on('close', function () {
233 if (that.pingTimer !== null) {
234 that.pingTimer.clear()
235 that.pingTimer = null
236 }
237 })
238
239 // Setup reconnect timer on disconnect
240 this.on('close', this._setupReconnect)
241
242 events.EventEmitter.call(this)
243
244 this._setupStream()
245}
246inherits(MqttClient, events.EventEmitter)
247
248/**
249 * setup the event handlers in the inner stream.
250 *
251 * @api private
252 */
253MqttClient.prototype._setupStream = function () {
254 var connectPacket
255 var that = this
256 var writable = new Writable()
257 var parser = mqttPacket.parser(this.options)
258 var completeParse = null
259 var packets = []
260
261 this._clearReconnect()
262
263 this.stream = this.streamBuilder(this)
264
265 parser.on('packet', function (packet) {
266 packets.push(packet)
267 })
268
269 function nextTickWork () {
270 if (packets.length) {
271 process.nextTick(work)
272 } else {
273 var done = completeParse
274 completeParse = null
275 done()
276 }
277 }
278
279 function work () {
280 var packet = packets.shift()
281
282 if (packet) {
283 that._handlePacket(packet, nextTickWork)
284 } else {
285 var done = completeParse
286 completeParse = null
287 if (done) done()
288 }
289 }
290
291 writable._write = function (buf, enc, done) {
292 completeParse = done
293 parser.parse(buf)
294 work()
295 }
296
297 this.stream.pipe(writable)
298
299 // Suppress connection errors
300 this.stream.on('error', nop)
301
302 // Echo stream close
303 this.stream.on('close', function () {
304 flushVolatile(that.outgoing)
305 that.emit('close')
306 })
307
308 // Send a connect packet
309 connectPacket = Object.create(this.options)
310 connectPacket.cmd = 'connect'
311 // avoid message queue
312 sendPacket(this, connectPacket)
313
314 // Echo connection errors
315 parser.on('error', this.emit.bind(this, 'error'))
316
317 // auth
318 if (this.options.properties) {
319 if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
320 this.emit('error', new Error('Packet has no Authentication Method'))
321 return this
322 }
323 if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
324 var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket)
325 sendPacket(this, authPacket)
326 }
327 }
328
329 // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
330 this.stream.setMaxListeners(1000)
331
332 clearTimeout(this.connackTimer)
333 this.connackTimer = setTimeout(function () {
334 that._cleanUp(true)
335 }, this.options.connectTimeout)
336}
337
338MqttClient.prototype._handlePacket = function (packet, done) {
339 var options = this.options
340
341 if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
342 this.emit('error', new Error('exceeding packets size ' + packet.cmd))
343 this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }})
344 return this
345 }
346
347 this.emit('packetreceive', packet)
348
349 switch (packet.cmd) {
350 case 'publish':
351 this._handlePublish(packet, done)
352 break
353 case 'puback':
354 case 'pubrec':
355 case 'pubcomp':
356 case 'suback':
357 case 'unsuback':
358 this._handleAck(packet)
359 done()
360 break
361 case 'pubrel':
362 this._handlePubrel(packet, done)
363 break
364 case 'connack':
365 this._handleConnack(packet)
366 done()
367 break
368 case 'pingresp':
369 this._handlePingresp(packet)
370 done()
371 break
372 case 'disconnect':
373 this._handleDisconnect(packet)
374 done()
375 break
376 default:
377 // do nothing
378 // maybe we should do an error handling
379 // or just log it
380 break
381 }
382}
383
384MqttClient.prototype._checkDisconnecting = function (callback) {
385 if (this.disconnecting) {
386 if (callback) {
387 callback(new Error('client disconnecting'))
388 } else {
389 this.emit('error', new Error('client disconnecting'))
390 }
391 }
392 return this.disconnecting
393}
394
395/**
396 * publish - publish <message> to <topic>
397 *
398 * @param {String} topic - topic to publish to
399 * @param {String, Buffer} message - message to publish
400 * @param {Object} [opts] - publish options, includes:
401 * {Number} qos - qos level to publish on
402 * {Boolean} retain - whether or not to retain the message
403 * {Boolean} dup - whether or not mark a message as duplicate
404 * {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
405 * @param {Function} [callback] - function(err){}
406 * called when publish succeeds or fails
407 * @returns {MqttClient} this - for chaining
408 * @api public
409 *
410 * @example client.publish('topic', 'message');
411 * @example
412 * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
413 * @example client.publish('topic', 'message', console.log);
414 */
415MqttClient.prototype.publish = function (topic, message, opts, callback) {
416 var packet
417 var options = this.options
418
419 // .publish(topic, payload, cb);
420 if (typeof opts === 'function') {
421 callback = opts
422 opts = null
423 }
424
425 // default opts
426 var defaultOpts = {qos: 0, retain: false, dup: false}
427 opts = xtend(defaultOpts, opts)
428
429 if (this._checkDisconnecting(callback)) {
430 return this
431 }
432
433 packet = {
434 cmd: 'publish',
435 topic: topic,
436 payload: message,
437 qos: opts.qos,
438 retain: opts.retain,
439 messageId: this._nextId(),
440 dup: opts.dup
441 }
442
443 if (options.protocolVersion === 5) {
444 packet.properties = opts.properties
445 if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
446 ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
447 (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
448 /*
449 if we are don`t setup topic alias or
450 topic alias maximum less than topic alias or
451 server don`t give topic alias maximum,
452 we are removing topic alias from packet
453 */
454 delete packet.properties.topicAlias
455 }
456 }
457
458 switch (opts.qos) {
459 case 1:
460 case 2:
461 // Add to callbacks
462 this.outgoing[packet.messageId] = {
463 volatile: false,
464 cb: callback || nop
465 }
466 if (this._storeProcessing) {
467 this._packetIdsDuringStoreProcessing[packet.messageId] = false
468 this._storePacket(packet, undefined, opts.cbStorePut)
469 } else {
470 this._sendPacket(packet, undefined, opts.cbStorePut)
471 }
472 break
473 default:
474 if (this._storeProcessing) {
475 this._storePacket(packet, callback, opts.cbStorePut)
476 } else {
477 this._sendPacket(packet, callback, opts.cbStorePut)
478 }
479 break
480 }
481
482 return this
483}
484
485/**
486 * subscribe - subscribe to <topic>
487 *
488 * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
489 * @param {Object} [opts] - optional subscription options, includes:
490 * {Number} qos - subscribe qos level
491 * @param {Function} [callback] - function(err, granted){} where:
492 * {Error} err - subscription error (none at the moment!)
493 * {Array} granted - array of {topic: 't', qos: 0}
494 * @returns {MqttClient} this - for chaining
495 * @api public
496 * @example client.subscribe('topic');
497 * @example client.subscribe('topic', {qos: 1});
498 * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
499 * @example client.subscribe('topic', console.log);
500 */
501MqttClient.prototype.subscribe = function () {
502 var packet
503 var args = new Array(arguments.length)
504 for (var i = 0; i < arguments.length; i++) {
505 args[i] = arguments[i]
506 }
507 var subs = []
508 var obj = args.shift()
509 var resubscribe = obj.resubscribe
510 var callback = args.pop() || nop
511 var opts = args.pop()
512 var invalidTopic
513 var that = this
514 var version = this.options.protocolVersion
515
516 delete obj.resubscribe
517
518 if (typeof obj === 'string') {
519 obj = [obj]
520 }
521
522 if (typeof callback !== 'function') {
523 opts = callback
524 callback = nop
525 }
526
527 invalidTopic = validations.validateTopics(obj)
528 if (invalidTopic !== null) {
529 setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
530 return this
531 }
532
533 if (this._checkDisconnecting(callback)) {
534 return this
535 }
536
537 var defaultOpts = {
538 qos: 0
539 }
540 if (version === 5) {
541 defaultOpts.nl = false
542 defaultOpts.rap = false
543 defaultOpts.rh = 0
544 }
545 opts = xtend(defaultOpts, opts)
546
547 if (Array.isArray(obj)) {
548 obj.forEach(function (topic) {
549 if (!that._resubscribeTopics.hasOwnProperty(topic) ||
550 that._resubscribeTopics[topic].qos < opts.qos ||
551 resubscribe) {
552 var currentOpts = {
553 topic: topic,
554 qos: opts.qos
555 }
556 if (version === 5) {
557 currentOpts.nl = opts.nl
558 currentOpts.rap = opts.rap
559 currentOpts.rh = opts.rh
560 currentOpts.properties = opts.properties
561 }
562 subs.push(currentOpts)
563 }
564 })
565 } else {
566 Object
567 .keys(obj)
568 .forEach(function (k) {
569 if (!that._resubscribeTopics.hasOwnProperty(k) ||
570 that._resubscribeTopics[k].qos < obj[k].qos ||
571 resubscribe) {
572 var currentOpts = {
573 topic: k,
574 qos: obj[k].qos
575 }
576 if (version === 5) {
577 currentOpts.nl = obj[k].nl
578 currentOpts.rap = obj[k].rap
579 currentOpts.rh = obj[k].rh
580 currentOpts.properties = opts.properties
581 }
582 subs.push(currentOpts)
583 }
584 })
585 }
586
587 packet = {
588 cmd: 'subscribe',
589 subscriptions: subs,
590 qos: 1,
591 retain: false,
592 dup: false,
593 messageId: this._nextId()
594 }
595
596 if (opts.properties) {
597 packet.properties = opts.properties
598 }
599
600 if (!subs.length) {
601 callback(null, [])
602 return
603 }
604
605 // subscriptions to resubscribe to in case of disconnect
606 if (this.options.resubscribe) {
607 var topics = []
608 subs.forEach(function (sub) {
609 if (that.options.reconnectPeriod > 0) {
610 var topic = { qos: sub.qos }
611 if (version === 5) {
612 topic.nl = sub.nl || false
613 topic.rap = sub.rap || false
614 topic.rh = sub.rh || 0
615 topic.properties = sub.properties
616 }
617 that._resubscribeTopics[sub.topic] = topic
618 topics.push(sub.topic)
619 }
620 })
621 that.messageIdToTopic[packet.messageId] = topics
622 }
623
624 this.outgoing[packet.messageId] = {
625 volatile: true,
626 cb: function (err, packet) {
627 if (!err) {
628 var granted = packet.granted
629 for (var i = 0; i < granted.length; i += 1) {
630 subs[i].qos = granted[i]
631 }
632 }
633
634 callback(err, subs)
635 }
636 }
637
638 this._sendPacket(packet)
639
640 return this
641}
642
643/**
644 * unsubscribe - unsubscribe from topic(s)
645 *
646 * @param {String, Array} topic - topics to unsubscribe from
647 * @param {Object} [opts] - optional subscription options, includes:
648 * {Object} properties - properties of unsubscribe packet
649 * @param {Function} [callback] - callback fired on unsuback
650 * @returns {MqttClient} this - for chaining
651 * @api public
652 * @example client.unsubscribe('topic');
653 * @example client.unsubscribe('topic', console.log);
654 */
655MqttClient.prototype.unsubscribe = function () {
656 var packet = {
657 cmd: 'unsubscribe',
658 qos: 1,
659 messageId: this._nextId()
660 }
661 var that = this
662 var args = new Array(arguments.length)
663 for (var i = 0; i < arguments.length; i++) {
664 args[i] = arguments[i]
665 }
666 var topic = args.shift()
667 var callback = args.pop() || nop
668 var opts = args.pop()
669
670 if (typeof topic === 'string') {
671 topic = [topic]
672 }
673
674 if (typeof callback !== 'function') {
675 opts = callback
676 callback = nop
677 }
678
679 if (this._checkDisconnecting(callback)) {
680 return this
681 }
682
683 if (typeof topic === 'string') {
684 packet.unsubscriptions = [topic]
685 } else if (typeof topic === 'object' && topic.length) {
686 packet.unsubscriptions = topic
687 }
688
689 if (this.options.resubscribe) {
690 packet.unsubscriptions.forEach(function (topic) {
691 delete that._resubscribeTopics[topic]
692 })
693 }
694
695 if (typeof opts === 'object' && opts.properties) {
696 packet.properties = opts.properties
697 }
698
699 this.outgoing[packet.messageId] = {
700 volatile: true,
701 cb: callback
702 }
703
704 this._sendPacket(packet)
705
706 return this
707}
708
709/**
710 * end - close connection
711 *
712 * @returns {MqttClient} this - for chaining
713 * @param {Boolean} force - do not wait for all in-flight messages to be acked
714 * @param {Function} cb - called when the client has been closed
715 *
716 * @api public
717 */
718MqttClient.prototype.end = function () {
719 var that = this
720
721 var force = arguments[0]
722 var opts = arguments[1]
723 var cb = arguments[2]
724
725 if (force == null || typeof force !== 'boolean') {
726 cb = opts || nop
727 opts = force
728 force = false
729 if (typeof opts !== 'object') {
730 cb = opts
731 opts = null
732 if (typeof cb !== 'function') {
733 cb = nop
734 }
735 }
736 }
737
738 if (typeof opts !== 'object') {
739 cb = opts
740 opts = null
741 }
742
743 cb = cb || nop
744
745 function closeStores () {
746 that.disconnected = true
747 that.incomingStore.close(function () {
748 that.outgoingStore.close(function () {
749 if (cb) {
750 cb.apply(null, arguments)
751 }
752 that.emit('end')
753 })
754 })
755 if (that._deferredReconnect) {
756 that._deferredReconnect()
757 }
758 }
759
760 function finish () {
761 // defer closesStores of an I/O cycle,
762 // just to make sure things are
763 // ok for websockets
764 that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
765 }
766
767 if (this.disconnecting) {
768 return this
769 }
770
771 this._clearReconnect()
772
773 this.disconnecting = true
774
775 if (!force && Object.keys(this.outgoing).length > 0) {
776 // wait 10ms, just to be sure we received all of it
777 this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
778 } else {
779 finish()
780 }
781
782 return this
783}
784
785/**
786 * removeOutgoingMessage - remove a message in outgoing store
787 * the outgoing callback will be called withe Error('Message removed') if the message is removed
788 *
789 * @param {Number} mid - messageId to remove message
790 * @returns {MqttClient} this - for chaining
791 * @api public
792 *
793 * @example client.removeOutgoingMessage(client.getLastMessageId());
794 */
795MqttClient.prototype.removeOutgoingMessage = function (mid) {
796 var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
797 delete this.outgoing[mid]
798 this.outgoingStore.del({messageId: mid}, function () {
799 cb(new Error('Message removed'))
800 })
801 return this
802}
803
804/**
805 * reconnect - connect again using the same options as connect()
806 *
807 * @param {Object} [opts] - optional reconnect options, includes:
808 * {Store} incomingStore - a store for the incoming packets
809 * {Store} outgoingStore - a store for the outgoing packets
810 * if opts is not given, current stores are used
811 * @returns {MqttClient} this - for chaining
812 *
813 * @api public
814 */
815MqttClient.prototype.reconnect = function (opts) {
816 var that = this
817 var f = function () {
818 if (opts) {
819 that.options.incomingStore = opts.incomingStore
820 that.options.outgoingStore = opts.outgoingStore
821 } else {
822 that.options.incomingStore = null
823 that.options.outgoingStore = null
824 }
825 that.incomingStore = that.options.incomingStore || new Store()
826 that.outgoingStore = that.options.outgoingStore || new Store()
827 that.disconnecting = false
828 that.disconnected = false
829 that._deferredReconnect = null
830 that._reconnect()
831 }
832
833 if (this.disconnecting && !this.disconnected) {
834 this._deferredReconnect = f
835 } else {
836 f()
837 }
838 return this
839}
840
841/**
842 * _reconnect - implement reconnection
843 * @api privateish
844 */
845MqttClient.prototype._reconnect = function () {
846 this.emit('reconnect')
847 this._setupStream()
848}
849
850/**
851 * _setupReconnect - setup reconnect timer
852 */
853MqttClient.prototype._setupReconnect = function () {
854 var that = this
855
856 if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
857 if (!this.reconnecting) {
858 this.emit('offline')
859 this.reconnecting = true
860 }
861 that.reconnectTimer = setInterval(function () {
862 that._reconnect()
863 }, that.options.reconnectPeriod)
864 }
865}
866
867/**
868 * _clearReconnect - clear the reconnect timer
869 */
870MqttClient.prototype._clearReconnect = function () {
871 if (this.reconnectTimer) {
872 clearInterval(this.reconnectTimer)
873 this.reconnectTimer = null
874 }
875}
876
877/**
878 * _cleanUp - clean up on connection end
879 * @api private
880 */
881MqttClient.prototype._cleanUp = function (forced, done) {
882 var opts = arguments[2]
883 if (done) {
884 this.stream.on('close', done)
885 }
886
887 if (forced) {
888 if ((this.options.reconnectPeriod === 0) && this.options.clean) {
889 flush(this.outgoing)
890 }
891 this.stream.destroy()
892 } else {
893 var packet = xtend({ cmd: 'disconnect' }, opts)
894 this._sendPacket(
895 packet,
896 setImmediate.bind(
897 null,
898 this.stream.end.bind(this.stream)
899 )
900 )
901 }
902
903 if (!this.disconnecting) {
904 this._clearReconnect()
905 this._setupReconnect()
906 }
907
908 if (this.pingTimer !== null) {
909 this.pingTimer.clear()
910 this.pingTimer = null
911 }
912
913 if (done && !this.connected) {
914 this.stream.removeListener('close', done)
915 done()
916 }
917}
918
919/**
920 * _sendPacket - send or queue a packet
921 * @param {String} type - packet type (see `protocol`)
922 * @param {Object} packet - packet options
923 * @param {Function} cb - callback when the packet is sent
924 * @param {Function} cbStorePut - called when message is put into outgoingStore
925 * @api private
926 */
927MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
928 cbStorePut = cbStorePut || nop
929
930 if (!this.connected) {
931 this._storePacket(packet, cb, cbStorePut)
932 return
933 }
934
935 // When sending a packet, reschedule the ping timer
936 this._shiftPingInterval()
937
938 switch (packet.cmd) {
939 case 'publish':
940 break
941 case 'pubrel':
942 storeAndSend(this, packet, cb, cbStorePut)
943 return
944 default:
945 sendPacket(this, packet, cb)
946 return
947 }
948
949 switch (packet.qos) {
950 case 2:
951 case 1:
952 storeAndSend(this, packet, cb, cbStorePut)
953 break
954 /**
955 * no need of case here since it will be caught by default
956 * and jshint comply that before default it must be a break
957 * anyway it will result in -1 evaluation
958 */
959 case 0:
960 /* falls through */
961 default:
962 sendPacket(this, packet, cb)
963 break
964 }
965}
966
967/**
968 * _storePacket - queue a packet
969 * @param {String} type - packet type (see `protocol`)
970 * @param {Object} packet - packet options
971 * @param {Function} cb - callback when the packet is sent
972 * @param {Function} cbStorePut - called when message is put into outgoingStore
973 * @api private
974 */
975MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
976 cbStorePut = cbStorePut || nop
977
978 if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
979 this.queue.push({ packet: packet, cb: cb })
980 } else if (packet.qos > 0) {
981 cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
982 this.outgoingStore.put(packet, function (err) {
983 if (err) {
984 return cb && cb(err)
985 }
986 cbStorePut()
987 })
988 } else if (cb) {
989 cb(new Error('No connection to broker'))
990 }
991}
992
993/**
994 * _setupPingTimer - setup the ping timer
995 *
996 * @api private
997 */
998MqttClient.prototype._setupPingTimer = function () {
999 var that = this
1000
1001 if (!this.pingTimer && this.options.keepalive) {
1002 this.pingResp = true
1003 this.pingTimer = reInterval(function () {
1004 that._checkPing()
1005 }, this.options.keepalive * 1000)
1006 }
1007}
1008
1009/**
1010 * _shiftPingInterval - reschedule the ping interval
1011 *
1012 * @api private
1013 */
1014MqttClient.prototype._shiftPingInterval = function () {
1015 if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
1016 this.pingTimer.reschedule(this.options.keepalive * 1000)
1017 }
1018}
1019/**
1020 * _checkPing - check if a pingresp has come back, and ping the server again
1021 *
1022 * @api private
1023 */
1024MqttClient.prototype._checkPing = function () {
1025 if (this.pingResp) {
1026 this.pingResp = false
1027 this._sendPacket({ cmd: 'pingreq' })
1028 } else {
1029 // do a forced cleanup since socket will be in bad shape
1030 this._cleanUp(true)
1031 }
1032}
1033
1034/**
1035 * _handlePingresp - handle a pingresp
1036 *
1037 * @api private
1038 */
1039MqttClient.prototype._handlePingresp = function () {
1040 this.pingResp = true
1041}
1042
1043/**
1044 * _handleConnack
1045 *
1046 * @param {Object} packet
1047 * @api private
1048 */
1049
1050MqttClient.prototype._handleConnack = function (packet) {
1051 var options = this.options
1052 var version = options.protocolVersion
1053 var rc = version === 5 ? packet.reasonCode : packet.returnCode
1054
1055 clearTimeout(this.connackTimer)
1056
1057 if (packet.properties) {
1058 if (packet.properties.topicAliasMaximum) {
1059 if (!options.properties) { options.properties = {} }
1060 options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
1061 }
1062 if (packet.properties.serverKeepAlive && options.keepalive) {
1063 options.keepalive = packet.properties.serverKeepAlive
1064 this._shiftPingInterval()
1065 }
1066 if (packet.properties.maximumPacketSize) {
1067 if (!options.properties) { options.properties = {} }
1068 options.properties.maximumPacketSize = packet.properties.maximumPacketSize
1069 }
1070 }
1071
1072 if (rc === 0) {
1073 this.reconnecting = false
1074 this._onConnect(packet)
1075 } else if (rc > 0) {
1076 var err = new Error('Connection refused: ' + errors[rc])
1077 err.code = rc
1078 this.emit('error', err)
1079 }
1080}
1081
1082/**
1083 * _handlePublish
1084 *
1085 * @param {Object} packet
1086 * @api private
1087 */
1088/*
1089those late 2 case should be rewrite to comply with coding style:
1090
1091case 1:
1092case 0:
1093 // do not wait sending a puback
1094 // no callback passed
1095 if (1 === qos) {
1096 this._sendPacket({
1097 cmd: 'puback',
1098 messageId: mid
1099 });
1100 }
1101 // emit the message event for both qos 1 and 0
1102 this.emit('message', topic, message, packet);
1103 this.handleMessage(packet, done);
1104 break;
1105default:
1106 // do nothing but every switch mus have a default
1107 // log or throw an error about unknown qos
1108 break;
1109
1110for now i just suppressed the warnings
1111*/
1112MqttClient.prototype._handlePublish = function (packet, done) {
1113 done = typeof done !== 'undefined' ? done : nop
1114 var topic = packet.topic.toString()
1115 var message = packet.payload
1116 var qos = packet.qos
1117 var mid = packet.messageId
1118 var that = this
1119 var options = this.options
1120 var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
1121
1122 switch (qos) {
1123 case 2: {
1124 options.customHandleAcks(topic, message, packet, function (error, code) {
1125 if (!(error instanceof Error)) {
1126 code = error
1127 error = null
1128 }
1129 if (error) { return that.emit('error', error) }
1130 if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
1131 if (code) {
1132 that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done)
1133 } else {
1134 that.incomingStore.put(packet, function () {
1135 that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
1136 })
1137 }
1138 })
1139 break
1140 }
1141 case 1: {
1142 // emit the message event
1143 options.customHandleAcks(topic, message, packet, function (error, code) {
1144 if (!(error instanceof Error)) {
1145 code = error
1146 error = null
1147 }
1148 if (error) { return that.emit('error', error) }
1149 if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
1150 if (!code) { that.emit('message', topic, message, packet) }
1151 that.handleMessage(packet, function (err) {
1152 if (err) {
1153 return done && done(err)
1154 }
1155 that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done)
1156 })
1157 })
1158 break
1159 }
1160 case 0:
1161 // emit the message event
1162 this.emit('message', topic, message, packet)
1163 this.handleMessage(packet, done)
1164 break
1165 default:
1166 // do nothing
1167 // log or throw an error about unknown qos
1168 break
1169 }
1170}
1171
1172/**
1173 * Handle messages with backpressure support, one at a time.
1174 * Override at will.
1175 *
1176 * @param Packet packet the packet
1177 * @param Function callback call when finished
1178 * @api public
1179 */
1180MqttClient.prototype.handleMessage = function (packet, callback) {
1181 callback()
1182}
1183
1184/**
1185 * _handleAck
1186 *
1187 * @param {Object} packet
1188 * @api private
1189 */
1190
1191MqttClient.prototype._handleAck = function (packet) {
1192 /* eslint no-fallthrough: "off" */
1193 var mid = packet.messageId
1194 var type = packet.cmd
1195 var response = null
1196 var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
1197 var that = this
1198 var err
1199
1200 if (!cb) {
1201 // Server sent an ack in error, ignore it.
1202 return
1203 }
1204
1205 // Process
1206 switch (type) {
1207 case 'pubcomp':
1208 // same thing as puback for QoS 2
1209 case 'puback':
1210 var pubackRC = packet.reasonCode
1211 // Callback - we're done
1212 if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
1213 err = new Error('Publish error: ' + errors[pubackRC])
1214 err.code = pubackRC
1215 cb(err, packet)
1216 }
1217 delete this.outgoing[mid]
1218 this.outgoingStore.del(packet, cb)
1219 break
1220 case 'pubrec':
1221 response = {
1222 cmd: 'pubrel',
1223 qos: 2,
1224 messageId: mid
1225 }
1226 var pubrecRC = packet.reasonCode
1227
1228 if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
1229 err = new Error('Publish error: ' + errors[pubrecRC])
1230 err.code = pubrecRC
1231 cb(err, packet)
1232 } else {
1233 this._sendPacket(response)
1234 }
1235 break
1236 case 'suback':
1237 delete this.outgoing[mid]
1238 for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
1239 if ((packet.granted[grantedI] & 0x80) !== 0) {
1240 // suback with Failure status
1241 var topics = this.messageIdToTopic[mid]
1242 if (topics) {
1243 topics.forEach(function (topic) {
1244 delete that._resubscribeTopics[topic]
1245 })
1246 }
1247 }
1248 }
1249 cb(null, packet)
1250 break
1251 case 'unsuback':
1252 delete this.outgoing[mid]
1253 cb(null)
1254 break
1255 default:
1256 that.emit('error', new Error('unrecognized packet type'))
1257 }
1258
1259 if (this.disconnecting &&
1260 Object.keys(this.outgoing).length === 0) {
1261 this.emit('outgoingEmpty')
1262 }
1263}
1264
1265/**
1266 * _handlePubrel
1267 *
1268 * @param {Object} packet
1269 * @api private
1270 */
1271MqttClient.prototype._handlePubrel = function (packet, callback) {
1272 callback = typeof callback !== 'undefined' ? callback : nop
1273 var mid = packet.messageId
1274 var that = this
1275
1276 var comp = {cmd: 'pubcomp', messageId: mid}
1277
1278 that.incomingStore.get(packet, function (err, pub) {
1279 if (!err) {
1280 that.emit('message', pub.topic, pub.payload, pub)
1281 that.handleMessage(pub, function (err) {
1282 if (err) {
1283 return callback(err)
1284 }
1285 that.incomingStore.del(pub, nop)
1286 that._sendPacket(comp, callback)
1287 })
1288 } else {
1289 that._sendPacket(comp, callback)
1290 }
1291 })
1292}
1293
1294/**
1295 * _handleDisconnect
1296 *
1297 * @param {Object} packet
1298 * @api private
1299 */
1300MqttClient.prototype._handleDisconnect = function (packet) {
1301 this.emit('disconnect', packet)
1302}
1303
1304/**
1305 * _nextId
1306 * @return unsigned int
1307 */
1308MqttClient.prototype._nextId = function () {
1309 // id becomes current state of this.nextId and increments afterwards
1310 var id = this.nextId++
1311 // Ensure 16 bit unsigned int (max 65535, nextId got one higher)
1312 if (this.nextId === 65536) {
1313 this.nextId = 1
1314 }
1315 return id
1316}
1317
1318/**
1319 * getLastMessageId
1320 * @return unsigned int
1321 */
1322MqttClient.prototype.getLastMessageId = function () {
1323 return (this.nextId === 1) ? 65535 : (this.nextId - 1)
1324}
1325
1326/**
1327 * _resubscribe
1328 * @api private
1329 */
1330MqttClient.prototype._resubscribe = function (connack) {
1331 var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
1332 if (!this._firstConnection &&
1333 (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
1334 _resubscribeTopicsKeys.length > 0) {
1335 if (this.options.resubscribe) {
1336 if (this.options.protocolVersion === 5) {
1337 for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
1338 var resubscribeTopic = {}
1339 resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
1340 resubscribeTopic.resubscribe = true
1341 this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
1342 }
1343 } else {
1344 this._resubscribeTopics.resubscribe = true
1345 this.subscribe(this._resubscribeTopics)
1346 }
1347 } else {
1348 this._resubscribeTopics = {}
1349 }
1350 }
1351
1352 this._firstConnection = false
1353}
1354
1355/**
1356 * _onConnect
1357 *
1358 * @api private
1359 */
1360MqttClient.prototype._onConnect = function (packet) {
1361 if (this.disconnected) {
1362 this.emit('connect', packet)
1363 return
1364 }
1365
1366 var that = this
1367
1368 this._setupPingTimer()
1369 this._resubscribe(packet)
1370
1371 this.connected = true
1372
1373 function startStreamProcess () {
1374 var outStore = that.outgoingStore.createStream()
1375
1376 function clearStoreProcessing () {
1377 that._storeProcessing = false
1378 that._packetIdsDuringStoreProcessing = {}
1379 }
1380
1381 that.once('close', remove)
1382 outStore.on('error', function (err) {
1383 clearStoreProcessing()
1384 that.removeListener('close', remove)
1385 that.emit('error', err)
1386 })
1387
1388 function remove () {
1389 outStore.destroy()
1390 outStore = null
1391 clearStoreProcessing()
1392 }
1393
1394 function storeDeliver () {
1395 // edge case, we wrapped this twice
1396 if (!outStore) {
1397 return
1398 }
1399 that._storeProcessing = true
1400
1401 var packet = outStore.read(1)
1402
1403 var cb
1404
1405 if (!packet) {
1406 // read when data is available in the future
1407 outStore.once('readable', storeDeliver)
1408 return
1409 }
1410
1411 // Skip already processed store packets
1412 if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
1413 storeDeliver()
1414 return
1415 }
1416
1417 // Avoid unnecessary stream read operations when disconnected
1418 if (!that.disconnecting && !that.reconnectTimer) {
1419 cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
1420 that.outgoing[packet.messageId] = {
1421 volatile: false,
1422 cb: function (err, status) {
1423 // Ensure that the original callback passed in to publish gets invoked
1424 if (cb) {
1425 cb(err, status)
1426 }
1427
1428 storeDeliver()
1429 }
1430 }
1431 that._packetIdsDuringStoreProcessing[packet.messageId] = true
1432 that._sendPacket(packet)
1433 } else if (outStore.destroy) {
1434 outStore.destroy()
1435 }
1436 }
1437
1438 outStore.on('end', function () {
1439 var allProcessed = true
1440 for (var id in that._packetIdsDuringStoreProcessing) {
1441 if (!that._packetIdsDuringStoreProcessing[id]) {
1442 allProcessed = false
1443 break
1444 }
1445 }
1446 if (allProcessed) {
1447 clearStoreProcessing()
1448 that.removeListener('close', remove)
1449 that.emit('connect', packet)
1450 } else {
1451 startStreamProcess()
1452 }
1453 })
1454 storeDeliver()
1455 }
1456 // start flowing
1457 startStreamProcess()
1458}
1459
1460module.exports = MqttClient