UNPKG

20.3 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3var tslib_1 = require("tslib");
4// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5// SPDX-License-Identifier: Apache-2.0
6var Paho = tslib_1.__importStar(require("../vendor/paho-mqtt"));
7var uuid_1 = require("uuid");
8var zen_observable_ts_1 = tslib_1.__importDefault(require("zen-observable-ts"));
9var PubSubProvider_1 = require("./PubSubProvider");
10var PubSub_1 = require("../types/PubSub");
11var core_1 = require("@aws-amplify/core");
12var ConnectionStateMonitor_1 = require("../utils/ConnectionStateMonitor");
13var ReconnectionMonitor_1 = require("../utils/ReconnectionMonitor");
14var constants_1 = require("./constants");
15var logger = new core_1.ConsoleLogger('MqttOverWSProvider');
16function mqttTopicMatch(filter, topic) {
17 var filterArray = filter.split('/');
18 var length = filterArray.length;
19 var topicArray = topic.split('/');
20 for (var i = 0; i < length; ++i) {
21 var left = filterArray[i];
22 var right = topicArray[i];
23 if (left === '#')
24 return topicArray.length >= length;
25 if (left !== '+' && left !== right)
26 return false;
27 }
28 return length === topicArray.length;
29}
30exports.mqttTopicMatch = mqttTopicMatch;
31var ClientsQueue = /** @class */ (function () {
32 function ClientsQueue() {
33 this.promises = new Map();
34 }
35 ClientsQueue.prototype.get = function (clientId, clientFactory) {
36 return tslib_1.__awaiter(this, void 0, void 0, function () {
37 var cachedPromise, newPromise;
38 var _this = this;
39 return tslib_1.__generator(this, function (_a) {
40 cachedPromise = this.promises.get(clientId);
41 if (cachedPromise)
42 return [2 /*return*/, cachedPromise];
43 if (clientFactory) {
44 newPromise = clientFactory(clientId);
45 this.promises.set(clientId, newPromise);
46 newPromise.catch(function () { return _this.promises.delete(clientId); });
47 return [2 /*return*/, newPromise];
48 }
49 return [2 /*return*/, undefined];
50 });
51 });
52 };
53 Object.defineProperty(ClientsQueue.prototype, "allClients", {
54 get: function () {
55 return Array.from(this.promises.keys());
56 },
57 enumerable: true,
58 configurable: true
59 });
60 ClientsQueue.prototype.remove = function (clientId) {
61 this.promises.delete(clientId);
62 };
63 return ClientsQueue;
64}());
65var dispatchPubSubEvent = function (event, data, message) {
66 core_1.Hub.dispatch('pubsub', { event: event, data: data, message: message }, 'PubSub', constants_1.AMPLIFY_SYMBOL);
67};
68var topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic';
69var MqttOverWSProvider = /** @class */ (function (_super) {
70 tslib_1.__extends(MqttOverWSProvider, _super);
71 function MqttOverWSProvider(options) {
72 if (options === void 0) { options = {}; }
73 var _this = _super.call(this, tslib_1.__assign(tslib_1.__assign({}, options), { clientId: options.clientId || uuid_1.v4() })) || this;
74 _this._clientsQueue = new ClientsQueue();
75 _this.connectionStateMonitor = new ConnectionStateMonitor_1.ConnectionStateMonitor();
76 _this.reconnectionMonitor = new ReconnectionMonitor_1.ReconnectionMonitor();
77 _this._topicObservers = new Map();
78 _this._clientIdObservers = new Map();
79 // Monitor the connection health state and pass changes along to Hub
80 _this.connectionStateMonitor.connectionStateObservable.subscribe(function (connectionStateChange) {
81 dispatchPubSubEvent(constants_1.CONNECTION_STATE_CHANGE, {
82 provider: _this,
83 connectionState: connectionStateChange,
84 }, "Connection state is " + connectionStateChange);
85 _this.connectionState = connectionStateChange;
86 // Trigger reconnection when the connection is disrupted
87 if (connectionStateChange === PubSub_1.ConnectionState.ConnectionDisrupted) {
88 _this.reconnectionMonitor.record(ReconnectionMonitor_1.ReconnectEvent.START_RECONNECT);
89 }
90 else if (connectionStateChange !== PubSub_1.ConnectionState.Connecting) {
91 // Trigger connected to halt reconnection attempts
92 _this.reconnectionMonitor.record(ReconnectionMonitor_1.ReconnectEvent.HALT_RECONNECT);
93 }
94 });
95 return _this;
96 }
97 Object.defineProperty(MqttOverWSProvider.prototype, "clientId", {
98 get: function () {
99 return this.options.clientId;
100 },
101 enumerable: true,
102 configurable: true
103 });
104 Object.defineProperty(MqttOverWSProvider.prototype, "endpoint", {
105 get: function () {
106 return Promise.resolve(this.options.aws_pubsub_endpoint);
107 },
108 enumerable: true,
109 configurable: true
110 });
111 Object.defineProperty(MqttOverWSProvider.prototype, "clientsQueue", {
112 get: function () {
113 return this._clientsQueue;
114 },
115 enumerable: true,
116 configurable: true
117 });
118 Object.defineProperty(MqttOverWSProvider.prototype, "isSSLEnabled", {
119 get: function () {
120 return !this.options['aws_appsync_dangerously_connect_to_http_endpoint_for_testing'];
121 },
122 enumerable: true,
123 configurable: true
124 });
125 MqttOverWSProvider.prototype.getProviderName = function () {
126 return 'MqttOverWSProvider';
127 };
128 MqttOverWSProvider.prototype.onDisconnect = function (_a) {
129 var clientId = _a.clientId, errorCode = _a.errorCode, args = tslib_1.__rest(_a, ["clientId", "errorCode"]);
130 if (errorCode !== 0) {
131 logger.warn(clientId, JSON.stringify(tslib_1.__assign({ errorCode: errorCode }, args), null, 2));
132 if (!clientId) {
133 return;
134 }
135 var clientIdObservers = this._clientIdObservers.get(clientId);
136 if (!clientIdObservers) {
137 return;
138 }
139 this.disconnect(clientId);
140 }
141 };
142 MqttOverWSProvider.prototype.newClient = function (_a) {
143 var url = _a.url, clientId = _a.clientId;
144 return tslib_1.__awaiter(this, void 0, void 0, function () {
145 var client, connected;
146 var _this = this;
147 return tslib_1.__generator(this, function (_b) {
148 switch (_b.label) {
149 case 0:
150 logger.debug('Creating new MQTT client', clientId);
151 this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.OPENING_CONNECTION);
152 client = new Paho.Client(url, clientId);
153 client.onMessageArrived = function (_a) {
154 var topic = _a.destinationName, msg = _a.payloadString;
155 _this._onMessage(topic, msg);
156 };
157 client.onConnectionLost = function (_a) {
158 var errorCode = _a.errorCode, args = tslib_1.__rest(_a, ["errorCode"]);
159 _this.onDisconnect(tslib_1.__assign({ clientId: clientId, errorCode: errorCode }, args));
160 _this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
161 };
162 return [4 /*yield*/, new Promise(function (resolve, reject) {
163 client.connect({
164 useSSL: _this.isSSLEnabled,
165 mqttVersion: 3,
166 onSuccess: function () { return resolve(true); },
167 onFailure: function () {
168 if (clientId)
169 _this._clientsQueue.remove(clientId);
170 _this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
171 resolve(false);
172 },
173 });
174 })];
175 case 1:
176 connected = _b.sent();
177 if (connected) {
178 this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CONNECTION_ESTABLISHED);
179 }
180 return [2 /*return*/, client];
181 }
182 });
183 });
184 };
185 MqttOverWSProvider.prototype.connect = function (clientId, options) {
186 if (options === void 0) { options = {}; }
187 return tslib_1.__awaiter(this, void 0, void 0, function () {
188 var _this = this;
189 return tslib_1.__generator(this, function (_a) {
190 switch (_a.label) {
191 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId, function (clientId) { return tslib_1.__awaiter(_this, void 0, void 0, function () {
192 var client;
193 return tslib_1.__generator(this, function (_a) {
194 switch (_a.label) {
195 case 0: return [4 /*yield*/, this.newClient(tslib_1.__assign(tslib_1.__assign({}, options), { clientId: clientId }))];
196 case 1:
197 client = _a.sent();
198 if (client) {
199 // Once connected, subscribe to all topics registered observers
200 this._topicObservers.forEach(function (_value, key) {
201 client.subscribe(key);
202 });
203 }
204 return [2 /*return*/, client];
205 }
206 });
207 }); })];
208 case 1: return [2 /*return*/, _a.sent()];
209 }
210 });
211 });
212 };
213 MqttOverWSProvider.prototype.disconnect = function (clientId) {
214 return tslib_1.__awaiter(this, void 0, void 0, function () {
215 var client;
216 return tslib_1.__generator(this, function (_a) {
217 switch (_a.label) {
218 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId)];
219 case 1:
220 client = _a.sent();
221 if (client && client.isConnected()) {
222 client.disconnect();
223 }
224 this.clientsQueue.remove(clientId);
225 this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
226 return [2 /*return*/];
227 }
228 });
229 });
230 };
231 MqttOverWSProvider.prototype.publish = function (topics, msg) {
232 return tslib_1.__awaiter(this, void 0, void 0, function () {
233 var targetTopics, message, client;
234 return tslib_1.__generator(this, function (_a) {
235 switch (_a.label) {
236 case 0:
237 targetTopics = [].concat(topics);
238 message = JSON.stringify(msg);
239 return [4 /*yield*/, this.clientsQueue.get(this.clientId)];
240 case 1:
241 client = _a.sent();
242 if (client) {
243 logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
244 targetTopics.forEach(function (topic) { return client.send(topic, message); });
245 }
246 else {
247 logger.debug('Publishing to topic(s) failed', targetTopics.join(','), message);
248 }
249 return [2 /*return*/];
250 }
251 });
252 });
253 };
254 MqttOverWSProvider.prototype._onMessage = function (topic, msg) {
255 try {
256 var matchedTopicObservers_1 = [];
257 this._topicObservers.forEach(function (observerForTopic, observerTopic) {
258 if (mqttTopicMatch(observerTopic, topic)) {
259 matchedTopicObservers_1.push(observerForTopic);
260 }
261 });
262 var parsedMessage_1 = JSON.parse(msg);
263 if (typeof parsedMessage_1 === 'object') {
264 // @ts-ignore
265 parsedMessage_1[topicSymbol] = topic;
266 }
267 matchedTopicObservers_1.forEach(function (observersForTopic) {
268 observersForTopic.forEach(function (observer) { return observer.next(parsedMessage_1); });
269 });
270 }
271 catch (error) {
272 logger.warn('Error handling message', error, msg);
273 }
274 };
275 MqttOverWSProvider.prototype.subscribe = function (topics, options) {
276 var _this = this;
277 if (options === void 0) { options = {}; }
278 var targetTopics = [].concat(topics);
279 logger.debug('Subscribing to topic(s)', targetTopics.join(','));
280 var reconnectSubscription;
281 return new zen_observable_ts_1.default(function (observer) {
282 targetTopics.forEach(function (topic) {
283 // this._topicObservers is used to notify the observers according to the topic received on the message
284 var observersForTopic = _this._topicObservers.get(topic);
285 if (!observersForTopic) {
286 observersForTopic = new Set();
287 _this._topicObservers.set(topic, observersForTopic);
288 }
289 observersForTopic.add(observer);
290 });
291 var _a = options.clientId, clientId = _a === void 0 ? _this.clientId : _a;
292 // this._clientIdObservers is used to close observers when client gets disconnected
293 var observersForClientId = _this._clientIdObservers.get(clientId);
294 if (!observersForClientId) {
295 observersForClientId = new Set();
296 }
297 if (observersForClientId) {
298 observersForClientId.add(observer);
299 _this._clientIdObservers.set(clientId, observersForClientId);
300 }
301 (function () { return tslib_1.__awaiter(_this, void 0, void 0, function () {
302 var getClient;
303 var _this = this;
304 return tslib_1.__generator(this, function (_a) {
305 switch (_a.label) {
306 case 0:
307 getClient = function () { return tslib_1.__awaiter(_this, void 0, void 0, function () {
308 var _a, url, _b, client_1, e_1;
309 return tslib_1.__generator(this, function (_c) {
310 switch (_c.label) {
311 case 0:
312 _c.trys.push([0, 5, , 6]);
313 _a = options.url;
314 if (!(_a === void 0)) return [3 /*break*/, 2];
315 return [4 /*yield*/, this.endpoint];
316 case 1:
317 _b = _c.sent();
318 return [3 /*break*/, 3];
319 case 2:
320 _b = _a;
321 _c.label = 3;
322 case 3:
323 url = _b;
324 return [4 /*yield*/, this.connect(clientId, { url: url })];
325 case 4:
326 client_1 = _c.sent();
327 if (client_1 !== undefined) {
328 targetTopics.forEach(function (topic) {
329 client_1.subscribe(topic);
330 });
331 }
332 return [3 /*break*/, 6];
333 case 5:
334 e_1 = _c.sent();
335 logger.debug('Error forming connection', e_1);
336 return [3 /*break*/, 6];
337 case 6: return [2 /*return*/];
338 }
339 });
340 }); };
341 // Establish the initial connection
342 return [4 /*yield*/, getClient()];
343 case 1:
344 // Establish the initial connection
345 _a.sent();
346 // Add an observable to the reconnection list to manage reconnection for this subscription
347 reconnectSubscription = new zen_observable_ts_1.default(function (observer) {
348 _this.reconnectionMonitor.addObserver(observer);
349 }).subscribe(function () {
350 getClient();
351 });
352 return [2 /*return*/];
353 }
354 });
355 }); })();
356 return function () { return tslib_1.__awaiter(_this, void 0, void 0, function () {
357 var client;
358 var _this = this;
359 var _a, _b;
360 return tslib_1.__generator(this, function (_c) {
361 switch (_c.label) {
362 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId)];
363 case 1:
364 client = _c.sent();
365 reconnectSubscription === null || reconnectSubscription === void 0 ? void 0 : reconnectSubscription.unsubscribe();
366 if (client) {
367 (_a = this._clientIdObservers.get(clientId)) === null || _a === void 0 ? void 0 : _a.delete(observer);
368 // No more observers per client => client not needed anymore
369 if (((_b = this._clientIdObservers.get(clientId)) === null || _b === void 0 ? void 0 : _b.size) === 0) {
370 this.disconnect(clientId);
371 this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSING_CONNECTION);
372 this._clientIdObservers.delete(clientId);
373 }
374 targetTopics.forEach(function (topic) {
375 var observersForTopic = _this._topicObservers.get(topic) ||
376 new Set();
377 observersForTopic.delete(observer);
378 // if no observers exists for the topic, topic should be removed
379 if (observersForTopic.size === 0) {
380 _this._topicObservers.delete(topic);
381 if (client.isConnected()) {
382 client.unsubscribe(topic);
383 }
384 }
385 });
386 }
387 return [2 /*return*/, null];
388 }
389 });
390 }); };
391 });
392 };
393 return MqttOverWSProvider;
394}(PubSubProvider_1.AbstractPubSubProvider));
395exports.MqttOverWSProvider = MqttOverWSProvider;
396//# sourceMappingURL=MqttOverWSProvider.js.map
\No newline at end of file