UNPKG

11.8 kBJavaScriptView Raw
1import { Observable } from 'rxjs';
2import { ConsoleLogger, Hub } from '@aws-amplify/core';
3import { amplifyUuid, AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils';
4import { ConnectionState } from '../types/PubSub.mjs';
5import * as Paho from '../vendor/paho-mqtt.js';
6import { ConnectionStateMonitor, CONNECTION_CHANGE } from '../utils/ConnectionStateMonitor.mjs';
7import { ReconnectionMonitor, ReconnectEvent } from '../utils/ReconnectionMonitor.mjs';
8import { AbstractPubSub } from './PubSub.mjs';
9import { CONNECTION_STATE_CHANGE } from './constants.mjs';
10
11// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
12// SPDX-License-Identifier: Apache-2.0
13const logger = new ConsoleLogger('MqttOverWS');
14function mqttTopicMatch(filter, topic) {
15 const filterArray = filter.split('/');
16 const { length } = filterArray;
17 const topicArray = topic.split('/');
18 for (let i = 0; i < length; ++i) {
19 const left = filterArray[i];
20 const right = topicArray[i];
21 if (left === '#')
22 return topicArray.length >= length;
23 if (left !== '+' && left !== right)
24 return false;
25 }
26 return length === topicArray.length;
27}
28class ClientsQueue {
29 constructor() {
30 this.promises = new Map();
31 }
32 async get(clientId, clientFactory) {
33 const cachedPromise = this.promises.get(clientId);
34 if (cachedPromise)
35 return cachedPromise;
36 if (clientFactory) {
37 const newPromise = clientFactory(clientId);
38 this.promises.set(clientId, newPromise);
39 newPromise.catch(() => this.promises.delete(clientId));
40 return newPromise;
41 }
42 return undefined;
43 }
44 get allClients() {
45 return Array.from(this.promises.keys());
46 }
47 remove(clientId) {
48 this.promises.delete(clientId);
49 }
50}
51const dispatchPubSubEvent = (payload) => {
52 Hub.dispatch('pubsub', payload, 'PubSub', AMPLIFY_SYMBOL);
53};
54const topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic';
55class MqttOverWS extends AbstractPubSub {
56 constructor(options = {}) {
57 super({ ...options, clientId: options.clientId || amplifyUuid() });
58 this._clientsQueue = new ClientsQueue();
59 this.connectionStateMonitor = new ConnectionStateMonitor();
60 this.reconnectionMonitor = new ReconnectionMonitor();
61 this._topicObservers = new Map();
62 this._clientIdObservers = new Map();
63 // Monitor the connection health state and pass changes along to Hub
64 this.connectionStateMonitor.connectionStateObservable.subscribe(connectionStateChange => {
65 dispatchPubSubEvent({
66 event: CONNECTION_STATE_CHANGE,
67 data: {
68 provider: this,
69 connectionState: connectionStateChange,
70 },
71 message: `Connection state is ${connectionStateChange}`,
72 });
73 this.connectionState = connectionStateChange;
74 // Trigger reconnection when the connection is disrupted
75 if (connectionStateChange === ConnectionState.ConnectionDisrupted) {
76 this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
77 }
78 else if (connectionStateChange !== ConnectionState.Connecting) {
79 // Trigger connected to halt reconnection attempts
80 this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
81 }
82 });
83 }
84 get clientId() {
85 return this.options.clientId;
86 }
87 get endpoint() {
88 return Promise.resolve(this.options.endpoint);
89 }
90 get clientsQueue() {
91 return this._clientsQueue;
92 }
93 get isSSLEnabled() {
94 return !this.options
95 .aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
96 }
97 onDisconnect({ clientId, errorCode, ...args }) {
98 if (errorCode !== 0) {
99 logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2));
100 if (!clientId) {
101 return;
102 }
103 const clientIdObservers = this._clientIdObservers.get(clientId);
104 if (!clientIdObservers) {
105 return;
106 }
107 this.disconnect(clientId);
108 }
109 }
110 async newClient({ url, clientId }) {
111 logger.debug('Creating new MQTT client', clientId);
112 this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
113 // eslint-disable-next-line @typescript-eslint/ban-ts-comment
114 // @ts-ignore this module is expected to not have declaration file
115 const client = new Paho.Client(url, clientId);
116 client.onMessageArrived = ({ destinationName: topic, payloadString: msg, }) => {
117 this._onMessage(topic, msg);
118 };
119 client.onConnectionLost = ({ errorCode, ...args }) => {
120 this.onDisconnect({ clientId, errorCode, ...args });
121 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
122 };
123 const connected = await new Promise((resolve, _reject) => {
124 client.connect({
125 useSSL: this.isSSLEnabled,
126 mqttVersion: 3,
127 onSuccess: () => {
128 resolve(true);
129 },
130 onFailure: () => {
131 if (clientId)
132 this._clientsQueue.remove(clientId);
133 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
134 resolve(false);
135 },
136 });
137 });
138 if (connected) {
139 this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED);
140 }
141 return client;
142 }
143 async connect(clientId, options = {}) {
144 return this.clientsQueue.get(clientId, async (inputClientId) => {
145 const client = await this.newClient({
146 ...options,
147 clientId: inputClientId,
148 });
149 if (client) {
150 // Once connected, subscribe to all topics registered observers
151 this._topicObservers.forEach((_value, key) => {
152 client.subscribe(key);
153 });
154 }
155 return client;
156 });
157 }
158 async disconnect(clientId) {
159 const client = await this.clientsQueue.get(clientId);
160 if (client && client.isConnected()) {
161 client.disconnect();
162 }
163 this.clientsQueue.remove(clientId);
164 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
165 }
166 async publish({ topics, message }) {
167 const targetTopics = [].concat(topics);
168 const msg = JSON.stringify(message);
169 const client = await this.clientsQueue.get(this.clientId);
170 if (client) {
171 logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
172 targetTopics.forEach(topic => {
173 client.send(topic, msg);
174 });
175 }
176 else {
177 logger.debug('Publishing to topic(s) failed', targetTopics.join(','), message);
178 }
179 }
180 _onMessage(topic, msg) {
181 try {
182 const matchedTopicObservers = [];
183 this._topicObservers.forEach((observerForTopic, observerTopic) => {
184 if (mqttTopicMatch(observerTopic, topic)) {
185 matchedTopicObservers.push(observerForTopic);
186 }
187 });
188 const parsedMessage = JSON.parse(msg);
189 if (typeof parsedMessage === 'object') {
190 parsedMessage[topicSymbol] = topic;
191 }
192 matchedTopicObservers.forEach(observersForTopic => {
193 observersForTopic.forEach(observer => {
194 observer.next(parsedMessage);
195 });
196 });
197 }
198 catch (error) {
199 logger.warn('Error handling message', error, msg);
200 }
201 }
202 subscribe({ topics, options = {}, }) {
203 const targetTopics = [].concat(topics);
204 logger.debug('Subscribing to topic(s)', targetTopics.join(','));
205 let reconnectSubscription;
206 return new Observable(observer => {
207 targetTopics.forEach(topic => {
208 // this._topicObservers is used to notify the observers according to the topic received on the message
209 let observersForTopic = this._topicObservers.get(topic);
210 if (!observersForTopic) {
211 observersForTopic = new Set();
212 this._topicObservers.set(topic, observersForTopic);
213 }
214 observersForTopic.add(observer);
215 });
216 const { clientId = this.clientId } = options;
217 // this._clientIdObservers is used to close observers when client gets disconnected
218 let observersForClientId = this._clientIdObservers.get(clientId);
219 if (!observersForClientId) {
220 observersForClientId = new Set();
221 }
222 if (observersForClientId) {
223 observersForClientId.add(observer);
224 this._clientIdObservers.set(clientId, observersForClientId);
225 }
226 (async () => {
227 const getClient = async () => {
228 try {
229 const { url = await this.endpoint } = options;
230 const client = await this.connect(clientId, { url });
231 if (client !== undefined) {
232 targetTopics.forEach(topic => {
233 client.subscribe(topic);
234 });
235 }
236 }
237 catch (e) {
238 logger.debug('Error forming connection', e);
239 }
240 };
241 // Establish the initial connection
242 await getClient();
243 // Add an observable to the reconnection list to manage reconnection for this subscription
244 reconnectSubscription = new Observable(reconnectSubscriptionObserver => {
245 this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver);
246 }).subscribe(() => {
247 getClient();
248 });
249 })();
250 return async () => {
251 const client = await this.clientsQueue.get(clientId);
252 reconnectSubscription?.unsubscribe();
253 if (client) {
254 this._clientIdObservers.get(clientId)?.delete(observer);
255 // No more observers per client => client not needed anymore
256 if (this._clientIdObservers.get(clientId)?.size === 0) {
257 this.disconnect(clientId);
258 this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION);
259 this._clientIdObservers.delete(clientId);
260 }
261 targetTopics.forEach(topic => {
262 const observersForTopic = this._topicObservers.get(topic) ||
263 new Set();
264 observersForTopic.delete(observer);
265 // if no observers exists for the topic, topic should be removed
266 if (observersForTopic.size === 0) {
267 this._topicObservers.delete(topic);
268 if (client.isConnected()) {
269 client.unsubscribe(topic);
270 }
271 }
272 });
273 }
274 return null;
275 };
276 });
277 }
278}
279
280export { MqttOverWS, mqttTopicMatch };
281//# sourceMappingURL=MqttOverWS.mjs.map