UNPKG

5.17 kBJavaScriptView Raw
1var define;
2if (typeof define === "undefined") {
3 define = function (classInstance) {
4 classInstance (require, exports, module);
5 }
6}
7
8define (function (require, exports, module) {
9
10var util = require('util');
11var common = require('../common');
12var flow = require('../flow');
13var task = require('./base');
14
15var EveryTask = function (cfg) {
16 this.init(cfg);
17 this.count = 0;
18 this.results = [];
19
20 if ((this.$collect || this.$collectArray) && this.$collectObject) {
21 console.error ('options $collectArray and $collectObject are mutually exclusive');
22 this.failed ('Configuration error');
23 }
24
25 if (this.$collectObject) {
26 this.results = {};
27 }
28
29};
30
31util.inherits(EveryTask, task);
32
33util.extend(EveryTask.prototype, {
34 constructor: EveryTask,
35
36 DEFAULT_CONFIG: {
37 $tasks: [],
38 $every: [],
39 $collect: '',
40 $set: ''
41 }
42});
43
44EveryTask.prototype.getProperty = function (obj, path) {
45 var val = obj;
46 var hasProp = path.split('.').every(function (prop) {
47 val = val[prop];
48 return null != val;
49 });
50 return hasProp ? val : undefined;
51};
52
53EveryTask.prototype.onFlowResult = function () {
54 this.count += 1;
55
56 // TODO: failed dataflows and completed ones must be separated
57 // so, every task must fail only when one or more dataflows is failed
58 // otherwise, we need to emit empty
59 if (this.subtaskFail) {
60 this.failed ('Task failed');
61 return;
62 }
63
64 if (this.count >= Object.keys (this.$every).length) {
65 if (this.$collect || this.$collectArray) {
66 if (this.results.length) {
67 this.completed(this.results);
68 } else {
69 this.failed('No results');
70 }
71 } else if (this.$collectObject) {
72 if (Object.keys(this.results).length) {
73 this.completed(this.results);
74 } else {
75 this.failed('No results');
76 }
77 } else {
78 this.completed({ ok: true });
79 }
80 }
81};
82
83EveryTask.prototype._onCompleted = function (df) {
84 if (this.$collect || this.$collectArray) {
85 var propertyName = this.$collect || this.$collectArray;
86 var result = this.getProperty(df.data, propertyName);
87 if (undefined !== result) {
88 this.results.push(result);
89 }
90 } else if (this.$collectObject) {
91 var result = this.getProperty(df.data, this.$collectObject);
92 if (undefined !== result) {
93 for (var objectField in result) {
94 this.results[objectField] = result[objectField];
95 }
96 }
97 }
98
99 this.onFlowResult();
100
101 this.runNextFlow();
102
103};
104
105EveryTask.prototype._onFailed = function (df) {
106 this.subtaskFail = true;
107 this.onFlowResult();
108
109 this.runNextFlow();
110
111};
112
113EveryTask.prototype.runNextFlow = function () {
114 var runNext = function () {
115 if (this.executionList.length) {
116 var df = this.executionList.shift ();
117 df.run ();
118 }
119 }.bind (this);
120 if (typeof process === 'undefined') {
121 setTimeout (runNext, 0);
122 } else {
123 process.nextTick (runNext);
124 }
125}
126
127function unquote (source, dest, origKey) {
128 var pattern = /\[([$*][^\]]+)\]/g;
129 var replacement = '{$1}';
130
131 var recur = function (tree, collect, key) {
132 var branch = tree[key];
133 var type = Object.typeOf(branch);
134
135 if ('String' == type) {
136 var interpol = branch.replace(pattern, replacement);
137 if (interpol != branch) {
138 collect[key] = interpol;
139 }
140 } else if ('Array' == type) {
141 branch.forEach(function (_, k) {
142 recur(branch, collect[key], k);
143 });
144 } else if ('Object' == type) {
145 Object.keys(branch).forEach(function (k) {
146 if (origKey != k) {
147 recur(branch, collect[key], k);
148 }
149 });
150 }
151 };
152
153 recur(source, dest, origKey);
154};
155
156EveryTask.prototype.run = function () {
157 var self = this;
158
159 /**
160 * Walk the original config tree and replace [$...] with {$...},
161 * modifying the interpolated config tree (i.e. `this').
162 * Don't touch [$...] refs inside nested $every loops.
163 */
164 // katspaugh is so stupid
165 // if we run already interpolated values second time,
166 // we face a problem with double interpolated values
167 // and missing functions
168 var everyTasks = util.extend (true, {}, this.originalConfig);
169 unquote (everyTasks, everyTasks, '$tasks');
170
171 // works for arrays and objects
172 var keys = Object.keys (this.$every);
173
174 this.executionList = [];
175 try {
176 keys.map (this.prepareDF.bind (this, everyTasks));
177 } catch (e) {
178 this.failed ();
179 return;
180 }
181
182 var concurrencyMax = this.concurrency || 10;
183 var concurrency = Math.min (this.executionList.length, concurrencyMax);
184
185 for (var toRun = 0; toRun < concurrency; toRun++) {
186 var df = this.executionList.shift ();
187 df.run ();
188 }
189};
190
191EveryTask.prototype.prepareDF = function (everyTasks, item, idx, keys) {
192 var every = {
193 item: this.$every[item],
194 index: item,
195 data: this.$every,
196 length: keys.length
197 };
198
199 // console.log (every);
200
201 // dict the same between every, so we need to host a local copy
202 var dict = util.extend (true, {}, this.getDict());
203 dict.every = every;
204
205 var flowConfig = util.extend (true, {}, this.flowConfig);
206 flowConfig.tasks = everyTasks.$tasks;
207 flowConfig.idPrefix = this.flowLogId + '>';
208
209 var df = new flow (flowConfig, dict);
210
211 if (!df.ready) {
212 throw "subflow not ready";
213 }
214
215 df.on ('completed', this._onCompleted.bind (this));
216 df.on ('failed', this._onFailed.bind (this));
217
218 this.executionList.push (df);
219 return df;
220};
221
222module.exports = EveryTask;
223
224return EveryTask;
225
226});