UNPKG

10.3 kBJavaScriptView Raw
1'use strict';
2
3var fs = require('fs');
4var Joi = require('joi');
5var net = require('net');
6var Request = require('request');
7var Rx = require('rx');
8require('core-js/modules/es6.string.starts-with');
9
10var fstat = Rx.Observable.fromNodeCallback(fs.stat);
11var head = Rx.Observable.fromNodeCallback(Request.head);
12var get = Rx.Observable.fromNodeCallback(Request.get);
13
14var WAIT_ON_SCHEMA = Joi.object().keys({
15 resources: Joi.array().items(Joi.string().required()),
16 delay: Joi.number().integer().min(0).default(0),
17 interval: Joi.number().integer().min(0).default(250),
18 log: Joi.boolean().default(false),
19 reverse: Joi.boolean().default(false),
20 timeout: Joi.number().integer().min(0).default(Infinity),
21 verbose: Joi.boolean().default(false),
22 window: Joi.number().integer().min(0).default(750),
23
24 // http options
25 ca: [Joi.string(), Joi.binary()],
26 cert: [Joi.string(), Joi.binary()],
27 key: [Joi.string(), Joi.binary()],
28 passphrase: Joi.string(),
29 auth: Joi.object().keys({
30 user: Joi.string(),
31 username: Joi.string(),
32 password: Joi.string(),
33 pass: Joi.string()
34 }),
35 httpSignature: Joi.object().keys({
36 keyId: Joi.string().required(),
37 key: Joi.string().required()
38 }),
39 strictSSL: Joi.boolean(),
40 followAllRedirects: Joi.boolean(),
41 followRedirect: Joi.boolean()
42});
43
44/**
45 Waits for resources to become available before calling callback
46
47 Polls file, http(s), tcp ports, sockets for availability.
48
49 Resource types are distinquished by their prefix with default being `file:`
50 - file:/path/to/file - waits for file to be available and size to stabilize
51 - http://foo.com:8000/bar verifies HTTP HEAD request returns 2XX
52 - https://my.bar.com/cat verifies HTTPS HEAD request returns 2XX
53 - http-get: - HTTP GET returns 2XX response. ex: http://m.com:90/foo
54 - https-get: - HTTPS GET returns 2XX response. ex: https://my/bar
55 - tcp:my.server.com:3000 verifies a service is listening on port
56 - socket:/path/sock verifies a service is listening on (UDS) socket
57
58 @param opts object configuring waitOn
59 @param opts.resources array of string resources to wait for. prefix determines the type of resource with the default type of `file:`
60 @param opts.delay integer - optional initial delay in ms, default 0
61 @param opts.interval integer - optional poll resource interval in ms, default 250ms
62 @param opts.log boolean - optional flag to turn on logging to stdout
63 @param opts.timeout integer - optional timeout in ms, default Infinity. Aborts with error.
64 @param opts.verbose boolean - optional flag to turn on debug output
65 @param opts.window integer - optional stabilization time in ms, default 750ms. Waits this amount of time for file sizes to stabilize or other resource availability to remain unchanged. If less than interval then will be reset to interval
66 @param cb callback function with signature cb(err) - if err is provided then, resource checks did not succeed
67 */
68function waitOn(opts, cb) {
69 var validResult = Joi.validate(opts, WAIT_ON_SCHEMA);
70 if (validResult.error) { return cb(validResult.error); }
71 opts = validResult.value; // use defaults
72
73 if (opts.window < opts.interval) {
74 opts.window = opts.interval; // it needs to be at least interval
75 }
76
77 var output = (opts.verbose) ?
78 console.log.bind() :
79 function () { };
80
81 var log = (opts.log) ?
82 console.log.bind() :
83 function () { };
84
85 var lastWaitForOutput; // the resources last known to be waiting for
86
87 var timeoutTimer = null;
88 if (opts.timeout !== Infinity) {
89 timeoutTimer = setTimeout(function () {
90 log('wait-on(%s) timed out waiting for: %s; exiting with error', process.pid, lastWaitForOutput);
91 cb(new Error('Timeout'))
92 }, opts.timeout);
93 }
94
95 function cleanup(err) {
96 if (timeoutTimer) {
97 clearTimeout(timeoutTimer);
98 timeoutTimer = null;
99 }
100 if (cb) {
101 cb(err);
102 cb = null; // only call once
103 }
104 }
105
106 /* Stability checking occurs by using an Rx window,
107 It waits until all of the vals from the resources are >=0,
108 then it waits for a window which has no changes
109 (duplicate outputs are filtered by distinctUntilChanged)
110 */
111
112 var lastValues = null;
113 var src = Rx.Observable.timer(opts.delay, opts.interval)
114 .concatMap(
115 function (counter) {
116 return Rx.Observable.from(opts.resources)
117 .concatMap(
118 function (resource, i) {
119 return create$(resource, opts);
120 },
121 function (resource, obj) {
122 return { resource: resource, val: obj.val, data: obj.data };
123 }
124 ).reduce(function (acc, x) {
125 acc[x.resource] = x.val;
126 return acc;
127 }, {});
128 }
129 )
130 .map(function (values) {
131 lastValues = values; // save lastValues for later ref
132 return values;
133 })
134 .distinctUntilChanged()
135 .windowWithTime(opts.window);
136
137 function lastValuesAllAvailable() {
138 if (!lastValues) { return false; }
139 var notReady = opts.resources
140 .filter(function (k) {
141 var lastValue = lastValues[k];
142 var result = (typeof lastValue !== 'number' || lastValue < 0);
143 return opts.reverse ? !result : result;
144 });
145
146 // only output when changes
147 var notReadyString = notReady.join(', ');
148 if (notReadyString && notReadyString !== lastWaitForOutput) {
149 log('wait-on(%s) waiting for: %s', process.pid, notReadyString);
150 lastWaitForOutput = notReadyString;
151 }
152
153 return !(notReady.length);
154 }
155
156 var subsc = src.subscribe(
157 function (child) {
158 var childSub = child.toArray().subscribe(
159 function (x) {
160 output('child next', x);
161 if (lastValuesAllAvailable() && !x.length) {
162 output('stabilized');
163 log('wait-on(%s) exiting successfully found all: %s', process.pid, opts.resources.join(', '));
164 childSub.dispose();
165 subsc.dispose();
166 cleanup();
167 }
168 },
169 function (err) {
170 output('child err', err);
171 },
172 function () {
173 output('child complete');
174 }
175 );
176 },
177 function (err) {
178 output('err: ', err);
179 log('wait-on(%s) exiting with error', process.pid, err);
180 cleanup(err);
181 },
182 function () {
183 output('complete');
184 cleanup();
185 }
186 );
187
188}
189
190function parseHttpOptions(options) {
191 if (options === undefined) return {}
192 var valid = [
193 // https://github.com/request/request/tree/c289759d10ebd76ff4138e81b39c81badde6e274#requestoptions-callback
194 'auth', 'httpSignature', 'followRedirect', 'followAllRedirects', 'strictSSL',
195 // https://github.com/request/request/tree/c289759d10ebd76ff4138e81b39c81badde6e274#tlsssl-protocol
196 'cert', 'key', 'passphrase', 'ca'
197 ];
198
199 var parsed = {};
200 valid.forEach(function (validOpt) {
201 if (options[validOpt] !== undefined) {
202 parsed[validOpt] = options[validOpt];
203 }
204 });
205 return parsed;
206}
207
208
209function create$(resource, options) {
210 if (resource.startsWith('http:')) {
211 return createHttp$(resource, options);
212 } else if (resource.startsWith('http-get:')) {
213 return createHttpGet$('http:' + resource.slice('http-get:'.length), options);
214 } else if (resource.startsWith('https:')) {
215 return createHttp$(resource, options);
216 } else if (resource.startsWith('https-get:')) {
217 return createHttpGet$('https:' + resource.slice('https-get:'.length), options);
218 } else if (resource.startsWith('tcp:')) {
219 return createTcp$(resource.slice('tcp:'.length));
220 } else if (resource.startsWith('socket:')) {
221 return createSocket$(resource.slice('socket:'.length));
222 } else if (resource.startsWith('file:')) {
223 return createFile$(resource.slice('file:'.length));
224 } else { // default to file
225 return createFile$(resource);
226 }
227}
228
229function createFile$(file) {
230 return Rx.Observable.catch(
231 fstat(file),
232 Rx.Observable.just({ size: -1 }) // fake stat when doesn't exist
233 )
234 .map(function (stat) {
235 return {
236 val: stat.size, // key comparator used
237 data: stat // additional data for debugging
238 };
239 });
240}
241
242function createHttp$(url, options) {
243 return Rx.Observable.catch(
244 head(url, parseHttpOptions(options)),
245 Rx.Observable.just([{statusCode: 999}])
246 )
247 .map(function (response) {
248 // Why is response in array here?
249 var statusCode = response[0].statusCode;
250 return {
251 // request will handle redirects before returning
252 // anything but 2XX is a failure
253 val: (statusCode >= 200 && statusCode <= 299) ?
254 statusCode :
255 -1 * statusCode,
256 data: response[0]
257 };
258 });
259}
260
261function createHttpGet$(url, options) {
262 return Rx.Observable.catch(
263 get(url, parseHttpOptions(options)),
264 Rx.Observable.just([{statusCode: 999}])
265 )
266 .map(function (response) {
267 // Why is response in array here?
268 var statusCode = response[0].statusCode;
269 return {
270 // request will handle redirects before returning
271 // anything but 2XX is a failure
272 val: (statusCode >= 200 && statusCode <= 299) ?
273 statusCode :
274 -1 * statusCode,
275 data: response[0]
276 };
277 });
278}
279
280function createTcp$(hostAndPort) {
281 var arrParts = hostAndPort.split(':');
282 var port = arrParts[arrParts.length - 1];
283 var host = arrParts[arrParts.length - 2] || 'localhost';
284 return Rx.Observable.create(function (observer) {
285 var conn = net.connect(port, host)
286 .on('error', function (err) {
287 observer.onNext({ val: -1, err: err });
288 observer.onCompleted();
289 })
290 .on('connect', function () {
291 observer.onNext({ val: 1 });
292 observer.onCompleted();
293 conn.end();
294 });
295 });
296}
297
298function createSocket$(socketPath) {
299 return Rx.Observable.create(function (observer) {
300 var conn = net.connect(socketPath)
301 .on('error', function (err) {
302 observer.onNext({ val: -1, err: err });
303 observer.onCompleted();
304 })
305 .on('connect', function () {
306 observer.onNext({ val: 1 });
307 observer.onCompleted();
308 conn.end();
309 });
310 });
311}
312
313// TODO create other observables like file, return val int >= 0 for available and data for debugging
314
315module.exports = waitOn;