1 | # vasync: utilities for observable asynchronous control flow
|
2 |
|
3 | This module provides facilities for asynchronous control flow. There are many
|
4 | modules that do this already (notably async.js). This one's claim to fame is
|
5 | aided debuggability: each of the contained functions return a "status" object
|
6 | with the following fields:
|
7 |
|
8 | operations array corresponding to the input functions, with
|
9 |
|
10 | func input function
|
11 |
|
12 | status "pending", "ok", or "fail"
|
13 |
|
14 | err returned "err" value, if any
|
15 |
|
16 | result returned "result" value, if any
|
17 |
|
18 | successes "result" field for each of "operations" where
|
19 | "status" == "ok"
|
20 |
|
21 | ndone number of input operations that have completed
|
22 |
|
23 | nerrors number of input operations that have failed
|
24 |
|
25 | You can use this from a debugger (or your own monitoring code) to understand
|
26 | the state of an ongoing asynchronous operation. For example, you could see how
|
27 | far into a pipeline some particular operation is.
|
28 |
|
29 |
|
30 | ### parallel(args, callback): invoke N functions in parallel and merge the results
|
31 |
|
32 | This function takes a list of input functions (specified by the "funcs" property
|
33 | of "args") and runs them all. These input functions are expected to be
|
34 | asynchronous: they get a "callback" argument and should invoke it as
|
35 | callback(err, result). The error and result will be saved and made available to
|
36 | the original caller when all of these functions complete.
|
37 |
|
38 | All errors are combined into a single "err" parameter to the final callback (see
|
39 | below). You can also observe the progress of the operation as it goes by
|
40 | examining the object returned synchronously by this function.
|
41 |
|
42 | Example usage:
|
43 |
|
44 | status = mod_vasync.parallel({
|
45 | 'funcs': [
|
46 | function f1 (callback) { mod_fs.stat('/tmp', callback); },
|
47 | function f2 (callback) { mod_fs.stat('/noexist', callback); },
|
48 | function f3 (callback) { mod_fs.stat('/var', callback); }
|
49 | ]
|
50 | }, function (err, results) {
|
51 | console.log('error: %s', err.message);
|
52 | console.log('results: %s', mod_util.inspect(results, null, 3));
|
53 | });
|
54 |
|
55 | console.log('status: %s', mod_sys.inspect(status, null, 3));
|
56 |
|
57 | In the first tick, this outputs:
|
58 |
|
59 | status: { operations:
|
60 | [ { func: [Function: f1], status: 'pending' },
|
61 | { func: [Function: f2], status: 'pending' },
|
62 | { func: [Function: f3], status: 'pending' } ],
|
63 | successes: [],
|
64 | ndone: 0,
|
65 | nerrors: 0 }
|
66 |
|
67 | showing that there are three operations pending and none has yet been started.
|
68 | When the program finishes, it outputs this error:
|
69 |
|
70 | error: first of 1 error: ENOENT, no such file or directory '/noexist'
|
71 |
|
72 | which encapsulates all of the intermediate failures. This model allows you to
|
73 | write the final callback like you normally would:
|
74 |
|
75 | if (err)
|
76 | return (callback(err));
|
77 |
|
78 | and still propagate useful information to callers that don't deal with multiple
|
79 | errors (i.e. most callers).
|
80 |
|
81 | The example also prints out the detailed final status, including all of the
|
82 | errors and return values:
|
83 |
|
84 | results: { operations:
|
85 | [ { func: [Function: f1],
|
86 | status: 'ok',
|
87 | err: null,
|
88 | result:
|
89 | { dev: 140247096,
|
90 | ino: 879368309,
|
91 | mode: 17407,
|
92 | nlink: 9,
|
93 | uid: 0,
|
94 | gid: 3,
|
95 | rdev: 0,
|
96 | size: 754,
|
97 | blksize: 4096,
|
98 | blocks: 8,
|
99 | atime: Thu, 12 Apr 2012 23:18:57 GMT,
|
100 | mtime: Tue, 17 Apr 2012 23:56:34 GMT,
|
101 | ctime: Tue, 17 Apr 2012 23:56:34 GMT } },
|
102 | { func: [Function: f2],
|
103 | status: 'fail',
|
104 | err: { [Error: ENOENT, no such file or directory '/noexist'] errno: 34, code: 'ENOENT', path: '/noexist' },
|
105 | result: undefined },
|
106 | { func: [Function: f3],
|
107 | status: 'ok',
|
108 | err: null,
|
109 | result:
|
110 | { dev: 23658528,
|
111 | ino: 5,
|
112 | mode: 16877,
|
113 | nlink: 27,
|
114 | uid: 0,
|
115 | gid: 0,
|
116 | rdev: -1,
|
117 | size: 27,
|
118 | blksize: 2560,
|
119 | blocks: 3,
|
120 | atime: Fri, 09 Sep 2011 14:28:55 GMT,
|
121 | mtime: Wed, 04 Apr 2012 17:51:20 GMT,
|
122 | ctime: Wed, 04 Apr 2012 17:51:20 GMT } } ],
|
123 | successes:
|
124 | [ { dev: 234881026,
|
125 | ino: 24965,
|
126 | mode: 17407,
|
127 | nlink: 8,
|
128 | uid: 0,
|
129 | gid: 0,
|
130 | rdev: 0,
|
131 | size: 272,
|
132 | blksize: 4096,
|
133 | blocks: 0,
|
134 | atime: Tue, 01 May 2012 16:02:24 GMT,
|
135 | mtime: Tue, 01 May 2012 19:10:35 GMT,
|
136 | ctime: Tue, 01 May 2012 19:10:35 GMT },
|
137 | { dev: 234881026,
|
138 | ino: 216,
|
139 | mode: 16877,
|
140 | nlink: 26,
|
141 | uid: 0,
|
142 | gid: 0,
|
143 | rdev: 0,
|
144 | size: 884,
|
145 | blksize: 4096,
|
146 | blocks: 0,
|
147 | atime: Tue, 01 May 2012 16:02:24 GMT,
|
148 | mtime: Fri, 14 Aug 2009 21:23:03 GMT,
|
149 | ctime: Thu, 28 Oct 2010 21:51:39 GMT } ],
|
150 | ndone: 3,
|
151 | nerrors: 1 }
|
152 |
|
153 | You can use this if you want to handle all of the errors individually or to get
|
154 | at all of the individual return values.
|
155 |
|
156 |
|
157 | ### forEachParallel(args, callback): invoke the same function on N inputs in parallel
|
158 |
|
159 | This function is exactly like `parallel`, except that the input is specified as
|
160 | a *single* function ("func") and a list of inputs ("inputs"). The function is
|
161 | invoked on each input in parallel.
|
162 |
|
163 | This example is exactly equivalent to the one above:
|
164 |
|
165 | mod_vasync.forEachParallel({
|
166 | 'func': mod_fs.stat,
|
167 | 'inputs': [ '/var', '/nonexistent', '/tmp' ]
|
168 | }, function (err, results) {
|
169 | console.log('error: %s', err.message);
|
170 | console.log('results: %s', mod_util.inspect(results, null, 3));
|
171 | });
|
172 |
|
173 |
|
174 | ### pipeline(args, callback): invoke N functions in series (and stop on failure)
|
175 |
|
176 | The arguments for this function are:
|
177 |
|
178 | * funcs: input functions, to be invoked in series
|
179 | * arg: arbitrary argument that will be passed to each function
|
180 |
|
181 | The functions are invoked in order as `func(arg, callback)`, where "arg" is the
|
182 | user-supplied argument from "args" and "callback" should be invoked in the usual
|
183 | way. If any function emits an error, the whole pipeline stops.
|
184 |
|
185 | The return value and the arguments to the final callback are exactly the same as
|
186 | for `parallel`. The error object for the final callback is just the error
|
187 | returned by whatever pipeline function failed (if any).
|
188 |
|
189 | This example is similar to the one above, except that it runs the steps in
|
190 | sequence and stops early because `pipeline` stops on the first error:
|
191 |
|
192 | console.log(mod_vasync.pipeline({
|
193 | 'funcs': [
|
194 | function f1 (_, callback) { mod_fs.stat('/tmp', callback); },
|
195 | function f2 (_, callback) { mod_fs.stat('/noexist', callback); },
|
196 | function f3 (_, callback) { mod_fs.stat('/var', callback); }
|
197 | ]
|
198 | }, function (err, results) {
|
199 | console.log('error: %s', err.message);
|
200 | console.log('results: %s', mod_util.inspect(results, null, 3));
|
201 | }));
|
202 |
|
203 | As a result, the status after the first tick looks like this:
|
204 |
|
205 | { operations:
|
206 | [ { func: [Function: f1], status: 'pending' },
|
207 | { func: [Function: f2], status: 'waiting' },
|
208 | { func: [Function: f3], status: 'waiting' } ],
|
209 | successes: [],
|
210 | ndone: 0,
|
211 | nerrors: 0 }
|
212 |
|
213 | (Note that the second and third stages are now "waiting", rather than "pending"
|
214 | in the `parallel` case.) The error reported is:
|
215 |
|
216 | error: ENOENT, no such file or directory '/noexist'
|
217 |
|
218 | and the complete result is:
|
219 |
|
220 | results: { operations:
|
221 | [ { func: [Function: f1],
|
222 | status: 'ok',
|
223 | err: null,
|
224 | result:
|
225 | { dev: 140247096,
|
226 | ino: 879368309,
|
227 | mode: 17407,
|
228 | nlink: 9,
|
229 | uid: 0,
|
230 | gid: 3,
|
231 | rdev: 0,
|
232 | size: 754,
|
233 | blksize: 4096,
|
234 | blocks: 8,
|
235 | atime: Thu, 12 Apr 2012 23:18:57 GMT,
|
236 | mtime: Tue, 17 Apr 2012 23:56:34 GMT,
|
237 | ctime: Tue, 17 Apr 2012 23:56:34 GMT } },
|
238 | { func: [Function: f2],
|
239 | status: 'fail',
|
240 | err: { [Error: ENOENT, no such file or directory '/noexist'] errno: 34, code: 'ENOENT', path: '/noexist' },
|
241 | result: undefined },
|
242 | { func: [Function: f3], status: 'waiting' } ],
|
243 | successes:
|
244 | [ { dev: 234881026,
|
245 | ino: 24965,
|
246 | mode: 17407,
|
247 | nlink: 8,
|
248 | uid: 0,
|
249 | gid: 0,
|
250 | rdev: 0,
|
251 | size: 272,
|
252 | blksize: 4096,
|
253 | blocks: 0,
|
254 | atime: Tue, 01 May 2012 16:02:24 GMT,
|
255 | mtime: Tue, 01 May 2012 19:10:35 GMT,
|
256 | ctime: Tue, 01 May 2012 19:10:35 GMT } ],
|
257 | ndone: 2,
|
258 | nerrors: 1 }
|
259 |
|
260 |
|
261 | ### queue(worker, concurrency): fixed-size worker queue
|
262 | ### queuev(args)
|
263 |
|
264 | This function returns an object that allows up to a fixed number of tasks to be
|
265 | dispatched at any given time. The interface is compatible with that provided
|
266 | by the "async" Node library, except that the returned object's fields represent
|
267 | a public interface you can use to introspect what's going on.
|
268 |
|
269 | The arguments are:
|
270 |
|
271 | * worker: a function invoked as `worker(task, callback)`, where `task` is a
|
272 | task dispatched to this queue and `callback` should be invoked when the task
|
273 | completes.
|
274 | * concurrency: a positive integer indicating the maximum number of tasks that
|
275 | may be dispatched at any time.
|
276 |
|
277 | With concurrency = 1, the queue serializes all operations.
|
278 |
|
279 | The object provides the following method:
|
280 |
|
281 | * push(task, [callback]): add a task (or array of tasks) to the queue, with an
|
282 | optional callback to be invoked when each task completes. If a list of tasks
|
283 | are added, the callback is invoked for each one.
|
284 |
|
285 | The object also provides the length() method, the "concurrency" field, and
|
286 | hooks for "saturated", "empty", and "drain" for compatibility with node-async.
|
287 | In addition, several fields may be inspected to see what's currently going on
|
288 | with the queue, but **these fields must not be modified directly**:
|
289 |
|
290 | * worker: worker function, as passed into "queue"/"queuev"
|
291 | * worker_name: worker function's "name" field
|
292 | * npending: the number of tasks currently being processed
|
293 | * pending: an object (*not* an array) describing the tasks currently being processed
|
294 | * queued: array of tasks currently queued for processing
|
295 |
|
296 | If the tasks are themselves simple objects, then the entire queue may be
|
297 | serialized (as via JSON.stringify) for debugging and monitoring tools. Using
|
298 | the above fields, you can see what this queue is doing (worker_name), which
|
299 | tasks are queued, which tasks are being processed, and so on.
|
300 |
|
301 | ### Example 1: Stat several files
|
302 |
|
303 | Here's an example demonstrating the queue:
|
304 |
|
305 | var mod_fs = require('fs');
|
306 | var mod_vasync = require('../lib/vasync');
|
307 |
|
308 | var queue;
|
309 |
|
310 | function doneOne()
|
311 | {
|
312 | console.log('task completed; queue state:\n%s\n',
|
313 | JSON.stringify(queue, null, 4));
|
314 | }
|
315 |
|
316 | queue = mod_vasync.queue(mod_fs.stat, 2);
|
317 |
|
318 | console.log('initial queue state:\n%s\n', JSON.stringify(queue, null, 4));
|
319 |
|
320 | queue.push('/tmp/file1', doneOne);
|
321 | queue.push('/tmp/file2', doneOne);
|
322 | queue.push('/tmp/file3', doneOne);
|
323 | queue.push('/tmp/file4', doneOne);
|
324 |
|
325 | console.log('all tasks dispatched:\n%s\n', JSON.stringify(queue, null, 4));
|
326 |
|
327 | The initial queue state looks like this:
|
328 |
|
329 | initial queue state:
|
330 | {
|
331 | "nextid": 0,
|
332 | "worker_name": "anon",
|
333 | "npending": 0,
|
334 | "pending": {},
|
335 | "queued": [],
|
336 | "concurrency": 2
|
337 | }
|
338 |
|
339 | After four tasks have been pushed, we see that two of them have been dispatched
|
340 | and the remaining two are queued up:
|
341 |
|
342 | all tasks pushed:
|
343 | {
|
344 | "nextid": 4,
|
345 | "worker_name": "anon",
|
346 | "npending": 2,
|
347 | "pending": {
|
348 | "1": {
|
349 | "id": 1,
|
350 | "task": "/tmp/file1"
|
351 | },
|
352 | "2": {
|
353 | "id": 2,
|
354 | "task": "/tmp/file2"
|
355 | }
|
356 | },
|
357 | "queued": [
|
358 | {
|
359 | "id": 3,
|
360 | "task": "/tmp/file3"
|
361 | },
|
362 | {
|
363 | "id": 4,
|
364 | "task": "/tmp/file4"
|
365 | }
|
366 | ],
|
367 | "concurrency": 2
|
368 | }
|
369 |
|
370 | As they complete, we see tasks moving from "queued" to "pending", and completed
|
371 | tasks disappear:
|
372 |
|
373 | task completed; queue state:
|
374 | {
|
375 | "nextid": 4,
|
376 | "worker_name": "anon",
|
377 | "npending": 1,
|
378 | "pending": {
|
379 | "3": {
|
380 | "id": 3,
|
381 | "task": "/tmp/file3"
|
382 | }
|
383 | },
|
384 | "queued": [
|
385 | {
|
386 | "id": 4,
|
387 | "task": "/tmp/file4"
|
388 | }
|
389 | ],
|
390 | "concurrency": 2
|
391 | }
|
392 |
|
393 | When all tasks have completed, the queue state looks like it started:
|
394 |
|
395 | task completed; queue state:
|
396 | {
|
397 | "nextid": 4,
|
398 | "worker_name": "anon",
|
399 | "npending": 0,
|
400 | "pending": {},
|
401 | "queued": [],
|
402 | "concurrency": 2
|
403 | }
|
404 |
|
405 |
|
406 | ### Example 2: A simple serializer
|
407 |
|
408 | You can use a queue with concurrency 1 and where the tasks are themselves
|
409 | functions to ensure that an arbitrary asynchronous function never runs
|
410 | concurrently with another one, no matter what each one does. Since the tasks
|
411 | are the actual functions to be invoked, the worker function just invokes each
|
412 | one:
|
413 |
|
414 | var mod_vasync = require('../lib/vasync');
|
415 |
|
416 | var queue = mod_vasync.queue(
|
417 | function (task, callback) { task(callback); }, 1);
|
418 |
|
419 | queue.push(function (callback) {
|
420 | console.log('first task begins');
|
421 | setTimeout(function () {
|
422 | console.log('first task ends');
|
423 | callback();
|
424 | }, 500);
|
425 | });
|
426 |
|
427 | queue.push(function (callback) {
|
428 | console.log('second task begins');
|
429 | process.nextTick(function () {
|
430 | console.log('second task ends');
|
431 | callback();
|
432 | });
|
433 | });
|
434 |
|
435 | This example outputs:
|
436 |
|
437 | $ node examples/queue-serializer.js
|
438 | first task begins
|
439 | first task ends
|
440 | second task begins
|
441 | second task ends
|