UNPKG

2.4 kBtext/coffeescriptView Raw
1events = require('events')
2
3class TimeboxedAggregator
4 constructor: (name, implementation, opts) ->
5 @name = name
6 @opts = opts
7 @implementation = implementation
8 @period = (opts.period or 60) * 1000
9 @precision = (opts.precision or 1) * 1000
10 @historySize = opts.keep or 1
11 @history = []
12 @cachedValue = null
13 @blocks = []
14 @implementation.init.call(this, opts) if @implementation.init
15 @events = new events.EventEmitter()
16 @staleCache = false
17 on: (event, callback) ->
18 @events.on(event, callback)
19 push: (time, values) ->
20 currentBlock = @maybeCreateNewBlock(time)
21 values = [values] unless Array.isArray(values)
22 for value in values
23 value = @opts.before.call(this, value) if @opts.before
24 currentBlock.data = @implementation.recalculateBlockData.call(this, currentBlock.data, value, currentBlock.time, time) unless value is undefined
25 oldValue = @cachedValue
26 @compute()
27 @events.emit('change', @cachedValue, oldValue) if @cachedValue != oldValue
28 compute: ->
29 @cleanup()
30 if @staleCache
31 @cachedValue = @implementation.computeFromBlocks.call(this, @blocks)
32 @cachedValue = @opts.after.call(this, @cachedValue) if @opts.after
33 @updateHistory()
34 @staleCache = false
35 @cachedValue
36 value: ->
37 @cachedValue
38 maybeCreateNewBlock: (time) ->
39 if @blocks.length == 0
40 @blocks.push( time:time, data:@defaultBlockValue() )
41 return @blocks[@blocks.length-1]
42 lastBlock = @blocks[@blocks.length-1]
43 diff = time - lastBlock.time.getTime()
44 if diff > @precision
45 @blocks.push( time:time, data:@defaultBlockValue() )
46 @blocks[@blocks.length-2].data = @implementation.closeBlock(lastBlock) if @implementation.closeBlock
47 @staleCache = true
48 @blocks[@blocks.length-1]
49 cleanup: ->
50 periodThreshold = new Date().getTime() - @period
51 loop
52 break if @blocks.length == 0 or @blocks[0].time.getTime() > periodThreshold
53 @blocks.shift()
54 defaultBlockValue: ->
55 if @implementation.defaultBlockValue then @implementation.defaultBlockValue.call(this) else null
56 updateHistory: ->
57 @history.unshift(@cachedValue)
58 @history.pop() if @history.length > @historySize
59 @history
60
61exports.TimeboxedAggregator = TimeboxedAggregator
62
63exports.buildTimeboxedAggregator = (name, implementation)-> ((opts) -> new TimeboxedAggregator(name, implementation, opts))
\No newline at end of file