1 | Promise = require('bluebird')
|
2 | Pump = require('./Pump')
|
3 | Buffer = require('./Buffer')
|
4 |
|
5 | module.exports = class Group extends Pump
|
6 |
|
7 | constructor: ->
|
8 | super()
|
9 | @_pumps = {}
|
10 | @_exposedBuffers = {}
|
11 |
|
12 | addPump: (name, pump = null) ->
|
13 | throw new Error 'Pump already exists' if @_pumps[name]?
|
14 | @_pumps[name] = pump ? new Pump
|
15 | pumpId = if @_id? then "#{@_id}/#{name}" else name
|
16 | @_pumps[name].id pumpId
|
17 | @_pumps[name].errorBuffer @_errorBuffer
|
18 | @_pumps[name]
|
19 |
|
20 | pump: (name) ->
|
21 | throw new Error "Pump #{name} does not exist" if !@_pumps[name]?
|
22 | @_pumps[name]
|
23 |
|
24 | pumps: ->
|
25 | @_pumps
|
26 |
|
27 | start: ->
|
28 | throw new Error 'Group already started' if @_state != Group.STOPPED
|
29 | @_state = Group.STARTED
|
30 | @_registerErrorBufferEvents()
|
31 | for name, pump of @_pumps
|
32 | pump.errorBuffer @_errorBuffer
|
33 | pump.debugMode @_debug
|
34 | @run()
|
35 | .then => @_endGroup()
|
36 | @
|
37 |
|
38 | _endGroup: ->
|
39 | @_state = Group.ENDED
|
40 | @emit 'end'
|
41 |
|
42 | _registerErrorBufferEvents: ->
|
43 | @_errorBuffer.on 'full', =>
|
44 | if @_state == Group.STARTED
|
45 | @pause()
|
46 | .then => @emit 'error'
|
47 |
|
48 | run: ->
|
49 | (result = @runPumps())
|
50 | .catch ->
|
51 |
|
52 |
|
53 | result
|
54 |
|
55 | runPumps: (pumps = null) ->
|
56 | pumps = do @_getAllStoppedPumps if !pumps?
|
57 | pumps = [ pumps ] if typeof pumps == 'string'
|
58 | finishPromises = []
|
59 | for pumpName in pumps
|
60 | finishPromises.push @pump(pumpName).start().whenFinished()
|
61 | Promise.all finishPromises
|
62 |
|
63 | _getAllStoppedPumps: ->
|
64 | result = []
|
65 | for name, pump of @_pumps
|
66 | result.push name if pump.isStopped()
|
67 | result
|
68 |
|
69 | expose: (exposedName, bufferPath) ->
|
70 | throw new Error "Already exposed a buffer with name #{exposedName}" if @_exposedBuffers[exposedName]?
|
71 | @_exposedBuffers[exposedName] = @_getBufferByPath bufferPath
|
72 |
|
73 | _getBufferByPath: (bufferPath) ->
|
74 | [ pumpName, bufferNames... ] = bufferPath.split('/')
|
75 | bufferName = if bufferNames.length then bufferNames.join('/') else 'output'
|
76 | @pump(pumpName).buffer bufferName
|
77 |
|
78 | buffer: (name = 'output') ->
|
79 | try
|
80 | result = @_exposedBuffers[name] ? @_getBufferByPath name
|
81 | catch
|
82 |
|
83 | throw new Error "No such buffer: #{name}" if !result
|
84 | result
|
85 |
|
86 | inputPump: (pumpName = null) ->
|
87 | return @_inputPump if !pumpName?
|
88 | @_inputPump = @pump(pumpName)
|
89 | @
|
90 |
|
91 | addInputPump: (name, pump = null) ->
|
92 | result = @addPump name, pump
|
93 | @inputPump name
|
94 | result
|
95 |
|
96 | from: (buffer = null) ->
|
97 | throw new Error 'Input pump is not set, use .inputPump to set it' if !@_inputPump?
|
98 | @_inputPump.from buffer
|
99 | @
|
100 |
|
101 | mixin: (mixins) ->
|
102 | throw new Error 'Input pump is not set, use .inputPump to set it' if !@_inputPump?
|
103 | @_inputPump.mixin mixins
|
104 | @
|
105 |
|
106 | process: ->
|
107 | throw new Error 'Cannot call .process() on a group: data in a group is transformed by its pumps.'
|
108 |
|
109 | pause: ->
|
110 | return if @_state == Group.PAUSED
|
111 | throw new Error 'Cannot .pause() a group that is not pumping' if @_state != Group.STARTED
|
112 | pausePromises = [ ]
|
113 | pausePromises.push pump.pause() for name, pump of @_pumps when pump.isStarted()
|
114 | Promise.all pausePromises
|
115 | .then => @_state = Group.PAUSED
|
116 |
|
117 | resume: ->
|
118 | throw new Error 'Cannot .resume() a group that is not paused' if @_state != Group.PAUSED
|
119 | @_state = Group.STARTED
|
120 | do pump.resume for name, pump of @_pumps
|
121 | @
|
122 |
|
123 | id: (id = null) ->
|
124 | return @_id if id == null
|
125 | @_id = id
|
126 | pump.id "#{@_id}/#{name}" for name, pump of @_pumps
|
127 | @
|
128 |
|
129 | debugMode: (@_debug) ->
|
130 | throw new Error 'Cannot change debug mode after pump start' if @_state != Pump.STOPPED
|
131 | @
|