1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 | import {inherits, deprecate} from 'util';
|
7 | import {Buffer} from 'buffer';
|
8 | Writable.WritableState = WritableState;
|
9 | import {EventEmitter} from 'events';
|
10 | import {Duplex} from './duplex';
|
11 | import {nextTick} from 'process';
|
12 | inherits(Writable, EventEmitter);
|
13 |
|
14 | function nop() {}
|
15 |
|
16 | function WriteReq(chunk, encoding, cb) {
|
17 | this.chunk = chunk;
|
18 | this.encoding = encoding;
|
19 | this.callback = cb;
|
20 | this.next = null;
|
21 | }
|
22 |
|
23 | function WritableState(options, stream) {
|
24 | Object.defineProperty(this, 'buffer', {
|
25 | get: deprecate(function () {
|
26 | return this.getBuffer();
|
27 | }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
|
28 | });
|
29 | options = options || {};
|
30 |
|
31 |
|
32 |
|
33 | this.objectMode = !!options.objectMode;
|
34 |
|
35 | if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | var hwm = options.highWaterMark;
|
41 | var defaultHwm = this.objectMode ? 16 : 16 * 1024;
|
42 | this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
|
43 |
|
44 |
|
45 | this.highWaterMark = ~ ~this.highWaterMark;
|
46 |
|
47 | this.needDrain = false;
|
48 |
|
49 | this.ending = false;
|
50 |
|
51 | this.ended = false;
|
52 |
|
53 | this.finished = false;
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | var noDecode = options.decodeStrings === false;
|
59 | this.decodeStrings = !noDecode;
|
60 |
|
61 |
|
62 |
|
63 |
|
64 | this.defaultEncoding = options.defaultEncoding || 'utf8';
|
65 |
|
66 |
|
67 |
|
68 |
|
69 | this.length = 0;
|
70 |
|
71 |
|
72 | this.writing = false;
|
73 |
|
74 |
|
75 | this.corked = 0;
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 | this.sync = true;
|
82 |
|
83 |
|
84 |
|
85 |
|
86 | this.bufferProcessing = false;
|
87 |
|
88 |
|
89 | this.onwrite = function (er) {
|
90 | onwrite(stream, er);
|
91 | };
|
92 |
|
93 |
|
94 | this.writecb = null;
|
95 |
|
96 |
|
97 | this.writelen = 0;
|
98 |
|
99 | this.bufferedRequest = null;
|
100 | this.lastBufferedRequest = null;
|
101 |
|
102 |
|
103 |
|
104 | this.pendingcb = 0;
|
105 |
|
106 |
|
107 |
|
108 | this.prefinished = false;
|
109 |
|
110 |
|
111 | this.errorEmitted = false;
|
112 |
|
113 |
|
114 | this.bufferedRequestCount = 0;
|
115 |
|
116 |
|
117 |
|
118 | this.corkedRequestsFree = new CorkedRequest(this);
|
119 | }
|
120 |
|
121 | WritableState.prototype.getBuffer = function writableStateGetBuffer() {
|
122 | var current = this.bufferedRequest;
|
123 | var out = [];
|
124 | while (current) {
|
125 | out.push(current);
|
126 | current = current.next;
|
127 | }
|
128 | return out;
|
129 | };
|
130 |
|
131 | export default Writable;
|
132 | export function Writable(options) {
|
133 |
|
134 |
|
135 |
|
136 | if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
|
137 |
|
138 | this._writableState = new WritableState(options, this);
|
139 |
|
140 |
|
141 | this.writable = true;
|
142 |
|
143 | if (options) {
|
144 | if (typeof options.write === 'function') this._write = options.write;
|
145 |
|
146 | if (typeof options.writev === 'function') this._writev = options.writev;
|
147 | }
|
148 |
|
149 | EventEmitter.call(this);
|
150 | }
|
151 |
|
152 |
|
153 | Writable.prototype.pipe = function () {
|
154 | this.emit('error', new Error('Cannot pipe, not readable'));
|
155 | };
|
156 |
|
157 | function writeAfterEnd(stream, cb) {
|
158 | var er = new Error('write after end');
|
159 |
|
160 | stream.emit('error', er);
|
161 | nextTick(cb, er);
|
162 | }
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 |
|
169 | function validChunk(stream, state, chunk, cb) {
|
170 | var valid = true;
|
171 | var er = false;
|
172 |
|
173 |
|
174 |
|
175 | if (chunk === null) {
|
176 | er = new TypeError('May not write null values to stream');
|
177 | } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
|
178 | er = new TypeError('Invalid non-string/buffer chunk');
|
179 | }
|
180 | if (er) {
|
181 | stream.emit('error', er);
|
182 | nextTick(cb, er);
|
183 | valid = false;
|
184 | }
|
185 | return valid;
|
186 | }
|
187 |
|
188 | Writable.prototype.write = function (chunk, encoding, cb) {
|
189 | var state = this._writableState;
|
190 | var ret = false;
|
191 |
|
192 | if (typeof encoding === 'function') {
|
193 | cb = encoding;
|
194 | encoding = null;
|
195 | }
|
196 |
|
197 | if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
|
198 |
|
199 | if (typeof cb !== 'function') cb = nop;
|
200 |
|
201 | if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
|
202 | state.pendingcb++;
|
203 | ret = writeOrBuffer(this, state, chunk, encoding, cb);
|
204 | }
|
205 |
|
206 | return ret;
|
207 | };
|
208 |
|
209 | Writable.prototype.cork = function () {
|
210 | var state = this._writableState;
|
211 |
|
212 | state.corked++;
|
213 | };
|
214 |
|
215 | Writable.prototype.uncork = function () {
|
216 | var state = this._writableState;
|
217 |
|
218 | if (state.corked) {
|
219 | state.corked--;
|
220 |
|
221 | if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
|
222 | }
|
223 | };
|
224 |
|
225 | Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
|
226 |
|
227 | if (typeof encoding === 'string') encoding = encoding.toLowerCase();
|
228 | if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
|
229 | this._writableState.defaultEncoding = encoding;
|
230 | return this;
|
231 | };
|
232 |
|
233 | function decodeChunk(state, chunk, encoding) {
|
234 | if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
|
235 | chunk = Buffer.from(chunk, encoding);
|
236 | }
|
237 | return chunk;
|
238 | }
|
239 |
|
240 |
|
241 |
|
242 |
|
243 | function writeOrBuffer(stream, state, chunk, encoding, cb) {
|
244 | chunk = decodeChunk(state, chunk, encoding);
|
245 |
|
246 | if (Buffer.isBuffer(chunk)) encoding = 'buffer';
|
247 | var len = state.objectMode ? 1 : chunk.length;
|
248 |
|
249 | state.length += len;
|
250 |
|
251 | var ret = state.length < state.highWaterMark;
|
252 |
|
253 | if (!ret) state.needDrain = true;
|
254 |
|
255 | if (state.writing || state.corked) {
|
256 | var last = state.lastBufferedRequest;
|
257 | state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
|
258 | if (last) {
|
259 | last.next = state.lastBufferedRequest;
|
260 | } else {
|
261 | state.bufferedRequest = state.lastBufferedRequest;
|
262 | }
|
263 | state.bufferedRequestCount += 1;
|
264 | } else {
|
265 | doWrite(stream, state, false, len, chunk, encoding, cb);
|
266 | }
|
267 |
|
268 | return ret;
|
269 | }
|
270 |
|
271 | function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
272 | state.writelen = len;
|
273 | state.writecb = cb;
|
274 | state.writing = true;
|
275 | state.sync = true;
|
276 | if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
|
277 | state.sync = false;
|
278 | }
|
279 |
|
280 | function onwriteError(stream, state, sync, er, cb) {
|
281 | --state.pendingcb;
|
282 | if (sync) nextTick(cb, er);else cb(er);
|
283 |
|
284 | stream._writableState.errorEmitted = true;
|
285 | stream.emit('error', er);
|
286 | }
|
287 |
|
288 | function onwriteStateUpdate(state) {
|
289 | state.writing = false;
|
290 | state.writecb = null;
|
291 | state.length -= state.writelen;
|
292 | state.writelen = 0;
|
293 | }
|
294 |
|
295 | function onwrite(stream, er) {
|
296 | var state = stream._writableState;
|
297 | var sync = state.sync;
|
298 | var cb = state.writecb;
|
299 |
|
300 | onwriteStateUpdate(state);
|
301 |
|
302 | if (er) onwriteError(stream, state, sync, er, cb);else {
|
303 |
|
304 | var finished = needFinish(state);
|
305 |
|
306 | if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
|
307 | clearBuffer(stream, state);
|
308 | }
|
309 |
|
310 | if (sync) {
|
311 |
|
312 | nextTick(afterWrite, stream, state, finished, cb);
|
313 |
|
314 | } else {
|
315 | afterWrite(stream, state, finished, cb);
|
316 | }
|
317 | }
|
318 | }
|
319 |
|
320 | function afterWrite(stream, state, finished, cb) {
|
321 | if (!finished) onwriteDrain(stream, state);
|
322 | state.pendingcb--;
|
323 | cb();
|
324 | finishMaybe(stream, state);
|
325 | }
|
326 |
|
327 |
|
328 |
|
329 |
|
330 | function onwriteDrain(stream, state) {
|
331 | if (state.length === 0 && state.needDrain) {
|
332 | state.needDrain = false;
|
333 | stream.emit('drain');
|
334 | }
|
335 | }
|
336 |
|
337 |
|
338 | function clearBuffer(stream, state) {
|
339 | state.bufferProcessing = true;
|
340 | var entry = state.bufferedRequest;
|
341 |
|
342 | if (stream._writev && entry && entry.next) {
|
343 |
|
344 | var l = state.bufferedRequestCount;
|
345 | var buffer = new Array(l);
|
346 | var holder = state.corkedRequestsFree;
|
347 | holder.entry = entry;
|
348 |
|
349 | var count = 0;
|
350 | while (entry) {
|
351 | buffer[count] = entry;
|
352 | entry = entry.next;
|
353 | count += 1;
|
354 | }
|
355 |
|
356 | doWrite(stream, state, true, state.length, buffer, '', holder.finish);
|
357 |
|
358 |
|
359 |
|
360 | state.pendingcb++;
|
361 | state.lastBufferedRequest = null;
|
362 | if (holder.next) {
|
363 | state.corkedRequestsFree = holder.next;
|
364 | holder.next = null;
|
365 | } else {
|
366 | state.corkedRequestsFree = new CorkedRequest(state);
|
367 | }
|
368 | } else {
|
369 |
|
370 | while (entry) {
|
371 | var chunk = entry.chunk;
|
372 | var encoding = entry.encoding;
|
373 | var cb = entry.callback;
|
374 | var len = state.objectMode ? 1 : chunk.length;
|
375 |
|
376 | doWrite(stream, state, false, len, chunk, encoding, cb);
|
377 | entry = entry.next;
|
378 |
|
379 |
|
380 |
|
381 |
|
382 | if (state.writing) {
|
383 | break;
|
384 | }
|
385 | }
|
386 |
|
387 | if (entry === null) state.lastBufferedRequest = null;
|
388 | }
|
389 |
|
390 | state.bufferedRequestCount = 0;
|
391 | state.bufferedRequest = entry;
|
392 | state.bufferProcessing = false;
|
393 | }
|
394 |
|
395 | Writable.prototype._write = function (chunk, encoding, cb) {
|
396 | cb(new Error('not implemented'));
|
397 | };
|
398 |
|
399 | Writable.prototype._writev = null;
|
400 |
|
401 | Writable.prototype.end = function (chunk, encoding, cb) {
|
402 | var state = this._writableState;
|
403 |
|
404 | if (typeof chunk === 'function') {
|
405 | cb = chunk;
|
406 | chunk = null;
|
407 | encoding = null;
|
408 | } else if (typeof encoding === 'function') {
|
409 | cb = encoding;
|
410 | encoding = null;
|
411 | }
|
412 |
|
413 | if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
|
414 |
|
415 |
|
416 | if (state.corked) {
|
417 | state.corked = 1;
|
418 | this.uncork();
|
419 | }
|
420 |
|
421 |
|
422 | if (!state.ending && !state.finished) endWritable(this, state, cb);
|
423 | };
|
424 |
|
425 | function needFinish(state) {
|
426 | return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
|
427 | }
|
428 |
|
429 | function prefinish(stream, state) {
|
430 | if (!state.prefinished) {
|
431 | state.prefinished = true;
|
432 | stream.emit('prefinish');
|
433 | }
|
434 | }
|
435 |
|
436 | function finishMaybe(stream, state) {
|
437 | var need = needFinish(state);
|
438 | if (need) {
|
439 | if (state.pendingcb === 0) {
|
440 | prefinish(stream, state);
|
441 | state.finished = true;
|
442 | stream.emit('finish');
|
443 | } else {
|
444 | prefinish(stream, state);
|
445 | }
|
446 | }
|
447 | return need;
|
448 | }
|
449 |
|
450 | function endWritable(stream, state, cb) {
|
451 | state.ending = true;
|
452 | finishMaybe(stream, state);
|
453 | if (cb) {
|
454 | if (state.finished) nextTick(cb);else stream.once('finish', cb);
|
455 | }
|
456 | state.ended = true;
|
457 | stream.writable = false;
|
458 | }
|
459 |
|
460 |
|
461 |
|
462 | function CorkedRequest(state) {
|
463 | var _this = this;
|
464 |
|
465 | this.next = null;
|
466 | this.entry = null;
|
467 |
|
468 | this.finish = function (err) {
|
469 | var entry = _this.entry;
|
470 | _this.entry = null;
|
471 | while (entry) {
|
472 | var cb = entry.callback;
|
473 | state.pendingcb--;
|
474 | cb(err);
|
475 | entry = entry.next;
|
476 | }
|
477 | if (state.corkedRequestsFree) {
|
478 | state.corkedRequestsFree.next = _this;
|
479 | } else {
|
480 | state.corkedRequestsFree = _this;
|
481 | }
|
482 | };
|
483 | }
|