UNPKG

2.73 kBJavaScriptView Raw
1'use strict'
2
3var Transform = require('readable-stream').Transform
4var duplexify = require('duplexify')
5
6/* global wx */
7var socketTask
8var proxy
9var stream
10
11function buildProxy () {
12 var proxy = new Transform()
13 proxy._write = function (chunk, encoding, next) {
14 socketTask.send({
15 data: chunk.buffer,
16 success: function () {
17 next()
18 },
19 fail: function (errMsg) {
20 next(new Error(errMsg))
21 }
22 })
23 }
24 proxy._flush = function socketEnd (done) {
25 socketTask.close({
26 success: function () {
27 done()
28 }
29 })
30 }
31
32 return proxy
33}
34
35function setDefaultOpts (opts) {
36 if (!opts.hostname) {
37 opts.hostname = 'localhost'
38 }
39 if (!opts.path) {
40 opts.path = '/'
41 }
42
43 if (!opts.wsOptions) {
44 opts.wsOptions = {}
45 }
46}
47
48function buildUrl (opts, client) {
49 var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
50 var url = protocol + '://' + opts.hostname + opts.path
51 if (opts.port && opts.port !== 80 && opts.port !== 443) {
52 url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
53 }
54 if (typeof (opts.transformWsUrl) === 'function') {
55 url = opts.transformWsUrl(url, opts, client)
56 }
57 return url
58}
59
60function bindEventHandler () {
61 socketTask.onOpen(function () {
62 stream.setReadable(proxy)
63 stream.setWritable(proxy)
64 stream.emit('connect')
65 })
66
67 socketTask.onMessage(function (res) {
68 var data = res.data
69
70 if (data instanceof ArrayBuffer) data = Buffer.from(data)
71 else data = Buffer.from(data, 'utf8')
72 proxy.push(data)
73 })
74
75 socketTask.onClose(function () {
76 stream.end()
77 stream.destroy()
78 })
79
80 socketTask.onError(function (res) {
81 stream.destroy(new Error(res.errMsg))
82 })
83}
84
85function buildStream (client, opts) {
86 opts.hostname = opts.hostname || opts.host
87
88 if (!opts.hostname) {
89 throw new Error('Could not determine host. Specify host manually.')
90 }
91
92 var websocketSubProtocol =
93 (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
94 ? 'mqttv3.1'
95 : 'mqtt'
96
97 setDefaultOpts(opts)
98
99 var url = buildUrl(opts, client)
100 socketTask = wx.connectSocket({
101 url: url,
102 protocols: websocketSubProtocol
103 })
104
105 proxy = buildProxy()
106 stream = duplexify.obj()
107 stream._destroy = function (err, cb) {
108 socketTask.close({
109 success: function () {
110 cb && cb(err)
111 }
112 })
113 }
114
115 var destroyRef = stream.destroy
116 stream.destroy = function () {
117 stream.destroy = destroyRef
118
119 var self = this
120 process.nextTick(function () {
121 socketTask.close({
122 fail: function () {
123 self._destroy(new Error())
124 }
125 })
126 })
127 }.bind(stream)
128
129 bindEventHandler()
130
131 return stream
132}
133
134module.exports = buildStream