1 | EventEmitter = require('events').EventEmitter
|
2 | Promise = require 'bluebird'
|
3 | Buffer = require './Buffer'
|
4 | BufferDebugMixin = require './mixin/BufferDebugMixin'
|
5 |
|
6 | module.exports = class Pump extends EventEmitter
|
7 | @STOPPED: 0
|
8 | @STARTED: 1
|
9 | @PAUSED: 2
|
10 | @ENDED: 3
|
11 |
|
12 | constructor: ->
|
13 | @_state = Pump.STOPPED
|
14 | @_from = null
|
15 | @_id = null
|
16 | @_errorBuffer = new Buffer
|
17 | @_debug = false
|
18 | @buffers
|
19 | output: new Buffer
|
20 |
|
21 | id: (id = null) ->
|
22 | return @_id if id == null
|
23 | @_id = id
|
24 | @
|
25 |
|
26 | from: (buffer = null) ->
|
27 | return @_from if buffer == null
|
28 | if @_state == Pump.STARTED
|
29 | throw new Error 'Cannot change source buffer after pumping has been started'
|
30 | if buffer instanceof Buffer
|
31 | @_from = buffer
|
32 | else if buffer instanceof Pump
|
33 | @_from = buffer.buffer()
|
34 | else if buffer instanceof require('stream')
|
35 | @_from = new Buffer
|
36 | size: 1000
|
37 | buffer.on 'data', (data) => @_from.write data
|
38 | buffer.on 'end', => @_from.seal()
|
39 | buffer.on 'error', (err) => @writeError err
|
40 | @_from.on 'full', -> buffer.pause()
|
41 | @_from.on 'release', -> buffer.resume()
|
42 | else if buffer instanceof Array
|
43 | @_from = new Buffer
|
44 | content: buffer
|
45 | sealed: true
|
46 | else
|
47 | throw new Error 'Argument must be datapumps.Buffer or stream'
|
48 |
|
49 | @_from.on 'end', => do @sourceEnded
|
50 | @
|
51 |
|
52 | writeError: (err) ->
|
53 | return if @_errorBuffer.isFull()
|
54 | @_errorBuffer.write
|
55 | error: err
|
56 | pump: @_id
|
57 | @
|
58 |
|
59 | sourceEnded: ->
|
60 | @currentRead.cancel() if @currentRead
|
61 |
|
62 | buffers: (buffers = null) ->
|
63 | return @_buffers if buffers == null
|
64 | throw new Error 'Cannot change output buffers after pumping has been started' if @_state == Pump.STARTED
|
65 | @_buffers = buffers
|
66 | @
|
67 |
|
68 | buffer: (name = 'output', buffer = null) ->
|
69 | if buffer == null
|
70 | throw new Error("No such buffer: #{name}") if !@_buffers[name]
|
71 | @_buffers[name]
|
72 | else
|
73 | throw new Error 'Cannot change output buffers after pumping has been started' if @_state == Pump.STARTED
|
74 | throw new Error 'buffer must be a datapumps.Buffer' if !(buffer instanceof Buffer)
|
75 | @_buffers[name] = buffer
|
76 | @
|
77 |
|
78 | to: (pump, bufferName) ->
|
79 | pump.from @buffer bufferName
|
80 | @
|
81 |
|
82 | start: ->
|
83 | throw new Error 'Source is not configured' if !@_from
|
84 | throw new Error 'Pump is already started' if @_state != Pump.STOPPED
|
85 | console.log "#{(new Date()).toISOString() } [#{@_id ? '(root)'}] Pump started" if @_debug
|
86 | @_state = Pump.STARTED
|
87 | for name, buffer of @_buffers
|
88 | buffer.on 'end', @_outputBufferEnded.bind @
|
89 | do @_pump
|
90 | @
|
91 |
|
92 | _outputBufferEnded: ->
|
93 | allEnded = true
|
94 | for name, buffer of @_buffers
|
95 | allEnded = false if !buffer.isEnded()
|
96 | return if !allEnded
|
97 |
|
98 | @_state = Pump.ENDED
|
99 | console.log "#{(new Date()).toISOString() } [#{@_id ? '(root)'}] Pump ended" if @_debug
|
100 | @emit 'end'
|
101 |
|
102 | _pump: ->
|
103 | return @sealOutputBuffers() if @_from.isEnded()
|
104 | return if @_state == Pump.PAUSED
|
105 |
|
106 | (@currentRead = @_from.readAsync())
|
107 | .cancellable()
|
108 | .then (data) =>
|
109 | @currentRead = null
|
110 | @_processing = @_process data, @
|
111 | throw new Error ".process() did not return a Promise" if not (@_processing instanceof Promise)
|
112 | return @_processing
|
113 | .catch(Promise.CancellationError, ->)
|
114 | .catch (err) => @writeError err
|
115 | .done => do @_pump
|
116 |
|
117 | sealOutputBuffers: ->
|
118 | for name, buffer of @_buffers
|
119 | do buffer.seal if !buffer.isSealed()
|
120 |
|
121 | _process: (data) ->
|
122 | @copy data
|
123 |
|
124 | copy: (data, buffers = null) ->
|
125 | buffers = [ 'output' ] if !buffers?
|
126 | buffers = [ buffers ] if typeof buffers == 'string'
|
127 | throw new Error 'buffers must be an array of buffer names or a single buffers name' if !Array.isArray buffers
|
128 |
|
129 | if buffers.length == 1
|
130 | @buffer(buffers[0]).writeAsync data
|
131 | else
|
132 | Promise.all(@buffer(buffer).writeAsync data for buffer in buffers)
|
133 |
|
134 | process: (fn) ->
|
135 | throw new Error('.process() argument must be a Promise returning function ') if typeof fn != 'function'
|
136 | @_process = fn
|
137 | @
|
138 |
|
139 | mixin: (mixins) ->
|
140 | mixins = if Array.isArray mixins then mixins else [ mixins ]
|
141 | mixin @ for mixin in mixins
|
142 | @
|
143 |
|
144 | isStopped: ->
|
145 | @_state == Pump.STOPPED
|
146 |
|
147 | isStarted: ->
|
148 | @_state == Pump.STARTED
|
149 |
|
150 | isPaused: ->
|
151 | @_state == Pump.PAUSED
|
152 |
|
153 | isEnded: ->
|
154 | @_state == Pump.ENDED
|
155 |
|
156 | createBuffer: (options = {}) ->
|
157 | new Buffer options
|
158 |
|
159 | errorBuffer: (buffer = null) ->
|
160 | return @_errorBuffer if buffer == null
|
161 | @_errorBuffer = buffer
|
162 | @
|
163 |
|
164 |
|
165 | pause: ->
|
166 | return if @_state == Pump.PAUSED
|
167 | throw new Error 'Cannot .pause() a pump that is not running' if @_state != Pump.STARTED
|
168 | if @_processing? and Promise.resolve(@_processing).isPending()
|
169 | @_processing.then => @_state = Pump.PAUSED
|
170 | else
|
171 | @_state = Pump.PAUSED
|
172 | Promise.resolve()
|
173 |
|
174 | resume: ->
|
175 | throw new Error 'Cannot .resume() a pump that is not paused' if @_state != Pump.PAUSED
|
176 | @_state = Pump.STARTED
|
177 | do @_pump
|
178 | @
|
179 |
|
180 | whenFinished: ->
|
181 | return Promise.resolve() if @isEnded()
|
182 |
|
183 | new Promise (resolve, reject) =>
|
184 | @on 'end', -> resolve()
|
185 | @on 'error', -> reject 'Pumping failed. See .errorBuffer() contents for error messages'
|
186 |
|
187 | logErrorsToConsole: ->
|
188 | @errorBuffer().on 'write', (errorRecord) =>
|
189 | name = errorRecord.pump ? '(root)'
|
190 | if @_debug
|
191 | console.log "Error in pump #{name}:"
|
192 | console.log errorRecord.error.stack
|
193 | else
|
194 | console.log "Error in pump #{name}: #{errorRecord.error}"
|
195 | @
|
196 |
|
197 | debug: ->
|
198 | @debugMode true
|
199 | @
|
200 |
|
201 | debugMode: (@_debug) ->
|
202 | throw new Error 'Cannot change debug mode after pump start' if @_state != Pump.STOPPED
|
203 | @mixin BufferDebugMixin if @_debug
|
204 | @
|
205 |
|
206 | run: ->
|
207 | @start().whenFinished()
|
208 | .then =>
|
209 | result = {}
|
210 | for name, buffer of @_buffers
|
211 | result[name] = buffer.getContent()
|
212 |
|
213 | return result
|