UNPKG

46.5 kBJavaScriptView Raw
1"use strict";
2var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
3 if (k2 === undefined) k2 = k;
4 var desc = Object.getOwnPropertyDescriptor(m, k);
5 if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
6 desc = { enumerable: true, get: function() { return m[k]; } };
7 }
8 Object.defineProperty(o, k2, desc);
9}) : (function(o, m, k, k2) {
10 if (k2 === undefined) k2 = k;
11 o[k2] = m[k];
12}));
13var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
14 Object.defineProperty(o, "default", { enumerable: true, value: v });
15}) : function(o, v) {
16 o["default"] = v;
17});
18var __importStar = (this && this.__importStar) || function (mod) {
19 if (mod && mod.__esModule) return mod;
20 var result = {};
21 if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
22 __setModuleDefault(result, mod);
23 return result;
24};
25var __importDefault = (this && this.__importDefault) || function (mod) {
26 return (mod && mod.__esModule) ? mod : { "default": mod };
27};
28Object.defineProperty(exports, "__esModule", { value: true });
29const topic_alias_recv_1 = __importDefault(require("./topic-alias-recv"));
30const mqtt_packet_1 = __importDefault(require("mqtt-packet"));
31const default_message_id_provider_1 = __importDefault(require("./default-message-id-provider"));
32const readable_stream_1 = require("readable-stream");
33const default_1 = __importDefault(require("rfdc/default"));
34const validations = __importStar(require("./validations"));
35const debug_1 = __importDefault(require("debug"));
36const store_1 = __importDefault(require("./store"));
37const handlers_1 = __importDefault(require("./handlers"));
38const shared_1 = require("./shared");
39const TypedEmitter_1 = require("./TypedEmitter");
40const PingTimer_1 = __importDefault(require("./PingTimer"));
41const is_browser_1 = __importStar(require("./is-browser"));
42const setImmediate = globalThis.setImmediate ||
43 ((...args) => {
44 const callback = args.shift();
45 (0, shared_1.nextTick)(() => {
46 callback(...args);
47 });
48 });
49const defaultConnectOptions = {
50 keepalive: 60,
51 reschedulePings: true,
52 protocolId: 'MQTT',
53 protocolVersion: 4,
54 reconnectPeriod: 1000,
55 connectTimeout: 30 * 1000,
56 clean: true,
57 resubscribe: true,
58 writeCache: true,
59 timerVariant: 'auto',
60};
61class MqttClient extends TypedEmitter_1.TypedEventEmitter {
62 static defaultId() {
63 return `mqttjs_${Math.random().toString(16).substr(2, 8)}`;
64 }
65 constructor(streamBuilder, options) {
66 super();
67 this.options = options || {};
68 for (const k in defaultConnectOptions) {
69 if (typeof this.options[k] === 'undefined') {
70 this.options[k] = defaultConnectOptions[k];
71 }
72 else {
73 this.options[k] = options[k];
74 }
75 }
76 this.log = this.options.log || (0, debug_1.default)('mqttjs:client');
77 this.noop = this._noop.bind(this);
78 this.log('MqttClient :: version:', MqttClient.VERSION);
79 if (is_browser_1.isWebWorker) {
80 this.log('MqttClient :: environment', 'webworker');
81 }
82 else {
83 this.log('MqttClient :: environment', is_browser_1.default ? 'browser' : 'node');
84 }
85 this.log('MqttClient :: options.protocol', options.protocol);
86 this.log('MqttClient :: options.protocolVersion', options.protocolVersion);
87 this.log('MqttClient :: options.username', options.username);
88 this.log('MqttClient :: options.keepalive', options.keepalive);
89 this.log('MqttClient :: options.reconnectPeriod', options.reconnectPeriod);
90 this.log('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized);
91 this.log('MqttClient :: options.properties.topicAliasMaximum', options.properties
92 ? options.properties.topicAliasMaximum
93 : undefined);
94 this.options.clientId =
95 typeof options.clientId === 'string'
96 ? options.clientId
97 : MqttClient.defaultId();
98 this.log('MqttClient :: clientId', this.options.clientId);
99 this.options.customHandleAcks =
100 options.protocolVersion === 5 && options.customHandleAcks
101 ? options.customHandleAcks
102 : (...args) => {
103 args[3](null, 0);
104 };
105 if (!this.options.writeCache) {
106 mqtt_packet_1.default.writeToStream.cacheNumbers = false;
107 }
108 this.streamBuilder = streamBuilder;
109 this.messageIdProvider =
110 typeof this.options.messageIdProvider === 'undefined'
111 ? new default_message_id_provider_1.default()
112 : this.options.messageIdProvider;
113 this.outgoingStore = options.outgoingStore || new store_1.default();
114 this.incomingStore = options.incomingStore || new store_1.default();
115 this.queueQoSZero =
116 options.queueQoSZero === undefined ? true : options.queueQoSZero;
117 this._resubscribeTopics = {};
118 this.messageIdToTopic = {};
119 this.pingTimer = null;
120 this.connected = false;
121 this.disconnecting = false;
122 this.reconnecting = false;
123 this.queue = [];
124 this.connackTimer = null;
125 this.reconnectTimer = null;
126 this._storeProcessing = false;
127 this._packetIdsDuringStoreProcessing = {};
128 this._storeProcessingQueue = [];
129 this.outgoing = {};
130 this._firstConnection = true;
131 if (options.properties && options.properties.topicAliasMaximum > 0) {
132 if (options.properties.topicAliasMaximum > 0xffff) {
133 this.log('MqttClient :: options.properties.topicAliasMaximum is out of range');
134 }
135 else {
136 this.topicAliasRecv = new topic_alias_recv_1.default(options.properties.topicAliasMaximum);
137 }
138 }
139 this.on('connect', () => {
140 const { queue } = this;
141 const deliver = () => {
142 const entry = queue.shift();
143 this.log('deliver :: entry %o', entry);
144 let packet = null;
145 if (!entry) {
146 this._resubscribe();
147 return;
148 }
149 packet = entry.packet;
150 this.log('deliver :: call _sendPacket for %o', packet);
151 let send = true;
152 if (packet.messageId && packet.messageId !== 0) {
153 if (!this.messageIdProvider.register(packet.messageId)) {
154 send = false;
155 }
156 }
157 if (send) {
158 this._sendPacket(packet, (err) => {
159 if (entry.cb) {
160 entry.cb(err);
161 }
162 deliver();
163 });
164 }
165 else {
166 this.log('messageId: %d has already used. The message is skipped and removed.', packet.messageId);
167 deliver();
168 }
169 };
170 this.log('connect :: sending queued packets');
171 deliver();
172 });
173 this.on('close', () => {
174 this.log('close :: connected set to `false`');
175 this.connected = false;
176 this.log('close :: clearing connackTimer');
177 clearTimeout(this.connackTimer);
178 this._destroyPingTimer();
179 if (this.topicAliasRecv) {
180 this.topicAliasRecv.clear();
181 }
182 this.log('close :: calling _setupReconnect');
183 this._setupReconnect();
184 });
185 if (!this.options.manualConnect) {
186 this.log('MqttClient :: setting up stream');
187 this.connect();
188 }
189 }
190 handleAuth(packet, callback) {
191 callback();
192 }
193 handleMessage(packet, callback) {
194 callback();
195 }
196 _nextId() {
197 return this.messageIdProvider.allocate();
198 }
199 getLastMessageId() {
200 return this.messageIdProvider.getLastAllocated();
201 }
202 connect() {
203 var _a;
204 const writable = new readable_stream_1.Writable();
205 const parser = mqtt_packet_1.default.parser(this.options);
206 let completeParse = null;
207 const packets = [];
208 this.log('connect :: calling method to clear reconnect');
209 this._clearReconnect();
210 this.log('connect :: using streamBuilder provided to client to create stream');
211 this.stream = this.streamBuilder(this);
212 parser.on('packet', (packet) => {
213 this.log('parser :: on packet push to packets array.');
214 packets.push(packet);
215 });
216 const work = () => {
217 this.log('work :: getting next packet in queue');
218 const packet = packets.shift();
219 if (packet) {
220 this.log('work :: packet pulled from queue');
221 (0, handlers_1.default)(this, packet, nextTickWork);
222 }
223 else {
224 this.log('work :: no packets in queue');
225 const done = completeParse;
226 completeParse = null;
227 this.log('work :: done flag is %s', !!done);
228 if (done)
229 done();
230 }
231 };
232 const nextTickWork = () => {
233 if (packets.length) {
234 (0, shared_1.nextTick)(work);
235 }
236 else {
237 const done = completeParse;
238 completeParse = null;
239 done();
240 }
241 };
242 writable._write = (buf, enc, done) => {
243 completeParse = done;
244 this.log('writable stream :: parsing buffer');
245 parser.parse(buf);
246 work();
247 };
248 const streamErrorHandler = (error) => {
249 this.log('streamErrorHandler :: error', error.message);
250 if (error.code) {
251 this.log('streamErrorHandler :: emitting error');
252 this.emit('error', error);
253 }
254 else {
255 this.noop(error);
256 }
257 };
258 this.log('connect :: pipe stream to writable stream');
259 this.stream.pipe(writable);
260 this.stream.on('error', streamErrorHandler);
261 this.stream.on('close', () => {
262 this.log('(%s)stream :: on close', this.options.clientId);
263 this._flushVolatile();
264 this.log('stream: emit close to MqttClient');
265 this.emit('close');
266 });
267 this.log('connect: sending packet `connect`');
268 const connectPacket = {
269 cmd: 'connect',
270 protocolId: this.options.protocolId,
271 protocolVersion: this.options.protocolVersion,
272 clean: this.options.clean,
273 clientId: this.options.clientId,
274 keepalive: this.options.keepalive,
275 username: this.options.username,
276 password: this.options.password,
277 properties: this.options.properties,
278 };
279 if (this.options.will) {
280 connectPacket.will = Object.assign(Object.assign({}, this.options.will), { payload: (_a = this.options.will) === null || _a === void 0 ? void 0 : _a.payload });
281 }
282 if (this.topicAliasRecv) {
283 if (!connectPacket.properties) {
284 connectPacket.properties = {};
285 }
286 if (this.topicAliasRecv) {
287 connectPacket.properties.topicAliasMaximum =
288 this.topicAliasRecv.max;
289 }
290 }
291 this._writePacket(connectPacket);
292 parser.on('error', this.emit.bind(this, 'error'));
293 if (this.options.properties) {
294 if (!this.options.properties.authenticationMethod &&
295 this.options.properties.authenticationData) {
296 this.end(() => this.emit('error', new Error('Packet has no Authentication Method')));
297 return this;
298 }
299 if (this.options.properties.authenticationMethod &&
300 this.options.authPacket &&
301 typeof this.options.authPacket === 'object') {
302 const authPacket = Object.assign({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket);
303 this._writePacket(authPacket);
304 }
305 }
306 this.stream.setMaxListeners(1000);
307 clearTimeout(this.connackTimer);
308 this.connackTimer = setTimeout(() => {
309 this.log('!!connectTimeout hit!! Calling _cleanUp with force `true`');
310 this.emit('error', new Error('connack timeout'));
311 this._cleanUp(true);
312 }, this.options.connectTimeout);
313 return this;
314 }
315 publish(topic, message, opts, callback) {
316 this.log('publish :: message `%s` to topic `%s`', message, topic);
317 const { options } = this;
318 if (typeof opts === 'function') {
319 callback = opts;
320 opts = null;
321 }
322 opts = opts || {};
323 const defaultOpts = {
324 qos: 0,
325 retain: false,
326 dup: false,
327 };
328 opts = Object.assign(Object.assign({}, defaultOpts), opts);
329 const { qos, retain, dup, properties, cbStorePut } = opts;
330 if (this._checkDisconnecting(callback)) {
331 return this;
332 }
333 const publishProc = () => {
334 let messageId = 0;
335 if (qos === 1 || qos === 2) {
336 messageId = this._nextId();
337 if (messageId === null) {
338 this.log('No messageId left');
339 return false;
340 }
341 }
342 const packet = {
343 cmd: 'publish',
344 topic,
345 payload: message,
346 qos,
347 retain,
348 messageId,
349 dup,
350 };
351 if (options.protocolVersion === 5) {
352 packet.properties = properties;
353 }
354 this.log('publish :: qos', qos);
355 switch (qos) {
356 case 1:
357 case 2:
358 this.outgoing[packet.messageId] = {
359 volatile: false,
360 cb: callback || this.noop,
361 };
362 this.log('MqttClient:publish: packet cmd: %s', packet.cmd);
363 this._sendPacket(packet, undefined, cbStorePut);
364 break;
365 default:
366 this.log('MqttClient:publish: packet cmd: %s', packet.cmd);
367 this._sendPacket(packet, callback, cbStorePut);
368 break;
369 }
370 return true;
371 };
372 if (this._storeProcessing ||
373 this._storeProcessingQueue.length > 0 ||
374 !publishProc()) {
375 this._storeProcessingQueue.push({
376 invoke: publishProc,
377 cbStorePut: opts.cbStorePut,
378 callback,
379 });
380 }
381 return this;
382 }
383 publishAsync(topic, message, opts) {
384 return new Promise((resolve, reject) => {
385 this.publish(topic, message, opts, (err, packet) => {
386 if (err) {
387 reject(err);
388 }
389 else {
390 resolve(packet);
391 }
392 });
393 });
394 }
395 subscribe(topicObject, opts, callback) {
396 const version = this.options.protocolVersion;
397 if (typeof opts === 'function') {
398 callback = opts;
399 }
400 callback = callback || this.noop;
401 let resubscribe = false;
402 let topicsList = [];
403 if (typeof topicObject === 'string') {
404 topicObject = [topicObject];
405 topicsList = topicObject;
406 }
407 else if (Array.isArray(topicObject)) {
408 topicsList = topicObject;
409 }
410 else if (typeof topicObject === 'object') {
411 resubscribe = topicObject.resubscribe;
412 delete topicObject.resubscribe;
413 topicsList = Object.keys(topicObject);
414 }
415 const invalidTopic = validations.validateTopics(topicsList);
416 if (invalidTopic !== null) {
417 setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`));
418 return this;
419 }
420 if (this._checkDisconnecting(callback)) {
421 this.log('subscribe: discconecting true');
422 return this;
423 }
424 const defaultOpts = {
425 qos: 0,
426 };
427 if (version === 5) {
428 defaultOpts.nl = false;
429 defaultOpts.rap = false;
430 defaultOpts.rh = 0;
431 }
432 opts = Object.assign(Object.assign({}, defaultOpts), opts);
433 const properties = opts.properties;
434 const subs = [];
435 const parseSub = (topic, subOptions) => {
436 subOptions = (subOptions || opts);
437 if (!Object.prototype.hasOwnProperty.call(this._resubscribeTopics, topic) ||
438 this._resubscribeTopics[topic].qos < subOptions.qos ||
439 resubscribe) {
440 const currentOpts = {
441 topic,
442 qos: subOptions.qos,
443 };
444 if (version === 5) {
445 currentOpts.nl = subOptions.nl;
446 currentOpts.rap = subOptions.rap;
447 currentOpts.rh = subOptions.rh;
448 currentOpts.properties = properties;
449 }
450 this.log('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos);
451 subs.push(currentOpts);
452 }
453 };
454 if (Array.isArray(topicObject)) {
455 topicObject.forEach((topic) => {
456 this.log('subscribe: array topic %s', topic);
457 parseSub(topic);
458 });
459 }
460 else {
461 Object.keys(topicObject).forEach((topic) => {
462 this.log('subscribe: object topic %s, %o', topic, topicObject[topic]);
463 parseSub(topic, topicObject[topic]);
464 });
465 }
466 if (!subs.length) {
467 callback(null, []);
468 return this;
469 }
470 const subscribeProc = () => {
471 const messageId = this._nextId();
472 if (messageId === null) {
473 this.log('No messageId left');
474 return false;
475 }
476 const packet = {
477 cmd: 'subscribe',
478 subscriptions: subs,
479 messageId,
480 };
481 if (properties) {
482 packet.properties = properties;
483 }
484 if (this.options.resubscribe) {
485 this.log('subscribe :: resubscribe true');
486 const topics = [];
487 subs.forEach((sub) => {
488 if (this.options.reconnectPeriod > 0) {
489 const topic = { qos: sub.qos };
490 if (version === 5) {
491 topic.nl = sub.nl || false;
492 topic.rap = sub.rap || false;
493 topic.rh = sub.rh || 0;
494 topic.properties = sub.properties;
495 }
496 this._resubscribeTopics[sub.topic] = topic;
497 topics.push(sub.topic);
498 }
499 });
500 this.messageIdToTopic[packet.messageId] = topics;
501 }
502 this.outgoing[packet.messageId] = {
503 volatile: true,
504 cb(err, packet2) {
505 if (!err) {
506 const { granted } = packet2;
507 for (let i = 0; i < granted.length; i += 1) {
508 subs[i].qos = granted[i];
509 }
510 }
511 callback(err, subs);
512 },
513 };
514 this.log('subscribe :: call _sendPacket');
515 this._sendPacket(packet);
516 return true;
517 };
518 if (this._storeProcessing ||
519 this._storeProcessingQueue.length > 0 ||
520 !subscribeProc()) {
521 this._storeProcessingQueue.push({
522 invoke: subscribeProc,
523 callback,
524 });
525 }
526 return this;
527 }
528 subscribeAsync(topicObject, opts) {
529 return new Promise((resolve, reject) => {
530 this.subscribe(topicObject, opts, (err, granted) => {
531 if (err) {
532 reject(err);
533 }
534 else {
535 resolve(granted);
536 }
537 });
538 });
539 }
540 unsubscribe(topic, opts, callback) {
541 if (typeof topic === 'string') {
542 topic = [topic];
543 }
544 if (typeof opts === 'function') {
545 callback = opts;
546 }
547 callback = callback || this.noop;
548 const invalidTopic = validations.validateTopics(topic);
549 if (invalidTopic !== null) {
550 setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`));
551 return this;
552 }
553 if (this._checkDisconnecting(callback)) {
554 return this;
555 }
556 const unsubscribeProc = () => {
557 const messageId = this._nextId();
558 if (messageId === null) {
559 this.log('No messageId left');
560 return false;
561 }
562 const packet = {
563 cmd: 'unsubscribe',
564 messageId,
565 unsubscriptions: [],
566 };
567 if (typeof topic === 'string') {
568 packet.unsubscriptions = [topic];
569 }
570 else if (Array.isArray(topic)) {
571 packet.unsubscriptions = topic;
572 }
573 if (this.options.resubscribe) {
574 packet.unsubscriptions.forEach((topic2) => {
575 delete this._resubscribeTopics[topic2];
576 });
577 }
578 if (typeof opts === 'object' && opts.properties) {
579 packet.properties = opts.properties;
580 }
581 this.outgoing[packet.messageId] = {
582 volatile: true,
583 cb: callback,
584 };
585 this.log('unsubscribe: call _sendPacket');
586 this._sendPacket(packet);
587 return true;
588 };
589 if (this._storeProcessing ||
590 this._storeProcessingQueue.length > 0 ||
591 !unsubscribeProc()) {
592 this._storeProcessingQueue.push({
593 invoke: unsubscribeProc,
594 callback,
595 });
596 }
597 return this;
598 }
599 unsubscribeAsync(topic, opts) {
600 return new Promise((resolve, reject) => {
601 this.unsubscribe(topic, opts, (err, packet) => {
602 if (err) {
603 reject(err);
604 }
605 else {
606 resolve(packet);
607 }
608 });
609 });
610 }
611 end(force, opts, cb) {
612 this.log('end :: (%s)', this.options.clientId);
613 if (force == null || typeof force !== 'boolean') {
614 cb = cb || opts;
615 opts = force;
616 force = false;
617 }
618 if (typeof opts !== 'object') {
619 cb = cb || opts;
620 opts = null;
621 }
622 this.log('end :: cb? %s', !!cb);
623 if (!cb || typeof cb !== 'function') {
624 cb = this.noop;
625 }
626 const closeStores = () => {
627 this.log('end :: closeStores: closing incoming and outgoing stores');
628 this.disconnected = true;
629 this.incomingStore.close((e1) => {
630 this.outgoingStore.close((e2) => {
631 this.log('end :: closeStores: emitting end');
632 this.emit('end');
633 if (cb) {
634 const err = e1 || e2;
635 this.log('end :: closeStores: invoking callback with args');
636 cb(err);
637 }
638 });
639 });
640 if (this._deferredReconnect) {
641 this._deferredReconnect();
642 }
643 };
644 const finish = () => {
645 this.log('end :: (%s) :: finish :: calling _cleanUp with force %s', this.options.clientId, force);
646 this._cleanUp(force, () => {
647 this.log('end :: finish :: calling process.nextTick on closeStores');
648 (0, shared_1.nextTick)(closeStores);
649 }, opts);
650 };
651 if (this.disconnecting) {
652 cb();
653 return this;
654 }
655 this._clearReconnect();
656 this.disconnecting = true;
657 if (!force && Object.keys(this.outgoing).length > 0) {
658 this.log('end :: (%s) :: calling finish in 10ms once outgoing is empty', this.options.clientId);
659 this.once('outgoingEmpty', setTimeout.bind(null, finish, 10));
660 }
661 else {
662 this.log('end :: (%s) :: immediately calling finish', this.options.clientId);
663 finish();
664 }
665 return this;
666 }
667 endAsync(force, opts) {
668 return new Promise((resolve, reject) => {
669 this.end(force, opts, (err) => {
670 if (err) {
671 reject(err);
672 }
673 else {
674 resolve();
675 }
676 });
677 });
678 }
679 removeOutgoingMessage(messageId) {
680 if (this.outgoing[messageId]) {
681 const { cb } = this.outgoing[messageId];
682 this._removeOutgoingAndStoreMessage(messageId, () => {
683 cb(new Error('Message removed'));
684 });
685 }
686 return this;
687 }
688 reconnect(opts) {
689 this.log('client reconnect');
690 const f = () => {
691 if (opts) {
692 this.options.incomingStore = opts.incomingStore;
693 this.options.outgoingStore = opts.outgoingStore;
694 }
695 else {
696 this.options.incomingStore = null;
697 this.options.outgoingStore = null;
698 }
699 this.incomingStore = this.options.incomingStore || new store_1.default();
700 this.outgoingStore = this.options.outgoingStore || new store_1.default();
701 this.disconnecting = false;
702 this.disconnected = false;
703 this._deferredReconnect = null;
704 this._reconnect();
705 };
706 if (this.disconnecting && !this.disconnected) {
707 this._deferredReconnect = f;
708 }
709 else {
710 f();
711 }
712 return this;
713 }
714 _flushVolatile() {
715 if (this.outgoing) {
716 this.log('_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function');
717 Object.keys(this.outgoing).forEach((messageId) => {
718 if (this.outgoing[messageId].volatile &&
719 typeof this.outgoing[messageId].cb === 'function') {
720 this.outgoing[messageId].cb(new Error('Connection closed'));
721 delete this.outgoing[messageId];
722 }
723 });
724 }
725 }
726 _flush() {
727 if (this.outgoing) {
728 this.log('_flush: queue exists? %b', !!this.outgoing);
729 Object.keys(this.outgoing).forEach((messageId) => {
730 if (typeof this.outgoing[messageId].cb === 'function') {
731 this.outgoing[messageId].cb(new Error('Connection closed'));
732 delete this.outgoing[messageId];
733 }
734 });
735 }
736 }
737 _removeTopicAliasAndRecoverTopicName(packet) {
738 let alias;
739 if (packet.properties) {
740 alias = packet.properties.topicAlias;
741 }
742 let topic = packet.topic.toString();
743 this.log('_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', alias, topic);
744 if (topic.length === 0) {
745 if (typeof alias === 'undefined') {
746 return new Error('Unregistered Topic Alias');
747 }
748 topic = this.topicAliasSend.getTopicByAlias(alias);
749 if (typeof topic === 'undefined') {
750 return new Error('Unregistered Topic Alias');
751 }
752 packet.topic = topic;
753 }
754 if (alias) {
755 delete packet.properties.topicAlias;
756 }
757 }
758 _checkDisconnecting(callback) {
759 if (this.disconnecting) {
760 if (callback && callback !== this.noop) {
761 callback(new Error('client disconnecting'));
762 }
763 else {
764 this.emit('error', new Error('client disconnecting'));
765 }
766 }
767 return this.disconnecting;
768 }
769 _reconnect() {
770 this.log('_reconnect: emitting reconnect to client');
771 this.emit('reconnect');
772 if (this.connected) {
773 this.end(() => {
774 this.connect();
775 });
776 this.log('client already connected. disconnecting first.');
777 }
778 else {
779 this.log('_reconnect: calling connect');
780 this.connect();
781 }
782 }
783 _setupReconnect() {
784 if (!this.disconnecting &&
785 !this.reconnectTimer &&
786 this.options.reconnectPeriod > 0) {
787 if (!this.reconnecting) {
788 this.log('_setupReconnect :: emit `offline` state');
789 this.emit('offline');
790 this.log('_setupReconnect :: set `reconnecting` to `true`');
791 this.reconnecting = true;
792 }
793 this.log('_setupReconnect :: setting reconnectTimer for %d ms', this.options.reconnectPeriod);
794 this.reconnectTimer = setInterval(() => {
795 this.log('reconnectTimer :: reconnect triggered!');
796 this._reconnect();
797 }, this.options.reconnectPeriod);
798 }
799 else {
800 this.log('_setupReconnect :: doing nothing...');
801 }
802 }
803 _clearReconnect() {
804 this.log('_clearReconnect : clearing reconnect timer');
805 if (this.reconnectTimer) {
806 clearInterval(this.reconnectTimer);
807 this.reconnectTimer = null;
808 }
809 }
810 _cleanUp(forced, done, opts = {}) {
811 if (done) {
812 this.log('_cleanUp :: done callback provided for on stream close');
813 this.stream.on('close', done);
814 }
815 this.log('_cleanUp :: forced? %s', forced);
816 if (forced) {
817 if (this.options.reconnectPeriod === 0 && this.options.clean) {
818 this._flush();
819 }
820 this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId);
821 this.stream.destroy();
822 }
823 else {
824 const packet = Object.assign({ cmd: 'disconnect' }, opts);
825 this.log('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId);
826 this._sendPacket(packet, () => {
827 this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId);
828 setImmediate(() => {
829 this.stream.end(() => {
830 this.log('_cleanUp :: (%s) :: stream destroyed', this.options.clientId);
831 });
832 });
833 });
834 }
835 if (!this.disconnecting && !this.reconnecting) {
836 this.log('_cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.');
837 this._clearReconnect();
838 this._setupReconnect();
839 }
840 this._destroyPingTimer();
841 if (done && !this.connected) {
842 this.log('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId);
843 this.stream.removeListener('close', done);
844 done();
845 }
846 }
847 _storeAndSend(packet, cb, cbStorePut) {
848 this.log('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd);
849 let storePacket = packet;
850 let err;
851 if (storePacket.cmd === 'publish') {
852 storePacket = (0, default_1.default)(packet);
853 err = this._removeTopicAliasAndRecoverTopicName(storePacket);
854 if (err) {
855 return cb && cb(err);
856 }
857 }
858 this.outgoingStore.put(storePacket, (err2) => {
859 if (err2) {
860 return cb && cb(err2);
861 }
862 cbStorePut();
863 this._writePacket(packet, cb);
864 });
865 }
866 _applyTopicAlias(packet) {
867 if (this.options.protocolVersion === 5) {
868 if (packet.cmd === 'publish') {
869 let alias;
870 if (packet.properties) {
871 alias = packet.properties.topicAlias;
872 }
873 const topic = packet.topic.toString();
874 if (this.topicAliasSend) {
875 if (alias) {
876 if (topic.length !== 0) {
877 this.log('applyTopicAlias :: register topic: %s - alias: %d', topic, alias);
878 if (!this.topicAliasSend.put(topic, alias)) {
879 this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias);
880 return new Error('Sending Topic Alias out of range');
881 }
882 }
883 }
884 else if (topic.length !== 0) {
885 if (this.options.autoAssignTopicAlias) {
886 alias = this.topicAliasSend.getAliasByTopic(topic);
887 if (alias) {
888 packet.topic = '';
889 packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
890 this.log('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias);
891 }
892 else {
893 alias = this.topicAliasSend.getLruAlias();
894 this.topicAliasSend.put(topic, alias);
895 packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
896 this.log('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias);
897 }
898 }
899 else if (this.options.autoUseTopicAlias) {
900 alias = this.topicAliasSend.getAliasByTopic(topic);
901 if (alias) {
902 packet.topic = '';
903 packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
904 this.log('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias);
905 }
906 }
907 }
908 }
909 else if (alias) {
910 this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias);
911 return new Error('Sending Topic Alias out of range');
912 }
913 }
914 }
915 }
916 _noop(err) {
917 this.log('noop ::', err);
918 }
919 _writePacket(packet, cb) {
920 this.log('_writePacket :: packet: %O', packet);
921 this.log('_writePacket :: emitting `packetsend`');
922 this.emit('packetsend', packet);
923 this.log('_writePacket :: writing to stream');
924 const result = mqtt_packet_1.default.writeToStream(packet, this.stream, this.options);
925 this.log('_writePacket :: writeToStream result %s', result);
926 if (!result && cb && cb !== this.noop) {
927 this.log('_writePacket :: handle events on `drain` once through callback.');
928 this.stream.once('drain', cb);
929 }
930 else if (cb) {
931 this.log('_writePacket :: invoking cb');
932 cb();
933 }
934 }
935 _sendPacket(packet, cb, cbStorePut, noStore) {
936 this.log('_sendPacket :: (%s) :: start', this.options.clientId);
937 cbStorePut = cbStorePut || this.noop;
938 cb = cb || this.noop;
939 const err = this._applyTopicAlias(packet);
940 if (err) {
941 cb(err);
942 return;
943 }
944 if (!this.connected) {
945 if (packet.cmd === 'auth') {
946 this._writePacket(packet, cb);
947 return;
948 }
949 this.log('_sendPacket :: client not connected. Storing packet offline.');
950 this._storePacket(packet, cb, cbStorePut);
951 return;
952 }
953 if (noStore) {
954 this._writePacket(packet, cb);
955 return;
956 }
957 switch (packet.cmd) {
958 case 'publish':
959 break;
960 case 'pubrel':
961 this._storeAndSend(packet, cb, cbStorePut);
962 return;
963 default:
964 this._writePacket(packet, cb);
965 return;
966 }
967 switch (packet.qos) {
968 case 2:
969 case 1:
970 this._storeAndSend(packet, cb, cbStorePut);
971 break;
972 case 0:
973 default:
974 this._writePacket(packet, cb);
975 break;
976 }
977 this.log('_sendPacket :: (%s) :: end', this.options.clientId);
978 }
979 _storePacket(packet, cb, cbStorePut) {
980 this.log('_storePacket :: packet: %o', packet);
981 this.log('_storePacket :: cb? %s', !!cb);
982 cbStorePut = cbStorePut || this.noop;
983 let storePacket = packet;
984 if (storePacket.cmd === 'publish') {
985 storePacket = (0, default_1.default)(packet);
986 const err = this._removeTopicAliasAndRecoverTopicName(storePacket);
987 if (err) {
988 return cb && cb(err);
989 }
990 }
991 const qos = storePacket.qos || 0;
992 if ((qos === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
993 this.queue.push({ packet: storePacket, cb });
994 }
995 else if (qos > 0) {
996 cb = this.outgoing[storePacket.messageId]
997 ? this.outgoing[storePacket.messageId].cb
998 : null;
999 this.outgoingStore.put(storePacket, (err) => {
1000 if (err) {
1001 return cb && cb(err);
1002 }
1003 cbStorePut();
1004 });
1005 }
1006 else if (cb) {
1007 cb(new Error('No connection to broker'));
1008 }
1009 }
1010 _setupPingTimer() {
1011 this.log('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive);
1012 if (!this.pingTimer && this.options.keepalive) {
1013 this.pingTimer = new PingTimer_1.default(this.options.keepalive, () => {
1014 this._checkPing();
1015 }, this.options.timerVariant);
1016 this.pingResp = Date.now();
1017 }
1018 }
1019 _destroyPingTimer() {
1020 if (this.pingTimer) {
1021 this.log('_destroyPingTimer :: destroying ping timer');
1022 this.pingTimer.destroy();
1023 this.pingTimer = null;
1024 }
1025 }
1026 _shiftPingInterval() {
1027 if (this.pingTimer &&
1028 this.options.keepalive &&
1029 this.options.reschedulePings) {
1030 this._reschedulePing();
1031 }
1032 }
1033 _reschedulePing() {
1034 this.log('_reschedulePing :: rescheduling ping');
1035 this.pingTimer.reschedule();
1036 }
1037 _checkPing() {
1038 this.log('_checkPing :: checking ping...');
1039 const timeSincePing = Date.now() - this.pingResp - 100;
1040 if (timeSincePing <= this.options.keepalive * 1000) {
1041 this.log('_checkPing :: ping response received in time');
1042 this._sendPing();
1043 }
1044 else {
1045 this.emit('error', new Error('Keepalive timeout'));
1046 this.log('_checkPing :: calling _cleanUp with force true');
1047 this._cleanUp(true);
1048 }
1049 }
1050 _sendPing() {
1051 this.log('_sendPing :: sending pingreq');
1052 this._sendPacket({ cmd: 'pingreq' });
1053 }
1054 _resubscribe() {
1055 this.log('_resubscribe');
1056 const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics);
1057 if (!this._firstConnection &&
1058 (this.options.clean ||
1059 (this.options.protocolVersion >= 4 &&
1060 !this.connackPacket.sessionPresent)) &&
1061 _resubscribeTopicsKeys.length > 0) {
1062 if (this.options.resubscribe) {
1063 if (this.options.protocolVersion === 5) {
1064 this.log('_resubscribe: protocolVersion 5');
1065 for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
1066 const resubscribeTopic = {};
1067 resubscribeTopic[_resubscribeTopicsKeys[topicI]] =
1068 this._resubscribeTopics[_resubscribeTopicsKeys[topicI]];
1069 resubscribeTopic.resubscribe = true;
1070 this.subscribe(resubscribeTopic, {
1071 properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]]
1072 .properties,
1073 });
1074 }
1075 }
1076 else {
1077 this._resubscribeTopics.resubscribe = true;
1078 this.subscribe(this._resubscribeTopics);
1079 }
1080 }
1081 else {
1082 this._resubscribeTopics = {};
1083 }
1084 }
1085 this._firstConnection = false;
1086 }
1087 _onConnect(packet) {
1088 if (this.disconnected) {
1089 this.emit('connect', packet);
1090 return;
1091 }
1092 this.connackPacket = packet;
1093 this.messageIdProvider.clear();
1094 this._setupPingTimer();
1095 this.connected = true;
1096 const startStreamProcess = () => {
1097 let outStore = this.outgoingStore.createStream();
1098 const remove = () => {
1099 outStore.destroy();
1100 outStore = null;
1101 this._flushStoreProcessingQueue();
1102 clearStoreProcessing();
1103 };
1104 const clearStoreProcessing = () => {
1105 this._storeProcessing = false;
1106 this._packetIdsDuringStoreProcessing = {};
1107 };
1108 this.once('close', remove);
1109 outStore.on('error', (err) => {
1110 clearStoreProcessing();
1111 this._flushStoreProcessingQueue();
1112 this.removeListener('close', remove);
1113 this.emit('error', err);
1114 });
1115 const storeDeliver = () => {
1116 if (!outStore) {
1117 return;
1118 }
1119 const packet2 = outStore.read(1);
1120 let cb;
1121 if (!packet2) {
1122 outStore.once('readable', storeDeliver);
1123 return;
1124 }
1125 this._storeProcessing = true;
1126 if (this._packetIdsDuringStoreProcessing[packet2.messageId]) {
1127 storeDeliver();
1128 return;
1129 }
1130 if (!this.disconnecting && !this.reconnectTimer) {
1131 cb = this.outgoing[packet2.messageId]
1132 ? this.outgoing[packet2.messageId].cb
1133 : null;
1134 this.outgoing[packet2.messageId] = {
1135 volatile: false,
1136 cb(err, status) {
1137 if (cb) {
1138 cb(err, status);
1139 }
1140 storeDeliver();
1141 },
1142 };
1143 this._packetIdsDuringStoreProcessing[packet2.messageId] =
1144 true;
1145 if (this.messageIdProvider.register(packet2.messageId)) {
1146 this._sendPacket(packet2, undefined, undefined, true);
1147 }
1148 else {
1149 this.log('messageId: %d has already used.', packet2.messageId);
1150 }
1151 }
1152 else if (outStore.destroy) {
1153 outStore.destroy();
1154 }
1155 };
1156 outStore.on('end', () => {
1157 let allProcessed = true;
1158 for (const id in this._packetIdsDuringStoreProcessing) {
1159 if (!this._packetIdsDuringStoreProcessing[id]) {
1160 allProcessed = false;
1161 break;
1162 }
1163 }
1164 this.removeListener('close', remove);
1165 if (allProcessed) {
1166 clearStoreProcessing();
1167 this._invokeAllStoreProcessingQueue();
1168 this.emit('connect', packet);
1169 }
1170 else {
1171 startStreamProcess();
1172 }
1173 });
1174 storeDeliver();
1175 };
1176 startStreamProcess();
1177 }
1178 _invokeStoreProcessingQueue() {
1179 if (!this._storeProcessing && this._storeProcessingQueue.length > 0) {
1180 const f = this._storeProcessingQueue[0];
1181 if (f && f.invoke()) {
1182 this._storeProcessingQueue.shift();
1183 return true;
1184 }
1185 }
1186 return false;
1187 }
1188 _invokeAllStoreProcessingQueue() {
1189 while (this._invokeStoreProcessingQueue()) {
1190 }
1191 }
1192 _flushStoreProcessingQueue() {
1193 for (const f of this._storeProcessingQueue) {
1194 if (f.cbStorePut)
1195 f.cbStorePut(new Error('Connection closed'));
1196 if (f.callback)
1197 f.callback(new Error('Connection closed'));
1198 }
1199 this._storeProcessingQueue.splice(0);
1200 }
1201 _removeOutgoingAndStoreMessage(messageId, cb) {
1202 delete this.outgoing[messageId];
1203 this.outgoingStore.del({ messageId }, (err, packet) => {
1204 cb(err, packet);
1205 this.messageIdProvider.deallocate(messageId);
1206 this._invokeStoreProcessingQueue();
1207 });
1208 }
1209}
1210MqttClient.VERSION = shared_1.MQTTJS_VERSION;
1211exports.default = MqttClient;
1212//# sourceMappingURL=client.js.map
\No newline at end of file