UNPKG

1.97 kBJavaScriptView Raw
1var Stream = require('stream')
2
3// through
4//
5// a stream that does nothing but re-emit the input.
6// useful for aggregating a series of changing but not ending streams into one stream)
7
8exports = module.exports = through
9through.through = through
10
11//create a readable writable stream.
12
13function through (write, end) {
14 write = write || function (data) { this.emit('data', data) }
15 end = (
16 'sync'== end || !end
17 //use sync end. (default)
18 ? function () { this.emit('end') }
19 : 'async' == end || end === true
20 //use async end.
21 //must eventually call drain if paused.
22 //else will not end.
23 ? function () {
24 if(!this.paused)
25 return this.emit('end')
26 var self = this
27 this.once('drain', function () {
28 self.emit('end')
29 })
30 }
31 //use custom end function
32 : end
33 )
34 var ended = false, destroyed = false
35 var stream = new Stream()
36 stream.readable = stream.writable = true
37 stream.paused = false
38 stream.write = function (data) {
39 write.call(this, data)
40 return !stream.paused
41 }
42 //this will be registered as the first 'end' listener
43 //must call destroy next tick, to make sure we're after any
44 //stream piped from here.
45 stream.on('end', function () {
46 stream.readable = false
47 if(!stream.writable)
48 process.nextTick(function () {
49 stream.destroy()
50 })
51 })
52
53 stream.end = function (data) {
54 if(ended) throw new Error('cannot call end twice')
55 ended = true
56 if(arguments.length) stream.write(data)
57 this.writable = false
58 end.call(this)
59 if(!this.readable)
60 this.destroy()
61 }
62 stream.destroy = function () {
63 if(destroyed) return
64 destroyed = true
65 ended = true
66 stream.writable = stream.readable = false
67 stream.emit('close')
68 }
69 stream.pause = function () {
70 stream.paused = true
71 }
72 stream.resume = function () {
73 if(stream.paused) {
74 stream.paused = false
75 stream.emit('drain')
76 }
77 }
78 return stream
79}
80