UNPKG

18.7 kBJavaScriptView Raw
1var __extends = (this && this.__extends) || (function () {
2 var extendStatics = function (d, b) {
3 extendStatics = Object.setPrototypeOf ||
4 ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
5 function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
6 return extendStatics(d, b);
7 };
8 return function (d, b) {
9 extendStatics(d, b);
10 function __() { this.constructor = d; }
11 d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
12 };
13})();
14var __assign = (this && this.__assign) || function () {
15 __assign = Object.assign || function(t) {
16 for (var s, i = 1, n = arguments.length; i < n; i++) {
17 s = arguments[i];
18 for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p))
19 t[p] = s[p];
20 }
21 return t;
22 };
23 return __assign.apply(this, arguments);
24};
25var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
26 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
27 return new (P || (P = Promise))(function (resolve, reject) {
28 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
29 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
30 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
31 step((generator = generator.apply(thisArg, _arguments || [])).next());
32 });
33};
34var __generator = (this && this.__generator) || function (thisArg, body) {
35 var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
36 return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
37 function verb(n) { return function (v) { return step([n, v]); }; }
38 function step(op) {
39 if (f) throw new TypeError("Generator is already executing.");
40 while (_) try {
41 if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
42 if (y = 0, t) op = [op[0] & 2, t.value];
43 switch (op[0]) {
44 case 0: case 1: t = op; break;
45 case 4: _.label++; return { value: op[1], done: false };
46 case 5: _.label++; y = op[1]; op = [0]; continue;
47 case 7: op = _.ops.pop(); _.trys.pop(); continue;
48 default:
49 if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
50 if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
51 if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
52 if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
53 if (t[2]) _.ops.pop();
54 _.trys.pop(); continue;
55 }
56 op = body.call(thisArg, _);
57 } catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
58 if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
59 }
60};
61var __rest = (this && this.__rest) || function (s, e) {
62 var t = {};
63 for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0)
64 t[p] = s[p];
65 if (s != null && typeof Object.getOwnPropertySymbols === "function")
66 for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
67 if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i]))
68 t[p[i]] = s[p[i]];
69 }
70 return t;
71};
72/*
73 * Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
74 *
75 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
76 * the License. A copy of the License is located at
77 *
78 * http://aws.amazon.com/apache2.0/
79 *
80 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
81 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
82 * and limitations under the License.
83 */
84import * as Paho from 'paho-mqtt';
85import { v4 as uuid } from 'uuid';
86import Observable from 'zen-observable-ts';
87import { AbstractPubSubProvider } from './PubSubProvider';
88import { ConsoleLogger as Logger } from '@aws-amplify/core';
89var logger = new Logger('MqttOverWSProvider');
90export function mqttTopicMatch(filter, topic) {
91 var filterArray = filter.split('/');
92 var length = filterArray.length;
93 var topicArray = topic.split('/');
94 for (var i = 0; i < length; ++i) {
95 var left = filterArray[i];
96 var right = topicArray[i];
97 if (left === '#')
98 return topicArray.length >= length;
99 if (left !== '+' && left !== right)
100 return false;
101 }
102 return length === topicArray.length;
103}
104var ClientsQueue = /** @class */ (function () {
105 function ClientsQueue() {
106 this.promises = new Map();
107 }
108 ClientsQueue.prototype.get = function (clientId, clientFactory) {
109 return __awaiter(this, void 0, void 0, function () {
110 var promise;
111 return __generator(this, function (_a) {
112 promise = this.promises.get(clientId);
113 if (promise) {
114 return [2 /*return*/, promise];
115 }
116 promise = clientFactory(clientId);
117 this.promises.set(clientId, promise);
118 return [2 /*return*/, promise];
119 });
120 });
121 };
122 Object.defineProperty(ClientsQueue.prototype, "allClients", {
123 get: function () {
124 return Array.from(this.promises.keys());
125 },
126 enumerable: true,
127 configurable: true
128 });
129 ClientsQueue.prototype.remove = function (clientId) {
130 this.promises.delete(clientId);
131 };
132 return ClientsQueue;
133}());
134var topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic';
135var MqttOverWSProvider = /** @class */ (function (_super) {
136 __extends(MqttOverWSProvider, _super);
137 function MqttOverWSProvider(options) {
138 if (options === void 0) { options = {}; }
139 var _this = _super.call(this, __assign(__assign({}, options), { clientId: options.clientId || uuid() })) || this;
140 _this._clientsQueue = new ClientsQueue();
141 _this._topicObservers = new Map();
142 _this._clientIdObservers = new Map();
143 return _this;
144 }
145 Object.defineProperty(MqttOverWSProvider.prototype, "clientId", {
146 get: function () {
147 return this.options.clientId;
148 },
149 enumerable: true,
150 configurable: true
151 });
152 Object.defineProperty(MqttOverWSProvider.prototype, "endpoint", {
153 get: function () {
154 return this.options.aws_pubsub_endpoint;
155 },
156 enumerable: true,
157 configurable: true
158 });
159 Object.defineProperty(MqttOverWSProvider.prototype, "clientsQueue", {
160 get: function () {
161 return this._clientsQueue;
162 },
163 enumerable: true,
164 configurable: true
165 });
166 Object.defineProperty(MqttOverWSProvider.prototype, "isSSLEnabled", {
167 get: function () {
168 return !this.options
169 .aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
170 },
171 enumerable: true,
172 configurable: true
173 });
174 MqttOverWSProvider.prototype.getTopicForValue = function (value) {
175 return typeof value === 'object' && value[topicSymbol];
176 };
177 MqttOverWSProvider.prototype.getProviderName = function () {
178 return 'MqttOverWSProvider';
179 };
180 MqttOverWSProvider.prototype.onDisconnect = function (_a) {
181 var _this = this;
182 var clientId = _a.clientId, errorCode = _a.errorCode, args = __rest(_a, ["clientId", "errorCode"]);
183 if (errorCode !== 0) {
184 logger.warn(clientId, JSON.stringify(__assign({ errorCode: errorCode }, args), null, 2));
185 var topicsToDelete_1 = [];
186 var clientIdObservers = this._clientIdObservers.get(clientId);
187 if (!clientIdObservers) {
188 return;
189 }
190 clientIdObservers.forEach(function (observer) {
191 observer.error('Disconnected, error code: ' + errorCode);
192 // removing observers for disconnected clientId
193 _this._topicObservers.forEach(function (observerForTopic, observerTopic) {
194 observerForTopic.delete(observer);
195 if (observerForTopic.size === 0) {
196 topicsToDelete_1.push(observerTopic);
197 }
198 });
199 });
200 // forgiving any trace of clientId
201 this._clientIdObservers.delete(clientId);
202 // Removing topics that are not listen by an observer
203 topicsToDelete_1.forEach(function (topic) {
204 _this._topicObservers.delete(topic);
205 });
206 }
207 };
208 MqttOverWSProvider.prototype.newClient = function (_a) {
209 var url = _a.url, clientId = _a.clientId;
210 return __awaiter(this, void 0, void 0, function () {
211 var client;
212 var _this = this;
213 return __generator(this, function (_b) {
214 switch (_b.label) {
215 case 0:
216 logger.debug('Creating new MQTT client', clientId);
217 client = new Paho.Client(url, clientId);
218 // client.trace = (args) => logger.debug(clientId, JSON.stringify(args, null, 2));
219 client.onMessageArrived = function (_a) {
220 var topic = _a.destinationName, msg = _a.payloadString;
221 _this._onMessage(topic, msg);
222 };
223 client.onConnectionLost = function (_a) {
224 var errorCode = _a.errorCode, args = __rest(_a, ["errorCode"]);
225 _this.onDisconnect(__assign({ clientId: clientId, errorCode: errorCode }, args));
226 };
227 return [4 /*yield*/, new Promise(function (resolve, reject) {
228 client.connect({
229 useSSL: _this.isSSLEnabled,
230 mqttVersion: 3,
231 onSuccess: function () { return resolve(client); },
232 onFailure: reject,
233 });
234 })];
235 case 1:
236 _b.sent();
237 return [2 /*return*/, client];
238 }
239 });
240 });
241 };
242 MqttOverWSProvider.prototype.connect = function (clientId, options) {
243 if (options === void 0) { options = {}; }
244 return __awaiter(this, void 0, void 0, function () {
245 var _this = this;
246 return __generator(this, function (_a) {
247 switch (_a.label) {
248 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId, function (clientId) {
249 return _this.newClient(__assign(__assign({}, options), { clientId: clientId }));
250 })];
251 case 1: return [2 /*return*/, _a.sent()];
252 }
253 });
254 });
255 };
256 MqttOverWSProvider.prototype.disconnect = function (clientId) {
257 return __awaiter(this, void 0, void 0, function () {
258 var client;
259 return __generator(this, function (_a) {
260 switch (_a.label) {
261 case 0: return [4 /*yield*/, this.clientsQueue.get(clientId, function () { return null; })];
262 case 1:
263 client = _a.sent();
264 if (client && client.isConnected()) {
265 client.disconnect();
266 }
267 this.clientsQueue.remove(clientId);
268 return [2 /*return*/];
269 }
270 });
271 });
272 };
273 MqttOverWSProvider.prototype.publish = function (topics, msg) {
274 return __awaiter(this, void 0, void 0, function () {
275 var targetTopics, message, url, client;
276 return __generator(this, function (_a) {
277 switch (_a.label) {
278 case 0:
279 targetTopics = [].concat(topics);
280 message = JSON.stringify(msg);
281 return [4 /*yield*/, this.endpoint];
282 case 1:
283 url = _a.sent();
284 return [4 /*yield*/, this.connect(this.clientId, { url: url })];
285 case 2:
286 client = _a.sent();
287 logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
288 targetTopics.forEach(function (topic) { return client.send(topic, message); });
289 return [2 /*return*/];
290 }
291 });
292 });
293 };
294 MqttOverWSProvider.prototype._onMessage = function (topic, msg) {
295 try {
296 var matchedTopicObservers_1 = [];
297 this._topicObservers.forEach(function (observerForTopic, observerTopic) {
298 if (mqttTopicMatch(observerTopic, topic)) {
299 matchedTopicObservers_1.push(observerForTopic);
300 }
301 });
302 var parsedMessage_1 = JSON.parse(msg);
303 if (typeof parsedMessage_1 === 'object') {
304 parsedMessage_1[topicSymbol] = topic;
305 }
306 matchedTopicObservers_1.forEach(function (observersForTopic) {
307 observersForTopic.forEach(function (observer) { return observer.next(parsedMessage_1); });
308 });
309 }
310 catch (error) {
311 logger.warn('Error handling message', error, msg);
312 }
313 };
314 MqttOverWSProvider.prototype.subscribe = function (topics, options) {
315 var _this = this;
316 if (options === void 0) { options = {}; }
317 var targetTopics = [].concat(topics);
318 logger.debug('Subscribing to topic(s)', targetTopics.join(','));
319 return new Observable(function (observer) {
320 targetTopics.forEach(function (topic) {
321 // this._topicObservers is used to notify the observers according to the topic received on the message
322 var observersForTopic = _this._topicObservers.get(topic);
323 if (!observersForTopic) {
324 observersForTopic = new Set();
325 _this._topicObservers.set(topic, observersForTopic);
326 }
327 observersForTopic.add(observer);
328 });
329 // @ts-ignore
330 var client;
331 var _a = options.clientId, clientId = _a === void 0 ? _this.clientId : _a;
332 // this._clientIdObservers is used to close observers when client gets disconnected
333 var observersForClientId = _this._clientIdObservers.get(clientId);
334 if (!observersForClientId) {
335 observersForClientId = new Set();
336 }
337 observersForClientId.add(observer);
338 _this._clientIdObservers.set(clientId, observersForClientId);
339 (function () { return __awaiter(_this, void 0, void 0, function () {
340 var _a, url, _b, e_1;
341 return __generator(this, function (_c) {
342 switch (_c.label) {
343 case 0:
344 _a = options.url;
345 if (!(_a === void 0)) return [3 /*break*/, 2];
346 return [4 /*yield*/, this.endpoint];
347 case 1:
348 _b = _c.sent();
349 return [3 /*break*/, 3];
350 case 2:
351 _b = _a;
352 _c.label = 3;
353 case 3:
354 url = _b;
355 _c.label = 4;
356 case 4:
357 _c.trys.push([4, 6, , 7]);
358 return [4 /*yield*/, this.connect(clientId, { url: url })];
359 case 5:
360 client = _c.sent();
361 targetTopics.forEach(function (topic) {
362 client.subscribe(topic);
363 });
364 return [3 /*break*/, 7];
365 case 6:
366 e_1 = _c.sent();
367 observer.error(e_1);
368 return [3 /*break*/, 7];
369 case 7: return [2 /*return*/];
370 }
371 });
372 }); })();
373 return function () {
374 logger.debug('Unsubscribing from topic(s)', targetTopics.join(','));
375 if (client) {
376 _this._clientIdObservers.get(clientId).delete(observer);
377 // No more observers per client => client not needed anymore
378 if (_this._clientIdObservers.get(clientId).size === 0) {
379 _this.disconnect(clientId);
380 _this._clientIdObservers.delete(clientId);
381 }
382 targetTopics.forEach(function (topic) {
383 var observersForTopic = _this._topicObservers.get(topic) ||
384 new Set();
385 observersForTopic.delete(observer);
386 // if no observers exists for the topic, topic should be removed
387 if (observersForTopic.size === 0) {
388 _this._topicObservers.delete(topic);
389 if (client.isConnected()) {
390 client.unsubscribe(topic);
391 }
392 }
393 });
394 }
395 return null;
396 };
397 });
398 };
399 return MqttOverWSProvider;
400}(AbstractPubSubProvider));
401export { MqttOverWSProvider };
402//# sourceMappingURL=MqttOverWSProvider.js.map
\No newline at end of file