UNPKG

16.4 kBJavaScriptView Raw
1"use strict";
2var task_resolve_1 = require("./task.resolve");
3var task_return_values_1 = require("./task.return.values");
4var Rx = require("rx");
5var debug = require("debug")("cb:task.runner");
6var _ = require("../lodash.custom");
7var once = require("once");
8var domain = require("domain");
9var TaskReportType;
10(function (TaskReportType) {
11 TaskReportType[TaskReportType["start"] = "start"] = "start";
12 TaskReportType[TaskReportType["end"] = "end"] = "end";
13 TaskReportType[TaskReportType["error"] = "error"] = "error";
14})(TaskReportType = exports.TaskReportType || (exports.TaskReportType = {}));
15var TaskSkipReasons;
16(function (TaskSkipReasons) {
17 TaskSkipReasons[TaskSkipReasons["SkipFlag"] = "SkipFlag"] = "SkipFlag";
18 TaskSkipReasons[TaskSkipReasons["IfChanged"] = "IfChanged"] = "IfChanged";
19})(TaskSkipReasons = exports.TaskSkipReasons || (exports.TaskSkipReasons = {}));
20/**
21 * This creates a wrapper around the actual function that will be run.
22 * This done to allow the before/after reporting to work as expected for consumers
23 */
24function time(scheduler) {
25 return scheduler ? scheduler.now() : new Date().getTime();
26}
27exports.time = time;
28function createObservableFromSequenceItem(item, trigger, ctx) {
29 var taskTrigger = _.assign({}, trigger, ctx.toJS());
30 return Rx.Observable.create(function (observer) {
31 var startTime = time(taskTrigger.config.scheduler);
32 /**
33 * Complete immediately if this item was marked
34 * as 'skipped'
35 */
36 if (!taskTrigger.config.force && item.task.skipped) {
37 var additionalStats = {
38 skipped: true,
39 skippedReason: TaskSkipReasons.SkipFlag
40 };
41 var stats_1 = getStartStats(startTime, additionalStats);
42 observer.onNext(getTaskReport(TaskReportType.start, item, stats_1));
43 observer.onNext(getTaskReport(TaskReportType.end, item, getEndStats(stats_1, startTime, additionalStats)));
44 observer.onCompleted();
45 return;
46 }
47 /**
48 * Complete immediately if this item was marked
49 * with an 'ifChanged' predicate
50 */
51 if (!taskTrigger.config.force && item.task.ifChanged.length && ctx.hasIn(["ifChanged"])) {
52 var hasChanges = ctx
53 .get("ifChanged")
54 .filter(function (x) {
55 return item.task.ifChanged.indexOf(x.get("userInput")) !== -1;
56 })
57 .some(function (x) { return x.get("changed"); });
58 if (!hasChanges) {
59 var additionalStats = {
60 skipped: true,
61 skippedReason: TaskSkipReasons.IfChanged
62 };
63 var stats_2 = getStartStats(startTime, additionalStats);
64 observer.onNext(getTaskReport(TaskReportType.start, item, stats_2));
65 observer.onNext(getTaskReport(TaskReportType.end, item, getEndStats(stats_2, startTime, additionalStats)));
66 observer.onCompleted();
67 return;
68 }
69 }
70 /**
71 * Timestamp when this task starts
72 * @type {TaskStats}
73 */
74 var stats = getStartStats(startTime, { skipped: false });
75 debug("> seqUID " + item.seqUID + " started");
76 /**
77 * Task started
78 */
79 observer.onNext(getTaskReport(TaskReportType.start, item, stats));
80 /**
81 * Exit after 1 second if we're in a 'dry run'
82 */
83 if (taskTrigger.config.dryRun) {
84 return Rx.Observable
85 .just("dryRun")
86 .delay(taskTrigger.config.dryRunDuration, taskTrigger.config.scheduler)
87 .do(function (_) {
88 observer.onNext(getTaskReport(TaskReportType.end, item, getEndStats(stats, time(taskTrigger.config.scheduler))));
89 observer.onCompleted();
90 }).subscribe();
91 }
92 if (item.task.type === task_resolve_1.TaskTypes.InlineFunction
93 || item.task.type === task_resolve_1.TaskTypes.ExternalTask
94 || item.task.type === task_resolve_1.TaskTypes.Adaptor) {
95 var argCount = item.factory.length;
96 var cb_1 = once(function (err) {
97 if (err) {
98 observer.onError(err);
99 return;
100 }
101 observer.onNext(getTaskReport(TaskReportType.end, item, getEndStats(stats, time(taskTrigger.config.scheduler))));
102 observer.onCompleted();
103 });
104 var d_1 = domain.create();
105 d_1.once("error", function (err) {
106 cb_1(err);
107 });
108 var domainBoundFn = d_1.bind(item.factory.bind(null, item.options, taskTrigger));
109 var done = function (err) {
110 d_1.removeListener("error", function (err) {
111 cb_1(err);
112 });
113 d_1.exit();
114 return cb_1.apply(null, arguments);
115 };
116 var result = domainBoundFn(done);
117 if (result) {
118 var returns = task_return_values_1.default(result, done);
119 /**
120 * If the return value does not need to be consumed,
121 * but it is IS a function, assume it's the tear-down logic
122 * for this task - which also means it MUST signify completion
123 * via the callback
124 */
125 if (!returns && typeof result === "function") {
126 if (argCount >= 3) {
127 return result;
128 }
129 else {
130 done(new Error("You returned tear-down logic, but you never asked for the completion callback"));
131 return;
132 }
133 }
134 }
135 else {
136 /**
137 * Assume sync function if nothing returned
138 * and 3rd argument was not asked for
139 */
140 if (argCount < 3) {
141 done();
142 return;
143 }
144 }
145 }
146 }).catch(function (error) {
147 /**
148 * **--**--MAGIC--**--**
149 * If a task throws an error of any kind, we want that error to propagate as normal,
150 * but we want tp prepend an error report so that the error report can be observed
151 * before the sequence ends.
152 */
153 return Rx.Observable.concat(Rx.Observable.just(getTaskErrorReport(item, getErrorStats(error, time(taskTrigger.config.scheduler)))), Rx.Observable.throw(error));
154 });
155}
156exports.createObservableFromSequenceItem = createObservableFromSequenceItem;
157/**
158 * Factory for TaskReports
159 */
160function getTaskReport(type, item, stats) {
161 return { type: type, item: item, stats: stats };
162}
163/**
164 * Create a new stats object with startTime
165 */
166function getStartStats(startTime, additional) {
167 return _.assign({}, additional, {
168 startTime: startTime,
169 started: true,
170 endTime: 0,
171 duration: 0,
172 completed: false,
173 errors: []
174 });
175}
176exports.getStartStats = getStartStats;
177/**
178 * Create a new stats object with completed/duration flags etc
179 */
180function getEndStats(stats, endTime, additional) {
181 return _.assign({}, stats, additional, {
182 endTime: endTime,
183 duration: endTime - stats.startTime,
184 completed: true
185 });
186}
187/**
188 * Factory for TaskReports that errored
189 */
190function getTaskErrorReport(item, stats) {
191 return { type: TaskReportType.error, item: item, stats: stats };
192}
193/**
194 * Get basic stats for a task error
195 */
196function getErrorStats(error, endTime) {
197 if (error._cbError) {
198 return {
199 endTime: endTime,
200 completed: false,
201 errors: [error],
202 cbError: true,
203 cbExitCode: error._cbExitCode
204 };
205 }
206 return {
207 endTime: endTime,
208 completed: false,
209 errors: [error]
210 };
211}
212//# sourceMappingURL=data:application/json;base64,
\No newline at end of file