UNPKG

19.7 kBJavaScriptView Raw
1import { __assign, __awaiter, __extends, __generator, __rest } from "tslib";
2// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3// SPDX-License-Identifier: Apache-2.0
4import * as Paho from '../vendor/paho-mqtt';
5import { v4 as uuid } from 'uuid';
6import Observable from 'zen-observable-ts';
7import { AbstractPubSubProvider } from './PubSubProvider';
8import { ConnectionState, } from '../types/PubSub';
9import { ConsoleLogger as Logger, Hub } from '@aws-amplify/core';
10import { ConnectionStateMonitor, CONNECTION_CHANGE, } from '../utils/ConnectionStateMonitor';
11import { ReconnectEvent, ReconnectionMonitor, } from '../utils/ReconnectionMonitor';
12import { AMPLIFY_SYMBOL, CONNECTION_STATE_CHANGE } from './constants';
13var logger = new Logger('MqttOverWSProvider');
14export function mqttTopicMatch(filter, topic) {
15 var filterArray = filter.split('/');
16 var length = filterArray.length;
17 var topicArray = topic.split('/');
18 for (var i = 0; i < length; ++i) {
19 var left = filterArray[i];
20 var right = topicArray[i];
21 if (left === '#')
22 return topicArray.length >= length;
23 if (left !== '+' && left !== right)
24 return false;
25 }
26 return length === topicArray.length;
27}
28var ClientsQueue = /** @class */ (function () {
29 function ClientsQueue() {
30 this.promises = new Map();
31 }
32 ClientsQueue.prototype.get = function (clientId, clientFactory) {
33 return __awaiter(this, void 0, void 0, function () {
34 var cachedPromise, newPromise;
35 var _this = this;
36 return __generator(this, function (_a) {
37 cachedPromise = this.promises.get(clientId);
38 if (cachedPromise)
39 return [2 /*return*/, cachedPromise];
40 if (clientFactory) {
41 newPromise = clientFactory(clientId);
42 this.promises.set(clientId, newPromise);
43 newPromise.catch(function () { return _this.promises.delete(clientId); });
44 return [2 /*return*/, newPromise];
45 }
46 return [2 /*return*/, undefined];
47 });
48 });
49 };
50 Object.defineProperty(ClientsQueue.prototype, "allClients", {
51 get: function () {
52 return Array.from(this.promises.keys());
53 },
54 enumerable: true,
55 configurable: true
56 });
57 ClientsQueue.prototype.remove = function (clientId) {
58 this.promises.delete(clientId);
59 };
60 return ClientsQueue;
61}());
62var dispatchPubSubEvent = function (event, data, message) {
63 Hub.dispatch('pubsub', { event: event, data: data, message: message }, 'PubSub', AMPLIFY_SYMBOL);
64};
65var topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic';
66var MqttOverWSProvider = /** @class */ (function (_super) {
67 __extends(MqttOverWSProvider, _super);
68 function MqttOverWSProvider(options) {
69 if (options === void 0) { options = {}; }
70 var _this = _super.call(this, __assign(__assign({}, options), { clientId: options.clientId || uuid() })) || this;
71 _this._clientsQueue = new ClientsQueue();
72 _this.connectionStateMonitor = new ConnectionStateMonitor();
73 _this.reconnectionMonitor = new ReconnectionMonitor();
74 _this._topicObservers = new Map();
75 _this._clientIdObservers = new Map();
76 // Monitor the connection health state and pass changes along to Hub
77 _this.connectionStateMonitor.connectionStateObservable.subscribe(function (connectionStateChange) {
78 dispatchPubSubEvent(CONNECTION_STATE_CHANGE, {
79 provider: _this,
80 connectionState: connectionStateChange,
81 }, "Connection state is " + connectionStateChange);
82 _this.connectionState = connectionStateChange;
83 // Trigger reconnection when the connection is disrupted
84 if (connectionStateChange === ConnectionState.ConnectionDisrupted) {
85 _this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
86 }
87 else if (connectionStateChange !== ConnectionState.Connecting) {
88 // Trigger connected to halt reconnection attempts
89 _this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
90 }
91 });
92 return _this;
93 }
94 Object.defineProperty(MqttOverWSProvider.prototype, "clientId", {
95 get: function () {
96 return this.options.clientId;
97 },
98 enumerable: true,
99 configurable: true
100 });
101 Object.defineProperty(MqttOverWSProvider.prototype, "endpoint", {
102 get: function () {
103 return Promise.resolve(this.options.aws_pubsub_endpoint);
104 },
105 enumerable: true,
106 configurable: true
107 });
108 Object.defineProperty(MqttOverWSProvider.prototype, "clientsQueue", {
109 get: function () {
110 return this._clientsQueue;
111 },
112 enumerable: true,
113 configurable: true
114 });
115 Object.defineProperty(MqttOverWSProvider.prototype, "isSSLEnabled", {
116 get: function () {
117 return !this.options['aws_appsync_dangerously_connect_to_http_endpoint_for_testing'];
118 },
119 enumerable: true,
120 configurable: true
121 });
122 MqttOverWSProvider.prototype.getProviderName = function () {
123 return 'MqttOverWSProvider';
124 };
125 MqttOverWSProvider.prototype.onDisconnect = function (_a) {
126 var clientId = _a.clientId, errorCode = _a.errorCode, args = __rest(_a, ["clientId", "errorCode"]);
127 if (errorCode !== 0) {
128 logger.warn(clientId, JSON.stringify(__assign({ errorCode: errorCode }, args), null, 2));
129 if (!clientId) {
130 return;
131 }
132 var clientIdObservers = this._clientIdObservers.get(clientId);
133 if (!clientIdObservers) {
134 return;
135 }
136 this.disconnect(clientId);
137 }
138 };
139 MqttOverWSProvider.prototype.newClient = function (_a) {
140 var url = _a.url, clientId = _a.clientId;
141 return __awaiter(this, void 0, void 0, function () {
142 var client, connected;
143 var _this = this;
144 return __generator(this, function (_b) {
145 switch (_b.label) {
146 case 0:
147 logger.debug('Creating new MQTT client', clientId);
148 this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
149 client = new Paho.Client(url, clientId);
150 client.onMessageArrived = function (_a) {
151 var topic = _a.destinationName, msg = _a.payloadString;
152 _this._onMessage(topic, msg);
153 };
154 client.onConnectionLost = function (_a) {
155 var errorCode = _a.errorCode, args = __rest(_a, ["errorCode"]);
156 _this.onDisconnect(__assign({ clientId: clientId, errorCode: errorCode }, args));
157 _this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
158 };
159 return [4 /*yield*/, new Promise(function (resolve, reject) {
160 client.connect({
161 useSSL: _this.isSSLEnabled,
162 mqttVersion: 3,
163 onSuccess: function () { return resolve(true); },
164 onFailure: function () {
165 if (clientId)
166 _this._clientsQueue.remove(clientId);
167 _this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
168 resolve(false);
169 },
170 });
171 })];
172 case 1:
173 connected = _b.sent();
174 if (connected) {
175 this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED);
176 }
177 return [2 /*return*/, client];
178 }
179 });
180 });
181 };
182 MqttOverWSProvider.prototype.connect = function (clientId, options) {
183 if (options === void 0) { options = {}; }
184 return __awaiter(this, void 0, void 0, function () {
185 var _this = this;
186 return __generator(this, function (_a) {
187 switch (_a.label) {
188 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId, function (clientId) { return __awaiter(_this, void 0, void 0, function () {
189 var client;
190 return __generator(this, function (_a) {
191 switch (_a.label) {
192 case 0: return [4 /*yield*/, this.newClient(__assign(__assign({}, options), { clientId: clientId }))];
193 case 1:
194 client = _a.sent();
195 if (client) {
196 // Once connected, subscribe to all topics registered observers
197 this._topicObservers.forEach(function (_value, key) {
198 client.subscribe(key);
199 });
200 }
201 return [2 /*return*/, client];
202 }
203 });
204 }); })];
205 case 1: return [2 /*return*/, _a.sent()];
206 }
207 });
208 });
209 };
210 MqttOverWSProvider.prototype.disconnect = function (clientId) {
211 return __awaiter(this, void 0, void 0, function () {
212 var client;
213 return __generator(this, function (_a) {
214 switch (_a.label) {
215 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId)];
216 case 1:
217 client = _a.sent();
218 if (client && client.isConnected()) {
219 client.disconnect();
220 }
221 this.clientsQueue.remove(clientId);
222 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
223 return [2 /*return*/];
224 }
225 });
226 });
227 };
228 MqttOverWSProvider.prototype.publish = function (topics, msg) {
229 return __awaiter(this, void 0, void 0, function () {
230 var targetTopics, message, client;
231 return __generator(this, function (_a) {
232 switch (_a.label) {
233 case 0:
234 targetTopics = [].concat(topics);
235 message = JSON.stringify(msg);
236 return [4 /*yield*/, this.clientsQueue.get(this.clientId)];
237 case 1:
238 client = _a.sent();
239 if (client) {
240 logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
241 targetTopics.forEach(function (topic) { return client.send(topic, message); });
242 }
243 else {
244 logger.debug('Publishing to topic(s) failed', targetTopics.join(','), message);
245 }
246 return [2 /*return*/];
247 }
248 });
249 });
250 };
251 MqttOverWSProvider.prototype._onMessage = function (topic, msg) {
252 try {
253 var matchedTopicObservers_1 = [];
254 this._topicObservers.forEach(function (observerForTopic, observerTopic) {
255 if (mqttTopicMatch(observerTopic, topic)) {
256 matchedTopicObservers_1.push(observerForTopic);
257 }
258 });
259 var parsedMessage_1 = JSON.parse(msg);
260 if (typeof parsedMessage_1 === 'object') {
261 // @ts-ignore
262 parsedMessage_1[topicSymbol] = topic;
263 }
264 matchedTopicObservers_1.forEach(function (observersForTopic) {
265 observersForTopic.forEach(function (observer) { return observer.next(parsedMessage_1); });
266 });
267 }
268 catch (error) {
269 logger.warn('Error handling message', error, msg);
270 }
271 };
272 MqttOverWSProvider.prototype.subscribe = function (topics, options) {
273 var _this = this;
274 if (options === void 0) { options = {}; }
275 var targetTopics = [].concat(topics);
276 logger.debug('Subscribing to topic(s)', targetTopics.join(','));
277 var reconnectSubscription;
278 return new Observable(function (observer) {
279 targetTopics.forEach(function (topic) {
280 // this._topicObservers is used to notify the observers according to the topic received on the message
281 var observersForTopic = _this._topicObservers.get(topic);
282 if (!observersForTopic) {
283 observersForTopic = new Set();
284 _this._topicObservers.set(topic, observersForTopic);
285 }
286 observersForTopic.add(observer);
287 });
288 var _a = options.clientId, clientId = _a === void 0 ? _this.clientId : _a;
289 // this._clientIdObservers is used to close observers when client gets disconnected
290 var observersForClientId = _this._clientIdObservers.get(clientId);
291 if (!observersForClientId) {
292 observersForClientId = new Set();
293 }
294 if (observersForClientId) {
295 observersForClientId.add(observer);
296 _this._clientIdObservers.set(clientId, observersForClientId);
297 }
298 (function () { return __awaiter(_this, void 0, void 0, function () {
299 var getClient;
300 var _this = this;
301 return __generator(this, function (_a) {
302 switch (_a.label) {
303 case 0:
304 getClient = function () { return __awaiter(_this, void 0, void 0, function () {
305 var _a, url, _b, client_1, e_1;
306 return __generator(this, function (_c) {
307 switch (_c.label) {
308 case 0:
309 _c.trys.push([0, 5, , 6]);
310 _a = options.url;
311 if (!(_a === void 0)) return [3 /*break*/, 2];
312 return [4 /*yield*/, this.endpoint];
313 case 1:
314 _b = _c.sent();
315 return [3 /*break*/, 3];
316 case 2:
317 _b = _a;
318 _c.label = 3;
319 case 3:
320 url = _b;
321 return [4 /*yield*/, this.connect(clientId, { url: url })];
322 case 4:
323 client_1 = _c.sent();
324 if (client_1 !== undefined) {
325 targetTopics.forEach(function (topic) {
326 client_1.subscribe(topic);
327 });
328 }
329 return [3 /*break*/, 6];
330 case 5:
331 e_1 = _c.sent();
332 logger.debug('Error forming connection', e_1);
333 return [3 /*break*/, 6];
334 case 6: return [2 /*return*/];
335 }
336 });
337 }); };
338 // Establish the initial connection
339 return [4 /*yield*/, getClient()];
340 case 1:
341 // Establish the initial connection
342 _a.sent();
343 // Add an observable to the reconnection list to manage reconnection for this subscription
344 reconnectSubscription = new Observable(function (observer) {
345 _this.reconnectionMonitor.addObserver(observer);
346 }).subscribe(function () {
347 getClient();
348 });
349 return [2 /*return*/];
350 }
351 });
352 }); })();
353 return function () { return __awaiter(_this, void 0, void 0, function () {
354 var client;
355 var _this = this;
356 var _a, _b;
357 return __generator(this, function (_c) {
358 switch (_c.label) {
359 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId)];
360 case 1:
361 client = _c.sent();
362 reconnectSubscription === null || reconnectSubscription === void 0 ? void 0 : reconnectSubscription.unsubscribe();
363 if (client) {
364 (_a = this._clientIdObservers.get(clientId)) === null || _a === void 0 ? void 0 : _a.delete(observer);
365 // No more observers per client => client not needed anymore
366 if (((_b = this._clientIdObservers.get(clientId)) === null || _b === void 0 ? void 0 : _b.size) === 0) {
367 this.disconnect(clientId);
368 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION);
369 this._clientIdObservers.delete(clientId);
370 }
371 targetTopics.forEach(function (topic) {
372 var observersForTopic = _this._topicObservers.get(topic) ||
373 new Set();
374 observersForTopic.delete(observer);
375 // if no observers exists for the topic, topic should be removed
376 if (observersForTopic.size === 0) {
377 _this._topicObservers.delete(topic);
378 if (client.isConnected()) {
379 client.unsubscribe(topic);
380 }
381 }
382 });
383 }
384 return [2 /*return*/, null];
385 }
386 });
387 }); };
388 });
389 };
390 return MqttOverWSProvider;
391}(AbstractPubSubProvider));
392export { MqttOverWSProvider };
393//# sourceMappingURL=MqttOverWSProvider.js.map
\No newline at end of file