1 | events = require('events')
|
2 |
|
3 | class TimeboxedAggregator
|
4 | constructor: (name, implementation, opts) ->
|
5 | @name = name
|
6 | @opts = opts
|
7 | @implementation = implementation
|
8 | @period = (opts.period or 60) * 1000
|
9 | @precision = (opts.precision or 1) * 1000
|
10 | @historySize = opts.keep or 1
|
11 | @history = []
|
12 | @cachedValue = null
|
13 | @blocks = []
|
14 | @implementation.init.call(this, opts) if @implementation.init
|
15 | @events = new events.EventEmitter()
|
16 | @staleCache = false
|
17 | on: (event, callback) ->
|
18 | @events.on(event, callback)
|
19 | push: (time, values) ->
|
20 | currentBlock = @maybeCreateNewBlock(time)
|
21 | values = [values] unless Array.isArray(values)
|
22 | for value in values
|
23 | value = @opts.before.call(this, value) if @opts.before
|
24 | currentBlock.data = @implementation.recalculateBlockData.call(this, currentBlock.data, value, currentBlock.time, time) unless value is undefined
|
25 | oldValue = @cachedValue
|
26 | @compute()
|
27 | @events.emit('change', @cachedValue, oldValue) if @cachedValue != oldValue
|
28 | compute: ->
|
29 | @cleanup()
|
30 | if @staleCache
|
31 | @cachedValue = @implementation.computeFromBlocks.call(this, @blocks)
|
32 | @cachedValue = @opts.after.call(this, @cachedValue) if @opts.after
|
33 | @updateHistory()
|
34 | @staleCache = false
|
35 | @cachedValue
|
36 | value: ->
|
37 | @cachedValue
|
38 | maybeCreateNewBlock: (time) ->
|
39 | if @blocks.length == 0
|
40 | @blocks.push( time:time, data:@defaultBlockValue() )
|
41 | return @blocks[@blocks.length-1]
|
42 | lastBlock = @blocks[@blocks.length-1]
|
43 | diff = time - lastBlock.time.getTime()
|
44 | if diff > @precision
|
45 | @blocks.push( time:time, data:@defaultBlockValue() )
|
46 | @blocks[@blocks.length-2].data = @implementation.closeBlock(lastBlock) if @implementation.closeBlock
|
47 | @staleCache = true
|
48 | @blocks[@blocks.length-1]
|
49 | cleanup: ->
|
50 | periodThreshold = new Date().getTime() - @period
|
51 | loop
|
52 | break if @blocks.length == 0 or @blocks[0].time.getTime() > periodThreshold
|
53 | @blocks.shift()
|
54 | defaultBlockValue: ->
|
55 | if @implementation.defaultBlockValue then @implementation.defaultBlockValue.call(this) else null
|
56 | updateHistory: ->
|
57 | @history.unshift(@cachedValue)
|
58 | @history.pop() if @history.length > @historySize
|
59 | @history
|
60 |
|
61 | exports.TimeboxedAggregator = TimeboxedAggregator
|
62 |
|
63 | exports.buildTimeboxedAggregator = (name, implementation)-> ((opts) -> new TimeboxedAggregator(name, implementation, opts)) |
\ | No newline at end of file |