UNPKG

14.2 kBJavaScriptView Raw
1const { createHash, createHmac } = require('crypto');
2const { EventEmitter } = require('events');
3const { hostname } = require('os');
4const { inherits } = require('util');
5
6const axios = require('axios');
7const auth = require('basic-auth');
8
9const { version } = require('../package.json');
10
11const environments = {
12 production: 'https://pubsub.platform.axway.com',
13 preproduction: 'https://pubsub.axwaytest.net'
14};
15const fingerprint = createHash('sha256').update(hostname() || Date.now()).digest('hex');
16const preproductionEnvironments = [ 'preproduction', 'development' ];
17const userAgent = `appc-pubsub/${version} ${fingerprint})`;
18
19let pendingChecks = [];
20let debug;
21
22/**
23 * Class constructor
24 *
25 * @class PubSubClient
26 * @param {Object} opts options for configuring the client
27 */
28function PubSubClient(opts) {
29 opts = opts || {};
30
31 // Stub debug logging function and extend if enabled.
32 debug = function () {};
33 opts.debug && (debug = function () {
34 let args = Array.prototype.slice.call(arguments);
35 args.unshift('appc:pubsub');
36 console.log.apply(this, args);
37 });
38
39 // prefer the environment settings over config
40 let env = opts.env || (_isPreproduction() ? 'preproduction' : 'production');
41 this.url = opts.url || environments[env] || environments.production;
42
43 // Require key and secret.
44 this.disabled = opts.disabled;
45 this.key = opts.key;
46 this.secret = opts.secret;
47
48 if (this.disabled) {
49 return;
50 }
51 if (!this.key) {
52 throw new Error('missing key');
53 }
54 if (!this.secret) {
55 throw new Error('missing secret');
56 }
57
58 this.timeout = opts.timeout || 10000;
59 this.retryLimit = 10;
60
61 this.fetchConfig();
62
63 // These functions need the client binding for use as a middleware/route
64 this.authenticateWebhook = this.authenticateWebhook.bind(this);
65 this.handleWebhook = this.handleWebhook.bind(this);
66}
67
68inherits(PubSubClient, EventEmitter);
69
70/**
71 * Authenticates a webhook request as being from pubsub server. Can be used as middleware.
72 * @param {http.ClientRequest} req request object containing auth details
73 * @param {http.ServerResponse} [res] response object for responding with errors
74 * @param {Function} [next] optional callback function for use in middleware
75 * @return {Boolean} whether the request is authenticated
76 */
77PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
78 if (req._authenticatedWebhook) {
79 next && next();
80 return true;
81 }
82 // Make sure the client has consumption enabled
83 if (!this.config.can_consume) {
84 res && res.writeHead(400, { 'Content-Type': 'application/json' });
85 res && res.end(JSON.stringify({
86 success: false,
87 message: 'This client does not have consumption enabled.'
88 }));
89 return false;
90 }
91 debug('authenticating webhook using: method =', this.config.auth_type);
92
93 let conf = this.config,
94 headers = req && req.headers || {},
95 creds = auth(req),
96 // Validate request using clients authentication method
97 authenticated
98 // Check the basic auth credentials match...
99 = conf.auth_type === 'basic'
100 ? creds.name === conf.auth_user && creds.pass === conf.auth_pass
101 // ...or the request has the correct auth token
102 : conf.auth_type === 'token'
103 ? headers['x-auth-token'] === this.config.auth_token
104 // ...or the signature matches the body signed with the client secret
105 : conf.auth_type === 'key_secret'
106 ? headers['x-signature'] === createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
107 // ...otherwise there's no authentication for the client
108 : true;
109
110 // Make sure the request is from pubsub server
111 if (!authenticated) {
112 debug('webhook authentication failed', headers);
113 res && res.writeHead(401, { 'Content-Type': 'application/json' });
114 res && res.end(JSON.stringify({
115 success: false,
116 message: 'Unauthorized'
117 }));
118 return false;
119 }
120 req._authenticatedWebhook = true;
121 next && next();
122 return true;
123};
124
125/**
126 * Fetch client config from the server.
127 */
128PubSubClient.prototype.fetchConfig = function () {
129 const url = new URL('/api/client/config', this.url);
130 const opts = {
131 url: url.href,
132 headers: {
133 'user-agent': userAgent,
134 APIKey: this.key,
135 APISig: createHmac('SHA256', this.secret).update('{}').digest('base64')
136 },
137 timeout: this.timeout
138 };
139 debug('fetching client config');
140 this.config = {};
141 axios(opts)
142 .then(resp => {
143 const data = resp.data && resp.data.key && resp.data[resp.data.key];
144 if (!data) {
145 let err = new Error('invalid response');
146 return debug('error', err, resp.statusCode, resp.data);
147 }
148
149 if (data.can_consume) {
150 // Extract topic from keys of event map.
151 data.topics = Object.keys(data.events || {});
152
153 // Get basic auth creds from the url
154 if (data.auth_type === 'basic' && data.url) {
155 const url = new URL(data.url);
156 data.auth_user = url.username;
157 data.auth_pass = url.password;
158 }
159 }
160
161 this.config = data;
162
163 this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
164 this.emit('configured', this.config);
165 return debug('got config', this.config);
166 })
167 .catch(e => {
168 // if 401 that means the apikey, secret is wrong. disable before raising an error
169 if (e.response.status === 401) {
170 e.message = 'Unauthorized';
171 this.emit('unauthorized', String(e), opts);
172 }
173 debug('error fetching config', String(e));
174 });
175};
176
177/**
178 * Validates that event name is in client's subscribed topics (or provided topic list).
179 *
180 * @param {String} name topic/event name
181 * @param {Array} topics (optional) set of topics to validate against, defaults to this.config.topics
182 * @returns {Boolean} true if event matched topics
183 */
184PubSubClient.prototype.hasSubscribedTopic = function (name, topics) {
185 // Event names are prefixed, so strip it.
186 name = name.replace('event:', '');
187 // Add internal events since they will be emitted.
188 let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
189 return validTopics.find(topic => {
190 // Name matches topic
191 if (topic === name) {
192 return topic;
193 }
194 // Fall out if exact match missed and topic does not have wildcard.
195 if (!topic.includes('*')) {
196 return null;
197 }
198 let eventSegments = name.split('.');
199 let topicSegments = topic.split('.');
200 // Fall out if topic is not double-splatted and segment counts do not match.
201 if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
202 return null;
203 }
204 // Check if name matches topic segment checks.
205 return topicSegments.reduce(function (m, segment, i) {
206 return m && (
207 // segment matched
208 segment === eventSegments[i]
209 // segment was wildcarded
210 || segment === '*'
211 // segment was terminus and double-splatted
212 || (segment === '**' && i === topicSegments.length - 1)
213 );
214 }, true);
215 }) || null;
216};
217
218/**
219 * Webhook handler route that exposes events using the EventEmitter pattern.
220 * @param {http.ClientRequest} req Request object
221 * @param {http.ServerResponse} res Response object
222 */
223PubSubClient.prototype.handleWebhook = function (req, res) {
224 // Make sure the request has been authenticated
225 if (!this.authenticateWebhook(req, res)) {
226 return;
227 }
228
229 let topic = req.body.topic;
230 debug('event received', topic, req.body);
231
232 // Search for any configured regex matches and emit using those too
233 if (this.hasSubscribedTopic(topic)) {
234 debug('emitting event:' + topic);
235 this.emit('event:' + topic, req.body);
236 }
237
238 res.writeHead(200, { 'Content-Type': 'application/json' });
239 res.end(JSON.stringify({ success: true }));
240};
241
242/**
243 * Publish an event with name and optional data.
244 * @param {String} name name of the event
245 * @param {Object} data optional event payload or undefined/null if no event data
246 * @param {Object} options the options object
247 */
248PubSubClient.prototype.publish = function (name, data, options) {
249 if (this.disabled) {
250 return;
251 }
252 debug('publish', name);
253 if (!name) {
254 throw new Error('required event name');
255 }
256 if (Buffer.byteLength(name) > 255) {
257 throw new Error('name length must be less than 255 bytes');
258 }
259 if (data && typeof data !== 'object') {
260 throw new Error('data must be an object');
261 }
262 // Clone data before serialization pass so objects are not modified.
263 try {
264 data = JSON.parse(JSON.stringify(data || {}));
265 } catch (e) {
266 throw new Error('data could not be cloned');
267 }
268 this._send({
269 event: name,
270 data: _sanitize(data, []),
271 options: options
272 });
273};
274
275/**
276 * Retry event.
277 * @private
278 * @param {Object} data the data object
279 * @param {String} reason the retry reason
280 * @param {Object} opts the options object
281 * @returns {void}
282 */
283PubSubClient.prototype._retry = function (data, reason, opts) {
284 debug('retry called', reason, opts);
285
286 if (this.disabled) {
287 return debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
288 }
289
290 if (this.retry > this.retryLimit) {
291 return debug('Retry limit exceeded', new Error().stack);
292 }
293
294 // run again with exponential backoff each time
295 setTimeout(() => this._send(data), Math.max(500, (Math.pow(2, this.retry) - 1) * 500));
296 this.emit('retry', reason, opts, this.retry);
297};
298
299/**
300 * Sending event to the server.
301 * @private
302 * @param {Object} data the data object
303 * @return {void}
304 */
305PubSubClient.prototype._send = function (data) {
306 if (this.disabled) {
307 return false;
308 }
309
310 debug('_send', data);
311
312 this.retry = (this.retry || 0) + 1;
313 // shouldn't get here, but empty data slot
314 if (!data) {
315 return;
316 }
317
318 this._sending = true;
319 let ticket = Date.now();
320 this._sendingTS = ticket;
321
322 const url = new URL('/api/event', this.url);
323 let opts = {
324 url: url.href,
325 method: 'post',
326 data,
327 headers: {
328 'user-agent': userAgent,
329 APIKey: this.key,
330 APISig: createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
331 },
332 timeout: this.timeout
333 };
334
335 try {
336 debug('sending web event', opts);
337 axios(opts)
338 .then(resp => {
339 // check current and if the same, change state, otherwise a new event has come
340 // in since we got here
341 if (ticket === this._sendingTS) {
342 this._sending = false;
343 }
344 debug('received web response', resp && resp.status);
345
346 // reset our retry count on successfully sending
347 this.retry = 0;
348 // emit an event
349 this.emit('response', resp, opts);
350 return debug('response received, status:', resp.statusCode, 'opts:', opts);
351 })
352 // handle HTTP errors
353 .catch(e => {
354 // an error which isn't a validation or authorization error, retry sending
355 if (e.response.status && ![ 400, 401 ].includes(e.response.status)) {
356 return this._retry(data, e.response.status);
357 }
358 const err = new Error('invalid response');
359 err.code = e.response.status;
360 // if 401 that means the apikey, secret is wrong. disable before raising an error
361 if (err.code === 401) {
362 err.message = 'Unauthorized';
363 this.emit('unauthorized', String(err), opts);
364 }
365 debug('web request received error', String(err), opts);
366 // check current and if the same, change state, otherwise a new event has come
367 // in since we got here
368 if (ticket === this._sendingTS) {
369 this._sending = false;
370 }
371 return this._retry(data, err.code, opts);
372 });
373 } catch (e) {
374 debug('web request received error', e, opts);
375 this._retry(data, e.code, opts);
376 }
377};
378
379/**
380 * Determines if event is a subscribed topic.
381 * @private
382 * @param {String} name event name
383 */
384PubSubClient.prototype._validateTopic = function (name) {
385 if (!this.getSubscribedTopic(name)) {
386 debug('Unexpected event', name, ': client not configured to receive this event');
387 }
388};
389
390/**
391 * Stub deprecated close function.
392 * @deprecated Websocket connections are no longer supported. Deprecated in v1.4.0. Removed in v2.0.0.
393 */
394PubSubClient.prototype.close = function () {
395 debug('PubSubClient.close function has been deprecated. Websocket connections are no longer supported.');
396};
397
398/**
399 * Stub deprecated getFingerprint function.
400 * @deprecated Deprecated in v1.6.0. Removed in v2.0.0.
401 * @param {Function} callback callback
402 * @returns {void}
403 */
404PubSubClient.prototype.getFingerprint = function (callback) {
405 debug('PubSubClient.getFingerprint function has been deprecated.');
406 callback && callback(null, fingerprint);
407};
408
409/**
410 * Stub deprecated getSubscribedTopic function.
411 * @deprecated Renamed to hasSubscribedTopic in v1.6.0. Removed in v2.0.0.
412 */
413PubSubClient.prototype.getSubscribedTopic = PubSubClient.prototype.hasSubscribedTopic;
414
415let on = PubSubClient.prototype.on;
416PubSubClient.prototype.on = function (name) {
417 debug('on', name);
418 // If the topics have been fetched then we can attempt to warn about events
419 // that aren't configured to be received by this client
420 if (!this.configued) {
421 pendingChecks.push(name);
422 } else {
423 this._validateTopic(name);
424 }
425 return on.apply(this, arguments);
426};
427
428module.exports = PubSubClient;
429
430/**
431 * Checks process.env flags to determine if env is preproduction or developement.
432 * @return {Boolean} flag if preproduction or development env
433 */
434function _isPreproduction() {
435 return String(process.env.NODE_ACS_URL).includes('.appctest.com')
436 || String(process.env.NODE_ACS_URL).includes('.axwaytest.net')
437 || preproductionEnvironments.includes(process.env.NODE_ENV)
438 || preproductionEnvironments.includes(process.env.APPC_ENV);
439}
440
441/**
442 * Removes potentially private data.
443 * @param {Object} obj object to sanitize
444 * @param {Array} seen list of enumerated properties
445 * @return {Object} sanitized object
446 */
447function _sanitize(obj, seen) {
448 if (!obj || typeof obj !== 'object') {
449 return obj;
450 }
451 if (obj instanceof RegExp) {
452 return obj.source;
453 }
454 if (obj instanceof Date) {
455 return obj;
456 }
457 Object.keys(obj).forEach(function (key) {
458 var value = obj[key],
459 t = typeof value;
460 if (t === 'function') {
461 delete obj[key];
462 } else if (/^(password|creditcard)/.test(key)) {
463 // the server side does masking as well, but doesn't hurt to do it at origin
464 obj[key] = '[HIDDEN]';
465 } else if (value instanceof Date) {
466 // do nothing
467 } else if (value instanceof RegExp) {
468 obj[key] = value.source;
469 } else if (value && t === 'object') {
470 if (seen.includes(value)) {
471 value = '[Circular]';
472 } else {
473 seen.push(value);
474 value = _sanitize(value, seen);
475 }
476 obj[key] = value;
477 }
478 });
479 return obj;
480}