UNPKG

6.84 kBJavaScriptView Raw
1"use strict";
2var async = require("async"), mm = require("micromatch");
3/**
4 * Tunnels are runnable work flow units that can watch nests.
5 */
6var Tunnel = (function () {
7 function Tunnel(e, theName) {
8 this.match_obj = {
9 queue: [],
10 run: null,
11 pattern: null,
12 orphan_minutes: null
13 };
14 this.e = e;
15 this.nests = [];
16 this.name = theName;
17 this.run_list = [];
18 this.run_sync_list = [];
19 this.job_counter = 0;
20 }
21 Tunnel.prototype.toString = function () {
22 return "Tunnel";
23 };
24 Tunnel.prototype.getName = function () {
25 return this.name;
26 };
27 Tunnel.prototype.getNests = function () {
28 return this.nests;
29 };
30 Tunnel.prototype.getRunList = function () {
31 return this.run_list;
32 };
33 Tunnel.prototype.getRunSyncList = function () {
34 return this.run_sync_list;
35 };
36 /**
37 * Instructs the tunnel to watch a nest for new jobs.
38 * @param nest
39 */
40 Tunnel.prototype.watch = function (nest) {
41 nest.register(this);
42 nest.load();
43 nest.watch();
44 this.nests.push(nest);
45 };
46 Tunnel.prototype.arrive = function (job, nest) {
47 this.job_counter++;
48 this.e.log(1, "Job " + this.job_counter + " triggered tunnel arrive.", this, [job, nest]);
49 this.executeRun(job, nest);
50 this.executeRunSync(job, nest);
51 this.executeMatch(job, nest);
52 };
53 /**
54 * Run program logic asynchronously.
55 * @param callback
56 */
57 Tunnel.prototype.run = function (callback) {
58 this.run_list.push(callback);
59 };
60 /**
61 * Run program logic synchronously.
62 * @param callback
63 */
64 Tunnel.prototype.runSync = function (callback) {
65 this.run_sync_list.push(callback);
66 };
67 /**
68 * Failed jobs runner.
69 * @param callback
70 */
71 Tunnel.prototype.fail = function (callback) {
72 this.run_fail = callback;
73 };
74 /**
75 * Asynchronous run event.
76 * @param job
77 * @param nest
78 */
79 Tunnel.prototype.executeRun = function (job, nest) {
80 var tn = this;
81 if (tn.run_list.length > 0) {
82 tn.run_list.forEach(function (callback) {
83 try {
84 callback(job, nest);
85 }
86 catch (e) {
87 // Fail if an error is thrown
88 tn.executeFail(job, nest, e);
89 }
90 });
91 }
92 };
93 /**
94 * Synchronous run event.
95 * @param job
96 * @param nest
97 */
98 Tunnel.prototype.executeRunSync = function (job, nest) {
99 var tn = this;
100 var breakFailure = false;
101 var successfulRuns = 0;
102 if (tn.run_sync_list.length > 0) {
103 async.eachSeries(tn.run_sync_list, function (run, doNextRun) {
104 if (breakFailure === false) {
105 run(job, nest, function () {
106 successfulRuns++;
107 doNextRun();
108 });
109 }
110 }, function (err) {
111 if (err) {
112 breakFailure = true;
113 tn.executeFail(job, nest, err);
114 }
115 tn.e.log(0, "Completed " + successfulRuns + "/" + tn.getRunSyncList().length + " synchronous run list(s).", tn, [job, nest]);
116 });
117 }
118 };
119 /**
120 * Fail run event.
121 * @param job
122 * @param nest
123 * @param reason
124 */
125 Tunnel.prototype.executeFail = function (job, nest, reason) {
126 var tn = this;
127 tn.e.log(3, "Failed for reason \"" + reason + "\".", tn, [job, nest]);
128 tn.run_fail(job, nest, reason);
129 };
130 /**
131 * Interface for matching two or more files together based on an array of glob filename patterns.
132 * @param pattern
133 * @param orphanMinutes
134 * @param callback
135 */
136 Tunnel.prototype.match = function (pattern, orphanMinutes, callback) {
137 this.match_obj.pattern = pattern;
138 this.match_obj.orphan_minutes = orphanMinutes;
139 this.match_obj.run = callback;
140 };
141 /**
142 * Match execution
143 * @param job
144 * @param nest
145 */
146 Tunnel.prototype.executeMatch = function (job, nest) {
147 if (this.match_obj.run) {
148 // Try to find match in queue
149 var tn_1 = this;
150 var qjob_pattern_match_result_1, job_pattern_match_result_1;
151 var matched_jobs_1 = [];
152 var job_base_1, qjob_base_1;
153 tn_1.e.log(0, "Executing matching process.", tn_1);
154 tn_1.match_obj.pattern.forEach(function (pattern, i) {
155 job_pattern_match_result_1 = mm.isMatch(job.getName(), pattern);
156 if (job_pattern_match_result_1 === true) {
157 job_base_1 = job.getName().substr(0, job.getName().indexOf(pattern.replace("*", "")));
158 }
159 });
160 tn_1.match_obj.queue.slice().reverse().forEach(function (qJob, qIndex, qObject) {
161 tn_1.match_obj.pattern.forEach(function (pattern) {
162 qjob_pattern_match_result_1 = mm.isMatch(qJob.getName(), pattern);
163 if (qjob_pattern_match_result_1 === true) {
164 qjob_base_1 = qJob.getName().substr(0, qJob.getName().indexOf(pattern.replace("*", "")));
165 if (job_base_1 === qjob_base_1) {
166 // Pull out qjob from queue
167 tn_1.match_obj.queue.splice(qObject.length - 1 - qIndex, 1);
168 matched_jobs_1.push(qJob);
169 return;
170 }
171 }
172 });
173 });
174 // if found
175 if (matched_jobs_1.length > 0) {
176 matched_jobs_1.push(job);
177 tn_1.e.log(0, "Matched " + matched_jobs_1.length + " jobs.", tn_1);
178 tn_1.match_obj.run(matched_jobs_1);
179 }
180 else {
181 // If not found, add this job to the queue
182 tn_1.match_obj.queue.push(job);
183 }
184 // Need to set a timeout for the job to fail
185 setTimeout(function () {
186 if (tn_1.match_obj.queue.length !== 0) {
187 // Fail all jobs in the queue
188 tn_1.match_obj.queue.forEach(function (fJob) {
189 fJob.fail("Orphan timeout.");
190 });
191 // Wipe the queue
192 tn_1.e.log(0, "Orphan timeout executed on " + tn_1.match_obj.queue.length + " jobs.", tn_1);
193 tn_1.match_obj.queue = [];
194 }
195 }, tn_1.match_obj.orphan_minutes * 60000);
196 }
197 };
198 return Tunnel;
199}());
200exports.Tunnel = Tunnel;