UNPKG

16.6 kBJavaScriptView Raw
1'use strict';
2
3const basicAuth = require('basic-auth');
4const exec = require('child_process').exec;
5const url = require('url');
6const crypto = require('crypto');
7const util = require('util');
8const request = require('request');
9const EventEmitter = require('events').EventEmitter;
10const version = require('../package.json').version;
11
12let debug;
13let environments = {
14 production: 'https://pubsub.platform.axway.com',
15 preproduction: 'https://pubsub.axwaytest.net'
16};
17let fingerprint;
18let pendingChecks = [];
19
20/**
21 * Checks process.env flags to determine if env is preproduction or developement.
22 * @return {Boolean} flag if preproduction or development env
23 */
24function isPreproduction() {
25 return process.env.NODE_ACS_URL
26 && process.env.NODE_ACS_URL.includes('.appctest.com')
27 || process.env.NODE_ENV === 'preproduction'
28 || process.env.APPC_ENV === 'preproduction'
29 || process.env.NODE_ENV === 'development'
30 || process.env.APPC_ENV === 'development';
31}
32
33/**
34 * Returns sha1 digested value.
35 * @param {String} value value to digest
36 * @returns {String} digested value
37 */
38function sha1(value) {
39 return crypto
40 .createHash('sha1')
41 .update(value)
42 .digest('hex');
43}
44
45/**
46 * Gets unique fingerprint for the machine (hashed) which is used for server-side client id tracking.
47 * @param {Function} callback the callback function
48 * @param {String} append append to command
49 * @return {void}
50 */
51function getComputerFingerprint(callback, append) {
52 if (fingerprint) {
53 return callback && callback(null, fingerprint);
54 }
55
56 let cmd;
57 switch (process.platform) {
58 case 'darwin':
59 // serial number + uuid is a good fingerprint
60 cmd = 'ioreg -c IOPlatformExpertDevice -d 2 | awk -F\\" \'/IOPlatformSerialNumber|IOPlatformUUID/{ print $(NF-1) }\';';
61 debug('running:', cmd);
62 return exec(cmd, function (err, stdout) {
63 if (err) {
64 return callback(err);
65 }
66 fingerprint = sha1(stdout + process.pid);
67 callback && callback(null, fingerprint);
68 });
69 case 'win32':
70 case 'windows':
71 cmd = 'reg query HKLM\\Software\\Microsoft\\Cryptography /v MachineGuid';
72 if (append) {
73 cmd += append;
74 }
75 debug('running:', cmd);
76 return exec(cmd, function (err, stdout) {
77 if (err && !append) {
78 debug('trying again, forcing it to use 64bit registry view');
79 return getComputerFingerprint(callback, ' /reg:64');
80 } else if (err) {
81 return callback(err);
82 }
83 let tokens = stdout.trim().split(/\s/),
84 serial = tokens[tokens.length - 1];
85 fingerprint = sha1(serial + process.pid);
86 callback && callback(null, fingerprint);
87 });
88 case 'linux':
89 cmd = 'ifconfig | grep eth0 | grep -i hwaddr | awk \'{print $1$5}\' | sed \'s/://g\' | xargs echo | sed \'s/ //g\'';
90 debug('running:', cmd);
91 return exec(cmd, function (err, stdout) {
92 if (err) {
93 return callback(err);
94 }
95 let serial = stdout.trim();
96 fingerprint = sha1(serial + process.pid);
97 callback && callback(null, fingerprint);
98 });
99 default:
100 return callback(new Error('Unknown platform:' + process.platform));
101 }
102}
103
104/**
105 * Class constructor
106 *
107 * @class PubSubClient
108 * @param {Object} opts options for configuring the client
109 */
110function PubSubClient(opts) {
111 opts = opts || {};
112
113 // Stub debug logging function and extend if enabled.
114 debug = function () {};
115 opts.debug && (debug = function () {
116 let args = Array.prototype.slice.call(arguments);
117 args.unshift('appc:pubsub');
118 console.log.apply(this, args);
119 });
120
121 // prefer the environment settings over config
122 let env = opts.env || (isPreproduction() ? 'preproduction' : 'production');
123 this.url = opts.url || environments[env] || environments.production;
124
125 // Require key and secret.
126 this.disabled = opts.disabled;
127 this.key = opts.key;
128 this.secret = opts.secret;
129
130 if (this.disabled) {
131 return;
132 }
133 if (!this.key) {
134 throw new Error('missing key');
135 }
136 if (!this.secret) {
137 throw new Error('missing secret');
138 }
139
140 this.timeout = opts.timeout || 10000;
141
142 getComputerFingerprint();
143 this.fetchConfig();
144 // These functions need the client binding for use as a middleware/route
145 this.authenticateWebhook = this.authenticateWebhook.bind(this);
146 this.handleWebhook = this.handleWebhook.bind(this);
147}
148
149util.inherits(PubSubClient, EventEmitter);
150
151/**
152 * Fetch client config from the server.
153 * @param {Function} callback the callback function
154 */
155PubSubClient.prototype.fetchConfig = function () {
156 // some random data to sign for the signature
157 let data = {},
158 opts = {
159 url: url.resolve(this.url, '/api/client/config'),
160 method: 'get',
161 json: data,
162 headers: {
163 'User-Agent': 'Appcelerator PubSub Client/' + version + ' (' + fingerprint + ')',
164 APIKey: this.key,
165 APISig: crypto.createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
166 },
167 gzip: true,
168 timeout: this.timeout,
169 followAllRedirects: true,
170 rejectUnauthorized: this.url.includes(environments.production.split('.').slice(-2).join('.'))
171 };
172 debug('fetching client config');
173 this.config = {};
174 request(opts, function (err, resp, body) {
175 if (err) {
176 return debug('error', err);
177 }
178 let data = body && body[body.key];
179
180 if (!data || resp.statusCode !== 200) {
181 let err = new Error('invalid response');
182 err.code = resp.statusCode;
183 // if 401 that means the apikey, secret is wrong. disable before raising an error
184 if (resp.statusCode === 401) {
185 err.message = 'Unauthorized';
186 this.emit('unauthorized', err, opts);
187 }
188 return debug('error', err, resp.statusCode, resp.body);
189 }
190
191 this._parseConfig(data);
192
193 this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
194 this.emit('configured', this.config);
195 debug('got config', this.config);
196 }.bind(this));
197};
198
199PubSubClient.prototype._parseConfig = function (data) {
200 if (data.can_consume) {
201 // Extract topic from keys of event map.
202 data.topics = Object.keys(data.events || {});
203
204 // Get basic auth creds from the url
205 if (data.auth_type === 'basic' && data.url) {
206 let details = (url.parse(data.url) || '').auth.split(':');
207 data.auth_user = details[0];
208 data.auth_pass = details[1];
209 }
210 }
211 this.config = data;
212};
213
214/**
215 * Authenticates a webhook request as being from pubsub server. Can be used as middleware.
216 * @param {http.ClientRequest} req request object containing auth details
217 * @param {http.ServerResponse} [res] response object for responding with errors
218 * @param {Function} [next] optional callback function for use in middleware
219 * @return {Boolean} whether the request is authenticated
220 */
221PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
222 if (req._authenticatedWebhook) {
223 next && next();
224 return true;
225 }
226 // Make sure the client has consumption enabled
227 if (!this.config.can_consume) {
228 res && res.writeHead(400, { 'Content-Type': 'application/json' });
229 res && res.end(JSON.stringify({
230 success: false,
231 message: 'This client does not have consumption enabled.'
232 }));
233 return false;
234 }
235 debug('authenticating webhook using: method =', this.config.auth_type);
236
237 let conf = this.config,
238 headers = req && req.headers || {},
239 user = basicAuth(req),
240 // Validate request using clients authentication method
241 authenticated
242 // Check the basic auth credentials match...
243 = conf.auth_type === 'basic'
244 ? user.name === conf.auth_user && user.pass === conf.auth_pass
245 // ...or the request has the correct auth token
246 : conf.auth_type === 'token'
247 ? headers['x-auth-token'] === this.config.auth_token
248 // ...or the signature matches the body signed with the client secret
249 : conf.auth_type === 'key_secret'
250 ? headers['x-signature'] === crypto.createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
251 // ...otherwise there's no authentication for the client
252 : true;
253
254 // Make sure the request is from pubsub server
255 if (!authenticated) {
256 debug('webhook authentication failed', headers);
257 res && res.writeHead(401, { 'Content-Type': 'application/json' });
258 res && res.end(JSON.stringify({
259 success: false,
260 message: 'Unauthorized'
261 }));
262 return false;
263 }
264 req._authenticatedWebhook = true;
265 next && next();
266 return true;
267};
268
269/**
270 * Webhook handler route that exposes events using the EventEmitter pattern.
271 * @param {http.ClientRequest} req Request object
272 * @param {http.ServerResponse} res Response object
273 */
274PubSubClient.prototype.handleWebhook = function (req, res) {
275 // Make sure the request has been authenticated
276 if (!this.authenticateWebhook(req, res)) {
277 return;
278 }
279
280 let topic = req.body.topic;
281 debug('event received', topic, req.body);
282
283 // Search for any configured regex matches and emit using those too
284 if (this.getSubscribedTopic(topic)) {
285 debug('emitting event:' + topic);
286 this.emit('event:' + topic, req.body);
287 }
288
289 res.writeHead(200, { 'Content-Type': 'application/json' });
290 res.end(JSON.stringify({ success: true }));
291};
292
293/**
294 * Validates that event name is in client's subscribed topics (or provided topic list).
295 * @param {String} name topic/event name
296 * @param {Array} topics (optional) set of topics to validate against, defaults to this.config.topics
297 * @returns {Boolean} true if event matched topics
298 */
299PubSubClient.prototype.getSubscribedTopic = function (name, topics) {
300 // Event names are prefixed, so strip it.
301 name = name.replace('event:', '');
302 // Add internal events since they will be emitted.
303 let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
304 return validTopics.find(topic => {
305 // Name matches topic
306 if (topic === name) {
307 return topic;
308 }
309 // Fall out if exact match missed and topic does not have wildcard.
310 if (!topic.includes('*')) {
311 return null;
312 }
313 let eventSegments = name.split('.');
314 let topicSegments = topic.split('.');
315 // Fall out if topic is not double-splatted and segment counts do not match.
316 if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
317 return null;
318 }
319 // Check if name matches topic segment checks.
320 return topicSegments.reduce(function (m, segment, i) {
321 return m && (
322 // segment matched
323 segment === eventSegments[i]
324 // segment was wildcarded
325 || segment === '*'
326 // segment was terminus and double-splatted
327 || (segment === '**' && i === topicSegments.length - 1)
328 );
329 }, true);
330 }) || null;
331};
332
333/**
334 * Returns fingerprint if set, or passes to callback, or generates fingerprint.
335 * @param {Function} callback the callback function
336 * @return {String} fingerprint
337 */
338PubSubClient.prototype.getFingerprint = function (callback) {
339 if (fingerprint && callback) {
340 return callback(null, fingerprint);
341 } else if (fingerprint) {
342 return fingerprint;
343 } else if (callback) {
344 return getComputerFingerprint(callback);
345 }
346 throw new Error('fingerprint has not yet been generated. invoke this function with a callback');
347};
348
349function serialize(obj, seen) {
350 if (!obj || typeof(obj) !== 'object') {
351 return obj;
352 }
353 if (obj instanceof RegExp) {
354 return obj.source;
355 }
356 if (obj instanceof Date) {
357 return obj;
358 }
359 Object.keys(obj).forEach(function (key) {
360 var value = obj[key],
361 t = typeof(value);
362 if (t === 'function') {
363 delete obj[key];
364 } else if (/^(password|creditcard)/.test(key)) {
365 // the server side does masking as well, but doesn't hurt to do it at origin
366 obj[key] = '[HIDDEN]';
367 } else if (value instanceof Date) {
368 // do nothing
369 } else if (value instanceof RegExp) {
370 obj[key] = value.source;
371 } else if (t === 'object') {
372 if (seen.includes(value)) {
373 value = '[Circular]';
374 } else {
375 seen.push(value);
376 value = serialize(value, seen);
377 }
378 obj[key] = value;
379 }
380 });
381 return obj;
382}
383
384/**
385 * publish an event with name and optional data
386 * @param {String} name name of the event
387 * @param {Object} data optional event payload or undefined/null if no event data
388 * @param {Object} options the options object
389 * @return {void}
390 */
391PubSubClient.prototype.publish = function (name, data, options) {
392 if (this.disabled) {
393 return;
394 }
395 debug('publish', name);
396 if (!name) {
397 throw new Error('required event name');
398 }
399 if (Buffer.byteLength(name) > 255) {
400 throw new Error('name length must be less than 255 bytes');
401 }
402 if (data && typeof(data) !== 'object') {
403 throw new Error('data must be an object');
404 }
405 // Clone data before serialization pass so objects are not modified.
406 try {
407 data = JSON.parse(JSON.stringify(data || {}));
408 } catch (e) {
409 throw new Error('data could not be cloned');
410 }
411 this._send({
412 event: name,
413 data: serialize(data, []),
414 options: options
415 });
416};
417
418/**
419 * Retry event.
420 *
421 * @private
422 * @param {Object} data the data object
423 * @param {String} reason the retry reason
424 * @param {Object} opts the options object
425 */
426PubSubClient.prototype._retry = function (data, reason, opts) {
427 debug('retry called', reason, opts);
428 if (this.disabled) {
429 debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
430 return;
431 }
432 // run again with a small backoff each time
433 setTimeout(() => this._send(data), Math.max(500, this.retry * 500));
434 this.emit('retry', reason, opts, this.retry);
435};
436
437/**
438 * Sending event to the server.
439 *
440 * @private
441 * @param {Object} data the data object
442 * @return {void}
443 */
444PubSubClient.prototype._send = function (data) {
445 if (this.disabled) {
446 return false;
447 }
448
449 debug('_send', data);
450 let self = this;
451 if (!fingerprint) {
452 // fetch the fingerprint and re-run this method again
453 getComputerFingerprint(() => this._send.apply(this, arguments));
454 return false;
455 }
456
457 this.retry = (this.retry || 0) + 1;
458 // shouldn't get here, but empty data slot
459 if (!data) {
460 return;
461 }
462
463 this._sending = true;
464 let ticket = Date.now();
465 this._sendingTS = ticket;
466
467 let opts = {
468 url: url.resolve(this.url, '/api/event'),
469 method: 'post',
470 json: data,
471 headers: {
472 'User-Agent': 'Appcelerator PubSub Client/' + version + ' (' + fingerprint + ')',
473 APIKey: this.key,
474 APISig: crypto.createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
475 },
476 gzip: true,
477 timeout: this.timeout,
478 followAllRedirects: true,
479 rejectUnauthorized: this.url.includes(environments.production.split('.').slice(-2).join('.'))
480 };
481
482 try {
483 debug('sending web event', opts);
484 let req = request(opts);
485 // handle response
486 req.on('response', function (resp) {
487 // check current and if the same, change state, otherwise a new event has come
488 // in since we got here
489 if (ticket === self._sendingTS) {
490 self._sending = false;
491 }
492 debug('received web response', resp && resp.statusCode);
493 if (resp && resp.statusCode !== 200) {
494 // an error which isn't a security error, try to push again
495 if (resp.statusCode && !(/^(400|401)$/).test(resp.statusCode)) {
496 return self._retry(data, resp.statusCode);
497 }
498 let err = new Error('invalid response');
499 err.code = resp.statusCode;
500 // if 401 that means the apikey, secret is wrong. disable before raising an error
501 if (resp.statusCode === 401) {
502 err.message = 'Unauthorized';
503 self.emit('unauthorized', err, opts);
504 }
505 debug('error', err, resp.statusCode, resp.body);
506 } else if (resp) {
507 // reset our retry count on successfully sending
508 self.retry = 0;
509 // emit an event
510 self.emit('response', resp, opts);
511 debug('response received, status:', resp.statusCode, 'opts:', opts);
512 }
513 });
514 // handle HTTP errors
515 req.on('error', function (err) {
516 debug('web request received error', err, opts);
517 // check current and if the same, change state, otherwise a new event has come
518 // in since we got here
519 if (ticket === self._sendingTS) {
520 self._sending = false;
521 }
522 return self._retry(data, err.code, opts);
523 });
524 } catch (E) {
525 debug('web request received error', E, opts);
526 self._retry(data, E.code, opts);
527 }
528};
529
530let on = PubSubClient.prototype.on;
531PubSubClient.prototype.on = function (name) {
532 debug('on', name);
533 // If the topics have been fetched then we can attempt to warn about events
534 // that aren't configured to be received by this client
535 if (!this.configued) {
536 pendingChecks.push(name);
537 } else {
538 this._validateTopic(name);
539 }
540 return on.apply(this, arguments);
541};
542
543PubSubClient.prototype._validateTopic = function (name) {
544 if (!this.getSubscribedTopic(name)) {
545 debug('Unexpected event', name, ': client not configured to receive this event');
546 }
547};
548
549/**
550 * Stub deprecated close function.
551 */
552PubSubClient.prototype.close = function () {
553 debug('pubsub.close function has been deprecated. Websocket connections are no longer supported.');
554};
555
556module.exports = PubSubClient;