UNPKG

3.21 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const buffer_1 = require("buffer");
4const readable_stream_1 = require("readable-stream");
5const BufferedDuplex_1 = require("../BufferedDuplex");
6let socketTask;
7let proxy;
8let stream;
9function buildProxy() {
10 const _proxy = new readable_stream_1.Transform();
11 _proxy._write = (chunk, encoding, next) => {
12 socketTask.send({
13 data: chunk.buffer,
14 success() {
15 next();
16 },
17 fail(errMsg) {
18 next(new Error(errMsg));
19 },
20 });
21 };
22 _proxy._flush = (done) => {
23 socketTask.close({
24 success() {
25 done();
26 },
27 });
28 };
29 return _proxy;
30}
31function setDefaultOpts(opts) {
32 if (!opts.hostname) {
33 opts.hostname = 'localhost';
34 }
35 if (!opts.path) {
36 opts.path = '/';
37 }
38 if (!opts.wsOptions) {
39 opts.wsOptions = {};
40 }
41}
42function buildUrl(opts, client) {
43 const protocol = opts.protocol === 'wxs' ? 'wss' : 'ws';
44 let url = `${protocol}://${opts.hostname}${opts.path}`;
45 if (opts.port && opts.port !== 80 && opts.port !== 443) {
46 url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}`;
47 }
48 if (typeof opts.transformWsUrl === 'function') {
49 url = opts.transformWsUrl(url, opts, client);
50 }
51 return url;
52}
53function bindEventHandler() {
54 socketTask.onOpen(() => {
55 stream.socketReady();
56 });
57 socketTask.onMessage((res) => {
58 let { data } = res;
59 if (data instanceof ArrayBuffer)
60 data = buffer_1.Buffer.from(data);
61 else
62 data = buffer_1.Buffer.from(data, 'utf8');
63 proxy.push(data);
64 });
65 socketTask.onClose(() => {
66 stream.emit('close');
67 stream.end();
68 stream.destroy();
69 });
70 socketTask.onError((error) => {
71 const err = new Error(error.errMsg);
72 stream.destroy(err);
73 });
74}
75const buildStream = (client, opts) => {
76 opts.hostname = opts.hostname || opts.host;
77 if (!opts.hostname) {
78 throw new Error('Could not determine host. Specify host manually.');
79 }
80 const websocketSubProtocol = opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
81 ? 'mqttv3.1'
82 : 'mqtt';
83 setDefaultOpts(opts);
84 const url = buildUrl(opts, client);
85 socketTask = wx.connectSocket({
86 url,
87 protocols: [websocketSubProtocol],
88 });
89 proxy = buildProxy();
90 stream = new BufferedDuplex_1.BufferedDuplex(opts, proxy, socketTask);
91 stream._destroy = (err, cb) => {
92 socketTask.close({
93 success() {
94 if (cb)
95 cb(err);
96 },
97 });
98 };
99 const destroyRef = stream.destroy;
100 stream.destroy = (err, cb) => {
101 stream.destroy = destroyRef;
102 setTimeout(() => {
103 socketTask.close({
104 fail() {
105 stream._destroy(err, cb);
106 },
107 });
108 }, 0);
109 return stream;
110 };
111 bindEventHandler();
112 return stream;
113};
114exports.default = buildStream;
115//# sourceMappingURL=wx.js.map
\No newline at end of file