UNPKG

5.57 kBJavaScriptView Raw
1module.exports = Socket
2
3var debug = require('debug')('simple-websocket')
4var inherits = require('inherits')
5var stream = require('readable-stream')
6var ws = require('ws') // websockets in node - will be empty object in browser
7
8var WebSocket = typeof window !== 'undefined' ? window.WebSocket : ws
9
10inherits(Socket, stream.Duplex)
11
12/**
13 * WebSocket. Same API as node core `net.Socket`. Duplex stream.
14 * @param {string} url websocket server url
15 * @param {Object} opts options to stream.Duplex
16 */
17function Socket (url, opts) {
18 var self = this
19 if (!(self instanceof Socket)) return new Socket(url, opts)
20 if (!opts) opts = {}
21 debug('new websocket: %s %o', url, opts)
22
23 opts.allowHalfOpen = false
24 if (opts.highWaterMark == null) opts.highWaterMark = 1024 * 1024
25
26 stream.Duplex.call(self, opts)
27
28 self.url = url
29 self.connected = false
30 self.destroyed = false
31
32 self._maxBufferedAmount = opts.highWaterMark
33 self._chunk = null
34 self._cb = null
35 self._interval = null
36
37 self._ws = new WebSocket(self.url)
38 self._ws.binaryType = 'arraybuffer'
39 self._ws.onopen = self._onOpen.bind(self)
40 self._ws.onmessage = self._onMessage.bind(self)
41 self._ws.onclose = self._onClose.bind(self)
42 self._ws.onerror = function () {
43 self._onError(new Error('connection error to ' + self.url))
44 }
45
46 self.on('finish', function () {
47 if (self.connected) {
48 // When stream is finished writing, close socket connection. Half open connections
49 // are currently not supported.
50 // Wait a bit before destroying so the socket flushes.
51 // TODO: is there a more reliable way to accomplish this?
52 setTimeout(function () {
53 self._destroy()
54 }, 100)
55 } else {
56 // If socket is not connected when stream is finished writing, wait until data is
57 // flushed to network at "connect" event.
58 // TODO: is there a more reliable way to accomplish this?
59 self.once('connect', function () {
60 setTimeout(function () {
61 self._destroy()
62 }, 100)
63 })
64 }
65 })
66}
67
68Socket.WEBSOCKET_SUPPORT = !!WebSocket
69
70/**
71 * Send text/binary data to the WebSocket server.
72 * @param {TypedArrayView|ArrayBuffer|Buffer|string|Blob|Object} chunk
73 */
74Socket.prototype.send = function (chunk) {
75 var self = this
76
77 var len = chunk.length || chunk.byteLength || chunk.size
78 self._ws.send(chunk)
79 debug('write: %d bytes', len)
80}
81
82Socket.prototype.destroy = function (onclose) {
83 var self = this
84 self._destroy(null, onclose)
85}
86
87Socket.prototype._destroy = function (err, onclose) {
88 var self = this
89 if (self.destroyed) return
90 if (onclose) self.once('close', onclose)
91
92 debug('destroy (error: %s)', err && err.message)
93
94 this.readable = this.writable = false
95
96 if (!self._readableState.ended) self.push(null)
97 if (!self._writableState.finished) self.end()
98
99 self.connected = false
100 self.destroyed = true
101
102 clearInterval(self._interval)
103 self._interval = null
104 self._chunk = null
105 self._cb = null
106
107 if (self._ws) {
108 var ws = self._ws
109 var onClose = function () {
110 ws.onclose = null
111 self.emit('close')
112 }
113 if (ws.readyState === WebSocket.CLOSED) {
114 onClose()
115 } else {
116 try {
117 ws.onclose = onClose
118 ws.close()
119 } catch (err) {
120 onClose()
121 }
122 }
123
124 ws.onopen = null
125 ws.onmessage = null
126 ws.onerror = null
127 }
128 self._ws = null
129
130 if (err) self.emit('error', err)
131}
132
133Socket.prototype._read = function () {}
134
135Socket.prototype._write = function (chunk, encoding, cb) {
136 var self = this
137 if (self.destroyed) return cb(new Error('cannot write after socket is destroyed'))
138
139 if (self.connected) {
140 try {
141 self.send(chunk)
142 } catch (err) {
143 return self._onError(err)
144 }
145 if (typeof ws !== 'function' && self._ws.bufferedAmount > self._maxBufferedAmount) {
146 debug('start backpressure: bufferedAmount %d', self._ws.bufferedAmount)
147 self._cb = cb
148 } else {
149 cb(null)
150 }
151 } else {
152 debug('write before connect')
153 self._chunk = chunk
154 self._cb = cb
155 }
156}
157
158Socket.prototype._onMessage = function (event) {
159 var self = this
160 if (self.destroyed) return
161 var data = event.data
162 debug('read: %d bytes', data.byteLength || data.length)
163
164 if (data instanceof ArrayBuffer) data = new Buffer(data)
165 self.push(data)
166}
167
168Socket.prototype._onOpen = function () {
169 var self = this
170 if (self.connected || self.destroyed) return
171 self.connected = true
172
173 if (self._chunk) {
174 try {
175 self.send(self._chunk)
176 } catch (err) {
177 return self._onError(err)
178 }
179 self._chunk = null
180 debug('sent chunk from "write before connect"')
181
182 var cb = self._cb
183 self._cb = null
184 cb(null)
185 }
186
187 // No backpressure in node. The `ws` module has a buggy `bufferedAmount` property.
188 // See: https://github.com/websockets/ws/issues/492
189 if (typeof ws !== 'function') {
190 self._interval = setInterval(function () {
191 if (!self._cb || !self._ws || self._ws.bufferedAmount > self._maxBufferedAmount) {
192 return
193 }
194 debug('ending backpressure: bufferedAmount %d', self._ws.bufferedAmount)
195 var cb = self._cb
196 self._cb = null
197 cb(null)
198 }, 150)
199 if (self._interval.unref) self._interval.unref()
200 }
201
202 debug('connect')
203 self.emit('connect')
204}
205
206Socket.prototype._onClose = function () {
207 var self = this
208 if (self.destroyed) return
209 debug('on close')
210 self._destroy()
211}
212
213Socket.prototype._onError = function (err) {
214 var self = this
215 if (self.destroyed) return
216 debug('error: %s', err.message || err)
217 self._destroy(err)
218}