UNPKG

6.97 kBJavaScriptView Raw
1"use strict";
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5Object.defineProperty(exports, "__esModule", { value: true });
6const buffer_1 = require("buffer");
7const ws_1 = __importDefault(require("ws"));
8const debug_1 = __importDefault(require("debug"));
9const readable_stream_1 = require("readable-stream");
10const is_browser_1 = __importDefault(require("../is-browser"));
11const BufferedDuplex_1 = require("../BufferedDuplex");
12const debug = (0, debug_1.default)('mqttjs:ws');
13const WSS_OPTIONS = [
14 'rejectUnauthorized',
15 'ca',
16 'cert',
17 'key',
18 'pfx',
19 'passphrase',
20];
21function buildUrl(opts, client) {
22 let url = `${opts.protocol}://${opts.hostname}:${opts.port}${opts.path}`;
23 if (typeof opts.transformWsUrl === 'function') {
24 url = opts.transformWsUrl(url, opts, client);
25 }
26 return url;
27}
28function setDefaultOpts(opts) {
29 const options = opts;
30 if (!opts.hostname) {
31 options.hostname = 'localhost';
32 }
33 if (!opts.port) {
34 if (opts.protocol === 'wss') {
35 options.port = 443;
36 }
37 else {
38 options.port = 80;
39 }
40 }
41 if (!opts.path) {
42 options.path = '/';
43 }
44 if (!opts.wsOptions) {
45 options.wsOptions = {};
46 }
47 if (!is_browser_1.default && opts.protocol === 'wss') {
48 WSS_OPTIONS.forEach((prop) => {
49 if (Object.prototype.hasOwnProperty.call(opts, prop) &&
50 !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
51 options.wsOptions[prop] = opts[prop];
52 }
53 });
54 }
55 return options;
56}
57function setDefaultBrowserOpts(opts) {
58 const options = setDefaultOpts(opts);
59 if (!options.hostname) {
60 options.hostname = options.host;
61 }
62 if (!options.hostname) {
63 if (typeof document === 'undefined') {
64 throw new Error('Could not determine host. Specify host manually.');
65 }
66 const parsed = new URL(document.URL);
67 options.hostname = parsed.hostname;
68 if (!options.port) {
69 options.port = Number(parsed.port);
70 }
71 }
72 if (options.objectMode === undefined) {
73 options.objectMode = !(options.binary === true || options.binary === undefined);
74 }
75 return options;
76}
77function createWebSocket(client, url, opts) {
78 debug('createWebSocket');
79 debug(`protocol: ${opts.protocolId} ${opts.protocolVersion}`);
80 const websocketSubProtocol = opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
81 ? 'mqttv3.1'
82 : 'mqtt';
83 debug(`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`);
84 let socket;
85 if (opts.createWebsocket) {
86 socket = opts.createWebsocket(url, [websocketSubProtocol], opts);
87 }
88 else {
89 socket = new ws_1.default(url, [websocketSubProtocol], opts.wsOptions);
90 }
91 return socket;
92}
93function createBrowserWebSocket(client, opts) {
94 const websocketSubProtocol = opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
95 ? 'mqttv3.1'
96 : 'mqtt';
97 const url = buildUrl(opts, client);
98 let socket;
99 if (opts.createWebsocket) {
100 socket = opts.createWebsocket(url, [websocketSubProtocol], opts);
101 }
102 else {
103 socket = new WebSocket(url, [websocketSubProtocol]);
104 }
105 socket.binaryType = 'arraybuffer';
106 return socket;
107}
108const streamBuilder = (client, opts) => {
109 debug('streamBuilder');
110 const options = setDefaultOpts(opts);
111 const url = buildUrl(options, client);
112 const socket = createWebSocket(client, url, options);
113 const webSocketStream = ws_1.default.createWebSocketStream(socket, options.wsOptions);
114 webSocketStream['url'] = url;
115 socket.on('close', () => {
116 webSocketStream.destroy();
117 });
118 return webSocketStream;
119};
120const browserStreamBuilder = (client, opts) => {
121 debug('browserStreamBuilder');
122 let stream;
123 const options = setDefaultBrowserOpts(opts);
124 const bufferSize = options.browserBufferSize || 1024 * 512;
125 const bufferTimeout = opts.browserBufferTimeout || 1000;
126 const coerceToBuffer = !opts.objectMode;
127 const socket = createBrowserWebSocket(client, opts);
128 const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser);
129 if (!opts.objectMode) {
130 proxy._writev = BufferedDuplex_1.writev.bind(proxy);
131 }
132 proxy.on('close', () => {
133 socket.close();
134 });
135 const eventListenerSupport = typeof socket.addEventListener !== 'undefined';
136 if (socket.readyState === socket.OPEN) {
137 stream = proxy;
138 stream.socket = socket;
139 }
140 else {
141 stream = new BufferedDuplex_1.BufferedDuplex(opts, proxy, socket);
142 if (eventListenerSupport) {
143 socket.addEventListener('open', onOpen);
144 }
145 else {
146 socket.onopen = onOpen;
147 }
148 }
149 if (eventListenerSupport) {
150 socket.addEventListener('close', onClose);
151 socket.addEventListener('error', onError);
152 socket.addEventListener('message', onMessage);
153 }
154 else {
155 socket.onclose = onClose;
156 socket.onerror = onError;
157 socket.onmessage = onMessage;
158 }
159 function buildProxy(pOptions, socketWrite, socketEnd) {
160 const _proxy = new readable_stream_1.Transform({
161 objectMode: pOptions.objectMode,
162 });
163 _proxy._write = socketWrite;
164 _proxy._flush = socketEnd;
165 return _proxy;
166 }
167 function onOpen() {
168 debug('WebSocket onOpen');
169 if (stream instanceof BufferedDuplex_1.BufferedDuplex) {
170 stream.socketReady();
171 }
172 }
173 function onClose(event) {
174 debug('WebSocket onClose', event);
175 stream.end();
176 stream.destroy();
177 }
178 function onError(err) {
179 debug('WebSocket onError', err);
180 const error = new Error('WebSocket error');
181 error['event'] = err;
182 stream.destroy(error);
183 }
184 function onMessage(event) {
185 let { data } = event;
186 if (data instanceof ArrayBuffer)
187 data = buffer_1.Buffer.from(data);
188 else
189 data = buffer_1.Buffer.from(data, 'utf8');
190 proxy.push(data);
191 }
192 function socketWriteBrowser(chunk, enc, next) {
193 if (socket.bufferedAmount > bufferSize) {
194 setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next);
195 return;
196 }
197 if (coerceToBuffer && typeof chunk === 'string') {
198 chunk = buffer_1.Buffer.from(chunk, 'utf8');
199 }
200 try {
201 socket.send(chunk);
202 }
203 catch (err) {
204 return next(err);
205 }
206 next();
207 }
208 function socketEndBrowser(done) {
209 socket.close();
210 done();
211 }
212 return stream;
213};
214exports.default = is_browser_1.default ? browserStreamBuilder : streamBuilder;
215//# sourceMappingURL=ws.js.map
\No newline at end of file