UNPKG

5.97 kBtext/coffeescriptView Raw
1EventEmitter = require('events').EventEmitter
2Promise = require 'bluebird'
3Buffer = require './Buffer'
4BufferDebugMixin = require './mixin/BufferDebugMixin'
5
6module.exports = class Pump extends EventEmitter
7 @STOPPED: 0
8 @STARTED: 1
9 @PAUSED: 2
10 @ENDED: 3
11
12 constructor: ->
13 @_state = Pump.STOPPED
14 @_from = null
15 @_id = null
16 @_errorBuffer = new Buffer
17 @_debug = false
18 @buffers
19 output: new Buffer
20
21 id: (id = null) ->
22 return @_id if id == null
23 @_id = id
24 @
25
26 from: (buffer = null) ->
27 return @_from if buffer == null
28 if @_state == Pump.STARTED
29 throw new Error 'Cannot change source buffer after pumping has been started'
30 if buffer instanceof Buffer
31 @_from = buffer
32 else if buffer instanceof Pump
33 @_from = buffer.buffer()
34 else if buffer instanceof require('stream')
35 @_from = new Buffer
36 size: 1000
37 buffer.on 'data', (data) => @_from.write data
38 buffer.on 'end', => @_from.seal()
39 buffer.on 'error', (err) => @writeError err
40 @_from.on 'full', -> buffer.pause()
41 @_from.on 'release', -> buffer.resume()
42 else if buffer instanceof Array
43 @_from = new Buffer
44 content: buffer
45 sealed: true
46 else
47 throw new Error 'Argument must be datapumps.Buffer or stream'
48
49 @_from.on 'end', => do @sourceEnded
50 @
51
52 writeError: (err) ->
53 return if @_errorBuffer.isFull()
54 @_errorBuffer.write
55 error: err
56 pump: @_id
57 @
58
59 sourceEnded: ->
60 @currentRead.cancel() if @currentRead
61
62 buffers: (buffers = null) ->
63 return @_buffers if buffers == null
64 throw new Error 'Cannot change output buffers after pumping has been started' if @_state == Pump.STARTED
65 @_buffers = buffers
66 @
67
68 buffer: (name = 'output', buffer = null) ->
69 if buffer == null
70 throw new Error("No such buffer: #{name}") if !@_buffers[name]
71 @_buffers[name]
72 else
73 throw new Error 'Cannot change output buffers after pumping has been started' if @_state == Pump.STARTED
74 throw new Error 'buffer must be a datapumps.Buffer' if !(buffer instanceof Buffer)
75 @_buffers[name] = buffer
76 @
77
78 to: (pump, bufferName) ->
79 pump.from @buffer bufferName
80 @
81
82 start: ->
83 throw new Error 'Source is not configured' if !@_from
84 throw new Error 'Pump is already started' if @_state != Pump.STOPPED
85 console.log "#{(new Date()).toISOString() } [#{@_id ? '(root)'}] Pump started" if @_debug
86 @_state = Pump.STARTED
87 for name, buffer of @_buffers
88 buffer.on 'end', @_outputBufferEnded.bind @
89 do @_pump
90 @
91
92 _outputBufferEnded: ->
93 allEnded = true
94 for name, buffer of @_buffers
95 allEnded = false if !buffer.isEnded()
96 return if !allEnded
97
98 @_state = Pump.ENDED
99 console.log "#{(new Date()).toISOString() } [#{@_id ? '(root)'}] Pump ended" if @_debug
100 @emit 'end'
101
102 _pump: ->
103 return @sealOutputBuffers() if @_from.isEnded()
104 return if @_state == Pump.PAUSED
105
106 (@currentRead = @_from.readAsync())
107 .cancellable()
108 .then (data) =>
109 @currentRead = null
110 @_processing = @_process data, @
111 throw new Error ".process() did not return a Promise" if not (@_processing instanceof Promise)
112 return @_processing
113 .catch(Promise.CancellationError, ->)
114 .catch (err) => @writeError err
115 .done => do @_pump
116
117 sealOutputBuffers: ->
118 for name, buffer of @_buffers
119 do buffer.seal if !buffer.isSealed()
120
121 _process: (data) ->
122 @copy data
123
124 copy: (data, buffers = null) ->
125 buffers = [ 'output' ] if !buffers?
126 buffers = [ buffers ] if typeof buffers == 'string'
127 throw new Error 'buffers must be an array of buffer names or a single buffers name' if !Array.isArray buffers
128
129 if buffers.length == 1
130 @buffer(buffers[0]).writeAsync data
131 else
132 Promise.all(@buffer(buffer).writeAsync data for buffer in buffers)
133
134 process: (fn) ->
135 throw new Error('.process() argument must be a Promise returning function ') if typeof fn != 'function'
136 @_process = fn
137 @
138
139 mixin: (mixins) ->
140 mixins = if Array.isArray mixins then mixins else [ mixins ]
141 mixin @ for mixin in mixins
142 @
143
144 isStopped: ->
145 @_state == Pump.STOPPED
146
147 isStarted: ->
148 @_state == Pump.STARTED
149
150 isPaused: ->
151 @_state == Pump.PAUSED
152
153 isEnded: ->
154 @_state == Pump.ENDED
155
156 createBuffer: (options = {}) ->
157 new Buffer options
158
159 errorBuffer: (buffer = null) ->
160 return @_errorBuffer if buffer == null
161 @_errorBuffer = buffer
162 @
163
164 # returns a promise that resolves when the pump is paused
165 pause: ->
166 return if @_state == Pump.PAUSED
167 throw new Error 'Cannot .pause() a pump that is not running' if @_state != Pump.STARTED
168 if @_processing? and Promise.resolve(@_processing).isPending()
169 @_processing.then => @_state = Pump.PAUSED
170 else
171 @_state = Pump.PAUSED
172 Promise.resolve()
173
174 resume: ->
175 throw new Error 'Cannot .resume() a pump that is not paused' if @_state != Pump.PAUSED
176 @_state = Pump.STARTED
177 do @_pump
178 @
179
180 whenFinished: ->
181 return Promise.resolve() if @isEnded()
182
183 new Promise (resolve, reject) =>
184 @on 'end', -> resolve()
185 @on 'error', -> reject 'Pumping failed. See .errorBuffer() contents for error messages'
186
187 logErrorsToConsole: ->
188 @errorBuffer().on 'write', (errorRecord) =>
189 name = errorRecord.pump ? '(root)'
190 if @_debug
191 console.log "Error in pump #{name}:"
192 console.log errorRecord.error.stack
193 else
194 console.log "Error in pump #{name}: #{errorRecord.error}"
195 @
196
197 debug: ->
198 @debugMode true
199 @
200
201 debugMode: (@_debug) ->
202 throw new Error 'Cannot change debug mode after pump start' if @_state != Pump.STOPPED
203 @mixin BufferDebugMixin if @_debug
204 @
205
206 run: ->
207 @start().whenFinished()
208 .then =>
209 result = {}
210 for name, buffer of @_buffers
211 result[name] = buffer.getContent()
212
213 return result