1 |
|
2 |
|
3 |
|
4 |
|
5 | var mod_assert = require('assert');
|
6 | var mod_events = require('events');
|
7 | var mod_util = require('util');
|
8 | var mod_verror = require('verror');
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | exports.parallel = parallel;
|
14 | exports.forEachParallel = forEachParallel;
|
15 | exports.pipeline = pipeline;
|
16 | exports.tryEach = tryEach;
|
17 | exports.forEachPipeline = forEachPipeline;
|
18 | exports.queue = queue;
|
19 | exports.queuev = queuev;
|
20 | exports.barrier = barrier;
|
21 | exports.waterfall = waterfall;
|
22 |
|
23 | if (!global.setImmediate) {
|
24 | global.setImmediate = function (func) {
|
25 | var args = Array.prototype.slice.call(arguments, 1);
|
26 | args.unshift(0);
|
27 | args.unshift(func);
|
28 | setTimeout.apply(this, args);
|
29 | };
|
30 | }
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 | function isEmpty(obj)
|
38 | {
|
39 | var key;
|
40 | for (key in obj)
|
41 | return (false);
|
42 | return (true);
|
43 | }
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 | function parallel(args, callback)
|
51 | {
|
52 | var funcs, rv, doneOne, i;
|
53 |
|
54 | mod_assert.equal(typeof (args), 'object', '"args" must be an object');
|
55 | mod_assert.ok(Array.isArray(args['funcs']),
|
56 | '"args.funcs" must be specified and must be an array');
|
57 | mod_assert.equal(typeof (callback), 'function',
|
58 | 'callback argument must be specified and must be a function');
|
59 |
|
60 | funcs = args['funcs'].slice(0);
|
61 |
|
62 | rv = {
|
63 | 'operations': new Array(funcs.length),
|
64 | 'successes': [],
|
65 | 'ndone': 0,
|
66 | 'nerrors': 0
|
67 | };
|
68 |
|
69 | if (funcs.length === 0) {
|
70 | setImmediate(function () { callback(null, rv); });
|
71 | return (rv);
|
72 | }
|
73 |
|
74 | doneOne = function (entry) {
|
75 | return (function (err, result) {
|
76 | mod_assert.equal(entry['status'], 'pending');
|
77 |
|
78 | entry['err'] = err;
|
79 | entry['result'] = result;
|
80 | entry['status'] = err ? 'fail' : 'ok';
|
81 |
|
82 | if (err)
|
83 | rv['nerrors']++;
|
84 | else
|
85 | rv['successes'].push(result);
|
86 |
|
87 | if (++rv['ndone'] < funcs.length)
|
88 | return;
|
89 |
|
90 | var errors = rv['operations'].filter(function (ent) {
|
91 | return (ent['status'] == 'fail');
|
92 | }).map(function (ent) { return (ent['err']); });
|
93 |
|
94 | if (errors.length > 0)
|
95 | callback(new mod_verror.MultiError(errors), rv);
|
96 | else
|
97 | callback(null, rv);
|
98 | });
|
99 | };
|
100 |
|
101 | for (i = 0; i < funcs.length; i++) {
|
102 | rv['operations'][i] = {
|
103 | 'func': funcs[i],
|
104 | 'funcname': funcs[i].name || '(anon)',
|
105 | 'status': 'pending'
|
106 | };
|
107 |
|
108 | funcs[i](doneOne(rv['operations'][i]));
|
109 | }
|
110 |
|
111 | return (rv);
|
112 | }
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 | function forEachParallel(args, callback)
|
124 | {
|
125 | var func, funcs;
|
126 |
|
127 | mod_assert.equal(typeof (args), 'object', '"args" must be an object');
|
128 | mod_assert.equal(typeof (args['func']), 'function',
|
129 | '"args.func" must be specified and must be a function');
|
130 | mod_assert.ok(Array.isArray(args['inputs']),
|
131 | '"args.inputs" must be specified and must be an array');
|
132 |
|
133 | func = args['func'];
|
134 | funcs = args['inputs'].map(function (input) {
|
135 | return (function (subcallback) {
|
136 | return (func(input, subcallback));
|
137 | });
|
138 | });
|
139 |
|
140 | return (parallel({ 'funcs': funcs }, callback));
|
141 | }
|
142 |
|
143 |
|
144 |
|
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 |
|
151 | function pipeline(args, callback)
|
152 | {
|
153 | mod_assert.equal(typeof (args), 'object', '"args" must be an object');
|
154 | mod_assert.ok(Array.isArray(args['funcs']),
|
155 | '"args.funcs" must be specified and must be an array');
|
156 |
|
157 | var opts = {
|
158 | 'funcs': args['funcs'].slice(0),
|
159 | 'callback': callback,
|
160 | 'args': { impl: 'pipeline', uarg: args['arg'] },
|
161 | 'stop_when': 'error',
|
162 | 'res_type': 'values'
|
163 | };
|
164 | return (waterfall_impl(opts));
|
165 | }
|
166 |
|
167 | function tryEach(funcs, callback)
|
168 | {
|
169 | mod_assert.ok(Array.isArray(funcs),
|
170 | '"funcs" must be specified and must be an array');
|
171 | mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
|
172 | '"callback" must be a function');
|
173 | var opts = {
|
174 | 'funcs': funcs.slice(0),
|
175 | 'callback': callback,
|
176 | 'args': { impl: 'tryEach' },
|
177 | 'stop_when': 'success',
|
178 | 'res_type': 'array'
|
179 | };
|
180 | return (waterfall_impl(opts));
|
181 | }
|
182 |
|
183 |
|
184 |
|
185 |
|
186 |
|
187 |
|
188 |
|
189 |
|
190 |
|
191 |
|
192 | function forEachPipeline(args, callback) {
|
193 | mod_assert.equal(typeof (args), 'object', '"args" must be an object');
|
194 | mod_assert.equal(typeof (args['func']), 'function',
|
195 | '"args.func" must be specified and must be a function');
|
196 | mod_assert.ok(Array.isArray(args['inputs']),
|
197 | '"args.inputs" must be specified and must be an array');
|
198 | mod_assert.equal(typeof (callback), 'function',
|
199 | 'callback argument must be specified and must be a function');
|
200 |
|
201 | var func = args['func'];
|
202 |
|
203 | var funcs = args['inputs'].map(function (input) {
|
204 | return (function (_, subcallback) {
|
205 | return (func(input, subcallback));
|
206 | });
|
207 | });
|
208 |
|
209 | return (pipeline({'funcs': funcs}, callback));
|
210 | }
|
211 |
|
212 |
|
213 |
|
214 |
|
215 |
|
216 | function queue(worker, concurrency)
|
217 | {
|
218 | return (new WorkQueue({
|
219 | 'worker': worker,
|
220 | 'concurrency': concurrency
|
221 | }));
|
222 | }
|
223 |
|
224 | function queuev(args)
|
225 | {
|
226 | return (new WorkQueue(args));
|
227 | }
|
228 |
|
229 | function WorkQueue(args)
|
230 | {
|
231 | mod_assert.ok(args.hasOwnProperty('worker'));
|
232 | mod_assert.equal(typeof (args['worker']), 'function');
|
233 | mod_assert.ok(args.hasOwnProperty('concurrency'));
|
234 | mod_assert.equal(typeof (args['concurrency']), 'number');
|
235 | mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
|
236 | mod_assert.ok(args['concurrency'] > 0);
|
237 |
|
238 | mod_events.EventEmitter.call(this);
|
239 |
|
240 | this.nextid = 0;
|
241 | this.worker = args['worker'];
|
242 | this.worker_name = args['worker'].name || 'anon';
|
243 | this.npending = 0;
|
244 | this.pending = {};
|
245 | this.queued = [];
|
246 | this.closed = false;
|
247 | this.ended = false;
|
248 |
|
249 |
|
250 | this.concurrency = args['concurrency'];
|
251 | this.saturated = undefined;
|
252 | this.empty = undefined;
|
253 | this.drain = undefined;
|
254 | }
|
255 |
|
256 | mod_util.inherits(WorkQueue, mod_events.EventEmitter);
|
257 |
|
258 | WorkQueue.prototype.push = function (tasks, callback)
|
259 | {
|
260 | if (!Array.isArray(tasks))
|
261 | return (this.pushOne(tasks, callback));
|
262 |
|
263 | var wq = this;
|
264 | return (tasks.map(function (task) {
|
265 | return (wq.pushOne(task, callback));
|
266 | }));
|
267 | };
|
268 |
|
269 | WorkQueue.prototype.updateConcurrency = function (concurrency)
|
270 | {
|
271 | if (this.closed)
|
272 | throw new mod_verror.VError(
|
273 | 'update concurrency invoked after queue closed');
|
274 | this.concurrency = concurrency;
|
275 | this.dispatchNext();
|
276 | };
|
277 |
|
278 | WorkQueue.prototype.close = function ()
|
279 | {
|
280 | var wq = this;
|
281 |
|
282 | if (wq.closed)
|
283 | return;
|
284 | wq.closed = true;
|
285 |
|
286 | |
287 |
|
288 |
|
289 |
|
290 | if (wq.npending === 0 && wq.queued.length === 0) {
|
291 | setImmediate(function () {
|
292 | if (!wq.ended) {
|
293 | wq.ended = true;
|
294 | wq.emit('end');
|
295 | }
|
296 | });
|
297 | }
|
298 | };
|
299 |
|
300 |
|
301 | WorkQueue.prototype.pushOne = function (task, callback)
|
302 | {
|
303 | if (this.closed)
|
304 | throw new mod_verror.VError('push invoked after queue closed');
|
305 |
|
306 | var id = ++this.nextid;
|
307 | var entry = { 'id': id, 'task': task, 'callback': callback };
|
308 |
|
309 | this.queued.push(entry);
|
310 | this.dispatchNext();
|
311 |
|
312 | return (id);
|
313 | };
|
314 |
|
315 |
|
316 | WorkQueue.prototype.dispatchNext = function ()
|
317 | {
|
318 | var wq = this;
|
319 | if (wq.npending === 0 && wq.queued.length === 0) {
|
320 | if (wq.drain)
|
321 | wq.drain();
|
322 | wq.emit('drain');
|
323 | |
324 |
|
325 |
|
326 |
|
327 | if (wq.closed) {
|
328 | wq.ended = true;
|
329 | wq.emit('end');
|
330 | }
|
331 | } else if (wq.queued.length > 0) {
|
332 | while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
|
333 | var next = wq.queued.shift();
|
334 | wq.dispatch(next);
|
335 |
|
336 | if (wq.queued.length === 0) {
|
337 | if (wq.empty)
|
338 | wq.empty();
|
339 | wq.emit('empty');
|
340 | }
|
341 | }
|
342 | }
|
343 | };
|
344 |
|
345 | WorkQueue.prototype.dispatch = function (entry)
|
346 | {
|
347 | var wq = this;
|
348 |
|
349 | mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
|
350 | mod_assert.ok(this.npending < this.concurrency);
|
351 | mod_assert.ok(!this.ended);
|
352 |
|
353 | this.npending++;
|
354 | this.pending[entry['id']] = entry;
|
355 |
|
356 | if (this.npending === this.concurrency) {
|
357 | if (this.saturated)
|
358 | this.saturated();
|
359 | this.emit('saturated');
|
360 | }
|
361 |
|
362 | |
363 |
|
364 |
|
365 |
|
366 |
|
367 |
|
368 | setImmediate(function () {
|
369 | wq.worker(entry['task'], function (err) {
|
370 | --wq.npending;
|
371 | delete (wq.pending[entry['id']]);
|
372 |
|
373 | if (entry['callback'])
|
374 | entry['callback'].apply(null, arguments);
|
375 |
|
376 | wq.dispatchNext();
|
377 | });
|
378 | });
|
379 | };
|
380 |
|
381 | WorkQueue.prototype.length = function ()
|
382 | {
|
383 | return (this.queued.length);
|
384 | };
|
385 |
|
386 | WorkQueue.prototype.kill = function ()
|
387 | {
|
388 | this.killed = true;
|
389 | this.queued = [];
|
390 | this.drain = undefined;
|
391 | this.close();
|
392 | };
|
393 |
|
394 |
|
395 |
|
396 |
|
397 | function barrier(args)
|
398 | {
|
399 | return (new Barrier(args));
|
400 | }
|
401 |
|
402 | function Barrier(args)
|
403 | {
|
404 | mod_assert.ok(!args || !args['nrecent'] ||
|
405 | typeof (args['nrecent']) == 'number',
|
406 | '"nrecent" must have type "number"');
|
407 |
|
408 | mod_events.EventEmitter.call(this);
|
409 |
|
410 | var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
|
411 |
|
412 | if (nrecent > 0) {
|
413 | this.nrecent = nrecent;
|
414 | this.recent = [];
|
415 | }
|
416 |
|
417 | this.pending = {};
|
418 | this.scheduled = false;
|
419 | }
|
420 |
|
421 | mod_util.inherits(Barrier, mod_events.EventEmitter);
|
422 |
|
423 | Barrier.prototype.start = function (name)
|
424 | {
|
425 | mod_assert.ok(!this.pending.hasOwnProperty(name),
|
426 | 'operation "' + name + '" is already pending');
|
427 | this.pending[name] = Date.now();
|
428 | };
|
429 |
|
430 | Barrier.prototype.done = function (name)
|
431 | {
|
432 | mod_assert.ok(this.pending.hasOwnProperty(name),
|
433 | 'operation "' + name + '" is not pending');
|
434 |
|
435 | if (this.recent) {
|
436 | this.recent.push({
|
437 | 'name': name,
|
438 | 'start': this.pending[name],
|
439 | 'done': Date.now()
|
440 | });
|
441 |
|
442 | if (this.recent.length > this.nrecent)
|
443 | this.recent.shift();
|
444 | }
|
445 |
|
446 | delete (this.pending[name]);
|
447 |
|
448 | |
449 |
|
450 |
|
451 |
|
452 |
|
453 |
|
454 |
|
455 |
|
456 |
|
457 |
|
458 | if (!isEmpty(this.pending) || this.scheduled)
|
459 | return;
|
460 |
|
461 | this.scheduled = true;
|
462 |
|
463 | var self = this;
|
464 |
|
465 | setImmediate(function () {
|
466 | self.scheduled = false;
|
467 |
|
468 | |
469 |
|
470 |
|
471 |
|
472 |
|
473 | if (isEmpty(self.pending))
|
474 | self.emit('drain');
|
475 | });
|
476 | };
|
477 |
|
478 |
|
479 |
|
480 |
|
481 |
|
482 |
|
483 |
|
484 |
|
485 |
|
486 |
|
487 |
|
488 |
|
489 |
|
490 |
|
491 |
|
492 |
|
493 |
|
494 |
|
495 | function waterfall(funcs, callback)
|
496 | {
|
497 | mod_assert.ok(Array.isArray(funcs),
|
498 | '"funcs" must be specified and must be an array');
|
499 | mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
|
500 | '"callback" must be a function');
|
501 | var opts = {
|
502 | 'funcs': funcs.slice(0),
|
503 | 'callback': callback,
|
504 | 'args': { impl: 'waterfall' },
|
505 | 'stop_when': 'error',
|
506 | 'res_type': 'values'
|
507 | };
|
508 | return (waterfall_impl(opts));
|
509 | }
|
510 |
|
511 | function waterfall_impl(opts)
|
512 | {
|
513 | mod_assert.ok(typeof (opts) === 'object');
|
514 | var rv, current, next;
|
515 | var funcs = opts.funcs;
|
516 | var callback = opts.callback;
|
517 |
|
518 | mod_assert.ok(Array.isArray(funcs),
|
519 | '"opts.funcs" must be specified and must be an array');
|
520 | mod_assert.ok(arguments.length == 1,
|
521 | 'Function "waterfall_impl" must take only 1 arg');
|
522 | mod_assert.ok(opts.res_type === 'values' ||
|
523 | opts.res_type === 'array',
|
524 | '"opts.res_type" must either be "value" or "array"');
|
525 | mod_assert.ok(opts.stop_when === 'error' ||
|
526 | opts.stop_when === 'success',
|
527 | '"opts.stop_when" must either be "error" or "success"');
|
528 | mod_assert.ok(opts.args.impl === 'pipeline' ||
|
529 | opts.args.impl === 'waterfall' || opts.args.impl === 'tryEach',
|
530 | '"opts.args.impl" must be "pipeline", "waterfall", or "tryEach"');
|
531 | if (opts.args.impl === 'pipeline') {
|
532 | mod_assert.ok(typeof (opts.args.uarg) !== undefined,
|
533 | '"opts.args.uarg" should be defined when pipeline is used');
|
534 | }
|
535 |
|
536 | rv = {
|
537 | 'operations': funcs.map(function (func) {
|
538 | return ({
|
539 | 'func': func,
|
540 | 'funcname': func.name || '(anon)',
|
541 | 'status': 'waiting'
|
542 | });
|
543 | }),
|
544 | 'successes': [],
|
545 | 'ndone': 0,
|
546 | 'nerrors': 0
|
547 | };
|
548 |
|
549 | if (funcs.length === 0) {
|
550 | if (callback)
|
551 | setImmediate(function () {
|
552 | var res = (opts.args.impl === 'pipeline') ? rv
|
553 | : undefined;
|
554 | callback(null, res);
|
555 | });
|
556 | return (rv);
|
557 | }
|
558 |
|
559 | next = function (idx, err) {
|
560 | var res_key, args, entry, nextentry;
|
561 |
|
562 | if (err === undefined)
|
563 | err = null;
|
564 |
|
565 | if (idx != current) {
|
566 | throw (new mod_verror.VError(
|
567 | 'vasync.waterfall: function %d ("%s") invoked ' +
|
568 | 'its callback twice', idx,
|
569 | rv['operations'][idx].funcname));
|
570 | }
|
571 |
|
572 | mod_assert.equal(idx, rv['ndone'],
|
573 | 'idx should be equal to ndone');
|
574 | entry = rv['operations'][rv['ndone']++];
|
575 | if (opts.args.impl === 'tryEach' ||
|
576 | opts.args.impl === 'waterfall') {
|
577 | args = Array.prototype.slice.call(arguments, 2);
|
578 | res_key = 'results';
|
579 | } else if (opts.args.impl === 'pipeline') {
|
580 | args = [ opts.args.uarg ];
|
581 | res_key = 'result';
|
582 | }
|
583 |
|
584 | mod_assert.equal(entry['status'], 'pending',
|
585 | 'status should be pending');
|
586 | entry['status'] = err ? 'fail' : 'ok';
|
587 | entry['err'] = err;
|
588 | entry[res_key] = args;
|
589 |
|
590 | if (err)
|
591 | rv['nerrors']++;
|
592 | else
|
593 | rv['successes'].push(args);
|
594 |
|
595 | if ((opts.stop_when === 'error' && err) ||
|
596 | (opts.stop_when === 'success' &&
|
597 | rv['successes'].length > 0) ||
|
598 | rv['ndone'] == funcs.length) {
|
599 | if (callback) {
|
600 | if (opts.res_type === 'values' ||
|
601 | (opts.res_type === 'array' &&
|
602 | args.length <= 1)) {
|
603 | args.unshift(err);
|
604 | callback.apply(null, args);
|
605 | } else if (opts.res_type === 'array') {
|
606 | callback(err, args);
|
607 | }
|
608 | }
|
609 | } else {
|
610 | nextentry = rv['operations'][rv['ndone']];
|
611 | nextentry['status'] = 'pending';
|
612 | current++;
|
613 | args.push(next.bind(null, current));
|
614 | setImmediate(function () {
|
615 | var nfunc = nextentry['func'];
|
616 | |
617 |
|
618 |
|
619 |
|
620 |
|
621 |
|
622 |
|
623 |
|
624 |
|
625 |
|
626 |
|
627 |
|
628 |
|
629 |
|
630 |
|
631 |
|
632 |
|
633 |
|
634 |
|
635 |
|
636 |
|
637 | if (opts.args.impl !== 'tryEach') {
|
638 | nfunc.apply(null, args);
|
639 | } else {
|
640 | nfunc(next.bind(null, current));
|
641 | }
|
642 | });
|
643 | }
|
644 | };
|
645 |
|
646 | rv['operations'][0]['status'] = 'pending';
|
647 | current = 0;
|
648 | if (opts.args.impl !== 'pipeline') {
|
649 | funcs[0](next.bind(null, current));
|
650 | } else {
|
651 | funcs[0](opts.args.uarg, next.bind(null, current));
|
652 | }
|
653 | return (rv);
|
654 | }
|