UNPKG

2.94 kBJavaScriptView Raw
1/**
2 * Module dependencies.
3 */
4
5try {
6 var EventEmitter = require('events').EventEmitter;
7 if (!EventEmitter) throw new Error();
8} catch (err) {
9 var Emitter = require('emitter');
10}
11
12/**
13 * Defer.
14 */
15
16var defer = typeof process !== 'undefined' && process && typeof process.nextTick === 'function'
17 ? process.nextTick
18 : function(fn){ setTimeout(fn); };
19
20/**
21 * Noop.
22 */
23
24function noop(){}
25
26/**
27 * Expose `Batch`.
28 */
29
30module.exports = Batch;
31
32/**
33 * Create a new Batch.
34 */
35
36function Batch() {
37 if (!(this instanceof Batch)) return new Batch;
38 this.fns = [];
39 this.concurrency(Infinity);
40 this.throws(true);
41 for (var i = 0, len = arguments.length; i < len; ++i) {
42 this.push(arguments[i]);
43 }
44}
45
46/**
47 * Inherit from `EventEmitter.prototype`.
48 */
49
50if (EventEmitter) {
51 Batch.prototype.__proto__ = EventEmitter.prototype;
52} else {
53 Emitter(Batch.prototype);
54}
55
56/**
57 * Set concurrency to `n`.
58 *
59 * @param {Number} n
60 * @return {Batch}
61 * @api public
62 */
63
64Batch.prototype.concurrency = function(n){
65 this.n = n;
66 return this;
67};
68
69/**
70 * Queue a function.
71 *
72 * @param {Function} fn
73 * @return {Batch}
74 * @api public
75 */
76
77Batch.prototype.push = function(fn){
78 this.fns.push(fn);
79 return this;
80};
81
82/**
83 * Set wether Batch will or will not throw up.
84 *
85 * @param {Boolean} throws
86 * @return {Batch}
87 * @api public
88 */
89Batch.prototype.throws = function(throws) {
90 this.e = !!throws;
91 return this;
92};
93
94/**
95 * Execute all queued functions in parallel,
96 * executing `cb(err, results)`.
97 *
98 * @param {Function} cb
99 * @return {Batch}
100 * @api public
101 */
102
103Batch.prototype.end = function(cb){
104 var self = this
105 , total = this.fns.length
106 , pending = total
107 , results = []
108 , errors = []
109 , cb = cb || noop
110 , fns = this.fns
111 , max = this.n
112 , throws = this.e
113 , index = 0
114 , done;
115
116 // empty
117 if (!fns.length) return defer(function(){
118 cb(null, results);
119 });
120
121 // process
122 function next() {
123 var i = index++;
124 var fn = fns[i];
125 if (!fn) return;
126 var start = new Date;
127
128 try {
129 fn(callback);
130 } catch (err) {
131 callback(err);
132 }
133
134 function callback(err, res){
135 if (done) return;
136 if (err && throws) return done = true, defer(function(){
137 cb(err);
138 });
139 var complete = total - pending + 1;
140 var end = new Date;
141
142 results[i] = res;
143 errors[i] = err;
144
145 self.emit('progress', {
146 index: i,
147 value: res,
148 error: err,
149 pending: pending,
150 total: total,
151 complete: complete,
152 percent: complete / total * 100 | 0,
153 start: start,
154 end: end,
155 duration: end - start
156 });
157
158 if (--pending) next();
159 else defer(function(){
160 if(!throws) cb(errors, results);
161 else cb(null, results);
162 });
163 }
164 }
165
166 // concurrency
167 for (var i = 0; i < fns.length; i++) {
168 if (i == max) break;
169 next();
170 }
171
172 return this;
173};