UNPKG

9.01 kBJavaScriptView Raw
1var getParameterNames = require('get-parameter-names');
2var _ = require('underscore');
3var taskUtils = require('./task');
4var Promise = require('bluebird');
5
6// Swallow bluebird's annoying error messages
7Promise.onPossiblyUnhandledRejection(function() {});
8
9var modules = {};
10
11var wagner = function(name) {
12 if (modules[name]) {
13 return modules[name];
14 }
15 modules[name] = wagnerFactory(name);
16 return modules[name];
17};
18
19wagner.module = function(name, dependencies) {
20 modules[name] = wagnerFactory(name);
21 if (dependencies && dependencies.length) {
22 for (var i = 0; i < dependencies.length; ++i) {
23 var module = modules[dependencies[i]];
24 modules[name]._addTasks(module._getTasks());
25 }
26 }
27
28 return modules[name];
29};
30
31function popCallback(arr) {
32 if (arr.length && arr[arr.length - 1] === 'callback') {
33 arr.hasCallback = true;
34 arr.pop();
35 }
36
37 return arr;
38}
39
40var wagnerFactory = function(name) {
41 var tasks = {};
42
43 var step = function(alreadyExecuted, tasks, callback) {
44 var done = false;
45
46 _.each(tasks, function(task) {
47 if (alreadyExecuted[task.name] || task.executing) {
48 return;
49 }
50
51 var paramNames = popCallback(getParameterNames(task.task));
52 var ready = true;
53 var args = [];
54 for (var i = 0; i < paramNames.length; ++i) {
55 if (!alreadyExecuted[paramNames[i]]) {
56 ready = false;
57 break;
58 }
59 args.push(alreadyExecuted[paramNames[i]].value);
60 }
61
62 if (ready) {
63 task.executing = true;
64
65 if (paramNames.hasCallback) {
66 args.push(function(error, value) {
67 if (done) {
68 return;
69 }
70
71 if (error) {
72 done = true;
73 return callback(error);
74 }
75 alreadyExecuted[task.name] = { value: value };
76
77 if (Object.keys(alreadyExecuted).length === Object.keys(tasks).length) {
78 done = true;
79 return callback(null);
80 }
81
82 step(alreadyExecuted, tasks, callback);
83 });
84
85 try {
86 task.task.apply(null, args);
87 } catch(error) {
88 done = true;
89 return callback(error);
90 }
91 } else {
92 try {
93 alreadyExecuted[task.name] = { value: task.task.apply(null, args) };
94 } catch(error) {
95 done = true;
96 process.nextTick(function() {
97 return callback(error);
98 });
99 return;
100 }
101
102 if (Object.keys(alreadyExecuted).length === Object.keys(tasks).length) {
103 done = true;
104 process.nextTick(function() {
105 return callback(null);
106 });
107 return;
108 }
109
110 process.nextTick(function() {
111 step(alreadyExecuted, tasks, callback);
112 });
113 }
114 }
115 });
116 };
117
118 var wagner = {
119 name: name,
120 task: function(name, func) {
121 var paramNames = getParameterNames(func);
122 if (paramNames.length &&
123 paramNames[paramNames.length - 1] === 'callback') {
124 paramNames.pop();
125 }
126
127 tasks[name] = {
128 task: func,
129 name: name,
130 dep: paramNames
131 };
132
133 return wagner;
134 },
135 factory: function(name, func) {
136 tasks[name] = {
137 task: func,
138 name: name,
139 dep: getParameterNames(func)
140 };
141
142 return wagner;
143 },
144 _getTasks: function() {
145 return tasks;
146 },
147 _addTasks: function(newTasks) {
148 for (var key in newTasks) {
149 tasks[key] = newTasks[key];
150 }
151 },
152 clear: function() {
153 tasks = {};
154 },
155 parallel: function(map, func, callback) {
156 var results = {};
157 var errors;
158 _.each(map, function(value, key) {
159 try {
160 func(
161 value,
162 key,
163 function(error, result) {
164 if (error) {
165 results[key] = { error: error };
166 errors = errors || [];
167 errors.push(error);
168 } else {
169 results[key] = { result: result };
170 }
171 if (Object.keys(results).length === Object.keys(map).length) {
172 return callback(errors, results);
173 }
174 });
175 } catch(error) {
176 results[key] = { error: error };
177 errors = errors || [];
178 errors.push(error);
179
180 if (Object.keys(results).length === Object.keys(map).length) {
181 return callback(errors, results);
182 }
183 }
184 });
185 },
186 series: function(arr, func, callback) {
187 var results = [];
188
189 var next = function(index) {
190 if (index >= arr.length) {
191 return callback(null, results);
192 }
193
194 try {
195 func(arr[index], index, function(error, result) {
196 if (error) {
197 return callback({ index: index, error: error });
198 }
199
200 results.push(result);
201 next(index + 1);
202 });
203 } catch(error) {
204 return callback({ index: index, error: error });
205 }
206 };
207
208 next(0);
209 },
210 invoke: function(func, locals) {
211 var paramNames = getParameterNames(func);
212
213 // Remove error param
214 var hasErrorParameter = paramNames.length &&
215 ['error', 'err'].indexOf(paramNames[0]) != -1;
216 if (hasErrorParameter) {
217 paramNames.shift();
218 }
219
220 var newTasks = _.clone(tasks);
221 _.each(locals, function(value, key) {
222 newTasks[key] = {
223 name: key,
224 dep: [],
225 value: value
226 };
227 });
228
229 var orderedTasks = taskUtils.dfs(newTasks, paramNames);
230 orderedTasks = _.map(orderedTasks, function(taskName) {
231 return newTasks[taskName];
232 });
233
234 var alreadyExecuted = {};
235 _.each(locals, function(value, key) {
236 alreadyExecuted[key] = { value: value };
237 });
238
239 var allSync = true;
240 for (var i = 0; i < orderedTasks.length; ++i) {
241 if (!orderedTasks[i].isSync) {
242 allSync = false;
243 throw 'Called invoke() with async dependency ' + orderedTasks[i].name;
244 }
245 }
246
247 var sorted = taskUtils.topoSort(newTasks, _.pluck(orderedTasks, 'name'));
248
249 for (var i = 0; i < sorted.length; ++i) {
250 var task = newTasks[sorted[i]];
251 if (task.value) {
252 alreadyExecuted[task.name] = { value: task.value };
253 } else {
254 params = getParameterNames(task.task);
255 var args = [];
256 for (var j = 0; j < params.length; ++j) {
257 args.push(alreadyExecuted[params[j]].value);
258 }
259 alreadyExecuted[task.name] = { value: task.task.apply(null, args) }
260 }
261 }
262
263 var args = [];
264 if (hasErrorParameter) {
265 args.push(undefined);
266 }
267 for (var i = 0; i < paramNames.length; ++i) {
268 args.push(alreadyExecuted[paramNames[i]].value);
269 }
270
271 return func.apply(null, args);
272 },
273 invokeAsync: function(func, locals) {
274 var promiseFulfill;
275 var promiseReject;
276
277 var promise = new Promise(function(resolve, reject) {
278 promiseFulfill = resolve;
279 promiseReject = reject;
280 });
281
282 promise.fulfill = function() {
283 promiseFulfill.apply(null, arguments);
284 };
285
286 promise.reject = function() {
287 promiseReject.apply(null, arguments);
288 };
289
290 var paramNames = getParameterNames(func);
291
292 // Remove error param
293 var hasErrorParameter = paramNames.length &&
294 ['error', 'err'].indexOf(paramNames[0]) != -1;
295 if (hasErrorParameter) {
296 paramNames.shift();
297 }
298
299 var newTasks = _.clone(tasks);
300 _.each(locals, function(value, key) {
301 newTasks[key] = {
302 name: key,
303 dep: [],
304 value: value
305 };
306 });
307
308 var orderedTasks = taskUtils.dfs(newTasks, paramNames);
309 orderedTasks = _.map(orderedTasks, function(taskName) {
310 return newTasks[taskName];
311 });
312
313 var alreadyExecuted = {};
314 _.each(locals, function(value, key) {
315 alreadyExecuted[key] = { value: value };
316 });
317
318 step(alreadyExecuted, orderedTasks, function(error) {
319 if (error) {
320 promise.reject(error);
321 if (hasErrorParameter) {
322 return func(error);
323 } else {
324 throw error;
325 }
326 }
327
328 var args = [];
329 if (hasErrorParameter) {
330 args.push(undefined);
331 }
332 for (var i = 0; i < paramNames.length; ++i) {
333 args.push(alreadyExecuted[paramNames[i]].value);
334 }
335
336 promise.fulfill(func.apply(null, args));
337 });
338
339 return promise;
340 }
341 };
342
343 return wagner;
344};
345
346var instance = wagnerFactory('global');
347_.each(instance, function(value, key) {
348 if (typeof value === 'function') {
349 wagner[key] = function() {
350 return instance[key].apply(instance, arguments);
351 };
352 }
353});
354
355module.exports = wagner;
\No newline at end of file