UNPKG

14.8 kBJavaScriptView Raw
1/*
2 * vasync.js: utilities for observable asynchronous control flow
3 */
4
5var mod_assert = require('assert');
6var mod_events = require('events');
7var mod_util = require('util');
8var mod_verror = require('verror');
9
10/*
11 * Public interface
12 */
13exports.parallel = parallel;
14exports.forEachParallel = forEachParallel;
15exports.pipeline = pipeline;
16exports.forEachPipeline = forEachPipeline;
17exports.queue = queue;
18exports.queuev = queuev;
19exports.barrier = barrier;
20exports.waterfall = waterfall;
21
22if (!global.setImmediate) {
23 global.setImmediate = function (func) {
24 var args = Array.prototype.slice.call(arguments, 1);
25 args.unshift(0);
26 args.unshift(func);
27 setTimeout.apply(this, args);
28 };
29}
30
31/*
32 * This is incorporated here from jsprim because jsprim ends up pulling in a lot
33 * of dependencies. If we end up needing more from jsprim, though, we should
34 * add it back and rip out this function.
35 */
36function isEmpty(obj)
37{
38 var key;
39 for (key in obj)
40 return (false);
41 return (true);
42}
43
44/*
45 * Given a set of functions that complete asynchronously using the standard
46 * callback(err, result) pattern, invoke them all and merge the results. See
47 * README.md for details.
48 */
49function parallel(args, callback)
50{
51 var funcs, rv, doneOne, i;
52
53 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
54 mod_assert.ok(Array.isArray(args['funcs']),
55 '"args.funcs" must be specified and must be an array');
56 mod_assert.equal(typeof (callback), 'function',
57 'callback argument must be specified and must be a function');
58
59 funcs = args['funcs'].slice(0);
60
61 rv = {
62 'operations': new Array(funcs.length),
63 'successes': [],
64 'ndone': 0,
65 'nerrors': 0
66 };
67
68 if (funcs.length === 0) {
69 setImmediate(function () { callback(null, rv); });
70 return (rv);
71 }
72
73 doneOne = function (entry) {
74 return (function (err, result) {
75 mod_assert.equal(entry['status'], 'pending');
76
77 entry['err'] = err;
78 entry['result'] = result;
79 entry['status'] = err ? 'fail' : 'ok';
80
81 if (err)
82 rv['nerrors']++;
83 else
84 rv['successes'].push(result);
85
86 if (++rv['ndone'] < funcs.length)
87 return;
88
89 var errors = rv['operations'].filter(function (ent) {
90 return (ent['status'] == 'fail');
91 }).map(function (ent) { return (ent['err']); });
92
93 if (errors.length > 0)
94 callback(new mod_verror.MultiError(errors), rv);
95 else
96 callback(null, rv);
97 });
98 };
99
100 for (i = 0; i < funcs.length; i++) {
101 rv['operations'][i] = {
102 'func': funcs[i],
103 'funcname': funcs[i].name || '(anon)',
104 'status': 'pending'
105 };
106
107 funcs[i](doneOne(rv['operations'][i]));
108 }
109
110 return (rv);
111}
112
113/*
114 * Exactly like parallel, except that the input is specified as a single
115 * function to invoke on N different inputs (rather than N functions). "args"
116 * must have the following fields:
117 *
118 * func asynchronous function to invoke on each input value
119 *
120 * inputs array of input values
121 */
122function forEachParallel(args, callback)
123{
124 var func, funcs;
125
126 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
127 mod_assert.equal(typeof (args['func']), 'function',
128 '"args.func" must be specified and must be a function');
129 mod_assert.ok(Array.isArray(args['inputs']),
130 '"args.inputs" must be specified and must be an array');
131
132 func = args['func'];
133 funcs = args['inputs'].map(function (input) {
134 return (function (subcallback) {
135 return (func(input, subcallback));
136 });
137 });
138
139 return (parallel({ 'funcs': funcs }, callback));
140}
141
142/*
143 * Like parallel, but invokes functions in sequence rather than in parallel
144 * and aborts if any function exits with failure. Arguments include:
145 *
146 * funcs invoke the functions in parallel
147 *
148 * arg first argument to each pipeline function
149 */
150function pipeline(args, callback)
151{
152 var funcs, uarg, rv, next;
153
154 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
155 mod_assert.ok(Array.isArray(args['funcs']),
156 '"args.funcs" must be specified and must be an array');
157
158 funcs = args['funcs'].slice(0);
159 uarg = args['arg'];
160
161 rv = {
162 'operations': funcs.map(function (func) {
163 return ({
164 'func': func,
165 'funcname': func.name || '(anon)',
166 'status': 'waiting'
167 });
168 }),
169 'successes': [],
170 'ndone': 0,
171 'nerrors': 0
172 };
173
174 if (funcs.length === 0) {
175 setImmediate(function () { callback(null, rv); });
176 return (rv);
177 }
178
179 next = function (err, result) {
180 if (rv['nerrors'] > 0 ||
181 rv['ndone'] >= rv['operations'].length) {
182 throw new mod_verror.VError('pipeline callback ' +
183 'invoked after the pipeline has already ' +
184 'completed (%j)', rv);
185 }
186
187 var entry = rv['operations'][rv['ndone']++];
188
189 mod_assert.equal(entry['status'], 'pending');
190
191 entry['status'] = err ? 'fail' : 'ok';
192 entry['err'] = err;
193 entry['result'] = result;
194
195 if (err)
196 rv['nerrors']++;
197 else
198 rv['successes'].push(result);
199
200 if (err || rv['ndone'] == funcs.length) {
201 callback(err, rv);
202 } else {
203 var nextent = rv['operations'][rv['ndone']];
204 nextent['status'] = 'pending';
205
206 /*
207 * We invoke the next function on the next tick so that
208 * the caller (stage N) need not worry about the case
209 * that the next stage (stage N + 1) runs in its own
210 * context.
211 */
212 setImmediate(function () {
213 nextent['func'](uarg, next);
214 });
215 }
216 };
217
218 rv['operations'][0]['status'] = 'pending';
219 funcs[0](uarg, next);
220
221 return (rv);
222}
223
224/*
225 * Exactly like pipeline, except that the input is specified as a single
226 * function to invoke on N different inputs (rather than N functions). "args"
227 * must have the following fields:
228 *
229 * func asynchronous function to invoke on each input value
230 *
231 * inputs array of input values
232 */
233function forEachPipeline(args, callback) {
234 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
235 mod_assert.equal(typeof (args['func']), 'function',
236 '"args.func" must be specified and must be a function');
237 mod_assert.ok(Array.isArray(args['inputs']),
238 '"args.inputs" must be specified and must be an array');
239 mod_assert.equal(typeof (callback), 'function',
240 'callback argument must be specified and must be a function');
241
242 var func = args['func'];
243
244 var funcs = args['inputs'].map(function (input) {
245 return (function (_, subcallback) {
246 return (func(input, subcallback));
247 });
248 });
249
250 return (pipeline({'funcs': funcs}, callback));
251}
252
253
254/*
255 * async-compatible "queue" function.
256 */
257function queue(worker, concurrency)
258{
259 return (new WorkQueue({
260 'worker': worker,
261 'concurrency': concurrency
262 }));
263}
264
265function queuev(args)
266{
267 return (new WorkQueue(args));
268}
269
270function WorkQueue(args)
271{
272 mod_assert.ok(args.hasOwnProperty('worker'));
273 mod_assert.equal(typeof (args['worker']), 'function');
274 mod_assert.ok(args.hasOwnProperty('concurrency'));
275 mod_assert.equal(typeof (args['concurrency']), 'number');
276 mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
277 mod_assert.ok(args['concurrency'] > 0);
278
279 mod_events.EventEmitter.call(this);
280
281 this.nextid = 0;
282 this.worker = args['worker'];
283 this.worker_name = args['worker'].name || 'anon';
284 this.npending = 0;
285 this.pending = {};
286 this.queued = [];
287 this.closed = false;
288 this.ended = false;
289
290 /* user-settable fields inherited from "async" interface */
291 this.concurrency = args['concurrency'];
292 this.saturated = undefined;
293 this.empty = undefined;
294 this.drain = undefined;
295}
296
297mod_util.inherits(WorkQueue, mod_events.EventEmitter);
298
299WorkQueue.prototype.push = function (tasks, callback)
300{
301 if (!Array.isArray(tasks))
302 return (this.pushOne(tasks, callback));
303
304 var wq = this;
305 return (tasks.map(function (task) {
306 return (wq.pushOne(task, callback));
307 }));
308};
309
310WorkQueue.prototype.updateConcurrency = function (concurrency)
311{
312 if (this.closed)
313 throw new mod_verror.VError(
314 'update concurrency invoked after queue closed');
315 this.concurrency = concurrency;
316 this.dispatchNext();
317};
318
319WorkQueue.prototype.close = function ()
320{
321 var wq = this;
322
323 if (wq.closed)
324 return;
325 wq.closed = true;
326
327 /*
328 * If the queue is already empty, just fire the "end" event on the
329 * next tick.
330 */
331 if (wq.npending === 0 && wq.queued.length === 0) {
332 setImmediate(function () {
333 if (!wq.ended) {
334 wq.ended = true;
335 wq.emit('end');
336 }
337 });
338 }
339};
340
341/* private */
342WorkQueue.prototype.pushOne = function (task, callback)
343{
344 if (this.closed)
345 throw new mod_verror.VError('push invoked after queue closed');
346
347 var id = ++this.nextid;
348 var entry = { 'id': id, 'task': task, 'callback': callback };
349
350 this.queued.push(entry);
351 this.dispatchNext();
352
353 return (id);
354};
355
356/* private */
357WorkQueue.prototype.dispatchNext = function ()
358{
359 var wq = this;
360 if (wq.npending === 0 && wq.queued.length === 0) {
361 if (wq.drain)
362 wq.drain();
363 wq.emit('drain');
364 /*
365 * The queue is closed; emit the final "end"
366 * event before we come to rest:
367 */
368 if (wq.closed) {
369 wq.ended = true;
370 wq.emit('end');
371 }
372 } else if (wq.queued.length > 0) {
373 while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
374 var next = wq.queued.shift();
375 wq.dispatch(next);
376
377 if (wq.queued.length === 0) {
378 if (wq.empty)
379 wq.empty();
380 wq.emit('empty');
381 }
382 }
383 }
384};
385
386WorkQueue.prototype.dispatch = function (entry)
387{
388 var wq = this;
389
390 mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
391 mod_assert.ok(this.npending < this.concurrency);
392 mod_assert.ok(!this.ended);
393
394 this.npending++;
395 this.pending[entry['id']] = entry;
396
397 if (this.npending === this.concurrency) {
398 if (this.saturated)
399 this.saturated();
400 this.emit('saturated');
401 }
402
403 /*
404 * We invoke the worker function on the next tick so that callers can
405 * always assume that the callback is NOT invoked during the call to
406 * push() even if the queue is not at capacity. It also avoids O(n)
407 * stack usage when used with synchronous worker functions.
408 */
409 setImmediate(function () {
410 wq.worker(entry['task'], function (err) {
411 --wq.npending;
412 delete (wq.pending[entry['id']]);
413
414 if (entry['callback'])
415 entry['callback'].apply(null, arguments);
416
417 wq.dispatchNext();
418 });
419 });
420};
421
422WorkQueue.prototype.length = function ()
423{
424 return (this.queued.length);
425};
426
427WorkQueue.prototype.kill = function ()
428{
429 this.killed = true;
430 this.queued = [];
431 this.drain = undefined;
432 this.close();
433};
434
435/*
436 * Barriers coordinate multiple concurrent operations.
437 */
438function barrier(args)
439{
440 return (new Barrier(args));
441}
442
443function Barrier(args)
444{
445 mod_assert.ok(!args || !args['nrecent'] ||
446 typeof (args['nrecent']) == 'number',
447 '"nrecent" must have type "number"');
448
449 mod_events.EventEmitter.call(this);
450
451 var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
452
453 if (nrecent > 0) {
454 this.nrecent = nrecent;
455 this.recent = [];
456 }
457
458 this.pending = {};
459 this.scheduled = false;
460}
461
462mod_util.inherits(Barrier, mod_events.EventEmitter);
463
464Barrier.prototype.start = function (name)
465{
466 mod_assert.ok(!this.pending.hasOwnProperty(name),
467 'operation "' + name + '" is already pending');
468 this.pending[name] = Date.now();
469};
470
471Barrier.prototype.done = function (name)
472{
473 mod_assert.ok(this.pending.hasOwnProperty(name),
474 'operation "' + name + '" is not pending');
475
476 if (this.recent) {
477 this.recent.push({
478 'name': name,
479 'start': this.pending[name],
480 'done': Date.now()
481 });
482
483 if (this.recent.length > this.nrecent)
484 this.recent.shift();
485 }
486
487 delete (this.pending[name]);
488
489 /*
490 * If we executed at least one operation and we're now empty, we should
491 * emit "drain". But most code doesn't deal well with events being
492 * processed while they're executing, so we actually schedule this event
493 * for the next tick.
494 *
495 * We use the "scheduled" flag to avoid emitting multiple "drain" events
496 * on consecutive ticks if the user starts and ends another task during
497 * this tick.
498 */
499 if (!isEmpty(this.pending) || this.scheduled)
500 return;
501
502 this.scheduled = true;
503
504 var self = this;
505
506 setImmediate(function () {
507 self.scheduled = false;
508
509 /*
510 * It's also possible that the user has started another task on
511 * the previous tick, in which case we really shouldn't emit
512 * "drain".
513 */
514 if (isEmpty(self.pending))
515 self.emit('drain');
516 });
517};
518
519/*
520 * waterfall([ funcs ], callback): invoke each of the asynchronous functions
521 * "funcs" in series. Each function is passed any values emitted by the
522 * previous function (none for the first function), followed by the callback to
523 * invoke upon completion. This callback must be invoked exactly once,
524 * regardless of success or failure. As conventional in Node, the first
525 * argument to the callback indicates an error (if non-null). Subsequent
526 * arguments are passed to the next function in the "funcs" chain.
527 *
528 * If any function fails (i.e., calls its callback with an Error), then the
529 * remaining functions are not invoked and "callback" is invoked with the error.
530 *
531 * The only difference between waterfall() and pipeline() are the arguments
532 * passed to each function in the chain. pipeline() always passes the same
533 * argument followed by the callback, while waterfall() passes whatever values
534 * were emitted by the previous function followed by the callback.
535 */
536function waterfall(funcs, callback)
537{
538 var rv, current, next;
539
540 mod_assert.ok(Array.isArray(funcs));
541 mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function');
542 funcs = funcs.slice(0);
543
544 rv = {
545 'operations': funcs.map(function (func) {
546 return ({
547 'func': func,
548 'funcname': func.name || '(anon)',
549 'status': 'waiting'
550 });
551 }),
552 'successes': [],
553 'ndone': 0,
554 'nerrors': 0
555 };
556
557 if (funcs.length === 0) {
558 if (callback)
559 setImmediate(function () { callback(null, rv); });
560 return (rv);
561 }
562
563 next = function (idx, err) {
564 var args, entry, nextentry;
565
566 if (err === undefined)
567 err = null;
568
569 if (idx != current) {
570 throw (new mod_verror.VError(
571 'vasync.waterfall: function %d ("%s") invoked ' +
572 'its callback twice', idx,
573 rv['operations'][idx].funcname));
574 }
575
576 mod_assert.equal(idx, rv['ndone']);
577 entry = rv['operations'][rv['ndone']++];
578 args = Array.prototype.slice.call(arguments, 2);
579
580 mod_assert.equal(entry['status'], 'pending');
581 entry['status'] = err ? 'fail' : 'ok';
582 entry['err'] = err;
583 entry['results'] = args;
584
585 if (err)
586 rv['nerrors']++;
587 else
588 rv['successes'].push(args);
589
590 if (err || rv['ndone'] == funcs.length) {
591 if (callback) {
592 args.unshift(err);
593 callback.apply(null, args);
594 }
595 } else {
596 nextentry = rv['operations'][rv['ndone']];
597 nextentry['status'] = 'pending';
598 current++;
599 args.push(next.bind(null, current));
600 setImmediate(function () {
601 nextentry['func'].apply(null, args);
602 });
603 }
604 };
605
606 rv['operations'][0]['status'] = 'pending';
607 current = 0;
608 funcs[0](next.bind(null, current));
609 return (rv);
610}