UNPKG

14.3 kBJavaScriptView Raw
1const { createHash, createHmac } = require('crypto');
2const { EventEmitter } = require('events');
3const { hostname } = require('os');
4const { inherits } = require('util');
5
6const axios = require('axios');
7const auth = require('basic-auth');
8
9const { version } = require('../package.json');
10
11const environments = {
12 production: 'https://pubsub.platform.axway.com',
13 preproduction: 'https://pubsub.axwaytest.net'
14};
15const fingerprint = createHash('sha256').update(hostname() || Date.now()).digest('hex');
16const preproductionEnvironments = [ 'preproduction', 'development' ];
17const userAgent = `appc-pubsub/${version} (${fingerprint})`;
18
19let pendingChecks = [];
20let debug;
21
22/**
23 * Class constructor
24 *
25 * @class PubSubClient
26 * @param {Object} opts options for configuring the client
27 */
28function PubSubClient(opts) {
29 opts = opts || {};
30
31 // Stub debug logging function and extend if enabled.
32 debug = function () {};
33 opts.debug && (debug = function () {
34 let args = Array.prototype.slice.call(arguments);
35 args.unshift('appc:pubsub');
36 console.log.apply(this, args);
37 });
38
39 // prefer the environment settings over config
40 let env = opts.env || (_isPreproduction() ? 'preproduction' : 'production');
41 this.url = opts.url || environments[env] || environments.production;
42
43 // Require key and secret.
44 this.disabled = opts.disabled;
45 this.key = opts.key;
46 this.secret = opts.secret;
47
48 if (this.disabled) {
49 return;
50 }
51 if (!this.key) {
52 throw new Error('missing key');
53 }
54 if (!this.secret) {
55 throw new Error('missing secret');
56 }
57
58 this.timeout = opts.timeout || 10000;
59 this.retryLimit = opts.retryLimit || 10;
60 this.retries = {};
61
62 this.fetchConfig();
63
64 // These functions need the client binding for use as a middleware/route
65 this.authenticateWebhook = this.authenticateWebhook.bind(this);
66 this.handleWebhook = this.handleWebhook.bind(this);
67}
68
69inherits(PubSubClient, EventEmitter);
70
71/**
72 * Authenticates a webhook request as being from pubsub server. Can be used as middleware.
73 * @param {http.ClientRequest} req request object containing auth details
74 * @param {http.ServerResponse} [res] response object for responding with errors
75 * @param {Function} [next] optional callback function for use in middleware
76 * @return {Boolean} whether the request is authenticated
77 */
78PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
79 if (req._authenticatedWebhook) {
80 next && next();
81 return true;
82 }
83 // Make sure the client has consumption enabled
84 if (!this.config.can_consume) {
85 res && res.writeHead(400, { 'Content-Type': 'application/json' });
86 res && res.end(JSON.stringify({
87 success: false,
88 message: 'This client does not have consumption enabled.'
89 }));
90 return false;
91 }
92 debug('authenticating webhook using: method =', this.config.auth_type);
93
94 let conf = this.config,
95 headers = req && req.headers || {},
96 creds = auth(req),
97 // Validate request using clients authentication method
98 authenticated
99 // Check the basic auth credentials match...
100 = conf.auth_type === 'basic'
101 ? creds.name === conf.auth_user && creds.pass === conf.auth_pass
102 // ...or the request has the correct auth token
103 : conf.auth_type === 'token'
104 ? headers['x-auth-token'] === this.config.auth_token
105 // ...or the signature matches the body signed with the client secret
106 : conf.auth_type === 'key_secret'
107 ? headers['x-signature'] === createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
108 // ...otherwise there's no authentication for the client
109 : true;
110
111 // Make sure the request is from pubsub server
112 if (!authenticated) {
113 debug('webhook authentication failed', headers);
114 res && res.writeHead(401, { 'Content-Type': 'application/json' });
115 res && res.end(JSON.stringify({
116 success: false,
117 message: 'Unauthorized'
118 }));
119 return false;
120 }
121 req._authenticatedWebhook = true;
122 next && next();
123 return true;
124};
125
126/**
127 * Fetch client config from the server.
128 */
129PubSubClient.prototype.fetchConfig = function () {
130 const url = new URL('/api/client/config', this.url);
131 const opts = {
132 url: url.href,
133 headers: {
134 'user-agent': userAgent,
135 APIKey: this.key,
136 APISig: createHmac('SHA256', this.secret).update('{}').digest('base64')
137 },
138 timeout: this.timeout
139 };
140 debug('fetching client config', this.key);
141 this.config = {};
142 axios(opts)
143 .then(resp => {
144 const data = resp.data && resp.data.key && resp.data[resp.data.key];
145 if (!data) {
146 let err = new Error('invalid response');
147 return debug('error', err, resp.status, resp.data);
148 }
149
150 if (data.can_consume) {
151 // Extract topic from keys of event map.
152 data.topics = Object.keys(data.events || {});
153
154 // Get basic auth creds from the url
155 if (data.auth_type === 'basic' && data.url) {
156 const url = new URL(data.url);
157 data.auth_user = url.username;
158 data.auth_pass = url.password;
159 }
160 }
161
162 this.config = data;
163
164 this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
165 this.emit('configured', this.config);
166 return debug('got config', this.config);
167 })
168 .catch(e => {
169 // if 401 that means the apikey, secret is wrong. disable before raising an error
170 if (e.response.status === 401) {
171 e.message = 'Unauthorized';
172 this.emit('unauthorized', String(e), opts);
173 }
174 debug('error fetching config', String(e));
175 });
176};
177
178/**
179 * Validates that event name is in client's subscribed topics (or provided topic list).
180 *
181 * @param {String} name topic/event name
182 * @param {Array} topics (optional) set of topics to validate against, defaults to this.config.topics
183 * @returns {Boolean} true if event matched topics
184 */
185PubSubClient.prototype.hasSubscribedTopic = function (name, topics) {
186 // Event names are prefixed, so strip it.
187 name = name.replace('event:', '');
188 // Add internal events since they will be emitted.
189 let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
190 return validTopics.find(topic => {
191 // Name matches topic
192 if (topic === name) {
193 return topic;
194 }
195 // Fall out if exact match missed and topic does not have wildcard.
196 if (!topic.includes('*')) {
197 return null;
198 }
199 let eventSegments = name.split('.');
200 let topicSegments = topic.split('.');
201 // Fall out if topic is not double-splatted and segment counts do not match.
202 if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
203 return null;
204 }
205 // Check if name matches topic segment checks.
206 return topicSegments.reduce(function (m, segment, i) {
207 return m && (
208 // segment matched
209 segment === eventSegments[i]
210 // segment was wildcarded
211 || segment === '*'
212 // segment was terminus and double-splatted
213 || (segment === '**' && i === topicSegments.length - 1)
214 );
215 }, true);
216 }) || null;
217};
218
219/**
220 * Webhook handler route that exposes events using the EventEmitter pattern.
221 * @param {http.ClientRequest} req Request object
222 * @param {http.ServerResponse} res Response object
223 */
224PubSubClient.prototype.handleWebhook = function (req, res) {
225 // Make sure the request has been authenticated
226 if (!this.authenticateWebhook(req, res)) {
227 return;
228 }
229
230 let topic = req.body.topic;
231 debug('event received', topic, req.body);
232
233 // Search for any configured regex matches and emit using those too
234 if (this.hasSubscribedTopic(topic)) {
235 debug('emitting event:' + topic);
236 this.emit('event:' + topic, req.body);
237 }
238
239 res.writeHead(200, { 'Content-Type': 'application/json' });
240 res.end(JSON.stringify({ success: true }));
241};
242
243/**
244 * Publish an event with name and optional data.
245 * @param {String} event name of the event
246 * @param {Object} data optional event payload or undefined/null if no event data
247 * @param {Object} options the options object
248 */
249PubSubClient.prototype.publish = function (event, data, options) {
250 if (this.disabled) {
251 return;
252 }
253 debug('publish', event);
254 if (!event) {
255 throw new Error('required event name');
256 }
257 if (Buffer.byteLength(event) > 255) {
258 throw new Error('name length must be less than 255 bytes');
259 }
260 if (data && typeof data !== 'object') {
261 throw new Error('data must be an object');
262 }
263 // Clone data before serialization pass so objects are not modified.
264 try {
265 data = JSON.parse(JSON.stringify(data || {}));
266 } catch (e) {
267 throw new Error('data could not be parsed');
268 }
269
270 // Generate identifier and send event.
271 this._send(event + '-' + Date.now(), {
272 data: _sanitize(data, []),
273 event,
274 options
275 });
276};
277
278/**
279 * Retry event.
280 * @private
281 * @param {String} id hash to identify the event being sent
282 * @param {Object} data the data object
283 * @param {String} reason the retry reason
284 * @param {Object} opts the options object
285 * @returns {void}
286 */
287PubSubClient.prototype._retry = function (id, data, reason, opts) {
288 debug('retry called', reason, opts);
289
290 if (this.disabled) {
291 return debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
292 }
293
294 if (this.retries[id] > this.retryLimit) {
295 return debug('Retry limit exceeded', new Error().stack);
296 }
297
298 // run again with exponential backoff each time
299 setTimeout(() => this._send(id, data), Math.max(500, (Math.pow(2, this.retries[id]) - 1) * 500));
300 this.emit('retry', reason, opts, this.retries[id]);
301};
302
303/**
304 * Sending event to the server.
305 * @private
306 * @param {String} id hash to identify the event being sent
307 * @param {Object} data the data object
308 * @return {void}
309 */
310PubSubClient.prototype._send = function (id, data) {
311 if (this.disabled) {
312 return false;
313 }
314
315 debug('_send', id, data);
316
317 // shouldn't get here, but empty data slot
318 if (!data || !data.event) {
319 return;
320 }
321
322 this.retries[id] = (this.retries[id] || 0) + 1;
323
324 const url = new URL('/api/event', this.url);
325 let opts = {
326 url: url.href,
327 method: 'post',
328 data,
329 headers: {
330 'user-agent': userAgent,
331 APIKey: this.key,
332 APISig: createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
333 },
334 timeout: this.timeout
335 };
336
337 try {
338 axios(opts)
339 .then(resp => {
340 // reset our retry count on successfully sending
341 delete this.retries[id];
342
343 // emit an event
344 this.emit('response', resp, opts);
345
346 // log status
347 return debug('response received, status:', resp.status);
348 })
349 // handle HTTP errors
350 .catch(e => {
351 const err = new Error('invalid response');
352 err.code = e.response && e.response.status || String(e);
353
354 // if 401, that means the apikey or secret is wrong or event is not allowed; do not attempt to retry
355 if (err.code === 401) {
356 err.message = 'Unauthorized';
357 this.emit('unauthorized', String(err), opts);
358 return debug('sending event failed', String(err));
359 }
360
361 // if 400, that means the event failed validation; do not attempt to retry
362 if (err.code === 400) {
363 err.message = 'Failed';
364 return debug('sending event failed', e.response && e.response.data || String(e));
365 }
366
367 // Otherwise, since it wasn't a validation or authorization error, log and retry.
368 debug('received error', String(err), opts);
369 return this._retry(id, data, err.code, opts);
370 });
371 } catch (e) {
372 // axios throwing outright (and not getting caught) likely means invalid opts.
373 // Log, but do not retry.
374 debug('web request received error', e, opts);
375 }
376};
377
378/**
379 * Determines if event is a subscribed topic.
380 * @private
381 * @param {String} name event name
382 */
383PubSubClient.prototype._validateTopic = function (name) {
384 if (String(name).startsWith('event:') && !this.hasSubscribedTopic(name)) {
385 debug('Unexpected event', name, ': client not configured to receive this event');
386 }
387};
388
389/**
390 * Stub deprecated close function.
391 * @deprecated Websocket connections are no longer supported. Deprecated in v1.4.0. Removed in v2.0.0.
392 */
393PubSubClient.prototype.close = function () {
394 debug('PubSubClient.close function has been deprecated. Websocket connections are no longer supported.');
395};
396
397/**
398 * Stub deprecated getFingerprint function.
399 * @deprecated Deprecated in v1.6.0. Removed in v2.0.0.
400 * @param {Function} callback callback
401 * @returns {void}
402 */
403PubSubClient.prototype.getFingerprint = function (callback) {
404 debug('PubSubClient.getFingerprint function has been deprecated.');
405 callback && callback(null, fingerprint);
406};
407
408/**
409 * Stub deprecated getSubscribedTopic function.
410 * @deprecated Renamed to hasSubscribedTopic in v1.6.0. Removed in v2.0.0.
411 */
412PubSubClient.prototype.getSubscribedTopic = PubSubClient.prototype.hasSubscribedTopic;
413
414let on = PubSubClient.prototype.on;
415PubSubClient.prototype.on = function (name) {
416 debug('on', name);
417 // If the topics have been fetched then we can attempt to warn about events
418 // that aren't configured to be received by this client
419 if (!this.configured) {
420 pendingChecks.push(name);
421 } else {
422 this._validateTopic(name);
423 }
424 return on.apply(this, arguments);
425};
426
427module.exports = PubSubClient;
428
429/**
430 * Checks process.env flags to determine if env is preproduction or developement.
431 * @return {Boolean} flag if preproduction or development env
432 */
433function _isPreproduction() {
434 return String(process.env.NODE_ACS_URL).includes('.appctest.com')
435 || String(process.env.NODE_ACS_URL).includes('.axwaytest.net')
436 || preproductionEnvironments.includes(process.env.NODE_ENV)
437 || preproductionEnvironments.includes(process.env.APPC_ENV);
438}
439
440/**
441 * Removes potentially private data.
442 * @param {Object} obj object to sanitize
443 * @param {Array} seen list of enumerated properties
444 * @return {Object} sanitized object
445 */
446function _sanitize(obj, seen) {
447 if (!obj || typeof obj !== 'object') {
448 return obj;
449 }
450 if (obj instanceof RegExp) {
451 return obj.source;
452 }
453 if (obj instanceof Date) {
454 return obj;
455 }
456 Object.keys(obj).forEach(function (key) {
457 var value = obj[key],
458 t = typeof value;
459 if (t === 'function') {
460 delete obj[key];
461 } else if (/^(password|creditcard)/.test(key)) {
462 // the server side does masking as well, but doesn't hurt to do it at origin
463 obj[key] = '[HIDDEN]';
464 } else if (value instanceof Date) {
465 // do nothing
466 } else if (value instanceof RegExp) {
467 obj[key] = value.source;
468 } else if (value && t === 'object') {
469 if (seen.includes(value)) {
470 value = '[Circular]';
471 } else {
472 seen.push(value);
473 value = _sanitize(value, seen);
474 }
475 obj[key] = value;
476 }
477 });
478 return obj;
479}