UNPKG

16.3 kBJavaScriptView Raw
1var EventEmitter = require ('events').EventEmitter,
2 util = require ('util'),
3 dataflows = require ('./'),
4 common = dataflows.common,
5 taskClass = require ('./task/base'),
6 paint = dataflows.color,
7 confFu = require ('conf-fu'),
8 tokenInitiator;
9
10// var $global = common.$global;
11
12var taskStateNames = taskClass.prototype.stateNames;
13
14function isVoid(val) {
15 return void 0 == val;
16}
17
18function taskRequirements (requirements, dict) {
19
20 var result = [];
21
22 for (var k in requirements) {
23 var requirement = requirements[k];
24 for (var i = 0; i < requirement.length; i++) {
25 try {
26 if (isVoid (common.pathToVal (dict, requirement[i])))
27 result.push (k);
28 } catch (e) {
29 result.push (k);
30 }
31 }
32 }
33
34 return result;
35}
36
37function checkTaskParams (params, dict, prefix, marks) {
38 // parse task params
39 // TODO: modify this function because recursive changes of parameters works dirty (indexOf for value)
40
41 var AllowedValueTypes = {
42 Boolean: true,
43 Number: true,
44 Function: true,
45 Date: true
46 };
47
48 if (prefix == void 0) prefix = '';
49 if (prefix) prefix += '.';
50
51 var modifiedParams;
52 var failedParams = [];
53
54 if (params === null || params === undefined) {
55 // nothing
56 } else if (Object.is('Array', params)) { // params is array
57
58 modifiedParams = [];
59
60 params.forEach(function (val, index, arr) {
61 if (Object.is('String', val)) { // string
62
63 try {
64 var tmp = val.interpolate(dict, marks);
65 if (tmp === undefined) {
66 failedParams.push (prefix+'['+index+']');
67 } else {
68 modifiedParams.push(tmp);
69 }
70 } catch (e) {
71 failedParams.push (prefix+'['+index+']');
72 }
73
74 } else if (Object.typeOf(val) in AllowedValueTypes) {
75 modifiedParams.push(val);
76 } else {
77 var result = checkTaskParams(
78 val, dict, prefix+'['+index+']', marks
79 );
80
81 modifiedParams.push(result.modified);
82 failedParams = failedParams.concat (result.failed);
83 }
84 });
85
86 } else { // params is hash
87 modifiedParams = {};
88
89 Object.keys(params).forEach(function (key) {
90 var val = params[key];
91 var valCheck = val;
92
93 if (Object.is('String', val)) {
94 try {
95 var tmp = val.interpolate(dict, marks);
96 if (tmp === undefined) {
97 failedParams.push (prefix+key);
98 } else {
99 modifiedParams[key] = tmp;
100 }
101 } catch (e) {
102 //console.error('ERR!');
103 failedParams.push (prefix+key);
104 }
105 } else if (Object.typeOf(val) in AllowedValueTypes) {
106 modifiedParams[key] = val;
107 } else { // val is hash || array
108 var result = checkTaskParams(val, dict, prefix+key, marks);
109
110 modifiedParams[key] = result.modified;
111 failedParams = failedParams.concat (result.failed);
112 }
113 });
114 }
115
116 return {
117 modified: modifiedParams,
118 failed: failedParams || []
119 };
120}
121
122var pid = (typeof process !== "undefined") ? ((process.pid & 0x7fff) << 16) : 0;
123
124/**
125 * @class flow
126 * @extends events.EventEmitter
127 *
128 * The heart of the framework. Parses task configurations, loads dependencies,
129 * launches tasks, stores their result. When all tasks are completed,
130 * notifies subscribers (inititators).
131 *
132 * @cfg {Object} config (required) dataflow configuration.
133 * @cfg {String} config.tasks (required) tasks in that dataflow.
134 * @cfg {String} config.templates task templates
135 * @cfg {String} config.data data for tasks.
136 * @cfg {String} config.stage default is dataflow.
137 * @cfg {Object} reqParam (required) dataflow parameters.
138 */
139var dataflow = module.exports = function (config, reqParam) {
140
141 var self = this;
142
143 // TODO: copy only required things
144 // util.extend (true, this, config); // this is immutable config skeleton
145 // util.extend (true, this, reqParam); // this is config fixup
146
147 this.created = this.getDate ();
148
149 // here we make sure dataflow uid generated
150
151 var idLength = 8;
152 // idPrefix is used for dataflows running winthing other dataflows, like `every` task
153 if ("idPrefix" in config) this.idPrefix = config.idPrefix;
154 if (this.idPrefix) {
155 this.id = this.id || dataflow.nextId ();
156 idLength = 4;
157 } else {
158 this.idPrefix = '';
159 this.id = this.id || (pid | dataflow.nextId ());
160 }
161
162 if ("stage" in config) this.stage = config.stage;
163 if (!this.stage) this.stage = 'dataflow';
164
165 if (config.logger) {
166 this.logger = this._log = config.logger;
167 }
168
169 //if (!this.stageMarkers[this.stage])
170 // console.error ('there is no such stage marker: ' + this.stage);
171
172 var idString = this.id.toString(16);
173
174 while (idString.length < idLength) {idString = '0' + idString};
175 var idChunks = [
176 "" + idString[0] + idString[1],
177 "" + idString[2] + idString[3],
178 ];
179 if (idLength === 8) {
180 idChunks.push (
181 "" + idString[4] + idString[5],
182 "" + idString[6] + idString[7]
183 );
184 }
185 this.coloredId = idChunks.map (function (item) {
186 if ($isServerSide) {
187 return "\x1B[0;3" + (parseInt(item, 16) % 8) + "m" + item + "\x1B[0m";
188 } else {
189
190 }
191 return item;
192 }).join ('');
193
194 // TODO: legacy, it is better to remove data.data
195 this.data = this.data || { data: {} };
196
197 this.templates = config.templates || {};
198
199// console.log ('!!!!!!!!!!!!!!!!!!!' + this.data.keys.length);
200
201// console.log ('config, reqParam', config, reqParam);
202
203 self.ready = true;
204
205 var tasks = config.tasks;
206
207 // TODO: optimize usage - find placeholders and check only placeholders
208 if (config.tasksFrom) {
209 if (!tokenInitiator) tokenInitiator = require ('initiator/token');
210
211 var flowByToken;
212
213 if (
214 !project.config.initiator
215 || !project.config.initiator.token
216 || !project.config.initiator.token.flows
217 || !(flowByToken = project.config.initiator.token.flows[config.tasksFrom])
218 || !flowByToken.tasks
219 ) {
220 this.log ('"tasksFrom" parameter requires to have "initiator/token/flows'+config.tasksFrom+'" configuration in project');
221 this.ready = false;
222 }
223
224 tasks = flowByToken.tasks;
225 } else if (!config.tasks || !config.tasks.length) {
226 config.tasks = [];
227 }
228
229 function createDict () {
230 // TODO: very bad idea: reqParam overwrites flow.data
231 var dict = util.extend (true, self.data, reqParam);
232 // dict.global = $global;
233 dict.appMain = $mainModule.exports;
234
235 if ($isServerSide) {
236 try { dict.project = project; } catch (e) {}
237 }
238
239 return dict;
240 }
241
242 var taskGen = function (type, actualTaskParams) {
243 if (type === 'createDict') return createDict;
244 if (type === 'checkRequirements') return function () {
245 var dict = createDict ();
246
247 var result = checkTaskParams (actualTaskParams, dict, self.marks);
248
249 if (result.failed && result.failed.length > 0) {
250 this.unsatisfiedRequirements = result.failed;
251 return false;
252 } else if (result.modified) {
253 // TODO: bad
254 util.extend (this, result.modified);
255 return true;
256 }
257 }
258 }
259
260
261 this.tasks = tasks.map (taskClass.prepare.bind (taskClass, self, dataflows, taskGen));
262
263};
264
265util.inherits (dataflow, EventEmitter);
266
267var seq = 0;
268
269dataflow.nextId = function () {
270 seq++;
271 if (seq > 65535) {
272 seq = 0;
273 }
274 return seq;
275}
276
277function pad(n) {
278 return n < 10 ? '0' + n.toString(10) : n.toString(10);
279}
280
281function formattedDate (lowRes) {
282 var time = [
283 pad(lowRes.getHours()),
284 pad(lowRes.getMinutes()),
285 pad(lowRes.getSeconds())
286 ].join(':');
287 var date = [
288 lowRes.getFullYear(),
289 pad(lowRes.getMonth() + 1),
290 pad(lowRes.getDate())
291 ].join ('-');
292 return [date, time].join(' ')
293}
294
295// one second low resolution timer
296// test: http://jsperf.com/low-res-timer
297
298function lowResTimer () {
299 lowResTimer.refCount ++;
300// console.log ('low res timer refcount++', lowResTimer.refCount);
301 lowResTimer.dateString = formattedDate (
302 lowResTimer.date = new Date ()
303 );
304
305 lowResTimer.interval = setInterval (function () {
306 lowResTimer.dateString = formattedDate (
307 lowResTimer.date = new Date ()
308 );
309 }, 100);
310 // Probably bug in nodejs
311 if (lowResTimer.interval.unref)
312 lowResTimer.interval.unref();
313}
314
315lowResTimer.refCount = 0;
316
317lowResTimer.free = function () {
318 lowResTimer.refCount --;
319// console.log ('low res timer refcount--', lowResTimer.refCount);
320 if (lowResTimer.refCount < 1) {
321 delete lowResTimer.date;
322 delete lowResTimer.dateString;
323 clearInterval (lowResTimer.interval);
324 }
325}
326
327lowResTimer.getDateString = function () {
328 return lowResTimer.dateString || formattedDate (new Date ());
329}
330
331lowResTimer.getDate = function () {
332 return lowResTimer.date || new Date ();
333}
334
335dataflow.lastId = 0;
336
337util.extend (dataflow.prototype, {
338 checkTaskParams: checkTaskParams,
339 taskRequirements: taskRequirements,
340 failed: false,
341 isIdle: true,
342 haveCompletedTasks: false,
343 timerStarted: false,
344
345 getDateString: function () {
346 if (!this.timerStarted) {
347 lowResTimer();
348 }
349 this.timerStarted = true;
350 return lowResTimer.getDateString ();
351 },
352 getDate: function () {
353 if (!this.timerStarted) {
354 lowResTimer();
355 }
356 this.timerStarted = true;
357 return lowResTimer.getDate ();
358 },
359 getDateAndStopTimer: function () {
360 var date = lowResTimer.getDate ();
361 lowResTimer.free();
362 return date;
363 },
364 /**
365 * @method run Initiators call this method to launch the dataflow.
366 */
367 runDelayed: function () {
368 var self = this;
369 if ($isClientSide) {
370 setTimeout (this.run.bind (this), 0);
371 } else if ($isServerSide) {
372 process.nextTick (this.run.bind (this));
373 }
374 },
375
376 run: function () {
377 if (!this.started)
378 this.started = this.getDate().getTime();
379
380 var flow = this;
381
382 if (flow.stopped)
383 return;
384 /* @behrad following was overriding already set failed status by failed tasks */
385// flow.failed = false;
386 flow.isIdle = false;
387 flow.haveCompletedTasks = false;
388
389// flow.log ('dataflow run');
390
391 var taskStateNames = taskClass.prototype.stateNames;
392 this.taskStates = [0, 0, 0, 0, 0, 0, 0, 0];
393
394 // check task states
395
396 if (!this.tasks) {
397 flow.emit ('failed', flow);
398 flow.logError (this.stage + ' failed immediately due empty task list');
399 flow.isIdle = true;
400 return;
401 }
402
403 if (!this.ready) {
404 flow.emit ('failed', flow);
405 flow.logError (this.stage + ' failed immediately due unready state');
406 flow.isIdle = true;
407 return;
408 }
409
410 this.tasks.forEach (function (task, idx) {
411
412 if (!task) {
413 flow.failed = true;
414 // flow.emit ('failed', flow);
415 flow.logError (flow.stage + ' task is undefined');
416 flow.taskStates[taskStateNames.failed]++;
417 return;
418 }
419
420 if (task.subscribed === void(0)) {
421 flow.addEventListenersToTask (task);
422 }
423
424 task.checkState ();
425
426 flow.taskStates[task.state]++;
427
428// console.log ('task.className, task.state\n', task, task.state, task.isReady ());
429
430 if (task.isReady () && !flow.failed) {
431 flow.logTask (task, 'started');
432 // TODO: add zones/domains
433 // dataflows.zone.run (function () {/* here is task code */}, function () {/* here is error handler */});
434 try {
435 task._launch ();
436 } catch (e) {
437 // TODO: set task state to exception
438 // on exception we should fail instantly
439 task.failed (e);
440 // flow.logTaskError (task, 'failed to run', e);
441 }
442
443 // sync task support
444 if (!task.isReady()) {
445 flow.taskStates[task.stateNames.ready]--;
446 flow.taskStates[task.state]++;
447 }
448 }
449 });
450
451
452 if (!flow.failed) {
453 if (this.taskStates[taskStateNames.ready] || this.taskStates[taskStateNames.running]) {
454 // it is save to continue, wait for running/ready task
455 // console.log ('have running tasks');
456
457 flow.isIdle = true;
458
459 return;
460 } else if (flow.haveCompletedTasks) {
461 // console.log ('have completed tasks');
462 // stack will be happy
463 flow.runDelayed();
464
465 flow.isIdle = true;
466
467 return;
468 }
469 }
470
471 flow.stopped = this.getDateAndStopTimer().getTime();
472
473 var scarceTaskMessage = 'unsatisfied requirements: ';
474
475 // TODO: display scarce tasks unsatisfied requirements
476 if (this.taskStates[taskStateNames.scarce]) {
477 flow.tasks.map (function (task, idx) {
478 if (task.state != taskStateNames.scarce && task.state != taskStateNames.skipped)
479 return;
480 if (task.important) {
481 task.failed (idx + " important task didn't start");
482 flow.taskStates[taskStateNames.scarce]--;
483 flow.taskStates[task.state]++;
484 flow.failed = true;
485 scarceTaskMessage += '(important) ';
486 }
487
488 if (task.state == taskStateNames.scarce || task.state == taskStateNames.failed)
489 scarceTaskMessage += idx + ' ' + (task.logTitle) + ' => ' + task.unsatisfiedRequirements.join (', ') + '; ';
490 });
491 flow.log (scarceTaskMessage);
492 }
493
494 if (flow.verbose) {
495 var requestDump = '???';
496 try {
497 requestDump = JSON.stringify (flow.request)
498 } catch (e) {
499 if ((""+e).match (/circular/))
500 requestDump = 'CIRCULAR'
501 else
502 requestDump = e
503 };
504 }
505
506 if (this.failed) {
507 // dataflow stopped and failed
508
509 flow.emit ('failed', flow);
510 var failedtasksCount = this.taskStates[taskStateNames.failed]
511 flow.logError (this.stage + ' failed in ' + (flow.stopped - flow.started) + 'ms; failed ' + failedtasksCount + ' ' + (failedtasksCount == 1 ? 'task': 'tasks') +' out of ' + flow.tasks.length);
512
513 } else {
514 // dataflow stopped and not failed
515
516 flow.emit ('completed', flow);
517 flow.log (this.stage + ' completed in ' + (flow.stopped - flow.started) + 'ms');
518 }
519
520 flow.isIdle = true;
521
522 },
523 stageMarker: {prepare: "[]", dataflow: "()", presentation: "{}"},
524 _log: function (level, msg) {
525// if (this.quiet || process.quiet) return;
526 var toLog = [].slice.call (arguments);
527 var level = toLog.shift() || 'log';
528 toLog.unshift (
529 this.stageMarker[this.stage][0] + this.idPrefix + this.coloredId + this.stageMarker[this.stage][1]
530 );
531
532 // TODO: also check for bad clients (like ie9)
533 if ($isCordova) {
534 toLog = [toLog.join (' ')];
535 } else {
536 toLog.unshift (
537 this.getDateString ()
538 );
539 }
540
541 console[level].apply (console, toLog);
542 },
543 log: function () {
544 var args = [].slice.call (arguments);
545 args.unshift ('log');
546 this._log.apply (this, args);
547 },
548 logTask: function (task, msg) {
549 this._log ('log', task.dfTaskLogNum, task.logTitle, "("+task.state+")", msg);
550 },
551 logTaskError: function (task, msg, options) {
552 var lastFrame = '';
553 if (options && options.stack) {
554 var frames = options.stack.split('\n');
555 var len = frames.length;
556 if (frames.length > 1) {
557 lastFrame = frames[1].trim();
558 }
559 }
560
561 this._log (
562 'error',
563 task.dfTaskLogNum,
564 task.logTitle,
565 '(' + task.state + ')',
566 paint.error (
567 util.inspect (msg).replace (/(^'|'$)/g, "").replace (/\\'/, "'"),
568 util.inspect (options || '').replace (/(^'|'$)/g, "").replace (/\\'/, "'")
569 ),
570 lastFrame
571 );
572 },
573 logError: function (msg, options) {
574 // TODO: fix by using console.error
575 this._log ('error', paint.error (
576 util.inspect (msg).replace (/(^'|'$)/g, "").replace (/\\'/g, "'").replace (/\\n/g, "\n"),
577 util.inspect (options || '').replace (/(^'|'$)/g, "").replace (/\\'/, "'")
578 ));
579 },
580 addEventListenersToTask: function (task) {
581 var self = this;
582
583 task.subscribed = 1;
584
585 // loggers
586 task.on ('log', function (message) {
587 self.logTask (task, message);
588 });
589
590 task.on ('warn', function (message) {
591 self.logTaskError (task, message);
592 });
593
594 task.on ('error', function (e) {
595 self.error = e;
596 self.logTaskError (task, 'error: ', e);
597 });
598
599 // states
600 task.on ('skip', function () {
601// if (task.important) {
602// self.failed = true;
603// return self.logTaskError (task, 'error ' + arguments[0]);
604// }
605 self.logTask (task, 'task skipped');
606
607 if (self.isIdle)
608 self.runDelayed ();
609
610 });
611
612 task.on ('cancel', function (failedValue) {
613
614 if (task.retries !== null)
615 self.logTaskError (task, 'canceled, retries = ' + task.retries);
616
617 if (!task.retries && task.$setOnFail) {
618 common.pathToVal(self.data, task.$setOnFail, failedValue || true);
619 self.haveCompletedTasks = true;
620 } else {
621 self.failed = true;
622 }
623
624 if (self.isIdle)
625 self.runDelayed ();
626 });
627
628 task.on ('complete', function (t, result) {
629
630 if (result) {
631 if (t.produce || t.$set) {
632 common.pathToVal (self.data, t.produce || t.$set, result);
633 } else if (t.$mergeWith) {
634 common.pathToVal (self.data, t.$mergeWith, result, common.mergeObjects);
635 }
636 }
637
638 self.logTask (task, 'task completed');
639
640 if (self.isIdle) {
641 self.runDelayed ();
642 } else
643 self.haveCompletedTasks = true;
644 });
645
646 task.on('empty', function (t) {
647 if (t.$empty || t.$setOnEmpty) {
648 common.pathToVal(self.data, t.$empty || t.$setOnEmpty, true);
649 }
650 });
651
652 }
653});
654
655// legacy
656dataflow.isEmpty = confFu.isEmpty;