1 | var assert = require('assert')
|
2 | var util = require('util')
|
3 |
|
4 | var EventEmitter = require('events').EventEmitter
|
5 | var Buffer = require('buffer').Buffer
|
6 |
|
7 | var Queue = require('./queue')
|
8 |
|
9 |
|
10 | var match = /^v(\d+)\.(\d+)\./.exec(process.version)
|
11 | var version = match ? Number(match[1]) + Number('0.' + match[2]) : 11
|
12 | var onreadMode = version >= 11.1 ? 2 : 1
|
13 | var mode = 'modern'
|
14 |
|
15 | var setNRead
|
16 | if (onreadMode === 2) {
|
17 | var sw = process.binding('stream_wrap')
|
18 | setNRead = function (nread) {
|
19 | sw.streamBaseState[sw.kReadBytesOrError] = nread
|
20 | }
|
21 | }
|
22 |
|
23 | function Handle (stream, options) {
|
24 | EventEmitter.call(this)
|
25 |
|
26 | this._stream = stream
|
27 | this._flowing = false
|
28 | this._reading = false
|
29 | this._options = options || {}
|
30 |
|
31 | this.onread = null
|
32 |
|
33 |
|
34 | this.pending = new Queue()
|
35 | }
|
36 | util.inherits(Handle, EventEmitter)
|
37 | module.exports = Handle
|
38 |
|
39 | Handle.mode = mode
|
40 |
|
41 | Handle.create = function create (stream, options) {
|
42 | return new Handle(stream, options)
|
43 | }
|
44 |
|
45 | Handle.prototype._onread = function _onread (nread, buffer) {
|
46 | if (onreadMode === 1) {
|
47 | this.onread(nread, buffer)
|
48 | } else {
|
49 | setNRead(nread)
|
50 | this.onread(buffer)
|
51 | }
|
52 | }
|
53 |
|
54 | Handle.prototype._queueReq = function _queueReq (type, req) {
|
55 | return this.pending.append(type, req)
|
56 | }
|
57 |
|
58 | Handle.prototype._pendingList = function _pendingList () {
|
59 | var list = []
|
60 | while (!this.pending.isEmpty()) { list.push(this.pending.first().dequeue()) }
|
61 | return list
|
62 | }
|
63 |
|
64 | Handle.prototype.setStream = function setStream (stream) {
|
65 | assert(this._stream === null, 'Can\'t set stream two times')
|
66 | this._stream = stream
|
67 |
|
68 | this.emit('stream', stream)
|
69 | }
|
70 |
|
71 | Handle.prototype.readStart = function readStart () {
|
72 | this._reading = true
|
73 |
|
74 | if (!this._stream) {
|
75 | this.once('stream', this.readStart)
|
76 | return 0
|
77 | }
|
78 |
|
79 | if (!this._flowing) {
|
80 | this._flowing = true
|
81 | this._flow()
|
82 | }
|
83 |
|
84 | this._stream.resume()
|
85 | return 0
|
86 | }
|
87 |
|
88 | Handle.prototype.readStop = function readStop () {
|
89 | this._reading = false
|
90 |
|
91 | if (!this._stream) {
|
92 | this.once('stream', this.readStop)
|
93 | return 0
|
94 | }
|
95 | this._stream.pause()
|
96 | return 0
|
97 | }
|
98 |
|
99 | var uv = process.binding('uv')
|
100 |
|
101 | Handle.prototype._flow = function flow () {
|
102 | var self = this
|
103 | this._stream.on('data', function (chunk) {
|
104 | self._onread(chunk.length, chunk)
|
105 | })
|
106 |
|
107 | this._stream.on('end', function () {
|
108 | self._onread(uv.UV_EOF, Buffer.alloc(0))
|
109 | })
|
110 |
|
111 | this._stream.on('close', function () {
|
112 | setImmediate(function () {
|
113 | if (self._reading) {
|
114 | self._onread(uv.UV_ECONNRESET, Buffer.alloc(0))
|
115 | }
|
116 | })
|
117 | })
|
118 | }
|
119 |
|
120 | Handle.prototype._close = function _close () {
|
121 | var list = this._pendingList()
|
122 |
|
123 | var self = this
|
124 | setImmediate(function () {
|
125 | for (var i = 0; i < list.length; i++) {
|
126 | var req = list[i]
|
127 | req.oncomplete(uv.UV_ECANCELED, self, req)
|
128 | }
|
129 | })
|
130 |
|
131 | this.readStop()
|
132 | }
|
133 |
|
134 | Handle.prototype.shutdown = function shutdown (req) {
|
135 | var wrap = this._queueReq('shutdown', req)
|
136 |
|
137 | if (!this._stream) {
|
138 | this.once('stream', function () {
|
139 | this._shutdown(wrap)
|
140 | })
|
141 | return 0
|
142 | }
|
143 |
|
144 | return this._shutdown(wrap)
|
145 | }
|
146 |
|
147 | Handle.prototype._shutdown = function _shutdown (wrap) {
|
148 | var self = this
|
149 | this._stream.end(function () {
|
150 | var req = wrap.dequeue()
|
151 | if (!req) { return }
|
152 |
|
153 | req.oncomplete(0, self, req)
|
154 | })
|
155 | return 0
|
156 | }
|
157 |
|
158 | Handle.prototype.close = function close (callback) {
|
159 | this._close()
|
160 |
|
161 | if (!this._stream) {
|
162 | this.once('stream', function () {
|
163 | this.close(callback)
|
164 | })
|
165 | return 0
|
166 | }
|
167 |
|
168 | if (this._options.close) {
|
169 | this._options.close(callback)
|
170 | } else {
|
171 | process.nextTick(callback)
|
172 | }
|
173 |
|
174 | return 0
|
175 | }
|
176 |
|
177 | Handle.prototype.writeEnc = function writeEnc (req, data, enc) {
|
178 | var wrap = this._queueReq('write', req)
|
179 |
|
180 | if (!this._stream) {
|
181 | this.once('stream', function () {
|
182 | this._writeEnc(wrap, req, data, enc)
|
183 | })
|
184 |
|
185 | return 0
|
186 | }
|
187 |
|
188 | return this._writeEnc(wrap, req, data, enc)
|
189 | }
|
190 |
|
191 | Handle.prototype._writeEnc = function _writeEnc (wrap, req, data, enc) {
|
192 | var self = this
|
193 |
|
194 | req.async = true
|
195 | req.bytes = data.length
|
196 |
|
197 | if (wrap.isEmpty()) {
|
198 | return 0
|
199 | }
|
200 |
|
201 | this._stream.write(data, enc, function () {
|
202 | var req = wrap.dequeue()
|
203 | if (!req) { return }
|
204 | req.oncomplete(0, self, req)
|
205 | })
|
206 |
|
207 | return 0
|
208 | }
|
209 |
|
210 |
|
211 |
|
212 |
|
213 |
|
214 |
|
215 | Handle.prototype.writev = function _writev (req, chunks, allBuffers) {
|
216 | while (chunks.length > 0) {
|
217 | this._stream.write(chunks.shift(), chunks.shift())
|
218 | }
|
219 | return 0
|
220 | }
|
221 |
|
222 | Handle.prototype.writeBuffer = function writeBuffer (req, data) {
|
223 | return this.writeEnc(req, data, null)
|
224 | }
|
225 |
|
226 | Handle.prototype.writeAsciiString = function writeAsciiString (req, data) {
|
227 | return this.writeEnc(req, data, 'ascii')
|
228 | }
|
229 |
|
230 | Handle.prototype.writeUtf8String = function writeUtf8String (req, data) {
|
231 | return this.writeEnc(req, data, 'utf8')
|
232 | }
|
233 |
|
234 | Handle.prototype.writeUcs2String = function writeUcs2String (req, data) {
|
235 | return this.writeEnc(req, data, 'ucs2')
|
236 | }
|
237 |
|
238 | Handle.prototype.writeBinaryString = function writeBinaryString (req, data) {
|
239 | return this.writeEnc(req, data, 'binary')
|
240 | }
|
241 |
|
242 | Handle.prototype.writeLatin1String = function writeLatin1String (req, data) {
|
243 | return this.writeEnc(req, data, 'binary')
|
244 | }
|
245 |
|
246 |
|
247 | Handle.prototype.getsockname = function getsockname () {
|
248 | if (this._options.getPeerName) {
|
249 | return this._options.getPeerName()
|
250 | }
|
251 | return null
|
252 | }
|
253 |
|
254 | Handle.prototype.getpeername = function getpeername (out) {
|
255 | var res = this.getsockname()
|
256 | if (!res) { return -1 }
|
257 |
|
258 | Object.keys(res).forEach(function (key) {
|
259 | out[key] = res[key]
|
260 | })
|
261 |
|
262 | return 0
|
263 | }
|