1 | var define;
|
2 | if (typeof define === "undefined") {
|
3 | define = function (classInstance) {
|
4 | classInstance (require, exports, module);
|
5 | }
|
6 | }
|
7 |
|
8 | define (function (require, exports, module) {
|
9 |
|
10 | var util = require('util');
|
11 | var common = require('../common');
|
12 | var flow = require('../flow');
|
13 | var task = require('./base');
|
14 |
|
15 | var EveryTask = function (cfg) {
|
16 | this.init(cfg);
|
17 | this.count = 0;
|
18 | this.results = [];
|
19 |
|
20 | if ((this.$collect || this.$collectArray) && this.$collectObject) {
|
21 | console.error ('options $collectArray and $collectObject are mutually exclusive');
|
22 | this.failed ('Configuration error');
|
23 | }
|
24 |
|
25 | if (this.$collectObject) {
|
26 | this.results = {};
|
27 | }
|
28 |
|
29 | };
|
30 |
|
31 | util.inherits(EveryTask, task);
|
32 |
|
33 | util.extend(EveryTask.prototype, {
|
34 | constructor: EveryTask,
|
35 |
|
36 | DEFAULT_CONFIG: {
|
37 | $tasks: [],
|
38 | $every: [],
|
39 | $collect: '',
|
40 | $set: ''
|
41 | }
|
42 | });
|
43 |
|
44 | EveryTask.prototype.getProperty = function (obj, path) {
|
45 | var val = obj;
|
46 | var hasProp = path.split('.').every(function (prop) {
|
47 | val = val[prop];
|
48 | return null != val;
|
49 | });
|
50 | return hasProp ? val : undefined;
|
51 | };
|
52 |
|
53 | EveryTask.prototype.onFlowResult = function () {
|
54 | this.count += 1;
|
55 |
|
56 |
|
57 |
|
58 |
|
59 | if (this.subtaskFail) {
|
60 | this.failed ('Task failed');
|
61 | return;
|
62 | }
|
63 |
|
64 | if (this.count >= Object.keys (this.$every).length) {
|
65 | if (this.$collect || this.$collectArray) {
|
66 | if (this.results.length) {
|
67 | this.completed(this.results);
|
68 | } else {
|
69 | this.failed('No results');
|
70 | }
|
71 | } else if (this.$collectObject) {
|
72 | if (Object.keys(this.results).length) {
|
73 | this.completed(this.results);
|
74 | } else {
|
75 | this.failed('No results');
|
76 | }
|
77 | } else {
|
78 | this.completed({ ok: true });
|
79 | }
|
80 | }
|
81 | };
|
82 |
|
83 | EveryTask.prototype._onCompleted = function (df) {
|
84 | if (this.$collect || this.$collectArray) {
|
85 | var propertyName = this.$collect || this.$collectArray;
|
86 | var result = this.getProperty(df.data, propertyName);
|
87 | if (undefined !== result) {
|
88 | this.results.push(result);
|
89 | }
|
90 | } else if (this.$collectObject) {
|
91 | var result = this.getProperty(df.data, this.$collectObject);
|
92 | if (undefined !== result) {
|
93 | for (var objectField in result) {
|
94 | this.results[objectField] = result[objectField];
|
95 | }
|
96 | }
|
97 | }
|
98 |
|
99 | this.onFlowResult();
|
100 |
|
101 | this.runNextFlow();
|
102 |
|
103 | };
|
104 |
|
105 | EveryTask.prototype._onFailed = function (df) {
|
106 | this.subtaskFail = true;
|
107 | this.onFlowResult();
|
108 |
|
109 | this.runNextFlow();
|
110 |
|
111 | };
|
112 |
|
113 | EveryTask.prototype.runNextFlow = function () {
|
114 | var runNext = function () {
|
115 | if (this.executionList.length) {
|
116 | var df = this.executionList.shift ();
|
117 | df.run ();
|
118 | }
|
119 | }.bind (this);
|
120 | if (typeof process === 'undefined') {
|
121 | setTimeout (runNext, 0);
|
122 | } else {
|
123 | process.nextTick (runNext);
|
124 | }
|
125 | }
|
126 |
|
127 | function unquote (source, dest, origKey) {
|
128 | var pattern = /\[([$*][^\]]+)\]/g;
|
129 | var replacement = '{$1}';
|
130 |
|
131 | var recur = function (tree, collect, key) {
|
132 | var branch = tree[key];
|
133 | var type = Object.typeOf(branch);
|
134 |
|
135 | if ('String' == type) {
|
136 | var interpol = branch.replace(pattern, replacement);
|
137 | if (interpol != branch) {
|
138 | collect[key] = interpol;
|
139 | }
|
140 | } else if ('Array' == type) {
|
141 | branch.forEach(function (_, k) {
|
142 | recur(branch, collect[key], k);
|
143 | });
|
144 | } else if ('Object' == type) {
|
145 | Object.keys(branch).forEach(function (k) {
|
146 | if (origKey != k) {
|
147 | recur(branch, collect[key], k);
|
148 | }
|
149 | });
|
150 | }
|
151 | };
|
152 |
|
153 | recur(source, dest, origKey);
|
154 | };
|
155 |
|
156 | EveryTask.prototype.run = function () {
|
157 | var self = this;
|
158 |
|
159 | |
160 |
|
161 |
|
162 |
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 | var everyTasks = util.extend (true, {}, this.originalConfig);
|
169 | unquote (everyTasks, everyTasks, '$tasks');
|
170 |
|
171 |
|
172 | var keys = Object.keys (this.$every);
|
173 |
|
174 | this.executionList = [];
|
175 | try {
|
176 | keys.map (this.prepareDF.bind (this, everyTasks));
|
177 | } catch (e) {
|
178 | this.failed ();
|
179 | return;
|
180 | }
|
181 |
|
182 | var concurrencyMax = this.concurrency || 10;
|
183 | var concurrency = Math.min (this.executionList.length, concurrencyMax);
|
184 |
|
185 | for (var toRun = 0; toRun < concurrency; toRun++) {
|
186 | var df = this.executionList.shift ();
|
187 | df.run ();
|
188 | }
|
189 | };
|
190 |
|
191 | EveryTask.prototype.prepareDF = function (everyTasks, item, idx, keys) {
|
192 | var every = {
|
193 | item: this.$every[item],
|
194 | index: item,
|
195 | data: this.$every,
|
196 | length: keys.length
|
197 | };
|
198 |
|
199 |
|
200 |
|
201 |
|
202 | var dict = util.extend (true, {}, this.getDict());
|
203 | dict.every = every;
|
204 |
|
205 | var flowConfig = util.extend (true, {}, this.flowConfig);
|
206 | flowConfig.tasks = everyTasks.$tasks;
|
207 | flowConfig.idPrefix = this.flowLogId + '>';
|
208 |
|
209 | var df = new flow (flowConfig, dict);
|
210 |
|
211 | if (!df.ready) {
|
212 | throw "subflow not ready";
|
213 | }
|
214 |
|
215 | df.on ('completed', this._onCompleted.bind (this));
|
216 | df.on ('failed', this._onFailed.bind (this));
|
217 |
|
218 | this.executionList.push (df);
|
219 | return df;
|
220 | };
|
221 |
|
222 | module.exports = EveryTask;
|
223 |
|
224 | return EveryTask;
|
225 |
|
226 | });
|