UNPKG

4.29 kBtext/coffeescriptView Raw
1Stream = require("stream").Stream
2disposable = require("disposable")
3
4
5
6module.exports = class Reader extends Stream
7
8 ###
9 ###
10
11 constructor: (@source) ->
12 super()
13
14 # we do not want any warnings from node. Fucking obnoxious,
15 # useless code.
16 @setMaxListeners(0)
17
18 @_listen()
19
20
21 ###
22 needs to be overridable incase there's more stuff to listen to (headers)
23 ###
24
25 _listenTo: () -> ["data","end","error"]
26
27 ###
28 ###
29
30 _listen: () ->
31
32 @_buffer = []
33 listeners = disposable.create()
34
35
36 # source given? need to pipe some stuff
37 if @source
38 for event in @_listenTo()
39 do(event) =>
40
41 # arg1 = data if present
42 # arg2 = encoding usually
43
44 onEvent = (arg1, arg2) =>
45 #flag for the reader that data has already been transmitted
46 @_started = true
47 @emit event, arg1, arg2
48
49 # pipe it.
50 @source.on event, onEvent
51 listeners.add () =>
52 @source.removeListener event, onEvent
53
54
55 @on "data", (data, encoding) =>
56
57
58 # do NOT store cache in the buffer if this flag is FALSE
59 return if not @_cache
60
61
62 # otherwise cache
63 @_buffer.push { chunk: data, encoding: encoding }
64
65 # listen for end, then flag as finished
66 @on "end", () =>
67 throw new Error("Cannot end more than once") if @ended
68 @ended = true
69 # listeners.dispose()
70
71 @on "error", (err) => @error = err
72
73 ###
74 ###
75
76 setEncoding: (encoding) -> @source?.setEncoding(encoding)
77
78 ###
79 ###
80
81 pause: () -> @source?.pause?()
82
83 ###
84 ###
85
86 resume: () -> @source?.resume?()
87
88 ###
89 ###
90
91 destroy: () -> @source?.destroy?()
92
93 ###
94 ###
95
96 destroySoon: () -> @source?.destroySoon?()
97
98 ###
99 flags the reader that data should be cached as it's coming in.
100 ###
101
102 cache: (value) ->
103
104 # data already being cached? too late!
105 @_cache = value or !!@_buffer.length if arguments.length
106 @_cache
107
108
109 ###
110 listens on a reader, and pipes it to a callback a few ways
111 ###
112
113 dump: (callback, ops) ->
114
115 ops = {} if not ops
116
117 # wrap the callback
118 wrappedCallback = @_dumpCallback callback, ops
119
120 # has the stream already started? need to create a NEW reader so any
121 # OTHER calls on dump() don't emit the same data twice.
122 pipedStream = if @_started then new Reader @ else @
123
124 #send the wrapped stream
125 wrappedCallback.call @, null, pipedStream
126
127 # not started? sweet! we don't have to dump the cached data
128 return if not @_started
129
130 # start dumping the cache into the reader
131 @_dumpCached pipedStream, ops
132
133 ###
134 ###
135
136 _dumpCallback: (callback, ops) ->
137
138 # if the callback is an object, then it's a listener ~ a piped stream
139 if typeof callback == 'object'
140
141 # turn into a stream
142 ops.stream = true
143
144 #listeners, are given so this is a bit more approriate
145 listeners = callback
146
147 # need to replace the callback. When the Stream is returned,
148 # the listeners are attached to the given stream
149 callback = (err, stream) ->
150 stream.on type, listeners[type] for type of listeners
151
152
153 return callback if ops.stream
154
155 # not streaming? the callback expects ALL the content to come at once,
156 # so we need to return something that catches it, and does shit on end. Depends on the options
157 # given
158 return (err, reader) =>
159
160 # error to boot? do NOT continue
161 return callback err if err
162
163 buffer = [];
164
165 onEnd = (err) =>
166
167
168 # content needs to be sent back as an array?
169 return callback.call @, err, buffer if ops.batch
170
171 # no data to return?
172 return callback.call @, err if not buffer.length
173
174 # treat the callback as a foreach func?
175 if ops.each
176 callback.call @, err, chunk for chunk in buffer
177
178 # otherwise try sending the first chunked data back, or the array if the buff
179 # length is greater than 2. Note this is implemented because it'd be a pain in the ass
180 # to always call response[0]. The ops.batch flag exists BECAUSE of that.
181 else
182 callback.call @, err, if buffer.length > 1 then buffer else buffer[0]
183
184
185 # start listening to piped data
186 reader.on "data", (data, encoding) -> buffer.push(data)
187 reader.on "error", onEnd
188 reader.on "end", onEnd
189
190 ###
191 ###
192
193 _dumpCached: (pipedReader) ->
194
195
196 pipedReader.emit "data", data.chunk, data.encoding for data in @_buffer
197 pipedReader.emit "end" if @ended
198 pipedReader.emit "error" if @error
199
200
201Reader::readable = true
202
203
204
205
\No newline at end of file