UNPKG

17.6 kBJavaScriptView Raw
1var define;
2if (typeof define === "undefined")
3 define = function (classInstance) {
4 classInstance (require, exports, module);
5 }
6
7define (function (require, exports, module) {
8
9"use strict";
10
11var EventEmitter = require ('events').EventEmitter,
12 util = require ('util'),
13 dataflows = require ('../'),
14 common = dataflows.common;
15
16var taskStateList = [
17 'scarce', 'ready', 'running', 'idle',
18 'complete', 'failed', 'skipped', 'exception'
19];
20
21var taskStateNames = {};
22
23var taskStateMethods = {};
24
25for (var stateNum = 0; stateNum < taskStateList.length; stateNum++) {
26 taskStateNames[taskStateList[stateNum]] = stateNum;
27
28 var fName = 'is' + taskStateList[stateNum].toLowerCase().replace(
29 /\b([a-z])/i, function(c) {return c.toUpperCase()}
30 );
31 taskStateMethods[fName] = function (x) {
32 return function () {return this.state == x};
33 } (stateNum);
34}
35
36/**
37 * Tasks are code containers atomic synchronous/asynchronous entities that configure
38 * what must be done, what prerequisitives must be satisfied before doing it,
39 * and, optionally, where to store the task's result.
40 *
41 * `task` is an abstract base class that specific task types
42 * should inherite from.
43 *
44 * The base `task` class provides methods to control the task execution
45 * externally. These methods are called by the dataflow.
46 * They cycle through a number of states ({@link #stateNames})
47 * and emit events.
48 *
49 * ### Example
50 *
51 * A sequence of task configs
52 * within the *dataflo.ws* concept. `task` objects are instantiated
53 * by `dataflow` internally.
54 * @module {Task} Task
55 */
56
57/**
58 * @class
59 * @extends events.EventEmitter
60 * @cfg {String} className (required) The name of a module-exported class
61 * to be instantiated as an asynchronous task.
62 *
63 * **Warning**: Params {@link #className} and {@link #functionName}
64 * are mutually exclusive.
65 *
66 * @cfg {String} functionName (required) The name of a module-exported function
67 * to be called as a synchronous task.
68 *
69 * **Warning**: Params {@link #className} and {@link #functionName}
70 * are mutually exclusive.
71 *
72 * @cfg {String} [method="run"] The entry point method name.
73 * This method will be called after the requirements are satisfied (if any).
74 *
75 * @cfg {Number} [retries=0] The number of times to retry to run the task.
76 *
77 * @cfg {Number} [timeout=1] The number of seconds between retries
78 * to run the task.
79 *
80 * @cfg {String} produce The name of the dataflow data field to receive
81 * the result of the task.
82 *
83 * @cfg {Function|String|String[]} require Lists requirements to check.
84 *
85 * @cfg {Boolean} important If the task is marked important,
86 * it may declare itself {@link #failed}.
87 * Used in custom {@link #method} methods.
88 *
89 * Implementations might provide either a function that checks whether
90 * the requirements are satisfied or an identifier or a list of identifiers,
91 * representing required objects.
92 *
93 * @returns {Boolean} If `require` is callable, it must return a boolean value.
94 *
95 */
96function task (config) {
97
98}
99
100module.exports = task;
101
102util.inherits (task, EventEmitter);
103
104util.extend (task.prototype, taskStateMethods, {
105
106 _launch: function () {
107 //this.emit ('log', 'RUN RETRIES : ' + this.retries);
108
109 if (this.state != 1) return;
110
111 this.state = 2;
112
113 var method = this[this.method || 'run'];
114 if (!method) {
115 this.state = 5;
116 this.emit ('error', 'no method named "' + (this.method || 'run') + "\" in current task's class");
117
118 return;
119 }
120
121 method.call (this);
122
123 if (this.timeout) {
124 this.timeoutId = setTimeout (function () {
125 this.state = 5;
126 this.emit ('log', 'timeout is over for task');
127 this._cancel();
128 }.bind (this), this.timeout);
129 }
130 },
131
132 run: function () {
133 var failed = false;
134
135 /**
136 * Apply $function to $args in $scope.
137 */
138 var origin = null;
139 // WTF???
140 var taskFnName = this.functionName;
141 var fnPath = taskFnName.split('#', 2);
142
143 if (fnPath.length == 2 && $global.project) {
144 origin = $global.project.require(fnPath[0]);
145 taskFnName = fnPath[1];
146 } else if (this.$origin) {
147 origin = this.$origin;
148 } else {
149 origin = $global.$mainModule.exports;
150 }
151
152 var method;
153 method = common.getByPath (taskFnName, origin);
154
155 /**
156 * Try to look up $function in the global scope.
157 */
158 if (!method || 'function' !== typeof method.value) {
159 method = common.getByPath (taskFnName);
160 }
161
162 if (!method || 'function' !== typeof method.value) {
163 method = dataflows.task (taskFnName);
164 }
165
166 if (!method || ('function' !== typeof method && 'function' !== typeof method.value)) {
167 failed = taskFnName + ' is not a function';
168 this.failed(failed);
169 }
170
171 var fn = 'function' === typeof method ? method : method.value;
172 var ctx = this.$scope || method.scope;
173
174 var args = this.$args;
175 var argsType = Object.typeOf(args);
176
177 if (null == args) {
178 args = [ this ];
179 } else if (
180 this.originalConfig.$args &&
181 Object.typeOf (this.originalConfig.$args) != 'Array' &&
182 Object.typeOf (this.originalConfig.$args) != 'Arguments'
183 ) {
184 args = [ args ];
185 }
186
187 // console.log ('task:', taskClassName, 'function:', taskFnName, 'promise:', taskPromise, 'errback:', taskErrBack);
188
189 if (this.type === "errback") {
190 args.push ((function (err) {
191 var cbArgs = [].slice.call (arguments, 1);
192 if (err) {
193 this.failed.apply (this, arguments);
194 return;
195 };
196 this.completed.apply (this, cbArgs);
197 }).bind(this));
198 }
199
200 try {
201 var returnVal = fn.apply(ctx, args);
202 } catch (e) {
203 failed = e;
204 this.failed(failed);
205 }
206
207 if (this.type === "promise") {
208 returnVal.then (
209 this.completed.bind (this),
210 this.failed.bind (this)
211 );
212 } else if (this.type === "errback") {
213
214 } else if (failed) {
215 throw failed;
216 } else {
217 this.completed(returnVal);
218
219 // if (isVoid(returnVal)) {
220 // if (common.isEmpty(returnVal)) {
221 // this.empty();
222 // }
223 }
224 },
225
226 /**
227 * Cancels the running task. The task is registered as attempted
228 * to run.
229 *
230 * Switches the task's state from `running` to `idle`.
231 *
232 * If the {@link #retries} limit allows, attempt to run the task
233 * after a delay ({@link #timeout}).
234 *
235 * Emits {@link #event_cancel}.
236 * @method _cancel
237 */
238 _cancel: function (value) {
239
240 this.attempts ++;
241
242 if (this.state == 2) return;
243
244 this.state = 5;
245
246 if (this.cancel) this.cancel.apply (this, arguments);
247
248 this.clearOperationTimeout();
249
250 //this.emit ('log', 'CANCEL RETRIES : ' + this.retries);
251
252 if (this.attempts - this.retries - 1 < 0) {
253
254 this.state = 1;
255
256 setTimeout (this._launch.bind (this), this.delay || 0);
257
258 return;
259 }
260
261 /**
262 * Published on task cancel.
263 * @event cancel
264 */
265 this.emit ('cancel', value);
266
267 },
268
269 init: function (config) {
270
271 this.require = config.require || null;
272 this.mustProduce = config.mustProduce;
273 this.cb = config.cb;
274 this.cbScope = config.cbScope;
275 this.className = config.className;
276 this.functionName = config.functionName;
277 this.originalConfig = config.originalConfig;
278 this.flowId = config.flowId;
279 this.flowLogId = config.flowLogId;
280 this.getDict = config.getDict;
281 this.type = config.type;
282
283 this.method = config.method;
284 if (this.className && !this.method)
285 this.method = 'run';
286
287 this.id = "" + this.flowId + ":" + config.idx;
288
289 var idxLog = (config.idx < 10 ? " " : "") + config.idx;
290 if ($isServerSide) {
291 idxLog = "\x1B[0;3" + (parseInt(config.idx) % 8) + "m" + idxLog + "\x1B[0m";
292 }
293
294 // idx is a task index within flow config
295 this.dfTaskNo = config.idx;
296 this.dfTaskLogNum = idxLog;
297
298 if (!this.logTitle) {
299 if (this.className) {
300 this.logTitle = this.className + '.' + this.method;
301 } else {
302 this.logTitle = this.functionName;
303 }
304 }
305
306 var stateList = taskStateList;
307
308 var self = this;
309
310 this.state = 0;
311
312 // default values
313
314 // TODO: this is provided only on run
315 self.timeout = config.timeout; // || 10000;
316 self.retries = config.retries || null;
317
318 self.attempts = 0;
319
320 self.important = config.important || void 0;
321
322 // `DEFAULT_CONFIG' is a formal config specification + default values
323 if (self.DEFAULT_CONFIG) {
324 util.shallowMerge(self, self.DEFAULT_CONFIG);
325 }
326
327 var state = this.checkState ();
328// console.log (this.url, 'state is', stateList[state], ' (' + state + ')', (state == 0 ? (this.require instanceof Array ? this.require.join (', ') : this.require) : ''));
329
330 },
331
332 /**
333 *
334 * Emits {@link #event_complete} with the result object
335 * that will go into the {@link #produce} field of the dataflow.
336 *
337 * @method completed
338 *
339 * @param {Object} result The product of the task.
340 */
341 completed: function (result) {
342 this.state = 4;
343
344 var mustProduce = this.mustProduce;
345
346 if (mustProduce) {
347 var checkString = (mustProduce instanceof Array ? mustProduce.join (' && ') : mustProduce);
348 var satisfy = 0;
349 try {satisfy = eval ("if ("+ checkString +") 1") } catch (e) {};
350 if (!satisfy) {
351 // TODO: WebApp.Loader.instance.taskError (this);
352 console.error ("task " + this.url + " must produce " + checkString + " but it doesn't");
353 // TODO: return;
354 }
355 }
356
357 // coroutine call
358 if (typeof this.cb == 'function') {
359// console.log ('cb defined', this.cb, this.cbScope);
360
361 if (this.cbScope) {
362 this.cb.call (this.cbScope, this);
363 } else {
364 this.cb (this);
365 }
366 }
367
368 //@behrad set $empty on completion of all task types
369 if (common.isEmpty (result)) {
370 this.empty();
371 }
372
373 /**
374 * Published upon task completion.
375 *
376 * @event complete
377 * @param {task.task} task
378 * @param {Object} result
379 */
380 this.emit ("complete", this, result);
381 },
382
383 /**
384 *
385 * Skips the task with a given result.
386 *
387 * Emits {@link #event_skip}.
388 *
389 * @method skipped
390 * @param {Object} result Substitutes the tasks's complete result.
391 *
392 */
393 skipped: function (result) {
394 this.state = 6;
395
396 /**
397 * Triggered when the task is {@link #skipped}.
398
399 * @event skip
400 * @param {task.task} task
401 * @param {Object} result
402 */
403 this.emit ("skip", this, result);
404 },
405
406 /**
407 * Run when the task has been completed correctly,
408 * but the result is a non-value (null or empty).
409 *
410 * Emits {@link #event_empty}.
411 *
412 * @method empty
413 */
414 empty: function () {
415 this.state = 6; // completed
416 this.emit('empty', this);
417 },
418
419 /**
420 * Translates task configuration from custom field-naming cheme.
421 *
422 * @method mapFields
423 * @param {Object} item
424 */
425 mapFields: function (item) {
426 var self = this;
427
428 for (var k in self.mapping) {
429 if (item[self.mapping[k]])
430 item[k] = item[self.mapping[k]];
431 }
432
433 return item;
434 },
435
436 /**
437 * Checks requirements and updates the task state.
438 *
439 * @method checkState
440 * @return {Number} The new state code.
441 */
442 checkState: function () {
443
444 var self = this;
445
446 if (!self.require && this.state == 0) {
447 this.state = 1;
448 }
449
450 if (this.state >= 1)
451 return this.state;
452
453 var satisfy = 0;
454 if (typeof self.require == 'function') {
455 satisfy = self.require ();
456 } else {
457 try {
458 satisfy = eval ("if ("+ (
459 self.require instanceof Array
460 ? self.require.join (' && ')
461 : self.require)+") 1")
462 } catch (e) {
463
464 };
465 }
466 if (satisfy) {
467 this.state = 1;
468 return this.state;
469 }
470
471 return this.state;
472 },
473
474 /**
475 * @private
476 */
477 clearOperationTimeout: function() {
478 if (this.timeoutId) {
479 clearTimeout (this.timeoutId);
480 this.timeoutId = undefined;
481 }
482
483 },
484
485 /**
486 * @private
487 */
488 // WTF??? MODEL???
489 activityCheck: function (place, breakOnly) {
490
491 if (place!=="model.fetch data") {
492 // console.log("%%%%%%%%%%%%%place -> ", place);
493 }
494 var self = this;
495
496 if (breakOnly === void (0)) {
497 breakOnly = false;
498 }
499
500 self.clearOperationTimeout();
501
502 if (!breakOnly)
503 {
504 self.timeoutId = setTimeout(function () {
505 self.state = 5;
506 self.emit (
507 'log', 'timeout is over for ' + place + ' operation'
508 );
509 self.model.stop();
510 self._cancel();
511
512 }, self.timeout);
513 }
514 },
515
516 /**
517 * @enum stateNames
518 *
519 * A map of the task state codes to human-readable state descriptions.
520 *
521 * The states codes are:
522 *
523 * - `scarce`
524 * - `ready`
525 * - `running`
526 * - `idle`
527 * - `complete`
528 * - `failed`
529 * - `skipped`
530 * - `empty`
531 */
532 stateNames: taskStateNames,
533
534 /**
535 * Emits an {@link #event_error}.
536 *
537 * Cancels (calls {@link #method-cancel}) the task if it was ready
538 * or running; or just emits {@link #event_cancel} if not.
539 *
540 * Sets the status to `failed`.
541 *
542 * Sidenote: when a task fails the whole dataflow, that it belongs to, fails.
543 *
544 * @method failed
545 * @return {Boolean} Always true.
546 * @param {Error} Error object.
547
548 */
549 failed: function (e, data) {
550 var prevState = this.state;
551 this.state = 5;
552
553 /**
554
555 * Emitted on task fail and on internal errors.
556 * @event error
557 * @param {Error} e Error object.
558 */
559 this.emit('error', e);
560 // if task failed at scarce state
561 if (prevState)
562 this._cancel (data || e)
563 else
564 this.emit ('cancel', data || e);
565 return;
566 }
567
568
569});
570
571/**
572 * Prepare task class to run, handle errors, workaround for a $every tasks.
573 *
574 * @method prepare
575 * @return {Task}.
576 * @param {Flow} flow for task.
577 * @param {DataFlows} dataflows object.
578 * @param {Function} generator for params dictionary and check requirements.
579 * @param {Integer} index in task array for that flow.
580 * @param {Array} task array.
581
582 */
583task.prepare = function (flow, dataflows, gen, taskParams, idx, array) {
584 var theTask;
585
586 var actualTaskParams = {
587 };
588 var taskTemplateName = taskParams.$template;
589 if (taskTemplateName && flow.templates && flow.templates[taskTemplateName]) {
590 util.extend (true, actualTaskParams, flow.templates[taskTemplateName]);
591 delete actualTaskParams.$template;
592 }
593
594 // we expand templates in every place in config
595 // for tasks such as every
596 util.extend (true, actualTaskParams, taskParams);
597
598 if (actualTaskParams.$every) {
599 actualTaskParams.$class = 'every';
600 if (!actualTaskParams.$tasks) {
601 flow.logError ('missing $tasks property for $every task');
602 flow.ready = false;
603 return;
604 }
605 actualTaskParams.$tasks.forEach (function (everyTaskConf, idx) {
606 var taskTemplateName = everyTaskConf.$template;
607 if (taskTemplateName && flow.templates && flow.templates[taskTemplateName]) {
608 var newEveryTaskConf = util.extend (true, {}, flow.templates[taskTemplateName]);
609 // WTF???
610 util.extend (true, newEveryTaskConf, everyTaskConf);
611 util.extend (true, everyTaskConf, newEveryTaskConf);
612 delete everyTaskConf.$template;
613 // console.log (everyTaskConf, actualTaskParams.$tasks[idx]);//everyTaskConf.$tasks
614 }
615
616 });
617
618 actualTaskParams.flowConfig = {
619 logger: flow.logger
620 };
621 }
622
623 // var originalTaskConfig = JSON.parse(JSON.stringify(actualTaskParams));
624 var originalTaskConfig = util.extend (true, {}, actualTaskParams);
625
626 // check for data persistence in flow.templates[taskTemplateName], taskParams
627
628 // console.log (taskParams);
629
630 var taskClassName = actualTaskParams.className || actualTaskParams.$class || actualTaskParams.task;
631 var taskFnName = actualTaskParams.functionName || actualTaskParams.$function || actualTaskParams.fn;
632 var taskPromise = actualTaskParams.promise || actualTaskParams.$promise;
633 var taskErrBack = actualTaskParams.errback || actualTaskParams.$errback;
634
635 // console.log ('task:', taskClassName, 'function:', taskFnName, 'promise:', taskPromise, 'errback:', taskErrBack);
636
637 if (taskClassName && taskFnName)
638 flow.logError ('defined both className and functionName, using className');
639
640 if (taskClassName) {
641
642 try {
643 var taskModule = dataflows.task (taskClassName);
644 theTask = new taskModule ({
645 originalConfig: originalTaskConfig,
646 className: taskClassName,
647 method: actualTaskParams.method || actualTaskParams.$method,
648 require: gen ('checkRequirements', actualTaskParams),
649 important: actualTaskParams.important || actualTaskParams.$important,
650 flowLogId: flow.coloredId,
651 flowId: flow.id,
652 getDict: gen ('createDict'),
653 timeout: actualTaskParams.timeout,
654 retries: actualTaskParams.retries,
655 idx: idx
656 });
657 } catch (e) {
658 flow.logError ('instance of "'+taskClassName+'" creation failed:');
659 flow.logError (e.stack);
660 // throw ('instance of "'+taskClassName+'" creation failed:');
661 flow.ready = false;
662 }
663
664 } else if (taskFnName || taskPromise || taskErrBack) {
665
666 var xTaskClass = function (config) {
667 this.init (config);
668 };
669
670 var taskType;
671
672 if (taskPromise) {
673 // functions and promises similar, but function return value, promise promisepromise promise
674 taskFnName = taskPromise;
675 taskType = "promise";
676 }
677
678 if (taskErrBack) {
679 // nodestyled callbacks
680 taskFnName = taskErrBack;
681 taskType = "errback";
682 }
683
684 util.inherits (xTaskClass, task);
685
686 theTask = new xTaskClass ({
687 originalConfig: originalTaskConfig,
688 functionName: taskFnName || taskPromise || taskErrBack,
689 type: taskType,
690 logTitle: actualTaskParams.logTitle || actualTaskParams.$logTitle || actualTaskParams.displayName,
691 require: gen ('checkRequirements', actualTaskParams),
692 important: actualTaskParams.important || actualTaskParams.$important,
693 flowLogId: flow.coloredId,
694 flowId: flow.id,
695 timeout: actualTaskParams.timeout,
696 retries: actualTaskParams.retries,
697 idx: idx
698 });
699
700 } else {
701 flow.logError ("cannot create task from structure:\n", taskParams);
702 flow.logError ('you must define $task, $function or $promise field');
703 // TODO: return something
704 flow.ready = false;
705 }
706
707 return theTask;
708}
709
710task.prototype.EmitError = task.prototype.failed;
711
712return task;
713
714});