UNPKG

13.9 kBJavaScriptView Raw
1var EventEmitter = require ('events').EventEmitter,
2 util = require ('util'),
3 dataflows = require ('./index'),
4 common = dataflows.common,
5 taskClass = require ('./task/base'),
6 paint = dataflows.color,
7 tokenInitiator;
8
9var $global = common.$global;
10
11var taskStateNames = taskClass.prototype.stateNames;
12
13function isVoid(val) {
14 return void 0 == val;
15}
16
17function taskRequirements (requirements, dict) {
18
19 var result = [];
20
21 for (var k in requirements) {
22 var requirement = requirements[k];
23 for (var i = 0; i < requirement.length; i++) {
24 try {
25 if (isVoid (common.pathToVal (dict, requirement[i])))
26 result.push (k);
27 } catch (e) {
28 result.push (k);
29 }
30 }
31 }
32
33 return result;
34}
35
36function checkTaskParams (params, dict, prefix, marks) {
37 // parse task params
38 // TODO: modify this function because recursive changes of parameters works dirty (indexOf for value)
39
40 var AllowedValueTypes = {
41 Boolean: true,
42 Number: true,
43 Function: true,
44 Date: true
45 };
46
47 if (prefix == void 0) prefix = '';
48 if (prefix) prefix += '.';
49
50 var modifiedParams;
51 var failedParams = [];
52
53 if (Object.is('Array', params)) { // params is array
54
55 modifiedParams = [];
56
57 params.forEach(function (val, index, arr) {
58 if (Object.is('String', val)) { // string
59
60 try {
61 var tmp = val.interpolate(dict, marks);
62 if (tmp === undefined) {
63 failedParams.push (prefix+'['+index+']');
64 } else {
65 modifiedParams.push(tmp);
66 }
67 } catch (e) {
68 failedParams.push (prefix+'['+index+']');
69 }
70
71 } else if (Object.typeOf(val) in AllowedValueTypes) {
72 modifiedParams.push(val);
73 } else {
74 var result = checkTaskParams(
75 val, dict, prefix+'['+index+']', marks
76 );
77
78 modifiedParams.push(result.modified);
79 failedParams = failedParams.concat (result.failed);
80 }
81 });
82
83 } else { // params is hash
84 modifiedParams = {};
85
86 Object.keys(params).forEach(function (key) {
87 var val = params[key];
88 var valCheck = val;
89
90 if (Object.is('String', val)) {
91 try {
92 var tmp = val.interpolate(dict, marks);
93 if (tmp === undefined) {
94 failedParams.push (prefix+key);
95 } else {
96 modifiedParams[key] = tmp;
97 }
98 } catch (e) {
99 //console.error('ERR!');
100 failedParams.push (prefix+key);
101 }
102 } else if (Object.typeOf(val) in AllowedValueTypes) {
103 modifiedParams[key] = val;
104 } else { // val is hash || array
105 var result = checkTaskParams(val, dict, prefix+key, marks);
106
107 modifiedParams[key] = result.modified;
108 failedParams = failedParams.concat (result.failed);
109 }
110 });
111 }
112
113 return {
114 modified: modifiedParams,
115 failed: failedParams || []
116 };
117}
118
119/**
120 * @class flow
121 * @extends events.EventEmitter
122 *
123 * The heart of the framework. Parses task configurations, loads dependencies,
124 * launches tasks, stores their result. When all tasks are completed,
125 * notifies subscribers (inititators).
126 *
127 * @cfg {Object} config (required) dataflow configuration.
128 * @cfg {String} config.$class (required) Class to instantiate
129 * (alias of config.className).
130 * @cfg {String} config.$function (required) Synchronous function to be run
131 * (instead of a class). Alias of functionName.
132 * @cfg {String} config.$set Path to the property in which the produced data
133 * will be stored.
134 * @cfg {String} config.$method Method to be run after the class instantiation.
135 * @cfg {Object} reqParam (required) dataflow parameters.
136 */
137var dataflow = module.exports = function (config, reqParam) {
138
139 var self = this;
140
141 // TODO: copy only required things
142 util.extend (true, this, config); // this is immutable config skeleton
143 util.extend (true, this, reqParam); // this is config fixup
144
145 this.created = new Date().getTime();
146
147 // here we make sure dataflow uid generated
148 // TODO: check for cpu load
149 var salt = (Math.random () * 1e6).toFixed(0);
150 this.id = this.id || (this.started ^ salt) % 1e6;
151 if (!this.idPrefix) this.idPrefix = '';
152
153 if (!this.stage) this.stage = 'dataflow';
154
155 //if (!this.stageMarkers[this.stage])
156 // console.error ('there is no such stage marker: ' + this.stage);
157
158 var idString = ""+this.id;
159 while (idString.length < 6) {idString = '0' + idString};
160 this.coloredId = [
161 "" + idString[0] + idString[1],
162 "" + idString[2] + idString[3],
163 "" + idString[4] + idString[5]
164 ].map (function (item) {
165 if ($isServerSide) return "\x1B[0;3" + (parseInt(item) % 8) + "m" + item + "\x1B[0m";
166 return item;
167 }).join ('');
168
169 this.data = this.data || { data: {} };
170
171// console.log ('!!!!!!!!!!!!!!!!!!!' + this.data.keys.length);
172
173// console.log ('config, reqParam', config, reqParam);
174
175 self.ready = true;
176
177 var tasks = config.tasks;
178
179 // TODO: optimize usage - find placeholders and check only placeholders
180 if (config.tasksFrom) {
181 if (!tokenInitiator) tokenInitiator = require ('initiator/token');
182
183 var flowByToken;
184
185 if (
186 !project.config.initiator
187 || !project.config.initiator.token
188 || !project.config.initiator.token.flows
189 || !(flowByToken = project.config.initiator.token.flows[config.tasksFrom])
190 || !flowByToken.tasks
191 ) {
192 this.log ('"tasksFrom" parameter requires to have "initiator/token/flows'+config.tasksFrom+'" configuration in project');
193 this.ready = false;
194 }
195
196 tasks = flowByToken.tasks;
197 } else if (!config.tasks || !config.tasks.length) {
198 config.tasks = [];
199 }
200
201 function createDict () {
202 // TODO: very bad idea: reqParam overwrites flow.data
203 var dict = util.extend (true, self.data, reqParam);
204 dict.global = $global;
205 dict.appMain = $mainModule.exports;
206
207 if ($isServerSide) {
208 try { dict.project = project; } catch (e) {}
209 }
210
211 return dict;
212 }
213
214 var taskGen = function (type, actualTaskParams) {
215 if (type === 'createDict') return createDict;
216 if (type === 'checkRequirements') return function () {
217 var dict = createDict ();
218
219 var result = checkTaskParams (actualTaskParams, dict, self.marks);
220
221 if (result.failed && result.failed.length > 0) {
222 this.unsatisfiedRequirements = result.failed;
223 return false;
224 } else if (result.modified) {
225 // TODO: bad
226 util.extend (this, result.modified);
227 return true;
228 }
229 }
230 }
231
232
233 this.tasks = tasks.map (taskClass.prepare.bind (taskClass, self, dataflows, taskGen));
234
235};
236
237util.inherits (dataflow, EventEmitter);
238
239function pad(n) {
240 return n < 10 ? '0' + n.toString(10) : n.toString(10);
241}
242
243// one second low resolution timer
244Date.dataflowsLowRes = new Date ();
245Date.dataflowsLowResInterval = setInterval (function () {
246 Date.dataflowsLowRes = new Date ();
247}, 1000);
248
249function timestamp () {
250 var lowRes = Date.dataflowsLowRes;
251 var time = [
252 pad(lowRes.getHours()),
253 pad(lowRes.getMinutes()),
254 pad(lowRes.getSeconds())
255 ].join(':');
256 var date = [
257 lowRes.getFullYear(),
258 pad(lowRes.getMonth() + 1),
259 pad(lowRes.getDate())
260 ].join ('-');
261 return [date, time].join(' ');
262}
263
264
265util.extend (dataflow.prototype, {
266 checkTaskParams: checkTaskParams,
267 taskRequirements: taskRequirements,
268 failed: false,
269 isIdle: true,
270 haveCompletedTasks: false,
271
272 /**
273 * @method run Initiators call this method to launch the dataflow.
274 */
275 runDelayed: function () {
276 var self = this;
277 if ($isClientSide) {
278 setTimeout (function () {self.run ();}, 0);
279 } else if ($isServerSide) {
280 process.nextTick (function () {self.run ()});
281 }
282 },
283
284 run: function () {
285 if (!this.started)
286 this.started = new Date().getTime();
287
288 var self = this;
289
290 if (self.stopped)
291 return;
292 /* @behrad following was overriding already set failed status by failed tasks */
293// self.failed = false;
294 self.isIdle = false;
295 self.haveCompletedTasks = false;
296
297// self.log ('dataflow run');
298
299 this.taskStates = [0, 0, 0, 0, 0, 0, 0];
300
301 // check task states
302
303 if (!this.tasks) {
304 self.emit ('failed', self);
305 self.logError (this.stage + ' failed immediately due empty task list');
306 self.isIdle = true;
307 return;
308 }
309
310 if (!this.ready) {
311 self.emit ('failed', self);
312 self.logError (this.stage + ' failed immediately due unready state');
313 self.isIdle = true;
314 return;
315 }
316
317 this.tasks.forEach (function (task, idx) {
318
319 if (task.subscribed === void(0)) {
320 self.addEventListenersToTask (task);
321 }
322
323 task.checkState ();
324
325 self.taskStates[task.state]++;
326
327// console.log ('task.className, task.state\n', task, task.state, task.isReady ());
328
329 if (task.isReady ()) {
330 self.logTask (task, 'started');
331 try {
332 task._launch ();
333 } catch (e) {
334 task.failed (e);
335 // self.logTaskError (task, 'failed to run', e);
336 }
337
338 // sync task support
339 if (!task.isReady()) {
340 self.taskStates[task.stateNames.ready]--;
341 self.taskStates[task.state]++;
342 }
343 }
344 });
345
346 var taskStateNames = taskClass.prototype.stateNames;
347
348 if (this.taskStates[taskStateNames.ready] || this.taskStates[taskStateNames.running]) {
349 // it is save to continue, wait for running/ready task
350 // console.log ('have running tasks');
351
352 self.isIdle = true;
353
354 return;
355 } else if (self.haveCompletedTasks) {
356 // console.log ('have completed tasks');
357 // stack will be happy
358 self.runDelayed();
359
360 self.isIdle = true;
361
362 return;
363 }
364
365 self.stopped = new Date().getTime();
366
367 var scarceTaskMessage = 'unsatisfied requirements: ';
368
369 // TODO: display scarce tasks unsatisfied requirements
370 if (this.taskStates[taskStateNames.scarce]) {
371 self.tasks.map (function (task, idx) {
372 if (task.state != taskStateNames.scarce && task.state != taskStateNames.skipped)
373 return;
374 if (task.important) {
375 task.failed (idx + " important task didn't start");
376 self.taskStates[taskStateNames.scarce]--;
377 self.taskStates[task.state]++;
378 self.failed = true;
379 scarceTaskMessage += '(important) ';
380 }
381
382 if (task.state == taskStateNames.scarce || task.state == taskStateNames.failed)
383 scarceTaskMessage += idx + ' ' + (task.logTitle) + ' => ' + task.unsatisfiedRequirements.join (', ') + '; ';
384 });
385 self.log (scarceTaskMessage);
386 }
387
388 if (self.verbose) {
389 var requestDump = '???';
390 try {
391 requestDump = JSON.stringify (self.request)
392 } catch (e) {
393 if ((""+e).match (/circular/))
394 requestDump = 'CIRCULAR'
395 else
396 requestDump = e
397 };
398 }
399
400 if (this.failed) {
401 // dataflow stopped and failed
402
403 self.emit ('failed', self);
404 var failedtasksCount = this.taskStates[taskStateNames.failed]
405 self.logError (this.stage + ' failed in ' + (self.stopped - self.started) + 'ms; failed ' + failedtasksCount + ' ' + (failedtasksCount == 1 ? 'task': 'tasks') +' out of ' + self.tasks.length);
406
407 } else {
408 // dataflow stopped and not failed
409
410 self.emit ('completed', self);
411 self.log (this.stage + ' complete in ' + (self.stopped - self.started) + 'ms');
412 }
413
414 self.isIdle = true;
415
416 },
417 stageMarker: {prepare: "[]", dataflow: "()", presentation: "{}"},
418 _log: function (level, msg) {
419// if (this.quiet || process.quiet) return;
420 var toLog = [].slice.call (arguments);
421 var level = toLog.shift() || 'log';
422 toLog.unshift (
423 timestamp (),
424 this.stageMarker[this.stage][0] + this.idPrefix + this.coloredId + this.stageMarker[this.stage][1]
425 );
426
427 // TODO: also check for bad clients (like ie9)
428 if ($isPhoneGap) {
429 toLog.shift();
430 toLog = [toLog.join (' ')];
431 }
432
433 console[level].apply (console, toLog);
434 },
435 log: function () {
436 var args = [].slice.call (arguments);
437 args.unshift ('log');
438 this._log.apply (this, args);
439 },
440 logTask: function (task, msg) {
441 this._log ('log', task.dfTaskLogNum, task.logTitle, "("+task.state+")", msg);
442 },
443 logTaskError: function (task, msg, options) {
444 var lastFrame = '';
445 if (options && options.stack) {
446 var frames = options.stack.split('\n');
447 var len = frames.length;
448 if (frames.length > 1) {
449 lastFrame = frames[1].trim();
450 }
451 }
452
453 this._log (
454 'error',
455 task.dfTaskLogNum,
456 task.logTitle,
457 '(' + task.state + ') ',
458 paint.error (
459 util.inspect (msg).replace (/(^'|'$)/g, "").replace (/\\'/, "'"),
460 util.inspect (options || '').replace (/(^'|'$)/g, "").replace (/\\'/, "'")
461 ),
462 lastFrame
463 );
464 },
465 logError: function (msg, options) {
466 // TODO: fix by using console.error
467 this._log ('error', paint.error (
468 util.inspect (msg).replace (/(^'|'$)/g, "").replace (/\\'/, "'"),
469 util.inspect (options || '').replace (/(^'|'$)/g, "").replace (/\\'/, "'")
470 ));
471 },
472 addEventListenersToTask: function (task) {
473 var self = this;
474
475 task.subscribed = 1;
476
477 // loggers
478 task.on ('log', function (message) {
479 self.logTask (task, message);
480 });
481
482 task.on ('warn', function (message) {
483 self.logTaskError (task, message);
484 });
485
486 task.on ('error', function (e) {
487 self.error = e;
488 self.logTaskError (task, 'error: ', e);
489 });
490
491 // states
492 task.on ('skip', function () {
493// if (task.important) {
494// self.failed = true;
495// return self.logTaskError (task, 'error ' + arguments[0]);
496// }
497 self.logTask (task, 'task skipped');
498
499 if (self.isIdle)
500 self.runDelayed ();
501
502 });
503
504 task.on ('cancel', function (failedValue) {
505
506 if (task.retries !== null)
507 self.logTaskError (task, 'canceled, retries = ' + task.retries);
508
509 if (!task.retries && task.$setOnFail) {
510 common.pathToVal(self.data, task.$setOnFail, failedValue || true);
511 self.haveCompletedTasks = true;
512 } else {
513 self.failed = true;
514 }
515
516 if (self.isIdle)
517 self.runDelayed ();
518 });
519
520 task.on ('complete', function (t, result) {
521
522 if (result) {
523 if (t.produce || t.$set) {
524 common.pathToVal (self.data, t.produce || t.$set, result);
525 } else if (t.$mergeWith) {
526 common.pathToVal (self.data, t.$mergeWith, result, common.mergeObjects);
527 }
528 }
529
530 self.logTask (task, 'task completed');
531
532 if (self.isIdle) {
533 self.runDelayed ();
534 } else
535 self.haveCompletedTasks = true;
536 });
537
538 task.on('empty', function (t) {
539 if (t.$empty || t.$setOnEmpty) {
540 common.pathToVal(self.data, t.$empty || t.$setOnEmpty, true);
541 }
542 });
543
544 }
545});
546
547// legacy
548dataflow.isEmpty = common.isEmpty;