UNPKG

2.08 kBJavaScriptView Raw
1var loop = require('./loop.js')
2// item: {payload, priority}
3
4// idle => executing
5
6module.exports = function (maxBatch, maxDelay, handler) {
7 var myBuffers = [] // job buffers
8 var myState = 'idle'
9
10 var myExecute = function (buffers) {
11 var payloads = buffers.map(function (i) {
12 return i.payload
13 })
14 var pro = handler(payloads)
15 // func return result instead of a promise
16 // we treat the out put as a promise
17 if (!pro.then) {
18 var outs = pro
19 pro = {
20 then: function (f) {
21 return f(outs)
22 },
23 }
24 }
25
26 var resolves = buffers.map(function (i) {
27 return i.rs
28 })
29 return new Promise(function (resolve) {
30 pro.then(function (outs) {
31 resolves.map(function (rs, i) {
32 rs(outs[i])
33 })
34 resolve()
35 })
36 })
37 }
38
39 var myFlush = function () {
40 if (myState === 'executing') return
41 myState = 'executing'
42
43 loop(function () {
44 if (myBuffers.length <= maxBatch) return Promise.resolve(false)
45 var buffers = myBuffers.splice(0, maxBatch) // myBuffers would also be trimmed
46 return myExecute(buffers).then(function () {
47 return true // continue loop
48 })
49 }).then(function () {
50 if (myBuffers.length === 0) {
51 myState = 'idle'
52 return
53 }
54
55 // last batch
56 var nextms = maxDelay - (Date.now() - myBuffers[0].created)
57 if (nextms >= 1) {
58 // set timeout for the first item in queue
59 // the settedTimeout property will make sure that we won't
60 // set multiple timeouts for this one
61 myState = 'idle'
62
63 if (myBuffers[0].settedTimeout) return
64 myBuffers[0].settedTimeout = true
65 setTimeout(myFlush, nextms)
66 return
67 }
68
69 // have to execute remaining items
70 var buffers = myBuffers
71 myBuffers = []
72 myExecute(buffers).then(function () {
73 // myBuffers may have been added new item, must check after executed the last batch
74 myState = 'idle'
75 myFlush()
76 })
77 })
78 }
79
80 this.push = function (payload, priority) {
81 return new Promise(function (resolve) {
82 myBuffers.push({ payload: payload, priority: priority, created: Date.now(), rs: resolve })
83 myFlush()
84 })
85 }
86
87 return this
88}