1 | module.exports = Socket
|
2 |
|
3 | var debug = require('debug')('simple-websocket')
|
4 | var inherits = require('inherits')
|
5 | var stream = require('readable-stream')
|
6 | var ws = require('ws')
|
7 |
|
8 | var WebSocket = typeof window !== 'undefined' ? window.WebSocket : ws
|
9 |
|
10 | inherits(Socket, stream.Duplex)
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 | function Socket (url, opts) {
|
18 | var self = this
|
19 | if (!(self instanceof Socket)) return new Socket(url, opts)
|
20 | if (!opts) opts = {}
|
21 | debug('new websocket: %s %o', url, opts)
|
22 |
|
23 | opts.allowHalfOpen = false
|
24 | if (opts.highWaterMark == null) opts.highWaterMark = 1024 * 1024
|
25 |
|
26 | stream.Duplex.call(self, opts)
|
27 |
|
28 | self.url = url
|
29 | self.connected = false
|
30 | self.destroyed = false
|
31 |
|
32 | self._maxBufferedAmount = opts.highWaterMark
|
33 | self._chunk = null
|
34 | self._cb = null
|
35 | self._interval = null
|
36 |
|
37 | self._ws = new WebSocket(self.url)
|
38 | self._ws.binaryType = 'arraybuffer'
|
39 | self._ws.onopen = self._onOpen.bind(self)
|
40 | self._ws.onmessage = self._onMessage.bind(self)
|
41 | self._ws.onclose = self._onClose.bind(self)
|
42 | self._ws.onerror = function () {
|
43 | self._onError(new Error('connection error to ' + self.url))
|
44 | }
|
45 |
|
46 | self.on('finish', function () {
|
47 | if (self.connected) {
|
48 |
|
49 |
|
50 |
|
51 |
|
52 | setTimeout(function () {
|
53 | self._destroy()
|
54 | }, 100)
|
55 | } else {
|
56 |
|
57 |
|
58 |
|
59 | self.once('connect', function () {
|
60 | setTimeout(function () {
|
61 | self._destroy()
|
62 | }, 100)
|
63 | })
|
64 | }
|
65 | })
|
66 | }
|
67 |
|
68 | Socket.WEBSOCKET_SUPPORT = !!WebSocket
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 | Socket.prototype.send = function (chunk) {
|
75 | var self = this
|
76 |
|
77 | var len = chunk.length || chunk.byteLength || chunk.size
|
78 | self._ws.send(chunk)
|
79 | debug('write: %d bytes', len)
|
80 | }
|
81 |
|
82 | Socket.prototype.destroy = function (onclose) {
|
83 | var self = this
|
84 | self._destroy(null, onclose)
|
85 | }
|
86 |
|
87 | Socket.prototype._destroy = function (err, onclose) {
|
88 | var self = this
|
89 | if (self.destroyed) return
|
90 | if (onclose) self.once('close', onclose)
|
91 |
|
92 | debug('destroy (error: %s)', err && err.message)
|
93 |
|
94 | this.readable = this.writable = false
|
95 |
|
96 | if (!self._readableState.ended) self.push(null)
|
97 | if (!self._writableState.finished) self.end()
|
98 |
|
99 | self.connected = false
|
100 | self.destroyed = true
|
101 |
|
102 | clearInterval(self._interval)
|
103 | self._interval = null
|
104 | self._chunk = null
|
105 | self._cb = null
|
106 |
|
107 | if (self._ws) {
|
108 | var ws = self._ws
|
109 | var onClose = function () {
|
110 | ws.onclose = null
|
111 | self.emit('close')
|
112 | }
|
113 | if (ws.readyState === WebSocket.CLOSED) {
|
114 | onClose()
|
115 | } else {
|
116 | try {
|
117 | ws.onclose = onClose
|
118 | ws.close()
|
119 | } catch (err) {
|
120 | onClose()
|
121 | }
|
122 | }
|
123 |
|
124 | ws.onopen = null
|
125 | ws.onmessage = null
|
126 | ws.onerror = null
|
127 | }
|
128 | self._ws = null
|
129 |
|
130 | if (err) self.emit('error', err)
|
131 | }
|
132 |
|
133 | Socket.prototype._read = function () {}
|
134 |
|
135 | Socket.prototype._write = function (chunk, encoding, cb) {
|
136 | var self = this
|
137 | if (self.destroyed) return cb(new Error('cannot write after socket is destroyed'))
|
138 |
|
139 | if (self.connected) {
|
140 | try {
|
141 | self.send(chunk)
|
142 | } catch (err) {
|
143 | return self._onError(err)
|
144 | }
|
145 | if (typeof ws !== 'function' && self._ws.bufferedAmount > self._maxBufferedAmount) {
|
146 | debug('start backpressure: bufferedAmount %d', self._ws.bufferedAmount)
|
147 | self._cb = cb
|
148 | } else {
|
149 | cb(null)
|
150 | }
|
151 | } else {
|
152 | debug('write before connect')
|
153 | self._chunk = chunk
|
154 | self._cb = cb
|
155 | }
|
156 | }
|
157 |
|
158 | Socket.prototype._onMessage = function (event) {
|
159 | var self = this
|
160 | if (self.destroyed) return
|
161 | var data = event.data
|
162 | debug('read: %d bytes', data.byteLength || data.length)
|
163 |
|
164 | if (data instanceof ArrayBuffer) data = new Buffer(data)
|
165 | self.push(data)
|
166 | }
|
167 |
|
168 | Socket.prototype._onOpen = function () {
|
169 | var self = this
|
170 | if (self.connected || self.destroyed) return
|
171 | self.connected = true
|
172 |
|
173 | if (self._chunk) {
|
174 | try {
|
175 | self.send(self._chunk)
|
176 | } catch (err) {
|
177 | return self._onError(err)
|
178 | }
|
179 | self._chunk = null
|
180 | debug('sent chunk from "write before connect"')
|
181 |
|
182 | var cb = self._cb
|
183 | self._cb = null
|
184 | cb(null)
|
185 | }
|
186 |
|
187 |
|
188 |
|
189 | if (typeof ws !== 'function') {
|
190 | self._interval = setInterval(function () {
|
191 | if (!self._cb || !self._ws || self._ws.bufferedAmount > self._maxBufferedAmount) {
|
192 | return
|
193 | }
|
194 | debug('ending backpressure: bufferedAmount %d', self._ws.bufferedAmount)
|
195 | var cb = self._cb
|
196 | self._cb = null
|
197 | cb(null)
|
198 | }, 150)
|
199 | if (self._interval.unref) self._interval.unref()
|
200 | }
|
201 |
|
202 | debug('connect')
|
203 | self.emit('connect')
|
204 | }
|
205 |
|
206 | Socket.prototype._onClose = function () {
|
207 | var self = this
|
208 | if (self.destroyed) return
|
209 | debug('on close')
|
210 | self._destroy()
|
211 | }
|
212 |
|
213 | Socket.prototype._onError = function (err) {
|
214 | var self = this
|
215 | if (self.destroyed) return
|
216 | debug('error: %s', err.message || err)
|
217 | self._destroy(err)
|
218 | }
|