UNPKG

2.72 kBJavaScriptView Raw
1'use strict'
2/*
3 * merge2
4 * https://github.com/teambition/merge2
5 *
6 * Copyright (c) 2014 Yan Qing
7 * Licensed under the MIT license.
8 */
9var Stream = require('stream')
10var PassThrough = Stream.PassThrough
11var slice = Array.prototype.slice
12
13module.exports = function merge2 () {
14 var streamsQueue = []
15 var merging = false
16 var args = slice.call(arguments)
17 var options = args[args.length - 1]
18
19 if (options && !Array.isArray(options) && options.pipe == null) args.pop()
20 else options = {}
21
22 var doEnd = options.end !== false
23 if (options.objectMode == null) options.objectMode = true
24 if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
25 var mergedStream = PassThrough(options)
26
27 function addStream () {
28 for (var i = 0, len = arguments.length; i < len; i++) {
29 streamsQueue.push(pauseStreams(arguments[i], options))
30 }
31 mergeStream()
32 return this
33 }
34
35 function mergeStream () {
36 if (merging) return
37 merging = true
38
39 var streams = streamsQueue.shift()
40 if (!streams) return endStream()
41 if (!Array.isArray(streams)) streams = [streams]
42
43 var pipesCount = streams.length + 1
44
45 function next () {
46 if (--pipesCount > 0) return
47 merging = false
48 mergeStream()
49 }
50
51 function pipe (stream) {
52 function onend () {
53 stream.removeListener('merge2UnpipeEnd', onend)
54 stream.removeListener('end', onend)
55 next()
56 }
57 // skip ended stream
58 if (stream._readableState.endEmitted) return next()
59
60 stream.on('merge2UnpipeEnd', onend)
61 stream.on('end', onend)
62 stream.pipe(mergedStream, {end: false})
63 // compatible for old stream
64 stream.resume()
65 }
66
67 for (var i = 0; i < streams.length; i++) pipe(streams[i])
68
69 next()
70 }
71
72 function endStream () {
73 merging = false
74 // emit 'queueDrain' when all streams merged.
75 mergedStream.emit('queueDrain')
76 return doEnd && mergedStream.end()
77 }
78
79 mergedStream.setMaxListeners(0)
80 mergedStream.add = addStream
81 mergedStream.on('unpipe', function (stream) {
82 stream.emit('merge2UnpipeEnd')
83 })
84
85 if (args.length) addStream.apply(null, args)
86 return mergedStream
87}
88
89// check and pause streams for pipe.
90function pauseStreams (streams, options) {
91 if (!Array.isArray(streams)) {
92 // Backwards-compat with old-style streams
93 if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
94 if (!streams._readableState || !streams.pause || !streams.pipe) {
95 throw new Error('Only readable stream can be merged.')
96 }
97 streams.pause()
98 } else {
99 for (var i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i])
100 }
101 return streams
102}