UNPKG

11.6 kBJavaScriptView Raw
1'use strict';
2
3var fs = require('fs');
4var Joi = require('@hapi/joi');
5var net = require('net');
6var Request = require('request');
7var Rx = require('rx');
8require('core-js/modules/es6.string.starts-with');
9
10var fstat = Rx.Observable.fromNodeCallback(fs.stat);
11var head = Rx.Observable.fromNodeCallback(Request.head);
12var get = Rx.Observable.fromNodeCallback(Request.get);
13
14var WAIT_ON_SCHEMA = Joi.object().keys({
15 resources: Joi.array().items(Joi.string().required()),
16 delay: Joi.number().integer().min(0).default(0),
17 httpTimeout: Joi.number().integer().min(0),
18 interval: Joi.number().integer().min(0).default(250),
19 log: Joi.boolean().default(false),
20 reverse: Joi.boolean().default(false),
21 timeout: Joi.number().integer().min(0).default(Infinity),
22 verbose: Joi.boolean().default(false),
23 window: Joi.number().integer().min(0).default(750),
24 tcpTimeout: Joi.number().integer().min(0).default(300),
25
26 // http options
27 ca: [Joi.string(), Joi.binary()],
28 cert: [Joi.string(), Joi.binary()],
29 key: [Joi.string(), Joi.binary()],
30 passphrase: Joi.string(),
31 auth: Joi.object().keys({
32 user: Joi.string(),
33 username: Joi.string(),
34 password: Joi.string(),
35 pass: Joi.string()
36 }),
37 httpSignature: Joi.object().keys({
38 keyId: Joi.string().required(),
39 key: Joi.string().required()
40 }),
41 strictSSL: Joi.boolean(),
42 followAllRedirects: Joi.boolean(),
43 followRedirect: Joi.boolean(),
44 headers: Joi.object()
45});
46
47/**
48 Waits for resources to become available before calling callback
49
50 Polls file, http(s), tcp ports, sockets for availability.
51
52 Resource types are distinquished by their prefix with default being `file:`
53 - file:/path/to/file - waits for file to be available and size to stabilize
54 - http://foo.com:8000/bar verifies HTTP HEAD request returns 2XX
55 - https://my.bar.com/cat verifies HTTPS HEAD request returns 2XX
56 - http-get: - HTTP GET returns 2XX response. ex: http://m.com:90/foo
57 - https-get: - HTTPS GET returns 2XX response. ex: https://my/bar
58 - tcp:my.server.com:3000 verifies a service is listening on port
59 - socket:/path/sock verifies a service is listening on (UDS) socket
60
61 @param opts object configuring waitOn
62 @param opts.resources array of string resources to wait for. prefix determines the type of resource with the default type of `file:`
63 @param opts.delay integer - optional initial delay in ms, default 0
64 @param opts.httpTimeout integer - optional http HEAD/GET timeout to wait for request, default 0
65 @param opts.interval integer - optional poll resource interval in ms, default 250ms
66 @param opts.log boolean - optional flag to turn on logging to stdout
67 @param opts.tcpTimeout - Maximum time in ms for tcp connect, default 300ms
68 @param opts.timeout integer - optional timeout in ms, default Infinity. Aborts with error.
69 @param opts.verbose boolean - optional flag to turn on debug output
70 @param opts.window integer - optional stabilization time in ms, default 750ms. Waits this amount of time for file sizes to stabilize or other resource availability to remain unchanged. If less than interval then will be reset to interval
71 @param cb optional callback function with signature cb(err) - if err is provided then, resource checks did not succeed
72 if not specified, wait-on will return a promise that will be rejected if resource checks did not succeed or resolved otherwise
73 */
74function waitOn(opts, cb) {
75 if (cb !== undefined) {
76 return waitOnImpl(opts, cb);
77 } else {
78 return new Promise(function (resolve, reject) {
79 waitOnImpl(opts, function(err) {
80 if (err) {
81 reject(err);
82 } else {
83 resolve();
84 }
85 })
86 });
87 }
88}
89
90function waitOnImpl(opts, cb) {
91 var validResult = Joi.validate(opts, WAIT_ON_SCHEMA);
92 if (validResult.error) { return cb(validResult.error); }
93 opts = validResult.value; // use defaults
94
95 if (opts.window < opts.interval) {
96 opts.window = opts.interval; // it needs to be at least interval
97 }
98
99 var output = (opts.verbose) ?
100 console.log.bind() :
101 function () { };
102
103 var log = (opts.log) ?
104 console.log.bind() :
105 function () { };
106
107 var lastWaitForOutput; // the resources last known to be waiting for
108
109 var timeoutTimer = null;
110 if (opts.timeout !== Infinity) {
111 timeoutTimer = setTimeout(function () {
112 log('wait-on(%s) timed out waiting for: %s; exiting with error', process.pid, lastWaitForOutput);
113 cb(new Error('Timeout'))
114 }, opts.timeout);
115 }
116
117 function cleanup(err) {
118 if (timeoutTimer) {
119 clearTimeout(timeoutTimer);
120 timeoutTimer = null;
121 }
122 if (cb) {
123 cb(err);
124 cb = null; // only call once
125 }
126 }
127
128 /* Stability checking occurs by using an Rx window,
129 It waits until all of the vals from the resources are >=0,
130 then it waits for a window which has no changes
131 (duplicate outputs are filtered by distinctUntilChanged)
132 */
133
134 var lastValues = null;
135 var src = Rx.Observable.timer(opts.delay, opts.interval)
136 .concatMap(
137 function (counter) {
138 return Rx.Observable.from(opts.resources)
139 .concatMap(
140 function (resource, i) {
141 return create$(resource, opts);
142 },
143 function (resource, obj) {
144 return { resource: resource, val: obj.val, data: obj.data };
145 }
146 ).reduce(function (acc, x) {
147 acc[x.resource] = x.val;
148 return acc;
149 }, {});
150 }
151 )
152 .map(function (values) {
153 lastValues = values; // save lastValues for later ref
154 return values;
155 })
156 .distinctUntilChanged()
157 .windowWithTime(opts.window);
158
159 function lastValuesAllAvailable() {
160 if (!lastValues) { return false; }
161 var notReady = opts.resources
162 .filter(function (k) {
163 var lastValue = lastValues[k];
164 var result = (typeof lastValue !== 'number' || lastValue < 0);
165 return opts.reverse ? !result : result;
166 });
167
168 // only output when changes
169 var notReadyString = notReady.join(', ');
170 if (notReadyString && notReadyString !== lastWaitForOutput) {
171 log('wait-on(%s) waiting for: %s', process.pid, notReadyString);
172 lastWaitForOutput = notReadyString;
173 }
174
175 return !(notReady.length);
176 }
177
178 var subsc = src.subscribe(
179 function (child) {
180 var childSub = child.toArray().subscribe(
181 function (x) {
182 output('child next', x);
183 if (lastValuesAllAvailable() && !x.length) {
184 output('stabilized');
185 log('wait-on(%s) exiting successfully found all: %s', process.pid, opts.resources.join(', '));
186 childSub.dispose();
187 subsc.dispose();
188 cleanup();
189 }
190 },
191 function (err) {
192 output('child err', err);
193 },
194 function () {
195 output('child complete');
196 }
197 );
198 },
199 function (err) {
200 output('err: ', err);
201 log('wait-on(%s) exiting with error', process.pid, err);
202 cleanup(err);
203 },
204 function () {
205 output('complete');
206 cleanup();
207 }
208 );
209
210}
211
212function parseHttpOptions(options) {
213 if (options === undefined) return {}
214 var valid = [
215 // https://github.com/request/request/tree/c289759d10ebd76ff4138e81b39c81badde6e274#requestoptions-callback
216 'auth', 'httpSignature', 'followRedirect', 'followAllRedirects', 'strictSSL', 'headers',
217 // https://github.com/request/request/tree/c289759d10ebd76ff4138e81b39c81badde6e274#tlsssl-protocol
218 'cert', 'key', 'passphrase', 'ca',
219 'httpTimeout' // needs to be renamed timeout for use with request
220 ];
221
222 var parsed = {};
223 valid.forEach(function (validOpt) {
224 if (options[validOpt] !== undefined) {
225 // for httpTimeout, need to use timeout as request param
226 if (validOpt === 'httpTimeout') {
227 parsed['timeout'] = options['httpTimeout'];
228 } else { // all other options match their keys for request
229 parsed[validOpt] = options[validOpt];
230 }
231 }
232 });
233 return parsed;
234}
235
236
237function create$(resource, options) {
238 if (resource.startsWith('http:')) {
239 return createHttp$(resource, options);
240 } else if (resource.startsWith('http-get:')) {
241 return createHttpGet$('http:' + resource.slice('http-get:'.length), options);
242 } else if (resource.startsWith('https:')) {
243 return createHttp$(resource, options);
244 } else if (resource.startsWith('https-get:')) {
245 return createHttpGet$('https:' + resource.slice('https-get:'.length), options);
246 } else if (resource.startsWith('tcp:')) {
247 return createTcp$(resource.slice('tcp:'.length), options);
248 } else if (resource.startsWith('socket:')) {
249 return createSocket$(resource.slice('socket:'.length));
250 } else if (resource.startsWith('file:')) {
251 return createFile$(resource.slice('file:'.length));
252 } else { // default to file
253 return createFile$(resource);
254 }
255}
256
257function createFile$(file) {
258 return Rx.Observable.catch(
259 fstat(file),
260 Rx.Observable.just({ size: -1 }) // fake stat when doesn't exist
261 )
262 .map(function (stat) {
263 return {
264 val: stat.size, // key comparator used
265 data: stat // additional data for debugging
266 };
267 });
268}
269
270function createHttp$(url, options) {
271 return Rx.Observable.catch(
272 head(url, parseHttpOptions(options)),
273 Rx.Observable.just([{statusCode: 999}])
274 )
275 .map(function (response) {
276 // Why is response in array here?
277 var statusCode = response[0].statusCode;
278 return {
279 // request will handle redirects before returning
280 // anything but 2XX is a failure
281 val: (statusCode >= 200 && statusCode <= 299) ?
282 statusCode :
283 -1 * statusCode,
284 data: response[0]
285 };
286 });
287}
288
289function createHttpGet$(url, options) {
290 return Rx.Observable.catch(
291 get(url, parseHttpOptions(options)),
292 Rx.Observable.just([{statusCode: 999}])
293 )
294 .map(function (response) {
295 // Why is response in array here?
296 var statusCode = response[0].statusCode;
297 return {
298 // request will handle redirects before returning
299 // anything but 2XX is a failure
300 val: (statusCode >= 200 && statusCode <= 299) ?
301 statusCode :
302 -1 * statusCode,
303 data: response[0]
304 };
305 });
306}
307
308function createTcp$(hostAndPort, options) {
309 var arrParts = hostAndPort.split(':');
310 var port = arrParts[arrParts.length - 1];
311 var host = arrParts[arrParts.length - 2] || 'localhost';
312 return Rx.Observable.create(function (observer) {
313 var conn = net.connect(port, host)
314 .on('error', function (err) {
315 observer.onNext({ val: -1, err: err });
316 observer.onCompleted();
317 })
318 .on('connect', function () {
319 observer.onNext({ val: 1 });
320 observer.onCompleted();
321 conn.end();
322 }).on('timeout', function () {
323 observer.onNext({ val: -1, err: {}});
324 observer.onCompleted();
325 conn.end();
326 });
327 conn.setTimeout(options.tcpTimeout);
328 });
329}
330
331function createSocket$(socketPath) {
332 return Rx.Observable.create(function (observer) {
333 var conn = net.connect(socketPath)
334 .on('error', function (err) {
335 observer.onNext({ val: -1, err: err });
336 observer.onCompleted();
337 })
338 .on('connect', function () {
339 observer.onNext({ val: 1 });
340 observer.onCompleted();
341 conn.end();
342 });
343 });
344}
345
346// TODO create other observables like file, return val int >= 0 for available and data for debugging
347
348module.exports = waitOn;