1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 | 'use strict';
|
25 |
|
26 | module.exports = Writable;
|
27 |
|
28 |
|
29 | function WriteReq(chunk, encoding, cb) {
|
30 | this.chunk = chunk;
|
31 | this.encoding = encoding;
|
32 | this.callback = cb;
|
33 | this.next = null;
|
34 | }
|
35 |
|
36 |
|
37 |
|
38 | function CorkedRequest(state) {
|
39 | var _this = this;
|
40 |
|
41 | this.next = null;
|
42 | this.entry = null;
|
43 |
|
44 | this.finish = function () {
|
45 | onCorkedFinish(_this, state);
|
46 | };
|
47 | }
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 | var Duplex;
|
54 |
|
55 |
|
56 | Writable.WritableState = WritableState;
|
57 |
|
58 |
|
59 | var internalUtil = {
|
60 | deprecate: require('util-deprecate')
|
61 | };
|
62 |
|
63 |
|
64 |
|
65 |
|
66 | var Stream = require('./internal/streams/stream');
|
67 |
|
68 |
|
69 |
|
70 | var Buffer = require('buffer').Buffer;
|
71 |
|
72 | var OurUint8Array = global.Uint8Array || function () {};
|
73 |
|
74 | function _uint8ArrayToBuffer(chunk) {
|
75 | return Buffer.from(chunk);
|
76 | }
|
77 |
|
78 | function _isUint8Array(obj) {
|
79 | return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
|
80 | }
|
81 |
|
82 | var destroyImpl = require('./internal/streams/destroy');
|
83 |
|
84 | var _require = require('./internal/streams/state'),
|
85 | getHighWaterMark = _require.getHighWaterMark;
|
86 |
|
87 | var _require$codes = require('../errors').codes,
|
88 | ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
|
89 | ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
|
90 | ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
|
91 | ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE,
|
92 | ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED,
|
93 | ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES,
|
94 | ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
|
95 | ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;
|
96 |
|
97 | var errorOrDestroy = destroyImpl.errorOrDestroy;
|
98 |
|
99 | require('inherits')(Writable, Stream);
|
100 |
|
101 | function nop() {}
|
102 |
|
103 | function WritableState(options, stream, isDuplex) {
|
104 | Duplex = Duplex || require('./_stream_duplex');
|
105 | options = options || {};
|
106 |
|
107 |
|
108 |
|
109 |
|
110 |
|
111 | if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex;
|
112 |
|
113 |
|
114 | this.objectMode = !!options.objectMode;
|
115 | if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
|
116 |
|
117 |
|
118 |
|
119 | this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex);
|
120 |
|
121 | this.finalCalled = false;
|
122 |
|
123 | this.needDrain = false;
|
124 |
|
125 | this.ending = false;
|
126 |
|
127 | this.ended = false;
|
128 |
|
129 | this.finished = false;
|
130 |
|
131 | this.destroyed = false;
|
132 |
|
133 |
|
134 |
|
135 | var noDecode = options.decodeStrings === false;
|
136 | this.decodeStrings = !noDecode;
|
137 |
|
138 |
|
139 |
|
140 | this.defaultEncoding = options.defaultEncoding || 'utf8';
|
141 |
|
142 |
|
143 |
|
144 | this.length = 0;
|
145 |
|
146 | this.writing = false;
|
147 |
|
148 | this.corked = 0;
|
149 |
|
150 |
|
151 |
|
152 |
|
153 | this.sync = true;
|
154 |
|
155 |
|
156 |
|
157 | this.bufferProcessing = false;
|
158 |
|
159 | this.onwrite = function (er) {
|
160 | onwrite(stream, er);
|
161 | };
|
162 |
|
163 |
|
164 | this.writecb = null;
|
165 |
|
166 | this.writelen = 0;
|
167 | this.bufferedRequest = null;
|
168 | this.lastBufferedRequest = null;
|
169 |
|
170 |
|
171 | this.pendingcb = 0;
|
172 |
|
173 |
|
174 | this.prefinished = false;
|
175 |
|
176 | this.errorEmitted = false;
|
177 |
|
178 | this.emitClose = options.emitClose !== false;
|
179 |
|
180 | this.autoDestroy = !!options.autoDestroy;
|
181 |
|
182 | this.bufferedRequestCount = 0;
|
183 |
|
184 |
|
185 | this.corkedRequestsFree = new CorkedRequest(this);
|
186 | }
|
187 |
|
188 | WritableState.prototype.getBuffer = function getBuffer() {
|
189 | var current = this.bufferedRequest;
|
190 | var out = [];
|
191 |
|
192 | while (current) {
|
193 | out.push(current);
|
194 | current = current.next;
|
195 | }
|
196 |
|
197 | return out;
|
198 | };
|
199 |
|
200 | (function () {
|
201 | try {
|
202 | Object.defineProperty(WritableState.prototype, 'buffer', {
|
203 | get: internalUtil.deprecate(function writableStateBufferGetter() {
|
204 | return this.getBuffer();
|
205 | }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003')
|
206 | });
|
207 | } catch (_) {}
|
208 | })();
|
209 |
|
210 |
|
211 |
|
212 | var realHasInstance;
|
213 |
|
214 | if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
|
215 | realHasInstance = Function.prototype[Symbol.hasInstance];
|
216 | Object.defineProperty(Writable, Symbol.hasInstance, {
|
217 | value: function value(object) {
|
218 | if (realHasInstance.call(this, object)) return true;
|
219 | if (this !== Writable) return false;
|
220 | return object && object._writableState instanceof WritableState;
|
221 | }
|
222 | });
|
223 | } else {
|
224 | realHasInstance = function realHasInstance(object) {
|
225 | return object instanceof this;
|
226 | };
|
227 | }
|
228 |
|
229 | function Writable(options) {
|
230 | Duplex = Duplex || require('./_stream_duplex');
|
231 |
|
232 |
|
233 |
|
234 |
|
235 |
|
236 |
|
237 |
|
238 |
|
239 | var isDuplex = this instanceof Duplex;
|
240 | if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options);
|
241 | this._writableState = new WritableState(options, this, isDuplex);
|
242 |
|
243 | this.writable = true;
|
244 |
|
245 | if (options) {
|
246 | if (typeof options.write === 'function') this._write = options.write;
|
247 | if (typeof options.writev === 'function') this._writev = options.writev;
|
248 | if (typeof options.destroy === 'function') this._destroy = options.destroy;
|
249 | if (typeof options.final === 'function') this._final = options.final;
|
250 | }
|
251 |
|
252 | Stream.call(this);
|
253 | }
|
254 |
|
255 |
|
256 | Writable.prototype.pipe = function () {
|
257 | errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
|
258 | };
|
259 |
|
260 | function writeAfterEnd(stream, cb) {
|
261 | var er = new ERR_STREAM_WRITE_AFTER_END();
|
262 |
|
263 | errorOrDestroy(stream, er);
|
264 | process.nextTick(cb, er);
|
265 | }
|
266 |
|
267 |
|
268 |
|
269 |
|
270 | function validChunk(stream, state, chunk, cb) {
|
271 | var er;
|
272 |
|
273 | if (chunk === null) {
|
274 | er = new ERR_STREAM_NULL_VALUES();
|
275 | } else if (typeof chunk !== 'string' && !state.objectMode) {
|
276 | er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
|
277 | }
|
278 |
|
279 | if (er) {
|
280 | errorOrDestroy(stream, er);
|
281 | process.nextTick(cb, er);
|
282 | return false;
|
283 | }
|
284 |
|
285 | return true;
|
286 | }
|
287 |
|
288 | Writable.prototype.write = function (chunk, encoding, cb) {
|
289 | var state = this._writableState;
|
290 | var ret = false;
|
291 |
|
292 | var isBuf = !state.objectMode && _isUint8Array(chunk);
|
293 |
|
294 | if (isBuf && !Buffer.isBuffer(chunk)) {
|
295 | chunk = _uint8ArrayToBuffer(chunk);
|
296 | }
|
297 |
|
298 | if (typeof encoding === 'function') {
|
299 | cb = encoding;
|
300 | encoding = null;
|
301 | }
|
302 |
|
303 | if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
|
304 | if (typeof cb !== 'function') cb = nop;
|
305 | if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
|
306 | state.pendingcb++;
|
307 | ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
|
308 | }
|
309 | return ret;
|
310 | };
|
311 |
|
312 | Writable.prototype.cork = function () {
|
313 | this._writableState.corked++;
|
314 | };
|
315 |
|
316 | Writable.prototype.uncork = function () {
|
317 | var state = this._writableState;
|
318 |
|
319 | if (state.corked) {
|
320 | state.corked--;
|
321 | if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
|
322 | }
|
323 | };
|
324 |
|
325 | Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
|
326 |
|
327 | if (typeof encoding === 'string') encoding = encoding.toLowerCase();
|
328 | if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding);
|
329 | this._writableState.defaultEncoding = encoding;
|
330 | return this;
|
331 | };
|
332 |
|
333 | Object.defineProperty(Writable.prototype, 'writableBuffer', {
|
334 |
|
335 |
|
336 |
|
337 | enumerable: false,
|
338 | get: function get() {
|
339 | return this._writableState && this._writableState.getBuffer();
|
340 | }
|
341 | });
|
342 |
|
343 | function decodeChunk(state, chunk, encoding) {
|
344 | if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
|
345 | chunk = Buffer.from(chunk, encoding);
|
346 | }
|
347 |
|
348 | return chunk;
|
349 | }
|
350 |
|
351 | Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
|
352 |
|
353 |
|
354 |
|
355 | enumerable: false,
|
356 | get: function get() {
|
357 | return this._writableState.highWaterMark;
|
358 | }
|
359 | });
|
360 |
|
361 |
|
362 |
|
363 | function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
|
364 | if (!isBuf) {
|
365 | var newChunk = decodeChunk(state, chunk, encoding);
|
366 |
|
367 | if (chunk !== newChunk) {
|
368 | isBuf = true;
|
369 | encoding = 'buffer';
|
370 | chunk = newChunk;
|
371 | }
|
372 | }
|
373 |
|
374 | var len = state.objectMode ? 1 : chunk.length;
|
375 | state.length += len;
|
376 | var ret = state.length < state.highWaterMark;
|
377 |
|
378 | if (!ret) state.needDrain = true;
|
379 |
|
380 | if (state.writing || state.corked) {
|
381 | var last = state.lastBufferedRequest;
|
382 | state.lastBufferedRequest = {
|
383 | chunk: chunk,
|
384 | encoding: encoding,
|
385 | isBuf: isBuf,
|
386 | callback: cb,
|
387 | next: null
|
388 | };
|
389 |
|
390 | if (last) {
|
391 | last.next = state.lastBufferedRequest;
|
392 | } else {
|
393 | state.bufferedRequest = state.lastBufferedRequest;
|
394 | }
|
395 |
|
396 | state.bufferedRequestCount += 1;
|
397 | } else {
|
398 | doWrite(stream, state, false, len, chunk, encoding, cb);
|
399 | }
|
400 |
|
401 | return ret;
|
402 | }
|
403 |
|
404 | function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
405 | state.writelen = len;
|
406 | state.writecb = cb;
|
407 | state.writing = true;
|
408 | state.sync = true;
|
409 | if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
|
410 | state.sync = false;
|
411 | }
|
412 |
|
413 | function onwriteError(stream, state, sync, er, cb) {
|
414 | --state.pendingcb;
|
415 |
|
416 | if (sync) {
|
417 |
|
418 |
|
419 | process.nextTick(cb, er);
|
420 |
|
421 |
|
422 | process.nextTick(finishMaybe, stream, state);
|
423 | stream._writableState.errorEmitted = true;
|
424 | errorOrDestroy(stream, er);
|
425 | } else {
|
426 |
|
427 |
|
428 | cb(er);
|
429 | stream._writableState.errorEmitted = true;
|
430 | errorOrDestroy(stream, er);
|
431 |
|
432 |
|
433 | finishMaybe(stream, state);
|
434 | }
|
435 | }
|
436 |
|
437 | function onwriteStateUpdate(state) {
|
438 | state.writing = false;
|
439 | state.writecb = null;
|
440 | state.length -= state.writelen;
|
441 | state.writelen = 0;
|
442 | }
|
443 |
|
444 | function onwrite(stream, er) {
|
445 | var state = stream._writableState;
|
446 | var sync = state.sync;
|
447 | var cb = state.writecb;
|
448 | if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
|
449 | onwriteStateUpdate(state);
|
450 | if (er) onwriteError(stream, state, sync, er, cb);else {
|
451 |
|
452 | var finished = needFinish(state) || stream.destroyed;
|
453 |
|
454 | if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
|
455 | clearBuffer(stream, state);
|
456 | }
|
457 |
|
458 | if (sync) {
|
459 | process.nextTick(afterWrite, stream, state, finished, cb);
|
460 | } else {
|
461 | afterWrite(stream, state, finished, cb);
|
462 | }
|
463 | }
|
464 | }
|
465 |
|
466 | function afterWrite(stream, state, finished, cb) {
|
467 | if (!finished) onwriteDrain(stream, state);
|
468 | state.pendingcb--;
|
469 | cb();
|
470 | finishMaybe(stream, state);
|
471 | }
|
472 |
|
473 |
|
474 |
|
475 |
|
476 | function onwriteDrain(stream, state) {
|
477 | if (state.length === 0 && state.needDrain) {
|
478 | state.needDrain = false;
|
479 | stream.emit('drain');
|
480 | }
|
481 | }
|
482 |
|
483 |
|
484 | function clearBuffer(stream, state) {
|
485 | state.bufferProcessing = true;
|
486 | var entry = state.bufferedRequest;
|
487 |
|
488 | if (stream._writev && entry && entry.next) {
|
489 |
|
490 | var l = state.bufferedRequestCount;
|
491 | var buffer = new Array(l);
|
492 | var holder = state.corkedRequestsFree;
|
493 | holder.entry = entry;
|
494 | var count = 0;
|
495 | var allBuffers = true;
|
496 |
|
497 | while (entry) {
|
498 | buffer[count] = entry;
|
499 | if (!entry.isBuf) allBuffers = false;
|
500 | entry = entry.next;
|
501 | count += 1;
|
502 | }
|
503 |
|
504 | buffer.allBuffers = allBuffers;
|
505 | doWrite(stream, state, true, state.length, buffer, '', holder.finish);
|
506 |
|
507 |
|
508 | state.pendingcb++;
|
509 | state.lastBufferedRequest = null;
|
510 |
|
511 | if (holder.next) {
|
512 | state.corkedRequestsFree = holder.next;
|
513 | holder.next = null;
|
514 | } else {
|
515 | state.corkedRequestsFree = new CorkedRequest(state);
|
516 | }
|
517 |
|
518 | state.bufferedRequestCount = 0;
|
519 | } else {
|
520 |
|
521 | while (entry) {
|
522 | var chunk = entry.chunk;
|
523 | var encoding = entry.encoding;
|
524 | var cb = entry.callback;
|
525 | var len = state.objectMode ? 1 : chunk.length;
|
526 | doWrite(stream, state, false, len, chunk, encoding, cb);
|
527 | entry = entry.next;
|
528 | state.bufferedRequestCount--;
|
529 |
|
530 |
|
531 |
|
532 |
|
533 | if (state.writing) {
|
534 | break;
|
535 | }
|
536 | }
|
537 |
|
538 | if (entry === null) state.lastBufferedRequest = null;
|
539 | }
|
540 |
|
541 | state.bufferedRequest = entry;
|
542 | state.bufferProcessing = false;
|
543 | }
|
544 |
|
545 | Writable.prototype._write = function (chunk, encoding, cb) {
|
546 | cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
|
547 | };
|
548 |
|
549 | Writable.prototype._writev = null;
|
550 |
|
551 | Writable.prototype.end = function (chunk, encoding, cb) {
|
552 | var state = this._writableState;
|
553 |
|
554 | if (typeof chunk === 'function') {
|
555 | cb = chunk;
|
556 | chunk = null;
|
557 | encoding = null;
|
558 | } else if (typeof encoding === 'function') {
|
559 | cb = encoding;
|
560 | encoding = null;
|
561 | }
|
562 |
|
563 | if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
|
564 |
|
565 | if (state.corked) {
|
566 | state.corked = 1;
|
567 | this.uncork();
|
568 | }
|
569 |
|
570 |
|
571 | if (!state.ending) endWritable(this, state, cb);
|
572 | return this;
|
573 | };
|
574 |
|
575 | Object.defineProperty(Writable.prototype, 'writableLength', {
|
576 |
|
577 |
|
578 |
|
579 | enumerable: false,
|
580 | get: function get() {
|
581 | return this._writableState.length;
|
582 | }
|
583 | });
|
584 |
|
585 | function needFinish(state) {
|
586 | return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
|
587 | }
|
588 |
|
589 | function callFinal(stream, state) {
|
590 | stream._final(function (err) {
|
591 | state.pendingcb--;
|
592 |
|
593 | if (err) {
|
594 | errorOrDestroy(stream, err);
|
595 | }
|
596 |
|
597 | state.prefinished = true;
|
598 | stream.emit('prefinish');
|
599 | finishMaybe(stream, state);
|
600 | });
|
601 | }
|
602 |
|
603 | function prefinish(stream, state) {
|
604 | if (!state.prefinished && !state.finalCalled) {
|
605 | if (typeof stream._final === 'function' && !state.destroyed) {
|
606 | state.pendingcb++;
|
607 | state.finalCalled = true;
|
608 | process.nextTick(callFinal, stream, state);
|
609 | } else {
|
610 | state.prefinished = true;
|
611 | stream.emit('prefinish');
|
612 | }
|
613 | }
|
614 | }
|
615 |
|
616 | function finishMaybe(stream, state) {
|
617 | var need = needFinish(state);
|
618 |
|
619 | if (need) {
|
620 | prefinish(stream, state);
|
621 |
|
622 | if (state.pendingcb === 0) {
|
623 | state.finished = true;
|
624 | stream.emit('finish');
|
625 |
|
626 | if (state.autoDestroy) {
|
627 |
|
628 |
|
629 | var rState = stream._readableState;
|
630 |
|
631 | if (!rState || rState.autoDestroy && rState.endEmitted) {
|
632 | stream.destroy();
|
633 | }
|
634 | }
|
635 | }
|
636 | }
|
637 |
|
638 | return need;
|
639 | }
|
640 |
|
641 | function endWritable(stream, state, cb) {
|
642 | state.ending = true;
|
643 | finishMaybe(stream, state);
|
644 |
|
645 | if (cb) {
|
646 | if (state.finished) process.nextTick(cb);else stream.once('finish', cb);
|
647 | }
|
648 |
|
649 | state.ended = true;
|
650 | stream.writable = false;
|
651 | }
|
652 |
|
653 | function onCorkedFinish(corkReq, state, err) {
|
654 | var entry = corkReq.entry;
|
655 | corkReq.entry = null;
|
656 |
|
657 | while (entry) {
|
658 | var cb = entry.callback;
|
659 | state.pendingcb--;
|
660 | cb(err);
|
661 | entry = entry.next;
|
662 | }
|
663 |
|
664 |
|
665 | state.corkedRequestsFree.next = corkReq;
|
666 | }
|
667 |
|
668 | Object.defineProperty(Writable.prototype, 'destroyed', {
|
669 |
|
670 |
|
671 |
|
672 | enumerable: false,
|
673 | get: function get() {
|
674 | if (this._writableState === undefined) {
|
675 | return false;
|
676 | }
|
677 |
|
678 | return this._writableState.destroyed;
|
679 | },
|
680 | set: function set(value) {
|
681 |
|
682 |
|
683 | if (!this._writableState) {
|
684 | return;
|
685 | }
|
686 |
|
687 |
|
688 |
|
689 | this._writableState.destroyed = value;
|
690 | }
|
691 | });
|
692 | Writable.prototype.destroy = destroyImpl.destroy;
|
693 | Writable.prototype._undestroy = destroyImpl.undestroy;
|
694 |
|
695 | Writable.prototype._destroy = function (err, cb) {
|
696 | cb(err);
|
697 | }; |
\ | No newline at end of file |