UNPKG

17.8 kBJavaScriptView Raw
1var define;
2if (typeof define === "undefined")
3 define = function (classInstance) {
4 classInstance (require, exports, module);
5 }
6
7define (function (require, exports, module) {
8
9"use strict";
10
11var EventEmitter = require ('events').EventEmitter,
12 util = require ('util'),
13 dataflows = require ('../'),
14 common = dataflows.common;
15
16var taskStateList = [
17 'scarce', 'ready', 'running', 'idle',
18 'complete', 'failed', 'skipped', 'exception'
19];
20
21var taskStateNames = {};
22
23var taskStateMethods = {};
24
25for (var stateNum = 0; stateNum < taskStateList.length; stateNum++) {
26 taskStateNames[taskStateList[stateNum]] = stateNum;
27
28 var fName = 'is' + taskStateList[stateNum].toLowerCase().replace(
29 /\b([a-z])/i, function(c) {return c.toUpperCase()}
30 );
31 taskStateMethods[fName] = function (x) {
32 return function () {return this.state == x};
33 } (stateNum);
34}
35
36/**
37 * Tasks are code containers atomic synchronous/asynchronous entities that configure
38 * what must be done, what prerequisitives must be satisfied before doing it,
39 * and, optionally, where to store the task's result.
40 *
41 * `task` is an abstract base class that specific task types
42 * should inherite from.
43 *
44 * The base `task` class provides methods to control the task execution
45 * externally. These methods are called by the dataflow.
46 * They cycle through a number of states ({@link #stateNames})
47 * and emit events.
48 *
49 * ### Example
50 *
51 * A sequence of task configs
52 * within the *dataflo.ws* concept. `task` objects are instantiated
53 * by `dataflow` internally.
54 * @module {Task} Task
55 */
56
57/**
58 * @class
59 * @extends events.EventEmitter
60 * @cfg {String} className (required) The name of a module-exported class
61 * to be instantiated as an asynchronous task.
62 *
63 * **Warning**: Params {@link #className} and {@link #functionName}
64 * are mutually exclusive.
65 *
66 * @cfg {String} functionName (required) The name of a module-exported function
67 * to be called as a synchronous task.
68 *
69 * **Warning**: Params {@link #className} and {@link #functionName}
70 * are mutually exclusive.
71 *
72 * @cfg {String} [method="run"] The entry point method name.
73 * This method will be called after the requirements are satisfied (if any).
74 *
75 * @cfg {Number} [retries=0] The number of times to retry to run the task.
76 *
77 * @cfg {Number} [timeout=1] The number of seconds between retries
78 * to run the task.
79 *
80 * @cfg {String} produce The name of the dataflow data field to receive
81 * the result of the task.
82 *
83 * @cfg {Function|String|String[]} require Lists requirements to check.
84 *
85 * @cfg {Boolean} important If the task is marked important,
86 * it may declare itself {@link #failed}.
87 * Used in custom {@link #method} methods.
88 *
89 * Implementations might provide either a function that checks whether
90 * the requirements are satisfied or an identifier or a list of identifiers,
91 * representing required objects.
92 *
93 * @returns {Boolean} If `require` is callable, it must return a boolean value.
94 *
95 */
96function task (config) {
97
98}
99
100module.exports = task;
101
102util.inherits (task, EventEmitter);
103
104util.extend (task.prototype, taskStateMethods, {
105
106 _launch: function () {
107 //this.emit ('log', 'RUN RETRIES : ' + this.retries);
108
109 if (this.state != 1) return;
110
111 this.state = 2;
112
113 var method = this[this.method || 'run'];
114 if (!method) {
115 this.state = 5;
116 this.emit ('error', 'no method named "' + (this.method || 'run') + "\" in current task's class");
117
118 return;
119 }
120
121 method.call (this);
122
123 if (this.timeout) {
124 this.timeoutId = setTimeout (function () {
125 this.state = 5;
126 this.emit ('log', 'timeout is over for task');
127 this._cancel();
128 }.bind (this), this.timeout);
129 }
130 },
131
132 run: function () {
133 var failed = false;
134
135 /**
136 * Apply $function to $args in $scope.
137 */
138 var origin = dataflows.global ();
139 // WTF???
140 var taskFnName = this.functionName;
141 var fnPath = taskFnName.split('#', 2);
142
143 if (fnPath.length == 2 && $global.project) {
144 origin = $global.project.require(fnPath[0]);
145 taskFnName = fnPath[1];
146 } else if (this.$origin) {
147 origin = this.$origin;
148 } else {
149 origin = dataflows.main ();
150 }
151
152 var method;
153 method = common.getByPath (taskFnName, origin);
154
155 /**
156 * Try to look up $function in the global scope.
157 */
158 if (!method || 'function' !== typeof method.value) {
159 method = common.getByPath (taskFnName, dataflows.global ());
160 }
161
162 if (!method || 'function' !== typeof method.value) {
163 method = dataflows.task (taskFnName);
164 }
165
166 if (!method || ('function' !== typeof method && 'function' !== typeof method.value)) {
167 failed = taskFnName + ' is not a function';
168 this.failed(failed);
169 }
170
171 var fn = 'function' === typeof method ? method : method.value;
172 var ctx = this.$scope || method.scope;
173
174 var args = this.$args;
175 var argsType = Object.typeOf(args);
176
177 if (null == args) {
178 args = []; // args = [ this ]; // NO WAY!!!
179 } else if (
180 this.originalConfig.$args &&
181 Object.typeOf (this.originalConfig.$args) != 'Array' &&
182 Object.typeOf (this.originalConfig.$args) != 'Arguments'
183 ) {
184 args = [args];
185 }
186
187 // console.log ('task:', taskClassName, 'function:', taskFnName, 'promise:', taskPromise, 'errback:', taskErrBack);
188
189 if (this.type === "errback") {
190 args.push ((function (err) {
191 var cbArgs = [].slice.call (arguments, 1);
192 if (err) {
193 this.failed.apply (this, arguments);
194 return;
195 };
196 this.completed.apply (this, cbArgs);
197 }).bind(this));
198 }
199
200 try {
201 var returnVal = fn.apply(ctx, args);
202 } catch (e) {
203 failed = e;
204 this.failed(failed);
205 }
206
207 if (this.type === "promise") {
208 returnVal.then (
209 this.completed.bind (this),
210 this.failed.bind (this)
211 );
212 } else if (this.type === "errback") {
213
214 } else if (failed) {
215 throw failed;
216 } else {
217 this.completed(returnVal);
218
219 // if (isVoid(returnVal)) {
220 // if (common.isEmpty(returnVal)) {
221 // this.empty();
222 // }
223 }
224 },
225
226 /**
227 * Cancels the running task. The task is registered as attempted
228 * to run.
229 *
230 * Switches the task's state from `running` to `idle`.
231 *
232 * If the {@link #retries} limit allows, attempt to run the task
233 * after a delay ({@link #timeout}).
234 *
235 * Emits {@link #event_cancel}.
236 * @method _cancel
237 */
238 _cancel: function (value) {
239
240 this.attempts ++;
241
242 if (this.state == 2) return;
243
244 this.state = 5;
245
246 if (this.cancel) this.cancel.apply (this, arguments);
247
248 this.clearOperationTimeout();
249
250 //this.emit ('log', 'CANCEL RETRIES : ' + this.retries);
251
252 if (this.attempts - this.retries - 1 < 0) {
253
254 this.state = 1;
255
256 setTimeout (this._launch.bind (this), this.delay || 0);
257
258 return;
259 }
260
261 /**
262 * Published on task cancel.
263 * @event cancel
264 */
265 this.emit ('cancel', value);
266
267 },
268
269 init: function (config) {
270
271 this.require = config.require || null;
272 this.mustProduce = config.mustProduce;
273 this.cb = config.cb;
274 this.cbScope = config.cbScope;
275 this.className = config.className;
276 this.functionName = config.functionName;
277 this.originalConfig = config.originalConfig;
278 this.flowId = config.flowId;
279 this.flowLogId = config.flowLogId;
280 this.getDict = config.getDict;
281 this.type = config.type;
282
283 this.method = config.method;
284 if (this.className && !this.method)
285 this.method = 'run';
286
287 this.id = "" + this.flowId + ":" + config.idx;
288
289 var idxLog = (config.idx < 10 ? " " : "") + config.idx;
290 if (dataflows.nodePlatform) {
291 idxLog = "\x1B[0;3" + (parseInt(config.idx) % 8) + "m" + idxLog + "\x1B[0m";
292 }
293
294 // idx is a task index within flow config
295 this.dfTaskNo = config.idx;
296 this.dfTaskLogNum = idxLog;
297
298 if (!this.logTitle) {
299 if (this.className) {
300 this.logTitle = this.className + '.' + this.method;
301 } else {
302 this.logTitle = this.functionName;
303 }
304 }
305
306 var stateList = taskStateList;
307
308 var self = this;
309
310 this.state = 0;
311
312 // default values
313
314 // TODO: this is provided only on run
315 self.timeout = config.timeout; // || 10000;
316 self.retries = config.retries || null;
317
318 self.attempts = 0;
319
320 self.important = config.important || void 0;
321
322 // `DEFAULT_CONFIG' is a formal config specification + default values
323 if (self.DEFAULT_CONFIG) {
324 util.shallowMerge(self, self.DEFAULT_CONFIG);
325 }
326
327 var state = this.checkState ();
328// console.log (this.url, 'state is', stateList[state], ' (' + state + ')', (state == 0 ? (this.require instanceof Array ? this.require.join (', ') : this.require) : ''));
329
330 },
331
332 /**
333 *
334 * Emits {@link #event_complete} with the result object
335 * that will go into the {@link #produce} field of the dataflow.
336 *
337 * @method completed
338 *
339 * @param {Object} result The product of the task.
340 */
341 completed: function (result) {
342 this.state = taskStateNames.complete;
343
344 var mustProduce = this.mustProduce;
345
346 if (mustProduce) {
347 var checkString = (mustProduce instanceof Array ? mustProduce.join (' && ') : mustProduce);
348 var satisfy = 0;
349 try {satisfy = eval ("if ("+ checkString +") 1") } catch (e) {};
350 if (!satisfy) {
351 // TODO: WebApp.Loader.instance.taskError (this);
352 console.error ("task " + this.url + " must produce " + checkString + " but it doesn't");
353 // TODO: return;
354 }
355 }
356
357 // coroutine call
358 if (typeof this.cb == 'function') {
359// console.log ('cb defined', this.cb, this.cbScope);
360
361 if (this.cbScope) {
362 this.cb.call (this.cbScope, this);
363 } else {
364 this.cb (this);
365 }
366 }
367
368 //@behrad set $empty on completion of all task types
369 if (common.isEmpty (result)) {
370 this.empty();
371 // TODO: return here, we don't need to emit complete
372 // or emit flowData event
373 }
374
375 /**
376 * Published upon task completion.
377 *
378 * @event complete
379 * @param {task.task} task
380 * @param {Object} result
381 */
382 this.emit ("complete", this, result);
383 },
384
385 /**
386 *
387 * Skips the task with a given result.
388 *
389 * Emits {@link #event_skip}.
390 *
391 * @method skipped
392 * @param {Object} result Substitutes the tasks's complete result.
393 *
394 */
395 skipped: function (result) {
396 this.state = taskStateNames.skipped;
397
398 /**
399 * Triggered when the task is {@link #skipped}.
400
401 * @event skip
402 * @param {task.task} task
403 * @param {Object} result
404 */
405 this.emit ("skip", this, result);
406 },
407
408 /**
409 * Run when the task has been completed correctly,
410 * but the result is a non-value (null or empty).
411 *
412 * Emits {@link #event_empty}.
413 *
414 * @method empty
415 */
416 empty: function () {
417 this.state = 6; // skipped, not completed? WTF?
418 this.emit ('empty', this);
419 },
420
421 /**
422 * Translates task configuration from custom field-naming cheme.
423 *
424 * @method mapFields
425 * @param {Object} item
426 */
427 mapFields: function (item) {
428 var self = this;
429
430 for (var k in self.mapping) {
431 if (item[self.mapping[k]])
432 item[k] = item[self.mapping[k]];
433 }
434
435 return item;
436 },
437
438 /**
439 * Checks requirements and updates the task state.
440 *
441 * @method checkState
442 * @return {Number} The new state code.
443 */
444 checkState: function () {
445
446 var self = this;
447
448 if (!self.require && this.state == 0) {
449 this.state = 1;
450 }
451
452 if (this.state >= 1)
453 return this.state;
454
455 var satisfy = 0;
456 if (typeof self.require == 'function') {
457 satisfy = self.require ();
458 } else {
459 try {
460 satisfy = eval ("if ("+ (
461 self.require instanceof Array
462 ? self.require.join (' && ')
463 : self.require)+") 1")
464 } catch (e) {
465
466 };
467 }
468 if (satisfy) {
469 this.state = 1;
470 return this.state;
471 }
472
473 return this.state;
474 },
475
476 /**
477 * @private
478 */
479 clearOperationTimeout: function() {
480 if (this.timeoutId) {
481 clearTimeout (this.timeoutId);
482 this.timeoutId = undefined;
483 }
484
485 },
486
487 /**
488 * @private
489 */
490 // WTF??? MODEL???
491 activityCheck: function (place, breakOnly) {
492
493 if (place!=="model.fetch data") {
494 // console.log("%%%%%%%%%%%%%place -> ", place);
495 }
496 var self = this;
497
498 if (breakOnly === void (0)) {
499 breakOnly = false;
500 }
501
502 self.clearOperationTimeout();
503
504 if (!breakOnly)
505 {
506 self.timeoutId = setTimeout(function () {
507 self.state = 5;
508 self.emit (
509 'log', 'timeout is over for ' + place + ' operation'
510 );
511 self.model.stop();
512 self._cancel();
513
514 }, self.timeout);
515 }
516 },
517
518 /**
519 * @enum stateNames
520 *
521 * A map of the task state codes to human-readable state descriptions.
522 *
523 * The states codes are:
524 *
525 * - `scarce`
526 * - `ready`
527 * - `running`
528 * - `idle`
529 * - `complete`
530 * - `failed`
531 * - `skipped`
532 * - `empty`
533 */
534 stateNames: taskStateNames,
535
536 /**
537 * Emits an {@link #event_error}.
538 *
539 * Cancels (calls {@link #method-cancel}) the task if it was ready
540 * or running; or just emits {@link #event_cancel} if not.
541 *
542 * Sets the status to `failed`.
543 *
544 * Sidenote: when a task fails the whole dataflow, that it belongs to, fails.
545 *
546 * @method failed
547 * @return {Boolean} Always true.
548 * @param {Error} Error object.
549
550 */
551 failed: function (e, data) {
552 var prevState = this.state;
553 this.state = 5;
554
555 /**
556
557 * Emitted on task fail and on internal errors.
558 * @event error
559 * @param {Error} e Error object.
560 */
561 this.emit('error', e);
562 // if task failed at scarce state
563 if (prevState)
564 this._cancel (data || e)
565 else
566 this.emit ('cancel', data || e);
567 return;
568 }
569
570
571});
572
573/**
574 * Prepare task class to run, handle errors, workaround for a $every tasks.
575 *
576 * @method prepare
577 * @return {Task}.
578 * @param {Flow} flow for task.
579 * @param {DataFlows} dataflows object.
580 * @param {Function} generator for params dictionary and check requirements.
581 * @param {Integer} index in task array for that flow.
582 * @param {Array} task array.
583
584 */
585task.prepare = function (flow, dataflows, gen, taskParams, idx, array) {
586 var theTask;
587
588 var actualTaskParams = {
589 };
590 var taskTemplateName = taskParams.$template;
591 if (taskTemplateName && flow.templates && flow.templates[taskTemplateName]) {
592 util.extend (true, actualTaskParams, flow.templates[taskTemplateName]);
593 delete actualTaskParams.$template;
594 }
595
596 // we expand templates in every place in config
597 // for tasks such as every
598 util.extend (true, actualTaskParams, taskParams);
599
600 if (actualTaskParams.$every) {
601 actualTaskParams.$class = 'every';
602 if (!actualTaskParams.$tasks) {
603 flow.logError ('missing $tasks property for $every task');
604 flow.ready = false;
605 return;
606 }
607 actualTaskParams.$tasks.forEach (function (everyTaskConf, idx) {
608 var taskTemplateName = everyTaskConf.$template;
609 if (taskTemplateName && flow.templates && flow.templates[taskTemplateName]) {
610 var newEveryTaskConf = util.extend (true, {}, flow.templates[taskTemplateName]);
611 // WTF???
612 util.extend (true, newEveryTaskConf, everyTaskConf);
613 util.extend (true, everyTaskConf, newEveryTaskConf);
614 delete everyTaskConf.$template;
615 // console.log (everyTaskConf, actualTaskParams.$tasks[idx]);//everyTaskConf.$tasks
616 }
617
618 });
619
620 actualTaskParams.flowConfig = {
621 logger: flow.logger
622 };
623 }
624
625 // var originalTaskConfig = JSON.parse(JSON.stringify(actualTaskParams));
626 var originalTaskConfig = util.extend (true, {}, actualTaskParams);
627
628 // check for data persistence in flow.templates[taskTemplateName], taskParams
629
630 // console.log (taskParams);
631
632 var taskClassName = actualTaskParams.className || actualTaskParams.$class || actualTaskParams.task;
633 var taskFnName = actualTaskParams.functionName || actualTaskParams.$function || actualTaskParams.fn;
634 var taskPromise = actualTaskParams.promise || actualTaskParams.$promise;
635 var taskErrBack = actualTaskParams.errback || actualTaskParams.$errback;
636
637 // console.log ('task:', taskClassName, 'function:', taskFnName, 'promise:', taskPromise, 'errback:', taskErrBack);
638
639 if (taskClassName && taskFnName)
640 flow.logError ('defined both className and functionName, using className');
641
642 if (taskClassName) {
643
644 try {
645 var taskModule = dataflows.task (taskClassName);
646 theTask = new taskModule ({
647 originalConfig: originalTaskConfig,
648 className: taskClassName,
649 method: actualTaskParams.method || actualTaskParams.$method,
650 require: gen ('checkRequirements', actualTaskParams),
651 important: actualTaskParams.important || actualTaskParams.$important,
652 flowLogId: flow.coloredId,
653 flowId: flow.id,
654 getDict: gen ('createDict'),
655 timeout: actualTaskParams.timeout,
656 retries: actualTaskParams.retries,
657 idx: idx
658 });
659 } catch (e) {
660 flow.logError ('instance of "'+taskClassName+'" creation failed:');
661 flow.logError (e.stack);
662 // throw ('instance of "'+taskClassName+'" creation failed:');
663 flow.ready = false;
664 }
665
666 } else if (taskFnName || taskPromise || taskErrBack) {
667
668 var xTaskClass = function (config) {
669 this.init (config);
670 };
671
672 var taskType;
673
674 if (taskPromise) {
675 // functions and promises similar, but function return value, promise promisepromise promise
676 taskFnName = taskPromise;
677 taskType = "promise";
678 }
679
680 if (taskErrBack) {
681 // nodestyled callbacks
682 taskFnName = taskErrBack;
683 taskType = "errback";
684 }
685
686 util.inherits (xTaskClass, task);
687
688 theTask = new xTaskClass ({
689 originalConfig: originalTaskConfig,
690 functionName: taskFnName || taskPromise || taskErrBack,
691 type: taskType,
692 logTitle: actualTaskParams.logTitle || actualTaskParams.$logTitle || actualTaskParams.displayName,
693 require: gen ('checkRequirements', actualTaskParams),
694 important: actualTaskParams.important || actualTaskParams.$important,
695 flowLogId: flow.coloredId,
696 flowId: flow.id,
697 timeout: actualTaskParams.timeout,
698 retries: actualTaskParams.retries,
699 idx: idx
700 });
701
702 } else {
703 flow.logError ("cannot create task from structure:\n", taskParams);
704 flow.logError ('you must define $task, $function or $promise field');
705 // TODO: return something
706 flow.ready = false;
707 }
708
709 return theTask;
710}
711
712task.prototype.EmitError = task.prototype.failed;
713
714return task;
715
716});