1 | import { Observable } from 'rxjs';
|
2 | import { ConsoleLogger, Hub } from '@aws-amplify/core';
|
3 | import { amplifyUuid, AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils';
|
4 | import { ConnectionState } from '../types/PubSub.mjs';
|
5 | import * as Paho from '../vendor/paho-mqtt.js';
|
6 | import { ConnectionStateMonitor, CONNECTION_CHANGE } from '../utils/ConnectionStateMonitor.mjs';
|
7 | import { ReconnectionMonitor, ReconnectEvent } from '../utils/ReconnectionMonitor.mjs';
|
8 | import { AbstractPubSub } from './PubSub.mjs';
|
9 | import { CONNECTION_STATE_CHANGE } from './constants.mjs';
|
10 |
|
11 |
|
12 |
|
13 | const logger = new ConsoleLogger('MqttOverWS');
|
14 | function mqttTopicMatch(filter, topic) {
|
15 | const filterArray = filter.split('/');
|
16 | const { length } = filterArray;
|
17 | const topicArray = topic.split('/');
|
18 | for (let i = 0; i < length; ++i) {
|
19 | const left = filterArray[i];
|
20 | const right = topicArray[i];
|
21 | if (left === '#')
|
22 | return topicArray.length >= length;
|
23 | if (left !== '+' && left !== right)
|
24 | return false;
|
25 | }
|
26 | return length === topicArray.length;
|
27 | }
|
28 | class ClientsQueue {
|
29 | constructor() {
|
30 | this.promises = new Map();
|
31 | }
|
32 | async get(clientId, clientFactory) {
|
33 | const cachedPromise = this.promises.get(clientId);
|
34 | if (cachedPromise)
|
35 | return cachedPromise;
|
36 | if (clientFactory) {
|
37 | const newPromise = clientFactory(clientId);
|
38 | this.promises.set(clientId, newPromise);
|
39 | newPromise.catch(() => this.promises.delete(clientId));
|
40 | return newPromise;
|
41 | }
|
42 | return undefined;
|
43 | }
|
44 | get allClients() {
|
45 | return Array.from(this.promises.keys());
|
46 | }
|
47 | remove(clientId) {
|
48 | this.promises.delete(clientId);
|
49 | }
|
50 | }
|
51 | const dispatchPubSubEvent = (payload) => {
|
52 | Hub.dispatch('pubsub', payload, 'PubSub', AMPLIFY_SYMBOL);
|
53 | };
|
54 | const topicSymbol = typeof Symbol !== 'undefined' ? Symbol('topic') : '@@topic';
|
55 | class MqttOverWS extends AbstractPubSub {
|
56 | constructor(options = {}) {
|
57 | super({ ...options, clientId: options.clientId || amplifyUuid() });
|
58 | this._clientsQueue = new ClientsQueue();
|
59 | this.connectionStateMonitor = new ConnectionStateMonitor();
|
60 | this.reconnectionMonitor = new ReconnectionMonitor();
|
61 | this._topicObservers = new Map();
|
62 | this._clientIdObservers = new Map();
|
63 |
|
64 | this.connectionStateMonitor.connectionStateObservable.subscribe(connectionStateChange => {
|
65 | dispatchPubSubEvent({
|
66 | event: CONNECTION_STATE_CHANGE,
|
67 | data: {
|
68 | provider: this,
|
69 | connectionState: connectionStateChange,
|
70 | },
|
71 | message: `Connection state is ${connectionStateChange}`,
|
72 | });
|
73 | this.connectionState = connectionStateChange;
|
74 |
|
75 | if (connectionStateChange === ConnectionState.ConnectionDisrupted) {
|
76 | this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
|
77 | }
|
78 | else if (connectionStateChange !== ConnectionState.Connecting) {
|
79 |
|
80 | this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
|
81 | }
|
82 | });
|
83 | }
|
84 | get clientId() {
|
85 | return this.options.clientId;
|
86 | }
|
87 | get endpoint() {
|
88 | return Promise.resolve(this.options.endpoint);
|
89 | }
|
90 | get clientsQueue() {
|
91 | return this._clientsQueue;
|
92 | }
|
93 | get isSSLEnabled() {
|
94 | return !this.options
|
95 | .aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
|
96 | }
|
97 | onDisconnect({ clientId, errorCode, ...args }) {
|
98 | if (errorCode !== 0) {
|
99 | logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2));
|
100 | if (!clientId) {
|
101 | return;
|
102 | }
|
103 | const clientIdObservers = this._clientIdObservers.get(clientId);
|
104 | if (!clientIdObservers) {
|
105 | return;
|
106 | }
|
107 | this.disconnect(clientId);
|
108 | }
|
109 | }
|
110 | async newClient({ url, clientId }) {
|
111 | logger.debug('Creating new MQTT client', clientId);
|
112 | this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
|
113 |
|
114 |
|
115 | const client = new Paho.Client(url, clientId);
|
116 | client.onMessageArrived = ({ destinationName: topic, payloadString: msg, }) => {
|
117 | this._onMessage(topic, msg);
|
118 | };
|
119 | client.onConnectionLost = ({ errorCode, ...args }) => {
|
120 | this.onDisconnect({ clientId, errorCode, ...args });
|
121 | this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
|
122 | };
|
123 | const connected = await new Promise((resolve, _reject) => {
|
124 | client.connect({
|
125 | useSSL: this.isSSLEnabled,
|
126 | mqttVersion: 3,
|
127 | onSuccess: () => {
|
128 | resolve(true);
|
129 | },
|
130 | onFailure: () => {
|
131 | if (clientId)
|
132 | this._clientsQueue.remove(clientId);
|
133 | this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
|
134 | resolve(false);
|
135 | },
|
136 | });
|
137 | });
|
138 | if (connected) {
|
139 | this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED);
|
140 | }
|
141 | return client;
|
142 | }
|
143 | async connect(clientId, options = {}) {
|
144 | return this.clientsQueue.get(clientId, async (inputClientId) => {
|
145 | const client = await this.newClient({
|
146 | ...options,
|
147 | clientId: inputClientId,
|
148 | });
|
149 | if (client) {
|
150 |
|
151 | this._topicObservers.forEach((_value, key) => {
|
152 | client.subscribe(key);
|
153 | });
|
154 | }
|
155 | return client;
|
156 | });
|
157 | }
|
158 | async disconnect(clientId) {
|
159 | const client = await this.clientsQueue.get(clientId);
|
160 | if (client && client.isConnected()) {
|
161 | client.disconnect();
|
162 | }
|
163 | this.clientsQueue.remove(clientId);
|
164 | this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
|
165 | }
|
166 | async publish({ topics, message }) {
|
167 | const targetTopics = [].concat(topics);
|
168 | const msg = JSON.stringify(message);
|
169 | const client = await this.clientsQueue.get(this.clientId);
|
170 | if (client) {
|
171 | logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
|
172 | targetTopics.forEach(topic => {
|
173 | client.send(topic, msg);
|
174 | });
|
175 | }
|
176 | else {
|
177 | logger.debug('Publishing to topic(s) failed', targetTopics.join(','), message);
|
178 | }
|
179 | }
|
180 | _onMessage(topic, msg) {
|
181 | try {
|
182 | const matchedTopicObservers = [];
|
183 | this._topicObservers.forEach((observerForTopic, observerTopic) => {
|
184 | if (mqttTopicMatch(observerTopic, topic)) {
|
185 | matchedTopicObservers.push(observerForTopic);
|
186 | }
|
187 | });
|
188 | const parsedMessage = JSON.parse(msg);
|
189 | if (typeof parsedMessage === 'object') {
|
190 | parsedMessage[topicSymbol] = topic;
|
191 | }
|
192 | matchedTopicObservers.forEach(observersForTopic => {
|
193 | observersForTopic.forEach(observer => {
|
194 | observer.next(parsedMessage);
|
195 | });
|
196 | });
|
197 | }
|
198 | catch (error) {
|
199 | logger.warn('Error handling message', error, msg);
|
200 | }
|
201 | }
|
202 | subscribe({ topics, options = {}, }) {
|
203 | const targetTopics = [].concat(topics);
|
204 | logger.debug('Subscribing to topic(s)', targetTopics.join(','));
|
205 | let reconnectSubscription;
|
206 | return new Observable(observer => {
|
207 | targetTopics.forEach(topic => {
|
208 |
|
209 | let observersForTopic = this._topicObservers.get(topic);
|
210 | if (!observersForTopic) {
|
211 | observersForTopic = new Set();
|
212 | this._topicObservers.set(topic, observersForTopic);
|
213 | }
|
214 | observersForTopic.add(observer);
|
215 | });
|
216 | const { clientId = this.clientId } = options;
|
217 |
|
218 | let observersForClientId = this._clientIdObservers.get(clientId);
|
219 | if (!observersForClientId) {
|
220 | observersForClientId = new Set();
|
221 | }
|
222 | if (observersForClientId) {
|
223 | observersForClientId.add(observer);
|
224 | this._clientIdObservers.set(clientId, observersForClientId);
|
225 | }
|
226 | (async () => {
|
227 | const getClient = async () => {
|
228 | try {
|
229 | const { url = await this.endpoint } = options;
|
230 | const client = await this.connect(clientId, { url });
|
231 | if (client !== undefined) {
|
232 | targetTopics.forEach(topic => {
|
233 | client.subscribe(topic);
|
234 | });
|
235 | }
|
236 | }
|
237 | catch (e) {
|
238 | logger.debug('Error forming connection', e);
|
239 | }
|
240 | };
|
241 |
|
242 | await getClient();
|
243 |
|
244 | reconnectSubscription = new Observable(reconnectSubscriptionObserver => {
|
245 | this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver);
|
246 | }).subscribe(() => {
|
247 | getClient();
|
248 | });
|
249 | })();
|
250 | return async () => {
|
251 | const client = await this.clientsQueue.get(clientId);
|
252 | reconnectSubscription?.unsubscribe();
|
253 | if (client) {
|
254 | this._clientIdObservers.get(clientId)?.delete(observer);
|
255 |
|
256 | if (this._clientIdObservers.get(clientId)?.size === 0) {
|
257 | this.disconnect(clientId);
|
258 | this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION);
|
259 | this._clientIdObservers.delete(clientId);
|
260 | }
|
261 | targetTopics.forEach(topic => {
|
262 | const observersForTopic = this._topicObservers.get(topic) ||
|
263 | new Set();
|
264 | observersForTopic.delete(observer);
|
265 |
|
266 | if (observersForTopic.size === 0) {
|
267 | this._topicObservers.delete(topic);
|
268 | if (client.isConnected()) {
|
269 | client.unsubscribe(topic);
|
270 | }
|
271 | }
|
272 | });
|
273 | }
|
274 | return null;
|
275 | };
|
276 | });
|
277 | }
|
278 | }
|
279 |
|
280 | export { MqttOverWS, mqttTopicMatch };
|
281 |
|