UNPKG

5.54 kBJavaScriptView Raw
1var assert = require('assert')
2var util = require('util')
3
4var EventEmitter = require('events').EventEmitter
5var Buffer = require('buffer').Buffer
6
7var Queue = require('./queue')
8
9// Node.js version
10var match = /^v(\d+)\.(\d+)\./.exec(process.version)
11var version = match ? Number(match[1]) + Number('0.' + match[2]) : 11
12var onreadMode = version >= 11.1 ? 2 : 1
13var mode = 'modern'
14
15var setNRead
16if (onreadMode === 2) {
17 var sw = process.binding('stream_wrap')
18 setNRead = function (nread) {
19 sw.streamBaseState[sw.kReadBytesOrError] = nread
20 }
21}
22
23function Handle (stream, options) {
24 EventEmitter.call(this)
25
26 this._stream = stream
27 this._flowing = false
28 this._reading = false
29 this._options = options || {}
30
31 this.onread = null
32
33 // Pending requests
34 this.pending = new Queue()
35}
36util.inherits(Handle, EventEmitter)
37module.exports = Handle
38
39Handle.mode = mode
40
41Handle.create = function create (stream, options) {
42 return new Handle(stream, options)
43}
44
45Handle.prototype._onread = function _onread (nread, buffer) {
46 if (onreadMode === 1) {
47 this.onread(nread, buffer)
48 } else {
49 setNRead(nread)
50 this.onread(buffer)
51 }
52}
53
54Handle.prototype._queueReq = function _queueReq (type, req) {
55 return this.pending.append(type, req)
56}
57
58Handle.prototype._pendingList = function _pendingList () {
59 var list = []
60 while (!this.pending.isEmpty()) { list.push(this.pending.first().dequeue()) }
61 return list
62}
63
64Handle.prototype.setStream = function setStream (stream) {
65 assert(this._stream === null, 'Can\'t set stream two times')
66 this._stream = stream
67
68 this.emit('stream', stream)
69}
70
71Handle.prototype.readStart = function readStart () {
72 this._reading = true
73
74 if (!this._stream) {
75 this.once('stream', this.readStart)
76 return 0
77 }
78
79 if (!this._flowing) {
80 this._flowing = true
81 this._flow()
82 }
83
84 this._stream.resume()
85 return 0
86}
87
88Handle.prototype.readStop = function readStop () {
89 this._reading = false
90
91 if (!this._stream) {
92 this.once('stream', this.readStop)
93 return 0
94 }
95 this._stream.pause()
96 return 0
97}
98
99var uv = process.binding('uv')
100
101Handle.prototype._flow = function flow () {
102 var self = this
103 this._stream.on('data', function (chunk) {
104 self._onread(chunk.length, chunk)
105 })
106
107 this._stream.on('end', function () {
108 self._onread(uv.UV_EOF, Buffer.alloc(0))
109 })
110
111 this._stream.on('close', function () {
112 setImmediate(function () {
113 if (self._reading) {
114 self._onread(uv.UV_ECONNRESET, Buffer.alloc(0))
115 }
116 })
117 })
118}
119
120Handle.prototype._close = function _close () {
121 var list = this._pendingList()
122
123 var self = this
124 setImmediate(function () {
125 for (var i = 0; i < list.length; i++) {
126 var req = list[i]
127 req.oncomplete(uv.UV_ECANCELED, self, req)
128 }
129 })
130
131 this.readStop()
132}
133
134Handle.prototype.shutdown = function shutdown (req) {
135 var wrap = this._queueReq('shutdown', req)
136
137 if (!this._stream) {
138 this.once('stream', function () {
139 this._shutdown(wrap)
140 })
141 return 0
142 }
143
144 return this._shutdown(wrap)
145}
146
147Handle.prototype._shutdown = function _shutdown (wrap) {
148 var self = this
149 this._stream.end(function () {
150 var req = wrap.dequeue()
151 if (!req) { return }
152
153 req.oncomplete(0, self, req)
154 })
155 return 0
156}
157
158Handle.prototype.close = function close (callback) {
159 this._close()
160
161 if (!this._stream) {
162 this.once('stream', function () {
163 this.close(callback)
164 })
165 return 0
166 }
167
168 if (this._options.close) {
169 this._options.close(callback)
170 } else {
171 process.nextTick(callback)
172 }
173
174 return 0
175}
176
177Handle.prototype.writeEnc = function writeEnc (req, data, enc) {
178 var wrap = this._queueReq('write', req)
179
180 if (!this._stream) {
181 this.once('stream', function () {
182 this._writeEnc(wrap, req, data, enc)
183 })
184
185 return 0
186 }
187
188 return this._writeEnc(wrap, req, data, enc)
189}
190
191Handle.prototype._writeEnc = function _writeEnc (wrap, req, data, enc) {
192 var self = this
193
194 req.async = true
195 req.bytes = data.length
196
197 if (wrap.isEmpty()) {
198 return 0
199 }
200
201 this._stream.write(data, enc, function () {
202 var req = wrap.dequeue()
203 if (!req) { return }
204 req.oncomplete(0, self, req)
205 })
206
207 return 0
208}
209
210/**
211 * @param {WriteWrap} req
212 * @param {string[]} chunks
213 * @param {Boolean} allBuffers
214 */
215Handle.prototype.writev = function _writev (req, chunks, allBuffers) {
216 while (chunks.length > 0) {
217 this._stream.write(chunks.shift(), chunks.shift())
218 }
219 return 0
220}
221
222Handle.prototype.writeBuffer = function writeBuffer (req, data) {
223 return this.writeEnc(req, data, null)
224}
225
226Handle.prototype.writeAsciiString = function writeAsciiString (req, data) {
227 return this.writeEnc(req, data, 'ascii')
228}
229
230Handle.prototype.writeUtf8String = function writeUtf8String (req, data) {
231 return this.writeEnc(req, data, 'utf8')
232}
233
234Handle.prototype.writeUcs2String = function writeUcs2String (req, data) {
235 return this.writeEnc(req, data, 'ucs2')
236}
237
238Handle.prototype.writeBinaryString = function writeBinaryString (req, data) {
239 return this.writeEnc(req, data, 'binary')
240}
241
242Handle.prototype.writeLatin1String = function writeLatin1String (req, data) {
243 return this.writeEnc(req, data, 'binary')
244}
245
246// v0.8
247Handle.prototype.getsockname = function getsockname () {
248 if (this._options.getPeerName) {
249 return this._options.getPeerName()
250 }
251 return null
252}
253
254Handle.prototype.getpeername = function getpeername (out) {
255 var res = this.getsockname()
256 if (!res) { return -1 }
257
258 Object.keys(res).forEach(function (key) {
259 out[key] = res[key]
260 })
261
262 return 0
263}