UNPKG

27.2 kBJavaScriptView Raw
1'use strict';
2
3
4Readable.ReadableState = ReadableState;
5import EventEmitter from 'events';
6import {inherits, debuglog} from 'util';
7import BufferList from './buffer-list';
8import {StringDecoder} from 'string_decoder';
9import {Duplex} from './duplex';
10import {nextTick} from 'process';
11
12var debug = debuglog('stream');
13inherits(Readable, EventEmitter);
14
15function prependListener(emitter, event, fn) {
16 // Sadly this is not cacheable as some libraries bundle their own
17 // event emitter implementation with them.
18 if (typeof emitter.prependListener === 'function') {
19 return emitter.prependListener(event, fn);
20 } else {
21 // This is a hack to make sure that our error handler is attached before any
22 // userland ones. NEVER DO THIS. This is here only because this code needs
23 // to continue to work with older versions of Node.js that do not include
24 // the prependListener() method. The goal is to eventually remove this hack.
25 if (!emitter._events || !emitter._events[event])
26 emitter.on(event, fn);
27 else if (Array.isArray(emitter._events[event]))
28 emitter._events[event].unshift(fn);
29 else
30 emitter._events[event] = [fn, emitter._events[event]];
31 }
32}
33function listenerCount (emitter, type) {
34 return emitter.listeners(type).length;
35}
36function ReadableState(options, stream) {
37
38 options = options || {};
39
40 // object stream flag. Used to make read(n) ignore n and to
41 // make all the buffer merging and length checks go away
42 this.objectMode = !!options.objectMode;
43
44 if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
45
46 // the point at which it stops calling _read() to fill the buffer
47 // Note: 0 is a valid value, means "don't call _read preemptively ever"
48 var hwm = options.highWaterMark;
49 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
50 this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
51
52 // cast to ints.
53 this.highWaterMark = ~ ~this.highWaterMark;
54
55 // A linked list is used to store data chunks instead of an array because the
56 // linked list can remove elements from the beginning faster than
57 // array.shift()
58 this.buffer = new BufferList();
59 this.length = 0;
60 this.pipes = null;
61 this.pipesCount = 0;
62 this.flowing = null;
63 this.ended = false;
64 this.endEmitted = false;
65 this.reading = false;
66
67 // a flag to be able to tell if the onwrite cb is called immediately,
68 // or on a later tick. We set this to true at first, because any
69 // actions that shouldn't happen until "later" should generally also
70 // not happen before the first write call.
71 this.sync = true;
72
73 // whenever we return null, then we set a flag to say
74 // that we're awaiting a 'readable' event emission.
75 this.needReadable = false;
76 this.emittedReadable = false;
77 this.readableListening = false;
78 this.resumeScheduled = false;
79
80 // Crypto is kind of old and crusty. Historically, its default string
81 // encoding is 'binary' so we have to make this configurable.
82 // Everything else in the universe uses 'utf8', though.
83 this.defaultEncoding = options.defaultEncoding || 'utf8';
84
85 // when piping, we only care about 'readable' events that happen
86 // after read()ing all the bytes and not getting any pushback.
87 this.ranOut = false;
88
89 // the number of writers that are awaiting a drain event in .pipe()s
90 this.awaitDrain = 0;
91
92 // if true, a maybeReadMore has been scheduled
93 this.readingMore = false;
94
95 this.decoder = null;
96 this.encoding = null;
97 if (options.encoding) {
98 this.decoder = new StringDecoder(options.encoding);
99 this.encoding = options.encoding;
100 }
101}
102export default Readable;
103export function Readable(options) {
104
105 if (!(this instanceof Readable)) return new Readable(options);
106
107 this._readableState = new ReadableState(options, this);
108
109 // legacy
110 this.readable = true;
111
112 if (options && typeof options.read === 'function') this._read = options.read;
113
114 EventEmitter.call(this);
115}
116
117// Manually shove something into the read() buffer.
118// This returns true if the highWaterMark has not been hit yet,
119// similar to how Writable.write() returns true if you should
120// write() some more.
121Readable.prototype.push = function (chunk, encoding) {
122 var state = this._readableState;
123
124 if (!state.objectMode && typeof chunk === 'string') {
125 encoding = encoding || state.defaultEncoding;
126 if (encoding !== state.encoding) {
127 chunk = Buffer.from(chunk, encoding);
128 encoding = '';
129 }
130 }
131
132 return readableAddChunk(this, state, chunk, encoding, false);
133};
134
135// Unshift should *always* be something directly out of read()
136Readable.prototype.unshift = function (chunk) {
137 var state = this._readableState;
138 return readableAddChunk(this, state, chunk, '', true);
139};
140
141Readable.prototype.isPaused = function () {
142 return this._readableState.flowing === false;
143};
144
145function readableAddChunk(stream, state, chunk, encoding, addToFront) {
146 var er = chunkInvalid(state, chunk);
147 if (er) {
148 stream.emit('error', er);
149 } else if (chunk === null) {
150 state.reading = false;
151 onEofChunk(stream, state);
152 } else if (state.objectMode || chunk && chunk.length > 0) {
153 if (state.ended && !addToFront) {
154 var e = new Error('stream.push() after EOF');
155 stream.emit('error', e);
156 } else if (state.endEmitted && addToFront) {
157 var _e = new Error('stream.unshift() after end event');
158 stream.emit('error', _e);
159 } else {
160 var skipAdd;
161 if (state.decoder && !addToFront && !encoding) {
162 chunk = state.decoder.write(chunk);
163 skipAdd = !state.objectMode && chunk.length === 0;
164 }
165
166 if (!addToFront) state.reading = false;
167
168 // Don't add to the buffer if we've decoded to an empty string chunk and
169 // we're not in object mode
170 if (!skipAdd) {
171 // if we want the data now, just emit it.
172 if (state.flowing && state.length === 0 && !state.sync) {
173 stream.emit('data', chunk);
174 stream.read(0);
175 } else {
176 // update the buffer info.
177 state.length += state.objectMode ? 1 : chunk.length;
178 if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
179
180 if (state.needReadable) emitReadable(stream);
181 }
182 }
183
184 maybeReadMore(stream, state);
185 }
186 } else if (!addToFront) {
187 state.reading = false;
188 }
189
190 return needMoreData(state);
191}
192
193// if it's past the high water mark, we can push in some more.
194// Also, if we have no data yet, we can stand some
195// more bytes. This is to work around cases where hwm=0,
196// such as the repl. Also, if the push() triggered a
197// readable event, and the user called read(largeNumber) such that
198// needReadable was set, then we ought to push more, so that another
199// 'readable' event will be triggered.
200function needMoreData(state) {
201 return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
202}
203
204// backwards compatibility.
205Readable.prototype.setEncoding = function (enc) {
206 this._readableState.decoder = new StringDecoder(enc);
207 this._readableState.encoding = enc;
208 return this;
209};
210
211// Don't raise the hwm > 8MB
212var MAX_HWM = 0x800000;
213function computeNewHighWaterMark(n) {
214 if (n >= MAX_HWM) {
215 n = MAX_HWM;
216 } else {
217 // Get the next highest power of 2 to prevent increasing hwm excessively in
218 // tiny amounts
219 n--;
220 n |= n >>> 1;
221 n |= n >>> 2;
222 n |= n >>> 4;
223 n |= n >>> 8;
224 n |= n >>> 16;
225 n++;
226 }
227 return n;
228}
229
230// This function is designed to be inlinable, so please take care when making
231// changes to the function body.
232function howMuchToRead(n, state) {
233 if (n <= 0 || state.length === 0 && state.ended) return 0;
234 if (state.objectMode) return 1;
235 if (n !== n) {
236 // Only flow one buffer at a time
237 if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
238 }
239 // If we're asking for more than the current hwm, then raise the hwm.
240 if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
241 if (n <= state.length) return n;
242 // Don't have enough
243 if (!state.ended) {
244 state.needReadable = true;
245 return 0;
246 }
247 return state.length;
248}
249
250// you can override either this method, or the async _read(n) below.
251Readable.prototype.read = function (n) {
252 debug('read', n);
253 n = parseInt(n, 10);
254 var state = this._readableState;
255 var nOrig = n;
256
257 if (n !== 0) state.emittedReadable = false;
258
259 // if we're doing read(0) to trigger a readable event, but we
260 // already have a bunch of data in the buffer, then just trigger
261 // the 'readable' event and move on.
262 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
263 debug('read: emitReadable', state.length, state.ended);
264 if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
265 return null;
266 }
267
268 n = howMuchToRead(n, state);
269
270 // if we've ended, and we're now clear, then finish it up.
271 if (n === 0 && state.ended) {
272 if (state.length === 0) endReadable(this);
273 return null;
274 }
275
276 // All the actual chunk generation logic needs to be
277 // *below* the call to _read. The reason is that in certain
278 // synthetic stream cases, such as passthrough streams, _read
279 // may be a completely synchronous operation which may change
280 // the state of the read buffer, providing enough data when
281 // before there was *not* enough.
282 //
283 // So, the steps are:
284 // 1. Figure out what the state of things will be after we do
285 // a read from the buffer.
286 //
287 // 2. If that resulting state will trigger a _read, then call _read.
288 // Note that this may be asynchronous, or synchronous. Yes, it is
289 // deeply ugly to write APIs this way, but that still doesn't mean
290 // that the Readable class should behave improperly, as streams are
291 // designed to be sync/async agnostic.
292 // Take note if the _read call is sync or async (ie, if the read call
293 // has returned yet), so that we know whether or not it's safe to emit
294 // 'readable' etc.
295 //
296 // 3. Actually pull the requested chunks out of the buffer and return.
297
298 // if we need a readable event, then we need to do some reading.
299 var doRead = state.needReadable;
300 debug('need readable', doRead);
301
302 // if we currently have less than the highWaterMark, then also read some
303 if (state.length === 0 || state.length - n < state.highWaterMark) {
304 doRead = true;
305 debug('length less than watermark', doRead);
306 }
307
308 // however, if we've ended, then there's no point, and if we're already
309 // reading, then it's unnecessary.
310 if (state.ended || state.reading) {
311 doRead = false;
312 debug('reading or ended', doRead);
313 } else if (doRead) {
314 debug('do read');
315 state.reading = true;
316 state.sync = true;
317 // if the length is currently zero, then we *need* a readable event.
318 if (state.length === 0) state.needReadable = true;
319 // call internal read method
320 this._read(state.highWaterMark);
321 state.sync = false;
322 // If _read pushed data synchronously, then `reading` will be false,
323 // and we need to re-evaluate how much data we can return to the user.
324 if (!state.reading) n = howMuchToRead(nOrig, state);
325 }
326
327 var ret;
328 if (n > 0) ret = fromList(n, state);else ret = null;
329
330 if (ret === null) {
331 state.needReadable = true;
332 n = 0;
333 } else {
334 state.length -= n;
335 }
336
337 if (state.length === 0) {
338 // If we have nothing in the buffer, then we want to know
339 // as soon as we *do* get something into the buffer.
340 if (!state.ended) state.needReadable = true;
341
342 // If we tried to read() past the EOF, then emit end on the next tick.
343 if (nOrig !== n && state.ended) endReadable(this);
344 }
345
346 if (ret !== null) this.emit('data', ret);
347
348 return ret;
349};
350
351function chunkInvalid(state, chunk) {
352 var er = null;
353 if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
354 er = new TypeError('Invalid non-string/buffer chunk');
355 }
356 return er;
357}
358
359function onEofChunk(stream, state) {
360 if (state.ended) return;
361 if (state.decoder) {
362 var chunk = state.decoder.end();
363 if (chunk && chunk.length) {
364 state.buffer.push(chunk);
365 state.length += state.objectMode ? 1 : chunk.length;
366 }
367 }
368 state.ended = true;
369
370 // emit 'readable' now to make sure it gets picked up.
371 emitReadable(stream);
372}
373
374// Don't emit readable right away in sync mode, because this can trigger
375// another read() call => stack overflow. This way, it might trigger
376// a nextTick recursion warning, but that's not so bad.
377function emitReadable(stream) {
378 var state = stream._readableState;
379 state.needReadable = false;
380 if (!state.emittedReadable) {
381 debug('emitReadable', state.flowing);
382 state.emittedReadable = true;
383 if (state.sync) nextTick(emitReadable_, stream);else emitReadable_(stream);
384 }
385}
386
387function emitReadable_(stream) {
388 debug('emit readable');
389 stream.emit('readable');
390 flow(stream);
391}
392
393// at this point, the user has presumably seen the 'readable' event,
394// and called read() to consume some data. that may have triggered
395// in turn another _read(n) call, in which case reading = true if
396// it's in progress.
397// However, if we're not ended, or reading, and the length < hwm,
398// then go ahead and try to read some more preemptively.
399function maybeReadMore(stream, state) {
400 if (!state.readingMore) {
401 state.readingMore = true;
402 nextTick(maybeReadMore_, stream, state);
403 }
404}
405
406function maybeReadMore_(stream, state) {
407 var len = state.length;
408 while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
409 debug('maybeReadMore read 0');
410 stream.read(0);
411 if (len === state.length)
412 // didn't get any data, stop spinning.
413 break;else len = state.length;
414 }
415 state.readingMore = false;
416}
417
418// abstract method. to be overridden in specific implementation classes.
419// call cb(er, data) where data is <= n in length.
420// for virtual (non-string, non-buffer) streams, "length" is somewhat
421// arbitrary, and perhaps not very meaningful.
422Readable.prototype._read = function (n) {
423 this.emit('error', new Error('not implemented'));
424};
425
426Readable.prototype.pipe = function (dest, pipeOpts) {
427 var src = this;
428 var state = this._readableState;
429
430 switch (state.pipesCount) {
431 case 0:
432 state.pipes = dest;
433 break;
434 case 1:
435 state.pipes = [state.pipes, dest];
436 break;
437 default:
438 state.pipes.push(dest);
439 break;
440 }
441 state.pipesCount += 1;
442 debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
443
444 var doEnd = (!pipeOpts || pipeOpts.end !== false);
445
446 var endFn = doEnd ? onend : cleanup;
447 if (state.endEmitted) nextTick(endFn);else src.once('end', endFn);
448
449 dest.on('unpipe', onunpipe);
450 function onunpipe(readable) {
451 debug('onunpipe');
452 if (readable === src) {
453 cleanup();
454 }
455 }
456
457 function onend() {
458 debug('onend');
459 dest.end();
460 }
461
462 // when the dest drains, it reduces the awaitDrain counter
463 // on the source. This would be more elegant with a .once()
464 // handler in flow(), but adding and removing repeatedly is
465 // too slow.
466 var ondrain = pipeOnDrain(src);
467 dest.on('drain', ondrain);
468
469 var cleanedUp = false;
470 function cleanup() {
471 debug('cleanup');
472 // cleanup event handlers once the pipe is broken
473 dest.removeListener('close', onclose);
474 dest.removeListener('finish', onfinish);
475 dest.removeListener('drain', ondrain);
476 dest.removeListener('error', onerror);
477 dest.removeListener('unpipe', onunpipe);
478 src.removeListener('end', onend);
479 src.removeListener('end', cleanup);
480 src.removeListener('data', ondata);
481
482 cleanedUp = true;
483
484 // if the reader is waiting for a drain event from this
485 // specific writer, then it would cause it to never start
486 // flowing again.
487 // So, if this is awaiting a drain, then we just call it now.
488 // If we don't know, then assume that we are waiting for one.
489 if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
490 }
491
492 // If the user pushes more data while we're writing to dest then we'll end up
493 // in ondata again. However, we only want to increase awaitDrain once because
494 // dest will only emit one 'drain' event for the multiple writes.
495 // => Introduce a guard on increasing awaitDrain.
496 var increasedAwaitDrain = false;
497 src.on('data', ondata);
498 function ondata(chunk) {
499 debug('ondata');
500 increasedAwaitDrain = false;
501 var ret = dest.write(chunk);
502 if (false === ret && !increasedAwaitDrain) {
503 // If the user unpiped during `dest.write()`, it is possible
504 // to get stuck in a permanently paused state if that write
505 // also returned false.
506 // => Check whether `dest` is still a piping destination.
507 if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
508 debug('false write response, pause', src._readableState.awaitDrain);
509 src._readableState.awaitDrain++;
510 increasedAwaitDrain = true;
511 }
512 src.pause();
513 }
514 }
515
516 // if the dest has an error, then stop piping into it.
517 // however, don't suppress the throwing behavior for this.
518 function onerror(er) {
519 debug('onerror', er);
520 unpipe();
521 dest.removeListener('error', onerror);
522 if (listenerCount(dest, 'error') === 0) dest.emit('error', er);
523 }
524
525 // Make sure our error handler is attached before userland ones.
526 prependListener(dest, 'error', onerror);
527
528 // Both close and finish should trigger unpipe, but only once.
529 function onclose() {
530 dest.removeListener('finish', onfinish);
531 unpipe();
532 }
533 dest.once('close', onclose);
534 function onfinish() {
535 debug('onfinish');
536 dest.removeListener('close', onclose);
537 unpipe();
538 }
539 dest.once('finish', onfinish);
540
541 function unpipe() {
542 debug('unpipe');
543 src.unpipe(dest);
544 }
545
546 // tell the dest that it's being piped to
547 dest.emit('pipe', src);
548
549 // start the flow if it hasn't been started already.
550 if (!state.flowing) {
551 debug('pipe resume');
552 src.resume();
553 }
554
555 return dest;
556};
557
558function pipeOnDrain(src) {
559 return function () {
560 var state = src._readableState;
561 debug('pipeOnDrain', state.awaitDrain);
562 if (state.awaitDrain) state.awaitDrain--;
563 if (state.awaitDrain === 0 && src.listeners('data').length) {
564 state.flowing = true;
565 flow(src);
566 }
567 };
568}
569
570Readable.prototype.unpipe = function (dest) {
571 var state = this._readableState;
572
573 // if we're not piping anywhere, then do nothing.
574 if (state.pipesCount === 0) return this;
575
576 // just one destination. most common case.
577 if (state.pipesCount === 1) {
578 // passed in one, but it's not the right one.
579 if (dest && dest !== state.pipes) return this;
580
581 if (!dest) dest = state.pipes;
582
583 // got a match.
584 state.pipes = null;
585 state.pipesCount = 0;
586 state.flowing = false;
587 if (dest) dest.emit('unpipe', this);
588 return this;
589 }
590
591 // slow case. multiple pipe destinations.
592
593 if (!dest) {
594 // remove all.
595 var dests = state.pipes;
596 var len = state.pipesCount;
597 state.pipes = null;
598 state.pipesCount = 0;
599 state.flowing = false;
600
601 for (var _i = 0; _i < len; _i++) {
602 dests[_i].emit('unpipe', this);
603 }return this;
604 }
605
606 // try to find the right one.
607 var i = indexOf(state.pipes, dest);
608 if (i === -1) return this;
609
610 state.pipes.splice(i, 1);
611 state.pipesCount -= 1;
612 if (state.pipesCount === 1) state.pipes = state.pipes[0];
613
614 dest.emit('unpipe', this);
615
616 return this;
617};
618
619// set up data events if they are asked for
620// Ensure readable listeners eventually get something
621Readable.prototype.on = function (ev, fn) {
622 var res = EventEmitter.prototype.on.call(this, ev, fn);
623
624 if (ev === 'data') {
625 // Start flowing on next tick if stream isn't explicitly paused
626 if (this._readableState.flowing !== false) this.resume();
627 } else if (ev === 'readable') {
628 var state = this._readableState;
629 if (!state.endEmitted && !state.readableListening) {
630 state.readableListening = state.needReadable = true;
631 state.emittedReadable = false;
632 if (!state.reading) {
633 nextTick(nReadingNextTick, this);
634 } else if (state.length) {
635 emitReadable(this, state);
636 }
637 }
638 }
639
640 return res;
641};
642Readable.prototype.addListener = Readable.prototype.on;
643
644function nReadingNextTick(self) {
645 debug('readable nexttick read 0');
646 self.read(0);
647}
648
649// pause() and resume() are remnants of the legacy readable stream API
650// If the user uses them, then switch into old mode.
651Readable.prototype.resume = function () {
652 var state = this._readableState;
653 if (!state.flowing) {
654 debug('resume');
655 state.flowing = true;
656 resume(this, state);
657 }
658 return this;
659};
660
661function resume(stream, state) {
662 if (!state.resumeScheduled) {
663 state.resumeScheduled = true;
664 nextTick(resume_, stream, state);
665 }
666}
667
668function resume_(stream, state) {
669 if (!state.reading) {
670 debug('resume read 0');
671 stream.read(0);
672 }
673
674 state.resumeScheduled = false;
675 state.awaitDrain = 0;
676 stream.emit('resume');
677 flow(stream);
678 if (state.flowing && !state.reading) stream.read(0);
679}
680
681Readable.prototype.pause = function () {
682 debug('call pause flowing=%j', this._readableState.flowing);
683 if (false !== this._readableState.flowing) {
684 debug('pause');
685 this._readableState.flowing = false;
686 this.emit('pause');
687 }
688 return this;
689};
690
691function flow(stream) {
692 var state = stream._readableState;
693 debug('flow', state.flowing);
694 while (state.flowing && stream.read() !== null) {}
695}
696
697// wrap an old-style stream as the async data source.
698// This is *not* part of the readable stream interface.
699// It is an ugly unfortunate mess of history.
700Readable.prototype.wrap = function (stream) {
701 var state = this._readableState;
702 var paused = false;
703
704 var self = this;
705 stream.on('end', function () {
706 debug('wrapped end');
707 if (state.decoder && !state.ended) {
708 var chunk = state.decoder.end();
709 if (chunk && chunk.length) self.push(chunk);
710 }
711
712 self.push(null);
713 });
714
715 stream.on('data', function (chunk) {
716 debug('wrapped data');
717 if (state.decoder) chunk = state.decoder.write(chunk);
718
719 // don't skip over falsy values in objectMode
720 if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
721
722 var ret = self.push(chunk);
723 if (!ret) {
724 paused = true;
725 stream.pause();
726 }
727 });
728
729 // proxy all the other methods.
730 // important when wrapping filters and duplexes.
731 for (var i in stream) {
732 if (this[i] === undefined && typeof stream[i] === 'function') {
733 this[i] = function (method) {
734 return function () {
735 return stream[method].apply(stream, arguments);
736 };
737 }(i);
738 }
739 }
740
741 // proxy certain important events.
742 var events = ['error', 'close', 'destroy', 'pause', 'resume'];
743 forEach(events, function (ev) {
744 stream.on(ev, self.emit.bind(self, ev));
745 });
746
747 // when we try to consume some more bytes, simply unpause the
748 // underlying stream.
749 self._read = function (n) {
750 debug('wrapped _read', n);
751 if (paused) {
752 paused = false;
753 stream.resume();
754 }
755 };
756
757 return self;
758};
759
760// exposed for testing purposes only.
761Readable._fromList = fromList;
762
763// Pluck off n bytes from an array of buffers.
764// Length is the combined lengths of all the buffers in the list.
765// This function is designed to be inlinable, so please take care when making
766// changes to the function body.
767function fromList(n, state) {
768 // nothing buffered
769 if (state.length === 0) return null;
770
771 var ret;
772 if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) {
773 // read it all, truncate the list
774 if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
775 state.buffer.clear();
776 } else {
777 // read part of list
778 ret = fromListPartial(n, state.buffer, state.decoder);
779 }
780
781 return ret;
782}
783
784// Extracts only enough buffered data to satisfy the amount requested.
785// This function is designed to be inlinable, so please take care when making
786// changes to the function body.
787function fromListPartial(n, list, hasStrings) {
788 var ret;
789 if (n < list.head.data.length) {
790 // slice is the same for buffers and strings
791 ret = list.head.data.slice(0, n);
792 list.head.data = list.head.data.slice(n);
793 } else if (n === list.head.data.length) {
794 // first chunk is a perfect match
795 ret = list.shift();
796 } else {
797 // result spans more than one buffer
798 ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
799 }
800 return ret;
801}
802
803// Copies a specified amount of characters from the list of buffered data
804// chunks.
805// This function is designed to be inlinable, so please take care when making
806// changes to the function body.
807function copyFromBufferString(n, list) {
808 var p = list.head;
809 var c = 1;
810 var ret = p.data;
811 n -= ret.length;
812 while (p = p.next) {
813 var str = p.data;
814 var nb = n > str.length ? str.length : n;
815 if (nb === str.length) ret += str;else ret += str.slice(0, n);
816 n -= nb;
817 if (n === 0) {
818 if (nb === str.length) {
819 ++c;
820 if (p.next) list.head = p.next;else list.head = list.tail = null;
821 } else {
822 list.head = p;
823 p.data = str.slice(nb);
824 }
825 break;
826 }
827 ++c;
828 }
829 list.length -= c;
830 return ret;
831}
832
833// Copies a specified amount of bytes from the list of buffered data chunks.
834// This function is designed to be inlinable, so please take care when making
835// changes to the function body.
836function copyFromBuffer(n, list) {
837 var ret = Buffer.allocUnsafe(n);
838 var p = list.head;
839 var c = 1;
840 p.data.copy(ret);
841 n -= p.data.length;
842 while (p = p.next) {
843 var buf = p.data;
844 var nb = n > buf.length ? buf.length : n;
845 buf.copy(ret, ret.length - n, 0, nb);
846 n -= nb;
847 if (n === 0) {
848 if (nb === buf.length) {
849 ++c;
850 if (p.next) list.head = p.next;else list.head = list.tail = null;
851 } else {
852 list.head = p;
853 p.data = buf.slice(nb);
854 }
855 break;
856 }
857 ++c;
858 }
859 list.length -= c;
860 return ret;
861}
862
863function endReadable(stream) {
864 var state = stream._readableState;
865
866 // If we get here before consuming all the bytes, then that is a
867 // bug in node. Should never happen.
868 if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
869
870 if (!state.endEmitted) {
871 state.ended = true;
872 nextTick(endReadableNT, state, stream);
873 }
874}
875
876function endReadableNT(state, stream) {
877 // Check that we didn't get one last unshift.
878 if (!state.endEmitted && state.length === 0) {
879 state.endEmitted = true;
880 stream.readable = false;
881 stream.emit('end');
882 }
883}
884
885function forEach(xs, f) {
886 for (var i = 0, l = xs.length; i < l; i++) {
887 f(xs[i], i);
888 }
889}
890
891function indexOf(xs, x) {
892 for (var i = 0, l = xs.length; i < l; i++) {
893 if (xs[i] === x) return i;
894 }
895 return -1;
896}