UNPKG

17.1 kBJavaScriptView Raw
1/*
2 * vasync.js: utilities for observable asynchronous control flow
3 */
4
5var mod_assert = require('assert');
6var mod_events = require('events');
7var mod_util = require('util');
8var mod_verror = require('verror');
9
10/*
11 * Public interface
12 */
13exports.parallel = parallel;
14exports.forEachParallel = forEachParallel;
15exports.pipeline = pipeline;
16exports.tryEach = tryEach;
17exports.forEachPipeline = forEachPipeline;
18exports.queue = queue;
19exports.queuev = queuev;
20exports.barrier = barrier;
21exports.waterfall = waterfall;
22
23if (!global.setImmediate) {
24 global.setImmediate = function (func) {
25 var args = Array.prototype.slice.call(arguments, 1);
26 args.unshift(0);
27 args.unshift(func);
28 setTimeout.apply(this, args);
29 };
30}
31
32/*
33 * This is incorporated here from jsprim because jsprim ends up pulling in a lot
34 * of dependencies. If we end up needing more from jsprim, though, we should
35 * add it back and rip out this function.
36 */
37function isEmpty(obj)
38{
39 var key;
40 for (key in obj)
41 return (false);
42 return (true);
43}
44
45/*
46 * Given a set of functions that complete asynchronously using the standard
47 * callback(err, result) pattern, invoke them all and merge the results. See
48 * README.md for details.
49 */
50function parallel(args, callback)
51{
52 var funcs, rv, doneOne, i;
53
54 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
55 mod_assert.ok(Array.isArray(args['funcs']),
56 '"args.funcs" must be specified and must be an array');
57 mod_assert.equal(typeof (callback), 'function',
58 'callback argument must be specified and must be a function');
59
60 funcs = args['funcs'].slice(0);
61
62 rv = {
63 'operations': new Array(funcs.length),
64 'successes': [],
65 'ndone': 0,
66 'nerrors': 0
67 };
68
69 if (funcs.length === 0) {
70 setImmediate(function () { callback(null, rv); });
71 return (rv);
72 }
73
74 doneOne = function (entry) {
75 return (function (err, result) {
76 mod_assert.equal(entry['status'], 'pending');
77
78 entry['err'] = err;
79 entry['result'] = result;
80 entry['status'] = err ? 'fail' : 'ok';
81
82 if (err)
83 rv['nerrors']++;
84 else
85 rv['successes'].push(result);
86
87 if (++rv['ndone'] < funcs.length)
88 return;
89
90 var errors = rv['operations'].filter(function (ent) {
91 return (ent['status'] == 'fail');
92 }).map(function (ent) { return (ent['err']); });
93
94 if (errors.length > 0)
95 callback(new mod_verror.MultiError(errors), rv);
96 else
97 callback(null, rv);
98 });
99 };
100
101 for (i = 0; i < funcs.length; i++) {
102 rv['operations'][i] = {
103 'func': funcs[i],
104 'funcname': funcs[i].name || '(anon)',
105 'status': 'pending'
106 };
107
108 funcs[i](doneOne(rv['operations'][i]));
109 }
110
111 return (rv);
112}
113
114/*
115 * Exactly like parallel, except that the input is specified as a single
116 * function to invoke on N different inputs (rather than N functions). "args"
117 * must have the following fields:
118 *
119 * func asynchronous function to invoke on each input value
120 *
121 * inputs array of input values
122 */
123function forEachParallel(args, callback)
124{
125 var func, funcs;
126
127 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
128 mod_assert.equal(typeof (args['func']), 'function',
129 '"args.func" must be specified and must be a function');
130 mod_assert.ok(Array.isArray(args['inputs']),
131 '"args.inputs" must be specified and must be an array');
132
133 func = args['func'];
134 funcs = args['inputs'].map(function (input) {
135 return (function (subcallback) {
136 return (func(input, subcallback));
137 });
138 });
139
140 return (parallel({ 'funcs': funcs }, callback));
141}
142
143/*
144 * Like parallel, but invokes functions in sequence rather than in parallel
145 * and aborts if any function exits with failure. Arguments include:
146 *
147 * funcs invoke the functions in parallel
148 *
149 * arg first argument to each pipeline function
150 */
151function pipeline(args, callback)
152{
153 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
154 mod_assert.ok(Array.isArray(args['funcs']),
155 '"args.funcs" must be specified and must be an array');
156
157 var opts = {
158 'funcs': args['funcs'].slice(0),
159 'callback': callback,
160 'args': { impl: 'pipeline', uarg: args['arg'] },
161 'stop_when': 'error',
162 'res_type': 'values'
163 };
164 return (waterfall_impl(opts));
165}
166
167function tryEach(funcs, callback)
168{
169 mod_assert.ok(Array.isArray(funcs),
170 '"funcs" must be specified and must be an array');
171 mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
172 '"callback" must be a function');
173 var opts = {
174 'funcs': funcs.slice(0),
175 'callback': callback,
176 'args': { impl: 'tryEach' },
177 'stop_when': 'success',
178 'res_type': 'array'
179 };
180 return (waterfall_impl(opts));
181}
182
183/*
184 * Exactly like pipeline, except that the input is specified as a single
185 * function to invoke on N different inputs (rather than N functions). "args"
186 * must have the following fields:
187 *
188 * func asynchronous function to invoke on each input value
189 *
190 * inputs array of input values
191 */
192function forEachPipeline(args, callback) {
193 mod_assert.equal(typeof (args), 'object', '"args" must be an object');
194 mod_assert.equal(typeof (args['func']), 'function',
195 '"args.func" must be specified and must be a function');
196 mod_assert.ok(Array.isArray(args['inputs']),
197 '"args.inputs" must be specified and must be an array');
198 mod_assert.equal(typeof (callback), 'function',
199 'callback argument must be specified and must be a function');
200
201 var func = args['func'];
202
203 var funcs = args['inputs'].map(function (input) {
204 return (function (_, subcallback) {
205 return (func(input, subcallback));
206 });
207 });
208
209 return (pipeline({'funcs': funcs}, callback));
210}
211
212
213/*
214 * async-compatible "queue" function.
215 */
216function queue(worker, concurrency)
217{
218 return (new WorkQueue({
219 'worker': worker,
220 'concurrency': concurrency
221 }));
222}
223
224function queuev(args)
225{
226 return (new WorkQueue(args));
227}
228
229function WorkQueue(args)
230{
231 mod_assert.ok(args.hasOwnProperty('worker'));
232 mod_assert.equal(typeof (args['worker']), 'function');
233 mod_assert.ok(args.hasOwnProperty('concurrency'));
234 mod_assert.equal(typeof (args['concurrency']), 'number');
235 mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
236 mod_assert.ok(args['concurrency'] > 0);
237
238 mod_events.EventEmitter.call(this);
239
240 this.nextid = 0;
241 this.worker = args['worker'];
242 this.worker_name = args['worker'].name || 'anon';
243 this.npending = 0;
244 this.pending = {};
245 this.queued = [];
246 this.closed = false;
247 this.ended = false;
248
249 /* user-settable fields inherited from "async" interface */
250 this.concurrency = args['concurrency'];
251 this.saturated = undefined;
252 this.empty = undefined;
253 this.drain = undefined;
254}
255
256mod_util.inherits(WorkQueue, mod_events.EventEmitter);
257
258WorkQueue.prototype.push = function (tasks, callback)
259{
260 if (!Array.isArray(tasks))
261 return (this.pushOne(tasks, callback));
262
263 var wq = this;
264 return (tasks.map(function (task) {
265 return (wq.pushOne(task, callback));
266 }));
267};
268
269WorkQueue.prototype.updateConcurrency = function (concurrency)
270{
271 if (this.closed)
272 throw new mod_verror.VError(
273 'update concurrency invoked after queue closed');
274 this.concurrency = concurrency;
275 this.dispatchNext();
276};
277
278WorkQueue.prototype.close = function ()
279{
280 var wq = this;
281
282 if (wq.closed)
283 return;
284 wq.closed = true;
285
286 /*
287 * If the queue is already empty, just fire the "end" event on the
288 * next tick.
289 */
290 if (wq.npending === 0 && wq.queued.length === 0) {
291 setImmediate(function () {
292 if (!wq.ended) {
293 wq.ended = true;
294 wq.emit('end');
295 }
296 });
297 }
298};
299
300/* private */
301WorkQueue.prototype.pushOne = function (task, callback)
302{
303 if (this.closed)
304 throw new mod_verror.VError('push invoked after queue closed');
305
306 var id = ++this.nextid;
307 var entry = { 'id': id, 'task': task, 'callback': callback };
308
309 this.queued.push(entry);
310 this.dispatchNext();
311
312 return (id);
313};
314
315/* private */
316WorkQueue.prototype.dispatchNext = function ()
317{
318 var wq = this;
319 if (wq.npending === 0 && wq.queued.length === 0) {
320 if (wq.drain)
321 wq.drain();
322 wq.emit('drain');
323 /*
324 * The queue is closed; emit the final "end"
325 * event before we come to rest:
326 */
327 if (wq.closed) {
328 wq.ended = true;
329 wq.emit('end');
330 }
331 } else if (wq.queued.length > 0) {
332 while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
333 var next = wq.queued.shift();
334 wq.dispatch(next);
335
336 if (wq.queued.length === 0) {
337 if (wq.empty)
338 wq.empty();
339 wq.emit('empty');
340 }
341 }
342 }
343};
344
345WorkQueue.prototype.dispatch = function (entry)
346{
347 var wq = this;
348
349 mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
350 mod_assert.ok(this.npending < this.concurrency);
351 mod_assert.ok(!this.ended);
352
353 this.npending++;
354 this.pending[entry['id']] = entry;
355
356 if (this.npending === this.concurrency) {
357 if (this.saturated)
358 this.saturated();
359 this.emit('saturated');
360 }
361
362 /*
363 * We invoke the worker function on the next tick so that callers can
364 * always assume that the callback is NOT invoked during the call to
365 * push() even if the queue is not at capacity. It also avoids O(n)
366 * stack usage when used with synchronous worker functions.
367 */
368 setImmediate(function () {
369 wq.worker(entry['task'], function (err) {
370 --wq.npending;
371 delete (wq.pending[entry['id']]);
372
373 if (entry['callback'])
374 entry['callback'].apply(null, arguments);
375
376 wq.dispatchNext();
377 });
378 });
379};
380
381WorkQueue.prototype.length = function ()
382{
383 return (this.queued.length);
384};
385
386WorkQueue.prototype.kill = function ()
387{
388 this.killed = true;
389 this.queued = [];
390 this.drain = undefined;
391 this.close();
392};
393
394/*
395 * Barriers coordinate multiple concurrent operations.
396 */
397function barrier(args)
398{
399 return (new Barrier(args));
400}
401
402function Barrier(args)
403{
404 mod_assert.ok(!args || !args['nrecent'] ||
405 typeof (args['nrecent']) == 'number',
406 '"nrecent" must have type "number"');
407
408 mod_events.EventEmitter.call(this);
409
410 var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
411
412 if (nrecent > 0) {
413 this.nrecent = nrecent;
414 this.recent = [];
415 }
416
417 this.pending = {};
418 this.scheduled = false;
419}
420
421mod_util.inherits(Barrier, mod_events.EventEmitter);
422
423Barrier.prototype.start = function (name)
424{
425 mod_assert.ok(!this.pending.hasOwnProperty(name),
426 'operation "' + name + '" is already pending');
427 this.pending[name] = Date.now();
428};
429
430Barrier.prototype.done = function (name)
431{
432 mod_assert.ok(this.pending.hasOwnProperty(name),
433 'operation "' + name + '" is not pending');
434
435 if (this.recent) {
436 this.recent.push({
437 'name': name,
438 'start': this.pending[name],
439 'done': Date.now()
440 });
441
442 if (this.recent.length > this.nrecent)
443 this.recent.shift();
444 }
445
446 delete (this.pending[name]);
447
448 /*
449 * If we executed at least one operation and we're now empty, we should
450 * emit "drain". But most code doesn't deal well with events being
451 * processed while they're executing, so we actually schedule this event
452 * for the next tick.
453 *
454 * We use the "scheduled" flag to avoid emitting multiple "drain" events
455 * on consecutive ticks if the user starts and ends another task during
456 * this tick.
457 */
458 if (!isEmpty(this.pending) || this.scheduled)
459 return;
460
461 this.scheduled = true;
462
463 var self = this;
464
465 setImmediate(function () {
466 self.scheduled = false;
467
468 /*
469 * It's also possible that the user has started another task on
470 * the previous tick, in which case we really shouldn't emit
471 * "drain".
472 */
473 if (isEmpty(self.pending))
474 self.emit('drain');
475 });
476};
477
478/*
479 * waterfall([ funcs ], callback): invoke each of the asynchronous functions
480 * "funcs" in series. Each function is passed any values emitted by the
481 * previous function (none for the first function), followed by the callback to
482 * invoke upon completion. This callback must be invoked exactly once,
483 * regardless of success or failure. As conventional in Node, the first
484 * argument to the callback indicates an error (if non-null). Subsequent
485 * arguments are passed to the next function in the "funcs" chain.
486 *
487 * If any function fails (i.e., calls its callback with an Error), then the
488 * remaining functions are not invoked and "callback" is invoked with the error.
489 *
490 * The only difference between waterfall() and pipeline() are the arguments
491 * passed to each function in the chain. pipeline() always passes the same
492 * argument followed by the callback, while waterfall() passes whatever values
493 * were emitted by the previous function followed by the callback.
494 */
495function waterfall(funcs, callback)
496{
497 mod_assert.ok(Array.isArray(funcs),
498 '"funcs" must be specified and must be an array');
499 mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
500 '"callback" must be a function');
501 var opts = {
502 'funcs': funcs.slice(0),
503 'callback': callback,
504 'args': { impl: 'waterfall' },
505 'stop_when': 'error',
506 'res_type': 'values'
507 };
508 return (waterfall_impl(opts));
509}
510
511function waterfall_impl(opts)
512{
513 mod_assert.ok(typeof (opts) === 'object');
514 var rv, current, next;
515 var funcs = opts.funcs;
516 var callback = opts.callback;
517
518 mod_assert.ok(Array.isArray(funcs),
519 '"opts.funcs" must be specified and must be an array');
520 mod_assert.ok(arguments.length == 1,
521 'Function "waterfall_impl" must take only 1 arg');
522 mod_assert.ok(opts.res_type === 'values' ||
523 opts.res_type === 'array',
524 '"opts.res_type" must either be "value" or "array"');
525 mod_assert.ok(opts.stop_when === 'error' ||
526 opts.stop_when === 'success',
527 '"opts.stop_when" must either be "error" or "success"');
528 mod_assert.ok(opts.args.impl === 'pipeline' ||
529 opts.args.impl === 'waterfall' || opts.args.impl === 'tryEach',
530 '"opts.args.impl" must be "pipeline", "waterfall", or "tryEach"');
531 if (opts.args.impl === 'pipeline') {
532 mod_assert.ok(typeof (opts.args.uarg) !== undefined,
533 '"opts.args.uarg" should be defined when pipeline is used');
534 }
535
536 rv = {
537 'operations': funcs.map(function (func) {
538 return ({
539 'func': func,
540 'funcname': func.name || '(anon)',
541 'status': 'waiting'
542 });
543 }),
544 'successes': [],
545 'ndone': 0,
546 'nerrors': 0
547 };
548
549 if (funcs.length === 0) {
550 if (callback)
551 setImmediate(function () {
552 var res = (opts.args.impl === 'pipeline') ? rv
553 : undefined;
554 callback(null, res);
555 });
556 return (rv);
557 }
558
559 next = function (idx, err) {
560 var res_key, args, entry, nextentry;
561
562 if (err === undefined)
563 err = null;
564
565 if (idx != current) {
566 throw (new mod_verror.VError(
567 'vasync.waterfall: function %d ("%s") invoked ' +
568 'its callback twice', idx,
569 rv['operations'][idx].funcname));
570 }
571
572 mod_assert.equal(idx, rv['ndone'],
573 'idx should be equal to ndone');
574 entry = rv['operations'][rv['ndone']++];
575 if (opts.args.impl === 'tryEach' ||
576 opts.args.impl === 'waterfall') {
577 args = Array.prototype.slice.call(arguments, 2);
578 res_key = 'results';
579 } else if (opts.args.impl === 'pipeline') {
580 args = [ opts.args.uarg ];
581 res_key = 'result';
582 }
583
584 mod_assert.equal(entry['status'], 'pending',
585 'status should be pending');
586 entry['status'] = err ? 'fail' : 'ok';
587 entry['err'] = err;
588 entry[res_key] = args;
589
590 if (err)
591 rv['nerrors']++;
592 else
593 rv['successes'].push(args);
594
595 if ((opts.stop_when === 'error' && err) ||
596 (opts.stop_when === 'success' &&
597 rv['successes'].length > 0) ||
598 rv['ndone'] == funcs.length) {
599 if (callback) {
600 if (opts.res_type === 'values' ||
601 (opts.res_type === 'array' &&
602 args.length <= 1)) {
603 args.unshift(err);
604 callback.apply(null, args);
605 } else if (opts.res_type === 'array') {
606 callback(err, args);
607 }
608 }
609 } else {
610 nextentry = rv['operations'][rv['ndone']];
611 nextentry['status'] = 'pending';
612 current++;
613 args.push(next.bind(null, current));
614 setImmediate(function () {
615 var nfunc = nextentry['func'];
616 /*
617 * At first glance it may seem like this branch
618 * is superflous with the code above that
619 * branches on `opts.args.impl`. It may also
620 * seem like calling `nfunc.apply` is
621 * sufficient for both cases (after all we
622 * pushed `next.bind(null, current)` to the
623 * `args` array), before we call
624 * `setImmediate()`. However, this is not the
625 * case, because the interface exposed by
626 * tryEach is different from the others. The
627 * others pass argument(s) from task to task.
628 * tryEach passes nothing but a callback
629 * (`next.bind` below). However, the callback
630 * itself _can_ be called with one or more
631 * results, which we collect into `args` using
632 * the aformentioned `opts.args.impl` branch
633 * above, and which we pass to the callback via
634 * the `opts.res_type` branch above (where
635 * res_type is set to 'array').
636 */
637 if (opts.args.impl !== 'tryEach') {
638 nfunc.apply(null, args);
639 } else {
640 nfunc(next.bind(null, current));
641 }
642 });
643 }
644 };
645
646 rv['operations'][0]['status'] = 'pending';
647 current = 0;
648 if (opts.args.impl !== 'pipeline') {
649 funcs[0](next.bind(null, current));
650 } else {
651 funcs[0](opts.args.uarg, next.bind(null, current));
652 }
653 return (rv);
654}