UNPKG

14.2 kBMarkdownView Raw
1# vasync: utilities for observable asynchronous control flow
2
3This module provides facilities for asynchronous control flow. There are many
4modules that do this already (notably async.js). This one's claim to fame is
5aided debuggability: each of the contained functions return a "status" object
6with the following fields:
7
8 operations array corresponding to the input functions, with
9
10 func input function
11
12 status "pending", "ok", or "fail"
13
14 err returned "err" value, if any
15
16 result returned "result" value, if any
17
18 successes "result" field for each of "operations" where
19 "status" == "ok"
20
21 ndone number of input operations that have completed
22
23 nerrors number of input operations that have failed
24
25You can use this from a debugger (or your own monitoring code) to understand
26the state of an ongoing asynchronous operation. For example, you could see how
27far into a pipeline some particular operation is.
28
29
30### parallel(args, callback): invoke N functions in parallel and merge the results
31
32This function takes a list of input functions (specified by the "funcs" property
33of "args") and runs them all. These input functions are expected to be
34asynchronous: they get a "callback" argument and should invoke it as
35callback(err, result). The error and result will be saved and made available to
36the original caller when all of these functions complete.
37
38All errors are combined into a single "err" parameter to the final callback (see
39below). You can also observe the progress of the operation as it goes by
40examining the object returned synchronously by this function.
41
42Example usage:
43
44 status = mod_vasync.parallel({
45 'funcs': [
46 function f1 (callback) { mod_fs.stat('/tmp', callback); },
47 function f2 (callback) { mod_fs.stat('/noexist', callback); },
48 function f3 (callback) { mod_fs.stat('/var', callback); }
49 ]
50 }, function (err, results) {
51 console.log('error: %s', err.message);
52 console.log('results: %s', mod_util.inspect(results, null, 3));
53 });
54
55 console.log('status: %s', mod_sys.inspect(status, null, 3));
56
57In the first tick, this outputs:
58
59 status: { operations:
60 [ { func: [Function: f1], status: 'pending' },
61 { func: [Function: f2], status: 'pending' },
62 { func: [Function: f3], status: 'pending' } ],
63 successes: [],
64 ndone: 0,
65 nerrors: 0 }
66
67showing that there are three operations pending and none has yet been started.
68When the program finishes, it outputs this error:
69
70 error: first of 1 error: ENOENT, no such file or directory '/noexist'
71
72which encapsulates all of the intermediate failures. This model allows you to
73write the final callback like you normally would:
74
75 if (err)
76 return (callback(err));
77
78and still propagate useful information to callers that don't deal with multiple
79errors (i.e. most callers).
80
81The example also prints out the detailed final status, including all of the
82errors and return values:
83
84 results: { operations:
85 [ { func: [Function: f1],
86 status: 'ok',
87 err: null,
88 result:
89 { dev: 140247096,
90 ino: 879368309,
91 mode: 17407,
92 nlink: 9,
93 uid: 0,
94 gid: 3,
95 rdev: 0,
96 size: 754,
97 blksize: 4096,
98 blocks: 8,
99 atime: Thu, 12 Apr 2012 23:18:57 GMT,
100 mtime: Tue, 17 Apr 2012 23:56:34 GMT,
101 ctime: Tue, 17 Apr 2012 23:56:34 GMT } },
102 { func: [Function: f2],
103 status: 'fail',
104 err: { [Error: ENOENT, no such file or directory '/noexist'] errno: 34, code: 'ENOENT', path: '/noexist' },
105 result: undefined },
106 { func: [Function: f3],
107 status: 'ok',
108 err: null,
109 result:
110 { dev: 23658528,
111 ino: 5,
112 mode: 16877,
113 nlink: 27,
114 uid: 0,
115 gid: 0,
116 rdev: -1,
117 size: 27,
118 blksize: 2560,
119 blocks: 3,
120 atime: Fri, 09 Sep 2011 14:28:55 GMT,
121 mtime: Wed, 04 Apr 2012 17:51:20 GMT,
122 ctime: Wed, 04 Apr 2012 17:51:20 GMT } } ],
123 successes:
124 [ { dev: 234881026,
125 ino: 24965,
126 mode: 17407,
127 nlink: 8,
128 uid: 0,
129 gid: 0,
130 rdev: 0,
131 size: 272,
132 blksize: 4096,
133 blocks: 0,
134 atime: Tue, 01 May 2012 16:02:24 GMT,
135 mtime: Tue, 01 May 2012 19:10:35 GMT,
136 ctime: Tue, 01 May 2012 19:10:35 GMT },
137 { dev: 234881026,
138 ino: 216,
139 mode: 16877,
140 nlink: 26,
141 uid: 0,
142 gid: 0,
143 rdev: 0,
144 size: 884,
145 blksize: 4096,
146 blocks: 0,
147 atime: Tue, 01 May 2012 16:02:24 GMT,
148 mtime: Fri, 14 Aug 2009 21:23:03 GMT,
149 ctime: Thu, 28 Oct 2010 21:51:39 GMT } ],
150 ndone: 3,
151 nerrors: 1 }
152
153You can use this if you want to handle all of the errors individually or to get
154at all of the individual return values.
155
156
157### forEachParallel(args, callback): invoke the same function on N inputs in parallel
158
159This function is exactly like `parallel`, except that the input is specified as
160a *single* function ("func") and a list of inputs ("inputs"). The function is
161invoked on each input in parallel.
162
163This example is exactly equivalent to the one above:
164
165 mod_vasync.forEachParallel({
166 'func': mod_fs.stat,
167 'inputs': [ '/var', '/nonexistent', '/tmp' ]
168 }, function (err, results) {
169 console.log('error: %s', err.message);
170 console.log('results: %s', mod_util.inspect(results, null, 3));
171 });
172
173
174### pipeline(args, callback): invoke N functions in series (and stop on failure)
175
176The arguments for this function are:
177
178* funcs: input functions, to be invoked in series
179* arg: arbitrary argument that will be passed to each function
180
181The functions are invoked in order as `func(arg, callback)`, where "arg" is the
182user-supplied argument from "args" and "callback" should be invoked in the usual
183way. If any function emits an error, the whole pipeline stops.
184
185The return value and the arguments to the final callback are exactly the same as
186for `parallel`. The error object for the final callback is just the error
187returned by whatever pipeline function failed (if any).
188
189This example is similar to the one above, except that it runs the steps in
190sequence and stops early because `pipeline` stops on the first error:
191
192 console.log(mod_vasync.pipeline({
193 'funcs': [
194 function f1 (_, callback) { mod_fs.stat('/tmp', callback); },
195 function f2 (_, callback) { mod_fs.stat('/noexist', callback); },
196 function f3 (_, callback) { mod_fs.stat('/var', callback); }
197 ]
198 }, function (err, results) {
199 console.log('error: %s', err.message);
200 console.log('results: %s', mod_util.inspect(results, null, 3));
201 }));
202
203As a result, the status after the first tick looks like this:
204
205 { operations:
206 [ { func: [Function: f1], status: 'pending' },
207 { func: [Function: f2], status: 'waiting' },
208 { func: [Function: f3], status: 'waiting' } ],
209 successes: [],
210 ndone: 0,
211 nerrors: 0 }
212
213(Note that the second and third stages are now "waiting", rather than "pending"
214in the `parallel` case.) The error reported is:
215
216 error: ENOENT, no such file or directory '/noexist'
217
218and the complete result is:
219
220 results: { operations:
221 [ { func: [Function: f1],
222 status: 'ok',
223 err: null,
224 result:
225 { dev: 140247096,
226 ino: 879368309,
227 mode: 17407,
228 nlink: 9,
229 uid: 0,
230 gid: 3,
231 rdev: 0,
232 size: 754,
233 blksize: 4096,
234 blocks: 8,
235 atime: Thu, 12 Apr 2012 23:18:57 GMT,
236 mtime: Tue, 17 Apr 2012 23:56:34 GMT,
237 ctime: Tue, 17 Apr 2012 23:56:34 GMT } },
238 { func: [Function: f2],
239 status: 'fail',
240 err: { [Error: ENOENT, no such file or directory '/noexist'] errno: 34, code: 'ENOENT', path: '/noexist' },
241 result: undefined },
242 { func: [Function: f3], status: 'waiting' } ],
243 successes:
244 [ { dev: 234881026,
245 ino: 24965,
246 mode: 17407,
247 nlink: 8,
248 uid: 0,
249 gid: 0,
250 rdev: 0,
251 size: 272,
252 blksize: 4096,
253 blocks: 0,
254 atime: Tue, 01 May 2012 16:02:24 GMT,
255 mtime: Tue, 01 May 2012 19:10:35 GMT,
256 ctime: Tue, 01 May 2012 19:10:35 GMT } ],
257 ndone: 2,
258 nerrors: 1 }
259
260
261### queue(worker, concurrency): fixed-size worker queue
262### queuev(args)
263
264This function returns an object that allows up to a fixed number of tasks to be
265dispatched at any given time. The interface is compatible with that provided
266by the "async" Node library, except that the returned object's fields represent
267a public interface you can use to introspect what's going on.
268
269The arguments are:
270
271* worker: a function invoked as `worker(task, callback)`, where `task` is a
272 task dispatched to this queue and `callback` should be invoked when the task
273 completes.
274* concurrency: a positive integer indicating the maximum number of tasks that
275 may be dispatched at any time.
276
277With concurrency = 1, the queue serializes all operations.
278
279The object provides the following method:
280
281* push(task, [callback]): add a task (or array of tasks) to the queue, with an
282 optional callback to be invoked when each task completes. If a list of tasks
283 are added, the callback is invoked for each one.
284
285The object also provides the length() method, the "concurrency" field, and
286hooks for "saturated", "empty", and "drain" for compatibility with node-async.
287In addition, several fields may be inspected to see what's currently going on
288with the queue, but **these fields must not be modified directly**:
289
290* worker: worker function, as passed into "queue"/"queuev"
291* worker_name: worker function's "name" field
292* npending: the number of tasks currently being processed
293* pending: an object (*not* an array) describing the tasks currently being processed
294* queued: array of tasks currently queued for processing
295
296If the tasks are themselves simple objects, then the entire queue may be
297serialized (as via JSON.stringify) for debugging and monitoring tools. Using
298the above fields, you can see what this queue is doing (worker_name), which
299tasks are queued, which tasks are being processed, and so on.
300
301### Example 1: Stat several files
302
303Here's an example demonstrating the queue:
304
305 var mod_fs = require('fs');
306 var mod_vasync = require('../lib/vasync');
307
308 var queue;
309
310 function doneOne()
311 {
312 console.log('task completed; queue state:\n%s\n',
313 JSON.stringify(queue, null, 4));
314 }
315
316 queue = mod_vasync.queue(mod_fs.stat, 2);
317
318 console.log('initial queue state:\n%s\n', JSON.stringify(queue, null, 4));
319
320 queue.push('/tmp/file1', doneOne);
321 queue.push('/tmp/file2', doneOne);
322 queue.push('/tmp/file3', doneOne);
323 queue.push('/tmp/file4', doneOne);
324
325 console.log('all tasks dispatched:\n%s\n', JSON.stringify(queue, null, 4));
326
327The initial queue state looks like this:
328
329 initial queue state:
330 {
331 "nextid": 0,
332 "worker_name": "anon",
333 "npending": 0,
334 "pending": {},
335 "queued": [],
336 "concurrency": 2
337 }
338
339After four tasks have been pushed, we see that two of them have been dispatched
340and the remaining two are queued up:
341
342 all tasks pushed:
343 {
344 "nextid": 4,
345 "worker_name": "anon",
346 "npending": 2,
347 "pending": {
348 "1": {
349 "id": 1,
350 "task": "/tmp/file1"
351 },
352 "2": {
353 "id": 2,
354 "task": "/tmp/file2"
355 }
356 },
357 "queued": [
358 {
359 "id": 3,
360 "task": "/tmp/file3"
361 },
362 {
363 "id": 4,
364 "task": "/tmp/file4"
365 }
366 ],
367 "concurrency": 2
368 }
369
370As they complete, we see tasks moving from "queued" to "pending", and completed
371tasks disappear:
372
373 task completed; queue state:
374 {
375 "nextid": 4,
376 "worker_name": "anon",
377 "npending": 1,
378 "pending": {
379 "3": {
380 "id": 3,
381 "task": "/tmp/file3"
382 }
383 },
384 "queued": [
385 {
386 "id": 4,
387 "task": "/tmp/file4"
388 }
389 ],
390 "concurrency": 2
391 }
392
393When all tasks have completed, the queue state looks like it started:
394
395 task completed; queue state:
396 {
397 "nextid": 4,
398 "worker_name": "anon",
399 "npending": 0,
400 "pending": {},
401 "queued": [],
402 "concurrency": 2
403 }
404
405
406### Example 2: A simple serializer
407
408You can use a queue with concurrency 1 and where the tasks are themselves
409functions to ensure that an arbitrary asynchronous function never runs
410concurrently with another one, no matter what each one does. Since the tasks
411are the actual functions to be invoked, the worker function just invokes each
412one:
413
414 var mod_vasync = require('../lib/vasync');
415
416 var queue = mod_vasync.queue(
417 function (task, callback) { task(callback); }, 1);
418
419 queue.push(function (callback) {
420 console.log('first task begins');
421 setTimeout(function () {
422 console.log('first task ends');
423 callback();
424 }, 500);
425 });
426
427 queue.push(function (callback) {
428 console.log('second task begins');
429 process.nextTick(function () {
430 console.log('second task ends');
431 callback();
432 });
433 });
434
435This example outputs:
436
437 $ node examples/queue-serializer.js
438 first task begins
439 first task ends
440 second task begins
441 second task ends