UNPKG

3.77 kBtext/coffeescriptView Raw
1Promise = require('bluebird')
2Pump = require('./Pump')
3Buffer = require('./Buffer')
4
5module.exports = class Group extends Pump
6
7 constructor: ->
8 super()
9 @_pumps = {}
10 @_exposedBuffers = {}
11
12 addPump: (name, pump = null) ->
13 throw new Error 'Pump already exists' if @_pumps[name]?
14 @_pumps[name] = pump ? new Pump
15 pumpId = if @_id? then "#{@_id}/#{name}" else name
16 @_pumps[name].id pumpId
17 @_pumps[name].errorBuffer @_errorBuffer
18 @_pumps[name]
19
20 pump: (name) ->
21 throw new Error "Pump #{name} does not exist" if !@_pumps[name]?
22 @_pumps[name]
23
24 pumps: ->
25 @_pumps
26
27 start: ->
28 throw new Error 'Group already started' if @_state != Group.STOPPED
29 @_state = Group.STARTED
30 @_registerErrorBufferEvents()
31 for name, pump of @_pumps
32 pump.errorBuffer @_errorBuffer
33 pump.debugMode @_debug
34 @run()
35 .then => @_endGroup()
36 @
37
38 _endGroup: ->
39 @_state = Group.ENDED
40 @emit 'end'
41
42 _registerErrorBufferEvents: ->
43 @_errorBuffer.on 'full', =>
44 if @_state == Group.STARTED
45 @pause()
46 .then => @emit 'error'
47
48 run: ->
49 (result = @runPumps())
50 .catch -> # The runpumps promise is only rejected when the error buffer is full and
51 # the sub-group is stopped. All errors are in the errorbuffer now, so we can
52 # safely discard this error
53 result
54
55 runPumps: (pumps = null) ->
56 pumps = do @_getAllStoppedPumps if !pumps?
57 pumps = [ pumps ] if typeof pumps == 'string'
58 finishPromises = []
59 for pumpName in pumps
60 finishPromises.push @pump(pumpName).start().whenFinished()
61 Promise.all finishPromises
62
63 _getAllStoppedPumps: ->
64 result = []
65 for name, pump of @_pumps
66 result.push name if pump.isStopped()
67 result
68
69 expose: (exposedName, bufferPath) ->
70 throw new Error "Already exposed a buffer with name #{exposedName}" if @_exposedBuffers[exposedName]?
71 @_exposedBuffers[exposedName] = @_getBufferByPath bufferPath
72
73 _getBufferByPath: (bufferPath) ->
74 [ pumpName, bufferNames... ] = bufferPath.split('/')
75 bufferName = if bufferNames.length then bufferNames.join('/') else 'output'
76 @pump(pumpName).buffer bufferName
77
78 buffer: (name = 'output') ->
79 try
80 result = @_exposedBuffers[name] ? @_getBufferByPath name
81 catch
82
83 throw new Error "No such buffer: #{name}" if !result
84 result
85
86 inputPump: (pumpName = null) ->
87 return @_inputPump if !pumpName?
88 @_inputPump = @pump(pumpName)
89 @
90
91 addInputPump: (name, pump = null) ->
92 result = @addPump name, pump
93 @inputPump name
94 result
95
96 from: (buffer = null) ->
97 throw new Error 'Input pump is not set, use .inputPump to set it' if !@_inputPump?
98 @_inputPump.from buffer
99 @
100
101 mixin: (mixins) ->
102 throw new Error 'Input pump is not set, use .inputPump to set it' if !@_inputPump?
103 @_inputPump.mixin mixins
104 @
105
106 process: ->
107 throw new Error 'Cannot call .process() on a group: data in a group is transformed by its pumps.'
108
109 pause: ->
110 return if @_state == Group.PAUSED
111 throw new Error 'Cannot .pause() a group that is not pumping' if @_state != Group.STARTED
112 pausePromises = [ ]
113 pausePromises.push pump.pause() for name, pump of @_pumps when pump.isStarted()
114 Promise.all pausePromises
115 .then => @_state = Group.PAUSED
116
117 resume: ->
118 throw new Error 'Cannot .resume() a group that is not paused' if @_state != Group.PAUSED
119 @_state = Group.STARTED
120 do pump.resume for name, pump of @_pumps
121 @
122
123 id: (id = null) ->
124 return @_id if id == null
125 @_id = id
126 pump.id "#{@_id}/#{name}" for name, pump of @_pumps
127 @
128
129 debugMode: (@_debug) ->
130 throw new Error 'Cannot change debug mode after pump start' if @_state != Pump.STOPPED
131 @