1 | "use strict";
|
2 | var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
|
3 | if (k2 === undefined) k2 = k;
|
4 | var desc = Object.getOwnPropertyDescriptor(m, k);
|
5 | if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
|
6 | desc = { enumerable: true, get: function() { return m[k]; } };
|
7 | }
|
8 | Object.defineProperty(o, k2, desc);
|
9 | }) : (function(o, m, k, k2) {
|
10 | if (k2 === undefined) k2 = k;
|
11 | o[k2] = m[k];
|
12 | }));
|
13 | var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
|
14 | Object.defineProperty(o, "default", { enumerable: true, value: v });
|
15 | }) : function(o, v) {
|
16 | o["default"] = v;
|
17 | });
|
18 | var __importStar = (this && this.__importStar) || function (mod) {
|
19 | if (mod && mod.__esModule) return mod;
|
20 | var result = {};
|
21 | if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
|
22 | __setModuleDefault(result, mod);
|
23 | return result;
|
24 | };
|
25 | var __importDefault = (this && this.__importDefault) || function (mod) {
|
26 | return (mod && mod.__esModule) ? mod : { "default": mod };
|
27 | };
|
28 | Object.defineProperty(exports, "__esModule", { value: true });
|
29 | const topic_alias_recv_1 = __importDefault(require("./topic-alias-recv"));
|
30 | const mqtt_packet_1 = __importDefault(require("mqtt-packet"));
|
31 | const default_message_id_provider_1 = __importDefault(require("./default-message-id-provider"));
|
32 | const readable_stream_1 = require("readable-stream");
|
33 | const default_1 = __importDefault(require("rfdc/default"));
|
34 | const validations = __importStar(require("./validations"));
|
35 | const debug_1 = __importDefault(require("debug"));
|
36 | const store_1 = __importDefault(require("./store"));
|
37 | const handlers_1 = __importDefault(require("./handlers"));
|
38 | const shared_1 = require("./shared");
|
39 | const TypedEmitter_1 = require("./TypedEmitter");
|
40 | const KeepaliveManager_1 = __importDefault(require("./KeepaliveManager"));
|
41 | const is_browser_1 = __importStar(require("./is-browser"));
|
42 | const setImmediate = globalThis.setImmediate ||
|
43 | ((...args) => {
|
44 | const callback = args.shift();
|
45 | (0, shared_1.nextTick)(() => {
|
46 | callback(...args);
|
47 | });
|
48 | });
|
49 | const defaultConnectOptions = {
|
50 | keepalive: 60,
|
51 | reschedulePings: true,
|
52 | protocolId: 'MQTT',
|
53 | protocolVersion: 4,
|
54 | reconnectPeriod: 1000,
|
55 | connectTimeout: 30 * 1000,
|
56 | clean: true,
|
57 | resubscribe: true,
|
58 | writeCache: true,
|
59 | timerVariant: 'auto',
|
60 | };
|
61 | class MqttClient extends TypedEmitter_1.TypedEventEmitter {
|
62 | static defaultId() {
|
63 | return `mqttjs_${Math.random().toString(16).substr(2, 8)}`;
|
64 | }
|
65 | constructor(streamBuilder, options) {
|
66 | super();
|
67 | this.options = options || {};
|
68 | for (const k in defaultConnectOptions) {
|
69 | if (typeof this.options[k] === 'undefined') {
|
70 | this.options[k] = defaultConnectOptions[k];
|
71 | }
|
72 | else {
|
73 | this.options[k] = options[k];
|
74 | }
|
75 | }
|
76 | this.log = this.options.log || (0, debug_1.default)('mqttjs:client');
|
77 | this.noop = this._noop.bind(this);
|
78 | this.log('MqttClient :: version:', MqttClient.VERSION);
|
79 | if (is_browser_1.isWebWorker) {
|
80 | this.log('MqttClient :: environment', 'webworker');
|
81 | }
|
82 | else {
|
83 | this.log('MqttClient :: environment', is_browser_1.default ? 'browser' : 'node');
|
84 | }
|
85 | this.log('MqttClient :: options.protocol', options.protocol);
|
86 | this.log('MqttClient :: options.protocolVersion', options.protocolVersion);
|
87 | this.log('MqttClient :: options.username', options.username);
|
88 | this.log('MqttClient :: options.keepalive', options.keepalive);
|
89 | this.log('MqttClient :: options.reconnectPeriod', options.reconnectPeriod);
|
90 | this.log('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized);
|
91 | this.log('MqttClient :: options.properties.topicAliasMaximum', options.properties
|
92 | ? options.properties.topicAliasMaximum
|
93 | : undefined);
|
94 | this.options.clientId =
|
95 | typeof options.clientId === 'string'
|
96 | ? options.clientId
|
97 | : MqttClient.defaultId();
|
98 | this.log('MqttClient :: clientId', this.options.clientId);
|
99 | this.options.customHandleAcks =
|
100 | options.protocolVersion === 5 && options.customHandleAcks
|
101 | ? options.customHandleAcks
|
102 | : (...args) => {
|
103 | args[3](null, 0);
|
104 | };
|
105 | if (!this.options.writeCache) {
|
106 | mqtt_packet_1.default.writeToStream.cacheNumbers = false;
|
107 | }
|
108 | this.streamBuilder = streamBuilder;
|
109 | this.messageIdProvider =
|
110 | typeof this.options.messageIdProvider === 'undefined'
|
111 | ? new default_message_id_provider_1.default()
|
112 | : this.options.messageIdProvider;
|
113 | this.outgoingStore = options.outgoingStore || new store_1.default();
|
114 | this.incomingStore = options.incomingStore || new store_1.default();
|
115 | this.queueQoSZero =
|
116 | options.queueQoSZero === undefined ? true : options.queueQoSZero;
|
117 | this._resubscribeTopics = {};
|
118 | this.messageIdToTopic = {};
|
119 | this.keepaliveManager = null;
|
120 | this.connected = false;
|
121 | this.disconnecting = false;
|
122 | this.reconnecting = false;
|
123 | this.queue = [];
|
124 | this.connackTimer = null;
|
125 | this.reconnectTimer = null;
|
126 | this._storeProcessing = false;
|
127 | this._packetIdsDuringStoreProcessing = {};
|
128 | this._storeProcessingQueue = [];
|
129 | this.outgoing = {};
|
130 | this._firstConnection = true;
|
131 | if (options.properties && options.properties.topicAliasMaximum > 0) {
|
132 | if (options.properties.topicAliasMaximum > 0xffff) {
|
133 | this.log('MqttClient :: options.properties.topicAliasMaximum is out of range');
|
134 | }
|
135 | else {
|
136 | this.topicAliasRecv = new topic_alias_recv_1.default(options.properties.topicAliasMaximum);
|
137 | }
|
138 | }
|
139 | this.on('connect', () => {
|
140 | const { queue } = this;
|
141 | const deliver = () => {
|
142 | const entry = queue.shift();
|
143 | this.log('deliver :: entry %o', entry);
|
144 | let packet = null;
|
145 | if (!entry) {
|
146 | this._resubscribe();
|
147 | return;
|
148 | }
|
149 | packet = entry.packet;
|
150 | this.log('deliver :: call _sendPacket for %o', packet);
|
151 | let send = true;
|
152 | if (packet.messageId && packet.messageId !== 0) {
|
153 | if (!this.messageIdProvider.register(packet.messageId)) {
|
154 | send = false;
|
155 | }
|
156 | }
|
157 | if (send) {
|
158 | this._sendPacket(packet, (err) => {
|
159 | if (entry.cb) {
|
160 | entry.cb(err);
|
161 | }
|
162 | deliver();
|
163 | });
|
164 | }
|
165 | else {
|
166 | this.log('messageId: %d has already used. The message is skipped and removed.', packet.messageId);
|
167 | deliver();
|
168 | }
|
169 | };
|
170 | this.log('connect :: sending queued packets');
|
171 | deliver();
|
172 | });
|
173 | this.on('close', () => {
|
174 | this.log('close :: connected set to `false`');
|
175 | this.connected = false;
|
176 | this.log('close :: clearing connackTimer');
|
177 | clearTimeout(this.connackTimer);
|
178 | this._destroyKeepaliveManager();
|
179 | if (this.topicAliasRecv) {
|
180 | this.topicAliasRecv.clear();
|
181 | }
|
182 | this.log('close :: calling _setupReconnect');
|
183 | this._setupReconnect();
|
184 | });
|
185 | if (!this.options.manualConnect) {
|
186 | this.log('MqttClient :: setting up stream');
|
187 | this.connect();
|
188 | }
|
189 | }
|
190 | handleAuth(packet, callback) {
|
191 | callback();
|
192 | }
|
193 | handleMessage(packet, callback) {
|
194 | callback();
|
195 | }
|
196 | _nextId() {
|
197 | return this.messageIdProvider.allocate();
|
198 | }
|
199 | getLastMessageId() {
|
200 | return this.messageIdProvider.getLastAllocated();
|
201 | }
|
202 | connect() {
|
203 | var _a;
|
204 | const writable = new readable_stream_1.Writable();
|
205 | const parser = mqtt_packet_1.default.parser(this.options);
|
206 | let completeParse = null;
|
207 | const packets = [];
|
208 | this.log('connect :: calling method to clear reconnect');
|
209 | this._clearReconnect();
|
210 | this.log('connect :: using streamBuilder provided to client to create stream');
|
211 | this.stream = this.streamBuilder(this);
|
212 | parser.on('packet', (packet) => {
|
213 | this.log('parser :: on packet push to packets array.');
|
214 | packets.push(packet);
|
215 | });
|
216 | const work = () => {
|
217 | this.log('work :: getting next packet in queue');
|
218 | const packet = packets.shift();
|
219 | if (packet) {
|
220 | this.log('work :: packet pulled from queue');
|
221 | (0, handlers_1.default)(this, packet, nextTickWork);
|
222 | }
|
223 | else {
|
224 | this.log('work :: no packets in queue');
|
225 | const done = completeParse;
|
226 | completeParse = null;
|
227 | this.log('work :: done flag is %s', !!done);
|
228 | if (done)
|
229 | done();
|
230 | }
|
231 | };
|
232 | const nextTickWork = () => {
|
233 | if (packets.length) {
|
234 | (0, shared_1.nextTick)(work);
|
235 | }
|
236 | else {
|
237 | const done = completeParse;
|
238 | completeParse = null;
|
239 | done();
|
240 | }
|
241 | };
|
242 | writable._write = (buf, enc, done) => {
|
243 | completeParse = done;
|
244 | this.log('writable stream :: parsing buffer');
|
245 | parser.parse(buf);
|
246 | work();
|
247 | };
|
248 | const streamErrorHandler = (error) => {
|
249 | this.log('streamErrorHandler :: error', error.message);
|
250 | if (error.code) {
|
251 | this.log('streamErrorHandler :: emitting error');
|
252 | this.emit('error', error);
|
253 | }
|
254 | else {
|
255 | this.noop(error);
|
256 | }
|
257 | };
|
258 | this.log('connect :: pipe stream to writable stream');
|
259 | this.stream.pipe(writable);
|
260 | this.stream.on('error', streamErrorHandler);
|
261 | this.stream.on('close', () => {
|
262 | this.log('(%s)stream :: on close', this.options.clientId);
|
263 | this._flushVolatile();
|
264 | this.log('stream: emit close to MqttClient');
|
265 | this.emit('close');
|
266 | });
|
267 | this.log('connect: sending packet `connect`');
|
268 | const connectPacket = {
|
269 | cmd: 'connect',
|
270 | protocolId: this.options.protocolId,
|
271 | protocolVersion: this.options.protocolVersion,
|
272 | clean: this.options.clean,
|
273 | clientId: this.options.clientId,
|
274 | keepalive: this.options.keepalive,
|
275 | username: this.options.username,
|
276 | password: this.options.password,
|
277 | properties: this.options.properties,
|
278 | };
|
279 | if (this.options.will) {
|
280 | connectPacket.will = Object.assign(Object.assign({}, this.options.will), { payload: (_a = this.options.will) === null || _a === void 0 ? void 0 : _a.payload });
|
281 | }
|
282 | if (this.topicAliasRecv) {
|
283 | if (!connectPacket.properties) {
|
284 | connectPacket.properties = {};
|
285 | }
|
286 | if (this.topicAliasRecv) {
|
287 | connectPacket.properties.topicAliasMaximum =
|
288 | this.topicAliasRecv.max;
|
289 | }
|
290 | }
|
291 | this._writePacket(connectPacket);
|
292 | parser.on('error', this.emit.bind(this, 'error'));
|
293 | if (this.options.properties) {
|
294 | if (!this.options.properties.authenticationMethod &&
|
295 | this.options.properties.authenticationData) {
|
296 | this.end(() => this.emit('error', new Error('Packet has no Authentication Method')));
|
297 | return this;
|
298 | }
|
299 | if (this.options.properties.authenticationMethod &&
|
300 | this.options.authPacket &&
|
301 | typeof this.options.authPacket === 'object') {
|
302 | const authPacket = Object.assign({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket);
|
303 | this._writePacket(authPacket);
|
304 | }
|
305 | }
|
306 | this.stream.setMaxListeners(1000);
|
307 | clearTimeout(this.connackTimer);
|
308 | this.connackTimer = setTimeout(() => {
|
309 | this.log('!!connectTimeout hit!! Calling _cleanUp with force `true`');
|
310 | this.emit('error', new Error('connack timeout'));
|
311 | this._cleanUp(true);
|
312 | }, this.options.connectTimeout);
|
313 | return this;
|
314 | }
|
315 | publish(topic, message, opts, callback) {
|
316 | this.log('publish :: message `%s` to topic `%s`', message, topic);
|
317 | const { options } = this;
|
318 | if (typeof opts === 'function') {
|
319 | callback = opts;
|
320 | opts = null;
|
321 | }
|
322 | opts = opts || {};
|
323 | const defaultOpts = {
|
324 | qos: 0,
|
325 | retain: false,
|
326 | dup: false,
|
327 | };
|
328 | opts = Object.assign(Object.assign({}, defaultOpts), opts);
|
329 | const { qos, retain, dup, properties, cbStorePut } = opts;
|
330 | if (this._checkDisconnecting(callback)) {
|
331 | return this;
|
332 | }
|
333 | const publishProc = () => {
|
334 | let messageId = 0;
|
335 | if (qos === 1 || qos === 2) {
|
336 | messageId = this._nextId();
|
337 | if (messageId === null) {
|
338 | this.log('No messageId left');
|
339 | return false;
|
340 | }
|
341 | }
|
342 | const packet = {
|
343 | cmd: 'publish',
|
344 | topic,
|
345 | payload: message,
|
346 | qos,
|
347 | retain,
|
348 | messageId,
|
349 | dup,
|
350 | };
|
351 | if (options.protocolVersion === 5) {
|
352 | packet.properties = properties;
|
353 | }
|
354 | this.log('publish :: qos', qos);
|
355 | switch (qos) {
|
356 | case 1:
|
357 | case 2:
|
358 | this.outgoing[packet.messageId] = {
|
359 | volatile: false,
|
360 | cb: callback || this.noop,
|
361 | };
|
362 | this.log('MqttClient:publish: packet cmd: %s', packet.cmd);
|
363 | this._sendPacket(packet, undefined, cbStorePut);
|
364 | break;
|
365 | default:
|
366 | this.log('MqttClient:publish: packet cmd: %s', packet.cmd);
|
367 | this._sendPacket(packet, callback, cbStorePut);
|
368 | break;
|
369 | }
|
370 | return true;
|
371 | };
|
372 | if (this._storeProcessing ||
|
373 | this._storeProcessingQueue.length > 0 ||
|
374 | !publishProc()) {
|
375 | this._storeProcessingQueue.push({
|
376 | invoke: publishProc,
|
377 | cbStorePut: opts.cbStorePut,
|
378 | callback,
|
379 | });
|
380 | }
|
381 | return this;
|
382 | }
|
383 | publishAsync(topic, message, opts) {
|
384 | return new Promise((resolve, reject) => {
|
385 | this.publish(topic, message, opts, (err, packet) => {
|
386 | if (err) {
|
387 | reject(err);
|
388 | }
|
389 | else {
|
390 | resolve(packet);
|
391 | }
|
392 | });
|
393 | });
|
394 | }
|
395 | subscribe(topicObject, opts, callback) {
|
396 | const version = this.options.protocolVersion;
|
397 | if (typeof opts === 'function') {
|
398 | callback = opts;
|
399 | }
|
400 | callback = callback || this.noop;
|
401 | let resubscribe = false;
|
402 | let topicsList = [];
|
403 | if (typeof topicObject === 'string') {
|
404 | topicObject = [topicObject];
|
405 | topicsList = topicObject;
|
406 | }
|
407 | else if (Array.isArray(topicObject)) {
|
408 | topicsList = topicObject;
|
409 | }
|
410 | else if (typeof topicObject === 'object') {
|
411 | resubscribe = topicObject.resubscribe;
|
412 | delete topicObject.resubscribe;
|
413 | topicsList = Object.keys(topicObject);
|
414 | }
|
415 | const invalidTopic = validations.validateTopics(topicsList);
|
416 | if (invalidTopic !== null) {
|
417 | setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`));
|
418 | return this;
|
419 | }
|
420 | if (this._checkDisconnecting(callback)) {
|
421 | this.log('subscribe: discconecting true');
|
422 | return this;
|
423 | }
|
424 | const defaultOpts = {
|
425 | qos: 0,
|
426 | };
|
427 | if (version === 5) {
|
428 | defaultOpts.nl = false;
|
429 | defaultOpts.rap = false;
|
430 | defaultOpts.rh = 0;
|
431 | }
|
432 | opts = Object.assign(Object.assign({}, defaultOpts), opts);
|
433 | const properties = opts.properties;
|
434 | const subs = [];
|
435 | const parseSub = (topic, subOptions) => {
|
436 | subOptions = (subOptions || opts);
|
437 | if (!Object.prototype.hasOwnProperty.call(this._resubscribeTopics, topic) ||
|
438 | this._resubscribeTopics[topic].qos < subOptions.qos ||
|
439 | resubscribe) {
|
440 | const currentOpts = {
|
441 | topic,
|
442 | qos: subOptions.qos,
|
443 | };
|
444 | if (version === 5) {
|
445 | currentOpts.nl = subOptions.nl;
|
446 | currentOpts.rap = subOptions.rap;
|
447 | currentOpts.rh = subOptions.rh;
|
448 | currentOpts.properties = properties;
|
449 | }
|
450 | this.log('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos);
|
451 | subs.push(currentOpts);
|
452 | }
|
453 | };
|
454 | if (Array.isArray(topicObject)) {
|
455 | topicObject.forEach((topic) => {
|
456 | this.log('subscribe: array topic %s', topic);
|
457 | parseSub(topic);
|
458 | });
|
459 | }
|
460 | else {
|
461 | Object.keys(topicObject).forEach((topic) => {
|
462 | this.log('subscribe: object topic %s, %o', topic, topicObject[topic]);
|
463 | parseSub(topic, topicObject[topic]);
|
464 | });
|
465 | }
|
466 | if (!subs.length) {
|
467 | callback(null, []);
|
468 | return this;
|
469 | }
|
470 | const subscribeProc = () => {
|
471 | const messageId = this._nextId();
|
472 | if (messageId === null) {
|
473 | this.log('No messageId left');
|
474 | return false;
|
475 | }
|
476 | const packet = {
|
477 | cmd: 'subscribe',
|
478 | subscriptions: subs,
|
479 | messageId,
|
480 | };
|
481 | if (properties) {
|
482 | packet.properties = properties;
|
483 | }
|
484 | if (this.options.resubscribe) {
|
485 | this.log('subscribe :: resubscribe true');
|
486 | const topics = [];
|
487 | subs.forEach((sub) => {
|
488 | if (this.options.reconnectPeriod > 0) {
|
489 | const topic = { qos: sub.qos };
|
490 | if (version === 5) {
|
491 | topic.nl = sub.nl || false;
|
492 | topic.rap = sub.rap || false;
|
493 | topic.rh = sub.rh || 0;
|
494 | topic.properties = sub.properties;
|
495 | }
|
496 | this._resubscribeTopics[sub.topic] = topic;
|
497 | topics.push(sub.topic);
|
498 | }
|
499 | });
|
500 | this.messageIdToTopic[packet.messageId] = topics;
|
501 | }
|
502 | this.outgoing[packet.messageId] = {
|
503 | volatile: true,
|
504 | cb(err, packet2) {
|
505 | if (!err) {
|
506 | const { granted } = packet2;
|
507 | for (let i = 0; i < granted.length; i += 1) {
|
508 | subs[i].qos = granted[i];
|
509 | }
|
510 | }
|
511 | callback(err, subs);
|
512 | },
|
513 | };
|
514 | this.log('subscribe :: call _sendPacket');
|
515 | this._sendPacket(packet);
|
516 | return true;
|
517 | };
|
518 | if (this._storeProcessing ||
|
519 | this._storeProcessingQueue.length > 0 ||
|
520 | !subscribeProc()) {
|
521 | this._storeProcessingQueue.push({
|
522 | invoke: subscribeProc,
|
523 | callback,
|
524 | });
|
525 | }
|
526 | return this;
|
527 | }
|
528 | subscribeAsync(topicObject, opts) {
|
529 | return new Promise((resolve, reject) => {
|
530 | this.subscribe(topicObject, opts, (err, granted) => {
|
531 | if (err) {
|
532 | reject(err);
|
533 | }
|
534 | else {
|
535 | resolve(granted);
|
536 | }
|
537 | });
|
538 | });
|
539 | }
|
540 | unsubscribe(topic, opts, callback) {
|
541 | if (typeof topic === 'string') {
|
542 | topic = [topic];
|
543 | }
|
544 | if (typeof opts === 'function') {
|
545 | callback = opts;
|
546 | }
|
547 | callback = callback || this.noop;
|
548 | const invalidTopic = validations.validateTopics(topic);
|
549 | if (invalidTopic !== null) {
|
550 | setImmediate(callback, new Error(`Invalid topic ${invalidTopic}`));
|
551 | return this;
|
552 | }
|
553 | if (this._checkDisconnecting(callback)) {
|
554 | return this;
|
555 | }
|
556 | const unsubscribeProc = () => {
|
557 | const messageId = this._nextId();
|
558 | if (messageId === null) {
|
559 | this.log('No messageId left');
|
560 | return false;
|
561 | }
|
562 | const packet = {
|
563 | cmd: 'unsubscribe',
|
564 | messageId,
|
565 | unsubscriptions: [],
|
566 | };
|
567 | if (typeof topic === 'string') {
|
568 | packet.unsubscriptions = [topic];
|
569 | }
|
570 | else if (Array.isArray(topic)) {
|
571 | packet.unsubscriptions = topic;
|
572 | }
|
573 | if (this.options.resubscribe) {
|
574 | packet.unsubscriptions.forEach((topic2) => {
|
575 | delete this._resubscribeTopics[topic2];
|
576 | });
|
577 | }
|
578 | if (typeof opts === 'object' && opts.properties) {
|
579 | packet.properties = opts.properties;
|
580 | }
|
581 | this.outgoing[packet.messageId] = {
|
582 | volatile: true,
|
583 | cb: callback,
|
584 | };
|
585 | this.log('unsubscribe: call _sendPacket');
|
586 | this._sendPacket(packet);
|
587 | return true;
|
588 | };
|
589 | if (this._storeProcessing ||
|
590 | this._storeProcessingQueue.length > 0 ||
|
591 | !unsubscribeProc()) {
|
592 | this._storeProcessingQueue.push({
|
593 | invoke: unsubscribeProc,
|
594 | callback,
|
595 | });
|
596 | }
|
597 | return this;
|
598 | }
|
599 | unsubscribeAsync(topic, opts) {
|
600 | return new Promise((resolve, reject) => {
|
601 | this.unsubscribe(topic, opts, (err, packet) => {
|
602 | if (err) {
|
603 | reject(err);
|
604 | }
|
605 | else {
|
606 | resolve(packet);
|
607 | }
|
608 | });
|
609 | });
|
610 | }
|
611 | end(force, opts, cb) {
|
612 | this.log('end :: (%s)', this.options.clientId);
|
613 | if (force == null || typeof force !== 'boolean') {
|
614 | cb = cb || opts;
|
615 | opts = force;
|
616 | force = false;
|
617 | }
|
618 | if (typeof opts !== 'object') {
|
619 | cb = cb || opts;
|
620 | opts = null;
|
621 | }
|
622 | this.log('end :: cb? %s', !!cb);
|
623 | if (!cb || typeof cb !== 'function') {
|
624 | cb = this.noop;
|
625 | }
|
626 | const closeStores = () => {
|
627 | this.log('end :: closeStores: closing incoming and outgoing stores');
|
628 | this.disconnected = true;
|
629 | this.incomingStore.close((e1) => {
|
630 | this.outgoingStore.close((e2) => {
|
631 | this.log('end :: closeStores: emitting end');
|
632 | this.emit('end');
|
633 | if (cb) {
|
634 | const err = e1 || e2;
|
635 | this.log('end :: closeStores: invoking callback with args');
|
636 | cb(err);
|
637 | }
|
638 | });
|
639 | });
|
640 | if (this._deferredReconnect) {
|
641 | this._deferredReconnect();
|
642 | }
|
643 | };
|
644 | const finish = () => {
|
645 | this.log('end :: (%s) :: finish :: calling _cleanUp with force %s', this.options.clientId, force);
|
646 | this._cleanUp(force, () => {
|
647 | this.log('end :: finish :: calling process.nextTick on closeStores');
|
648 | (0, shared_1.nextTick)(closeStores);
|
649 | }, opts);
|
650 | };
|
651 | if (this.disconnecting) {
|
652 | cb();
|
653 | return this;
|
654 | }
|
655 | this._clearReconnect();
|
656 | this.disconnecting = true;
|
657 | if (!force && Object.keys(this.outgoing).length > 0) {
|
658 | this.log('end :: (%s) :: calling finish in 10ms once outgoing is empty', this.options.clientId);
|
659 | this.once('outgoingEmpty', setTimeout.bind(null, finish, 10));
|
660 | }
|
661 | else {
|
662 | this.log('end :: (%s) :: immediately calling finish', this.options.clientId);
|
663 | finish();
|
664 | }
|
665 | return this;
|
666 | }
|
667 | endAsync(force, opts) {
|
668 | return new Promise((resolve, reject) => {
|
669 | this.end(force, opts, (err) => {
|
670 | if (err) {
|
671 | reject(err);
|
672 | }
|
673 | else {
|
674 | resolve();
|
675 | }
|
676 | });
|
677 | });
|
678 | }
|
679 | removeOutgoingMessage(messageId) {
|
680 | if (this.outgoing[messageId]) {
|
681 | const { cb } = this.outgoing[messageId];
|
682 | this._removeOutgoingAndStoreMessage(messageId, () => {
|
683 | cb(new Error('Message removed'));
|
684 | });
|
685 | }
|
686 | return this;
|
687 | }
|
688 | reconnect(opts) {
|
689 | this.log('client reconnect');
|
690 | const f = () => {
|
691 | if (opts) {
|
692 | this.options.incomingStore = opts.incomingStore;
|
693 | this.options.outgoingStore = opts.outgoingStore;
|
694 | }
|
695 | else {
|
696 | this.options.incomingStore = null;
|
697 | this.options.outgoingStore = null;
|
698 | }
|
699 | this.incomingStore = this.options.incomingStore || new store_1.default();
|
700 | this.outgoingStore = this.options.outgoingStore || new store_1.default();
|
701 | this.disconnecting = false;
|
702 | this.disconnected = false;
|
703 | this._deferredReconnect = null;
|
704 | this._reconnect();
|
705 | };
|
706 | if (this.disconnecting && !this.disconnected) {
|
707 | this._deferredReconnect = f;
|
708 | }
|
709 | else {
|
710 | f();
|
711 | }
|
712 | return this;
|
713 | }
|
714 | _flushVolatile() {
|
715 | if (this.outgoing) {
|
716 | this.log('_flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function');
|
717 | Object.keys(this.outgoing).forEach((messageId) => {
|
718 | if (this.outgoing[messageId].volatile &&
|
719 | typeof this.outgoing[messageId].cb === 'function') {
|
720 | this.outgoing[messageId].cb(new Error('Connection closed'));
|
721 | delete this.outgoing[messageId];
|
722 | }
|
723 | });
|
724 | }
|
725 | }
|
726 | _flush() {
|
727 | if (this.outgoing) {
|
728 | this.log('_flush: queue exists? %b', !!this.outgoing);
|
729 | Object.keys(this.outgoing).forEach((messageId) => {
|
730 | if (typeof this.outgoing[messageId].cb === 'function') {
|
731 | this.outgoing[messageId].cb(new Error('Connection closed'));
|
732 | delete this.outgoing[messageId];
|
733 | }
|
734 | });
|
735 | }
|
736 | }
|
737 | _removeTopicAliasAndRecoverTopicName(packet) {
|
738 | let alias;
|
739 | if (packet.properties) {
|
740 | alias = packet.properties.topicAlias;
|
741 | }
|
742 | let topic = packet.topic.toString();
|
743 | this.log('_removeTopicAliasAndRecoverTopicName :: alias %d, topic %o', alias, topic);
|
744 | if (topic.length === 0) {
|
745 | if (typeof alias === 'undefined') {
|
746 | return new Error('Unregistered Topic Alias');
|
747 | }
|
748 | topic = this.topicAliasSend.getTopicByAlias(alias);
|
749 | if (typeof topic === 'undefined') {
|
750 | return new Error('Unregistered Topic Alias');
|
751 | }
|
752 | packet.topic = topic;
|
753 | }
|
754 | if (alias) {
|
755 | delete packet.properties.topicAlias;
|
756 | }
|
757 | }
|
758 | _checkDisconnecting(callback) {
|
759 | if (this.disconnecting) {
|
760 | if (callback && callback !== this.noop) {
|
761 | callback(new Error('client disconnecting'));
|
762 | }
|
763 | else {
|
764 | this.emit('error', new Error('client disconnecting'));
|
765 | }
|
766 | }
|
767 | return this.disconnecting;
|
768 | }
|
769 | _reconnect() {
|
770 | this.log('_reconnect: emitting reconnect to client');
|
771 | this.emit('reconnect');
|
772 | if (this.connected) {
|
773 | this.end(() => {
|
774 | this.connect();
|
775 | });
|
776 | this.log('client already connected. disconnecting first.');
|
777 | }
|
778 | else {
|
779 | this.log('_reconnect: calling connect');
|
780 | this.connect();
|
781 | }
|
782 | }
|
783 | _setupReconnect() {
|
784 | if (!this.disconnecting &&
|
785 | !this.reconnectTimer &&
|
786 | this.options.reconnectPeriod > 0) {
|
787 | if (!this.reconnecting) {
|
788 | this.log('_setupReconnect :: emit `offline` state');
|
789 | this.emit('offline');
|
790 | this.log('_setupReconnect :: set `reconnecting` to `true`');
|
791 | this.reconnecting = true;
|
792 | }
|
793 | this.log('_setupReconnect :: setting reconnectTimer for %d ms', this.options.reconnectPeriod);
|
794 | this.reconnectTimer = setInterval(() => {
|
795 | this.log('reconnectTimer :: reconnect triggered!');
|
796 | this._reconnect();
|
797 | }, this.options.reconnectPeriod);
|
798 | }
|
799 | else {
|
800 | this.log('_setupReconnect :: doing nothing...');
|
801 | }
|
802 | }
|
803 | _clearReconnect() {
|
804 | this.log('_clearReconnect : clearing reconnect timer');
|
805 | if (this.reconnectTimer) {
|
806 | clearInterval(this.reconnectTimer);
|
807 | this.reconnectTimer = null;
|
808 | }
|
809 | }
|
810 | _cleanUp(forced, done, opts = {}) {
|
811 | if (done) {
|
812 | this.log('_cleanUp :: done callback provided for on stream close');
|
813 | this.stream.on('close', done);
|
814 | }
|
815 | this.log('_cleanUp :: forced? %s', forced);
|
816 | if (forced) {
|
817 | if (this.options.reconnectPeriod === 0 && this.options.clean) {
|
818 | this._flush();
|
819 | }
|
820 | this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId);
|
821 | this.stream.destroy();
|
822 | }
|
823 | else {
|
824 | const packet = Object.assign({ cmd: 'disconnect' }, opts);
|
825 | this.log('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId);
|
826 | this._sendPacket(packet, () => {
|
827 | this.log('_cleanUp :: (%s) :: destroying stream', this.options.clientId);
|
828 | setImmediate(() => {
|
829 | this.stream.end(() => {
|
830 | this.log('_cleanUp :: (%s) :: stream destroyed', this.options.clientId);
|
831 | });
|
832 | });
|
833 | });
|
834 | }
|
835 | if (!this.disconnecting && !this.reconnecting) {
|
836 | this.log('_cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.');
|
837 | this._clearReconnect();
|
838 | this._setupReconnect();
|
839 | }
|
840 | this._destroyKeepaliveManager();
|
841 | if (done && !this.connected) {
|
842 | this.log('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId);
|
843 | this.stream.removeListener('close', done);
|
844 | done();
|
845 | }
|
846 | }
|
847 | _storeAndSend(packet, cb, cbStorePut) {
|
848 | this.log('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd);
|
849 | let storePacket = packet;
|
850 | let err;
|
851 | if (storePacket.cmd === 'publish') {
|
852 | storePacket = (0, default_1.default)(packet);
|
853 | err = this._removeTopicAliasAndRecoverTopicName(storePacket);
|
854 | if (err) {
|
855 | return cb && cb(err);
|
856 | }
|
857 | }
|
858 | this.outgoingStore.put(storePacket, (err2) => {
|
859 | if (err2) {
|
860 | return cb && cb(err2);
|
861 | }
|
862 | cbStorePut();
|
863 | this._writePacket(packet, cb);
|
864 | });
|
865 | }
|
866 | _applyTopicAlias(packet) {
|
867 | if (this.options.protocolVersion === 5) {
|
868 | if (packet.cmd === 'publish') {
|
869 | let alias;
|
870 | if (packet.properties) {
|
871 | alias = packet.properties.topicAlias;
|
872 | }
|
873 | const topic = packet.topic.toString();
|
874 | if (this.topicAliasSend) {
|
875 | if (alias) {
|
876 | if (topic.length !== 0) {
|
877 | this.log('applyTopicAlias :: register topic: %s - alias: %d', topic, alias);
|
878 | if (!this.topicAliasSend.put(topic, alias)) {
|
879 | this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias);
|
880 | return new Error('Sending Topic Alias out of range');
|
881 | }
|
882 | }
|
883 | }
|
884 | else if (topic.length !== 0) {
|
885 | if (this.options.autoAssignTopicAlias) {
|
886 | alias = this.topicAliasSend.getAliasByTopic(topic);
|
887 | if (alias) {
|
888 | packet.topic = '';
|
889 | packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
|
890 | this.log('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias);
|
891 | }
|
892 | else {
|
893 | alias = this.topicAliasSend.getLruAlias();
|
894 | this.topicAliasSend.put(topic, alias);
|
895 | packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
|
896 | this.log('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias);
|
897 | }
|
898 | }
|
899 | else if (this.options.autoUseTopicAlias) {
|
900 | alias = this.topicAliasSend.getAliasByTopic(topic);
|
901 | if (alias) {
|
902 | packet.topic = '';
|
903 | packet.properties = Object.assign(Object.assign({}, packet.properties), { topicAlias: alias });
|
904 | this.log('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias);
|
905 | }
|
906 | }
|
907 | }
|
908 | }
|
909 | else if (alias) {
|
910 | this.log('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias);
|
911 | return new Error('Sending Topic Alias out of range');
|
912 | }
|
913 | }
|
914 | }
|
915 | }
|
916 | _noop(err) {
|
917 | this.log('noop ::', err);
|
918 | }
|
919 | _writePacket(packet, cb) {
|
920 | this.log('_writePacket :: packet: %O', packet);
|
921 | this.log('_writePacket :: emitting `packetsend`');
|
922 | this.emit('packetsend', packet);
|
923 | this.log('_writePacket :: writing to stream');
|
924 | const result = mqtt_packet_1.default.writeToStream(packet, this.stream, this.options);
|
925 | this.log('_writePacket :: writeToStream result %s', result);
|
926 | if (!result && cb && cb !== this.noop) {
|
927 | this.log('_writePacket :: handle events on `drain` once through callback.');
|
928 | this.stream.once('drain', cb);
|
929 | }
|
930 | else if (cb) {
|
931 | this.log('_writePacket :: invoking cb');
|
932 | cb();
|
933 | }
|
934 | }
|
935 | _sendPacket(packet, cb, cbStorePut, noStore) {
|
936 | this.log('_sendPacket :: (%s) :: start', this.options.clientId);
|
937 | cbStorePut = cbStorePut || this.noop;
|
938 | cb = cb || this.noop;
|
939 | const err = this._applyTopicAlias(packet);
|
940 | if (err) {
|
941 | cb(err);
|
942 | return;
|
943 | }
|
944 | if (!this.connected) {
|
945 | if (packet.cmd === 'auth') {
|
946 | this._writePacket(packet, cb);
|
947 | return;
|
948 | }
|
949 | this.log('_sendPacket :: client not connected. Storing packet offline.');
|
950 | this._storePacket(packet, cb, cbStorePut);
|
951 | return;
|
952 | }
|
953 | if (noStore) {
|
954 | this._writePacket(packet, cb);
|
955 | return;
|
956 | }
|
957 | switch (packet.cmd) {
|
958 | case 'publish':
|
959 | break;
|
960 | case 'pubrel':
|
961 | this._storeAndSend(packet, cb, cbStorePut);
|
962 | return;
|
963 | default:
|
964 | this._writePacket(packet, cb);
|
965 | return;
|
966 | }
|
967 | switch (packet.qos) {
|
968 | case 2:
|
969 | case 1:
|
970 | this._storeAndSend(packet, cb, cbStorePut);
|
971 | break;
|
972 | case 0:
|
973 | default:
|
974 | this._writePacket(packet, cb);
|
975 | break;
|
976 | }
|
977 | this.log('_sendPacket :: (%s) :: end', this.options.clientId);
|
978 | }
|
979 | _storePacket(packet, cb, cbStorePut) {
|
980 | this.log('_storePacket :: packet: %o', packet);
|
981 | this.log('_storePacket :: cb? %s', !!cb);
|
982 | cbStorePut = cbStorePut || this.noop;
|
983 | let storePacket = packet;
|
984 | if (storePacket.cmd === 'publish') {
|
985 | storePacket = (0, default_1.default)(packet);
|
986 | const err = this._removeTopicAliasAndRecoverTopicName(storePacket);
|
987 | if (err) {
|
988 | return cb && cb(err);
|
989 | }
|
990 | }
|
991 | const qos = storePacket.qos || 0;
|
992 | if ((qos === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
|
993 | this.queue.push({ packet: storePacket, cb });
|
994 | }
|
995 | else if (qos > 0) {
|
996 | cb = this.outgoing[storePacket.messageId]
|
997 | ? this.outgoing[storePacket.messageId].cb
|
998 | : null;
|
999 | this.outgoingStore.put(storePacket, (err) => {
|
1000 | if (err) {
|
1001 | return cb && cb(err);
|
1002 | }
|
1003 | cbStorePut();
|
1004 | });
|
1005 | }
|
1006 | else if (cb) {
|
1007 | cb(new Error('No connection to broker'));
|
1008 | }
|
1009 | }
|
1010 | _setupKeepaliveManager() {
|
1011 | this.log('_setupKeepaliveManager :: keepalive %d (seconds)', this.options.keepalive);
|
1012 | if (!this.keepaliveManager && this.options.keepalive) {
|
1013 | this.keepaliveManager = new KeepaliveManager_1.default(this, this.options.timerVariant);
|
1014 | }
|
1015 | }
|
1016 | _destroyKeepaliveManager() {
|
1017 | if (this.keepaliveManager) {
|
1018 | this.log('_destroyKeepaliveManager :: destroying keepalive manager');
|
1019 | this.keepaliveManager.destroy();
|
1020 | this.keepaliveManager = null;
|
1021 | }
|
1022 | }
|
1023 | reschedulePing() {
|
1024 | if (this.keepaliveManager &&
|
1025 | this.options.keepalive &&
|
1026 | this.options.reschedulePings) {
|
1027 | this._reschedulePing();
|
1028 | }
|
1029 | }
|
1030 | _reschedulePing() {
|
1031 | this.log('_reschedulePing :: rescheduling ping');
|
1032 | this.keepaliveManager.reschedule();
|
1033 | }
|
1034 | sendPing() {
|
1035 | this.log('_sendPing :: sending pingreq');
|
1036 | this._sendPacket({ cmd: 'pingreq' });
|
1037 | }
|
1038 | onKeepaliveTimeout() {
|
1039 | this.emit('error', new Error('Keepalive timeout'));
|
1040 | this.log('onKeepaliveTimeout :: calling _cleanUp with force true');
|
1041 | this._cleanUp(true);
|
1042 | }
|
1043 | _resubscribe() {
|
1044 | this.log('_resubscribe');
|
1045 | const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics);
|
1046 | if (!this._firstConnection &&
|
1047 | (this.options.clean ||
|
1048 | (this.options.protocolVersion >= 4 &&
|
1049 | !this.connackPacket.sessionPresent)) &&
|
1050 | _resubscribeTopicsKeys.length > 0) {
|
1051 | if (this.options.resubscribe) {
|
1052 | if (this.options.protocolVersion === 5) {
|
1053 | this.log('_resubscribe: protocolVersion 5');
|
1054 | for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
|
1055 | const resubscribeTopic = {};
|
1056 | resubscribeTopic[_resubscribeTopicsKeys[topicI]] =
|
1057 | this._resubscribeTopics[_resubscribeTopicsKeys[topicI]];
|
1058 | resubscribeTopic.resubscribe = true;
|
1059 | this.subscribe(resubscribeTopic, {
|
1060 | properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]]
|
1061 | .properties,
|
1062 | });
|
1063 | }
|
1064 | }
|
1065 | else {
|
1066 | this._resubscribeTopics.resubscribe = true;
|
1067 | this.subscribe(this._resubscribeTopics);
|
1068 | }
|
1069 | }
|
1070 | else {
|
1071 | this._resubscribeTopics = {};
|
1072 | }
|
1073 | }
|
1074 | this._firstConnection = false;
|
1075 | }
|
1076 | _onConnect(packet) {
|
1077 | if (this.disconnected) {
|
1078 | this.emit('connect', packet);
|
1079 | return;
|
1080 | }
|
1081 | this.connackPacket = packet;
|
1082 | this.messageIdProvider.clear();
|
1083 | this._setupKeepaliveManager();
|
1084 | this.connected = true;
|
1085 | const startStreamProcess = () => {
|
1086 | let outStore = this.outgoingStore.createStream();
|
1087 | const remove = () => {
|
1088 | outStore.destroy();
|
1089 | outStore = null;
|
1090 | this._flushStoreProcessingQueue();
|
1091 | clearStoreProcessing();
|
1092 | };
|
1093 | const clearStoreProcessing = () => {
|
1094 | this._storeProcessing = false;
|
1095 | this._packetIdsDuringStoreProcessing = {};
|
1096 | };
|
1097 | this.once('close', remove);
|
1098 | outStore.on('error', (err) => {
|
1099 | clearStoreProcessing();
|
1100 | this._flushStoreProcessingQueue();
|
1101 | this.removeListener('close', remove);
|
1102 | this.emit('error', err);
|
1103 | });
|
1104 | const storeDeliver = () => {
|
1105 | if (!outStore) {
|
1106 | return;
|
1107 | }
|
1108 | const packet2 = outStore.read(1);
|
1109 | let cb;
|
1110 | if (!packet2) {
|
1111 | outStore.once('readable', storeDeliver);
|
1112 | return;
|
1113 | }
|
1114 | this._storeProcessing = true;
|
1115 | if (this._packetIdsDuringStoreProcessing[packet2.messageId]) {
|
1116 | storeDeliver();
|
1117 | return;
|
1118 | }
|
1119 | if (!this.disconnecting && !this.reconnectTimer) {
|
1120 | cb = this.outgoing[packet2.messageId]
|
1121 | ? this.outgoing[packet2.messageId].cb
|
1122 | : null;
|
1123 | this.outgoing[packet2.messageId] = {
|
1124 | volatile: false,
|
1125 | cb(err, status) {
|
1126 | if (cb) {
|
1127 | cb(err, status);
|
1128 | }
|
1129 | storeDeliver();
|
1130 | },
|
1131 | };
|
1132 | this._packetIdsDuringStoreProcessing[packet2.messageId] =
|
1133 | true;
|
1134 | if (this.messageIdProvider.register(packet2.messageId)) {
|
1135 | this._sendPacket(packet2, undefined, undefined, true);
|
1136 | }
|
1137 | else {
|
1138 | this.log('messageId: %d has already used.', packet2.messageId);
|
1139 | }
|
1140 | }
|
1141 | else if (outStore.destroy) {
|
1142 | outStore.destroy();
|
1143 | }
|
1144 | };
|
1145 | outStore.on('end', () => {
|
1146 | let allProcessed = true;
|
1147 | for (const id in this._packetIdsDuringStoreProcessing) {
|
1148 | if (!this._packetIdsDuringStoreProcessing[id]) {
|
1149 | allProcessed = false;
|
1150 | break;
|
1151 | }
|
1152 | }
|
1153 | this.removeListener('close', remove);
|
1154 | if (allProcessed) {
|
1155 | clearStoreProcessing();
|
1156 | this._invokeAllStoreProcessingQueue();
|
1157 | this.emit('connect', packet);
|
1158 | }
|
1159 | else {
|
1160 | startStreamProcess();
|
1161 | }
|
1162 | });
|
1163 | storeDeliver();
|
1164 | };
|
1165 | startStreamProcess();
|
1166 | }
|
1167 | _invokeStoreProcessingQueue() {
|
1168 | if (!this._storeProcessing && this._storeProcessingQueue.length > 0) {
|
1169 | const f = this._storeProcessingQueue[0];
|
1170 | if (f && f.invoke()) {
|
1171 | this._storeProcessingQueue.shift();
|
1172 | return true;
|
1173 | }
|
1174 | }
|
1175 | return false;
|
1176 | }
|
1177 | _invokeAllStoreProcessingQueue() {
|
1178 | while (this._invokeStoreProcessingQueue()) {
|
1179 | }
|
1180 | }
|
1181 | _flushStoreProcessingQueue() {
|
1182 | for (const f of this._storeProcessingQueue) {
|
1183 | if (f.cbStorePut)
|
1184 | f.cbStorePut(new Error('Connection closed'));
|
1185 | if (f.callback)
|
1186 | f.callback(new Error('Connection closed'));
|
1187 | }
|
1188 | this._storeProcessingQueue.splice(0);
|
1189 | }
|
1190 | _removeOutgoingAndStoreMessage(messageId, cb) {
|
1191 | delete this.outgoing[messageId];
|
1192 | this.outgoingStore.del({ messageId }, (err, packet) => {
|
1193 | cb(err, packet);
|
1194 | this.messageIdProvider.deallocate(messageId);
|
1195 | this._invokeStoreProcessingQueue();
|
1196 | });
|
1197 | }
|
1198 | }
|
1199 | MqttClient.VERSION = shared_1.MQTTJS_VERSION;
|
1200 | exports.default = MqttClient;
|
1201 |
|
\ | No newline at end of file |