1 | const { createHash, createHmac } = require('crypto');
|
2 | const { EventEmitter } = require('events');
|
3 | const { hostname } = require('os');
|
4 | const { inherits } = require('util');
|
5 |
|
6 | const axios = require('axios');
|
7 | const auth = require('basic-auth');
|
8 |
|
9 | const { version } = require('../package.json');
|
10 |
|
11 | const environments = {
|
12 | production: 'https://pubsub.platform.axway.com',
|
13 | preproduction: 'https://pubsub.axwaytest.net'
|
14 | };
|
15 | const fingerprint = createHash('sha256').update(hostname() || Date.now()).digest('hex');
|
16 | const preproductionEnvironments = [ 'preproduction', 'development' ];
|
17 | const userAgent = `appc-pubsub/${version} ${fingerprint})`;
|
18 |
|
19 | let pendingChecks = [];
|
20 | let debug;
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | function PubSubClient(opts) {
|
29 | opts = opts || {};
|
30 |
|
31 |
|
32 | debug = function () {};
|
33 | opts.debug && (debug = function () {
|
34 | let args = Array.prototype.slice.call(arguments);
|
35 | args.unshift('appc:pubsub');
|
36 | console.log.apply(this, args);
|
37 | });
|
38 |
|
39 |
|
40 | let env = opts.env || (_isPreproduction() ? 'preproduction' : 'production');
|
41 | this.url = opts.url || environments[env] || environments.production;
|
42 |
|
43 |
|
44 | this.disabled = opts.disabled;
|
45 | this.key = opts.key;
|
46 | this.secret = opts.secret;
|
47 |
|
48 | if (this.disabled) {
|
49 | return;
|
50 | }
|
51 | if (!this.key) {
|
52 | throw new Error('missing key');
|
53 | }
|
54 | if (!this.secret) {
|
55 | throw new Error('missing secret');
|
56 | }
|
57 |
|
58 | this.timeout = opts.timeout || 10000;
|
59 | this.retryLimit = 10;
|
60 |
|
61 | this.fetchConfig();
|
62 |
|
63 |
|
64 | this.authenticateWebhook = this.authenticateWebhook.bind(this);
|
65 | this.handleWebhook = this.handleWebhook.bind(this);
|
66 | }
|
67 |
|
68 | inherits(PubSubClient, EventEmitter);
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
|
78 | if (req._authenticatedWebhook) {
|
79 | next && next();
|
80 | return true;
|
81 | }
|
82 |
|
83 | if (!this.config.can_consume) {
|
84 | res && res.writeHead(400, { 'Content-Type': 'application/json' });
|
85 | res && res.end(JSON.stringify({
|
86 | success: false,
|
87 | message: 'This client does not have consumption enabled.'
|
88 | }));
|
89 | return false;
|
90 | }
|
91 | debug('authenticating webhook using: method =', this.config.auth_type);
|
92 |
|
93 | let conf = this.config,
|
94 | headers = req && req.headers || {},
|
95 | creds = auth(req),
|
96 |
|
97 | authenticated
|
98 |
|
99 | = conf.auth_type === 'basic'
|
100 | ? creds.name === conf.auth_user && creds.pass === conf.auth_pass
|
101 |
|
102 | : conf.auth_type === 'token'
|
103 | ? headers['x-auth-token'] === this.config.auth_token
|
104 |
|
105 | : conf.auth_type === 'key_secret'
|
106 | ? headers['x-signature'] === createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
|
107 |
|
108 | : true;
|
109 |
|
110 |
|
111 | if (!authenticated) {
|
112 | debug('webhook authentication failed', headers);
|
113 | res && res.writeHead(401, { 'Content-Type': 'application/json' });
|
114 | res && res.end(JSON.stringify({
|
115 | success: false,
|
116 | message: 'Unauthorized'
|
117 | }));
|
118 | return false;
|
119 | }
|
120 | req._authenticatedWebhook = true;
|
121 | next && next();
|
122 | return true;
|
123 | };
|
124 |
|
125 |
|
126 |
|
127 |
|
128 | PubSubClient.prototype.fetchConfig = function () {
|
129 | const url = new URL('/api/client/config', this.url);
|
130 | const opts = {
|
131 | url: url.href,
|
132 | headers: {
|
133 | 'user-agent': userAgent,
|
134 | APIKey: this.key,
|
135 | APISig: createHmac('SHA256', this.secret).update('{}').digest('base64')
|
136 | },
|
137 | timeout: this.timeout
|
138 | };
|
139 | debug('fetching client config');
|
140 | this.config = {};
|
141 | axios(opts)
|
142 | .then(resp => {
|
143 | const data = resp.data && resp.data.key && resp.data[resp.data.key];
|
144 | if (!data) {
|
145 | let err = new Error('invalid response');
|
146 | return debug('error', err, resp.statusCode, resp.data);
|
147 | }
|
148 |
|
149 | if (data.can_consume) {
|
150 |
|
151 | data.topics = Object.keys(data.events || {});
|
152 |
|
153 |
|
154 | if (data.auth_type === 'basic' && data.url) {
|
155 | const url = new URL(data.url);
|
156 | data.auth_user = url.username;
|
157 | data.auth_pass = url.password;
|
158 | }
|
159 | }
|
160 |
|
161 | this.config = data;
|
162 |
|
163 | this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
|
164 | this.emit('configured', this.config);
|
165 | return debug('got config', this.config);
|
166 | })
|
167 | .catch(e => {
|
168 |
|
169 | if (e.response.status === 401) {
|
170 | e.message = 'Unauthorized';
|
171 | this.emit('unauthorized', String(e), opts);
|
172 | }
|
173 | debug('error fetching config', String(e));
|
174 | });
|
175 | };
|
176 |
|
177 |
|
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 |
|
184 | PubSubClient.prototype.hasSubscribedTopic = function (name, topics) {
|
185 |
|
186 | name = name.replace('event:', '');
|
187 |
|
188 | let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
|
189 | return validTopics.find(topic => {
|
190 |
|
191 | if (topic === name) {
|
192 | return topic;
|
193 | }
|
194 |
|
195 | if (!topic.includes('*')) {
|
196 | return null;
|
197 | }
|
198 | let eventSegments = name.split('.');
|
199 | let topicSegments = topic.split('.');
|
200 |
|
201 | if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
|
202 | return null;
|
203 | }
|
204 |
|
205 | return topicSegments.reduce(function (m, segment, i) {
|
206 | return m && (
|
207 |
|
208 | segment === eventSegments[i]
|
209 |
|
210 | || segment === '*'
|
211 |
|
212 | || (segment === '**' && i === topicSegments.length - 1)
|
213 | );
|
214 | }, true);
|
215 | }) || null;
|
216 | };
|
217 |
|
218 |
|
219 |
|
220 |
|
221 |
|
222 |
|
223 | PubSubClient.prototype.handleWebhook = function (req, res) {
|
224 |
|
225 | if (!this.authenticateWebhook(req, res)) {
|
226 | return;
|
227 | }
|
228 |
|
229 | let topic = req.body.topic;
|
230 | debug('event received', topic, req.body);
|
231 |
|
232 |
|
233 | if (this.hasSubscribedTopic(topic)) {
|
234 | debug('emitting event:' + topic);
|
235 | this.emit('event:' + topic, req.body);
|
236 | }
|
237 |
|
238 | res.writeHead(200, { 'Content-Type': 'application/json' });
|
239 | res.end(JSON.stringify({ success: true }));
|
240 | };
|
241 |
|
242 |
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 | PubSubClient.prototype.publish = function (name, data, options) {
|
249 | if (this.disabled) {
|
250 | return;
|
251 | }
|
252 | debug('publish', name);
|
253 | if (!name) {
|
254 | throw new Error('required event name');
|
255 | }
|
256 | if (Buffer.byteLength(name) > 255) {
|
257 | throw new Error('name length must be less than 255 bytes');
|
258 | }
|
259 | if (data && typeof data !== 'object') {
|
260 | throw new Error('data must be an object');
|
261 | }
|
262 |
|
263 | try {
|
264 | data = JSON.parse(JSON.stringify(data || {}));
|
265 | } catch (e) {
|
266 | throw new Error('data could not be cloned');
|
267 | }
|
268 | this._send({
|
269 | event: name,
|
270 | data: _sanitize(data, []),
|
271 | options: options
|
272 | });
|
273 | };
|
274 |
|
275 |
|
276 |
|
277 |
|
278 |
|
279 |
|
280 |
|
281 |
|
282 |
|
283 | PubSubClient.prototype._retry = function (data, reason, opts) {
|
284 | debug('retry called', reason, opts);
|
285 |
|
286 | if (this.disabled) {
|
287 | return debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
|
288 | }
|
289 |
|
290 | if (this.retry > this.retryLimit) {
|
291 | return debug('Retry limit exceeded', new Error().stack);
|
292 | }
|
293 |
|
294 |
|
295 | setTimeout(() => this._send(data), Math.max(500, (Math.pow(2, this.retry) - 1) * 500));
|
296 | this.emit('retry', reason, opts, this.retry);
|
297 | };
|
298 |
|
299 |
|
300 |
|
301 |
|
302 |
|
303 |
|
304 |
|
305 | PubSubClient.prototype._send = function (data) {
|
306 | if (this.disabled) {
|
307 | return false;
|
308 | }
|
309 |
|
310 | debug('_send', data);
|
311 |
|
312 | this.retry = (this.retry || 0) + 1;
|
313 |
|
314 | if (!data) {
|
315 | return;
|
316 | }
|
317 |
|
318 | this._sending = true;
|
319 | let ticket = Date.now();
|
320 | this._sendingTS = ticket;
|
321 |
|
322 | const url = new URL('/api/event', this.url);
|
323 | let opts = {
|
324 | url: url.href,
|
325 | method: 'post',
|
326 | data,
|
327 | headers: {
|
328 | 'user-agent': userAgent,
|
329 | APIKey: this.key,
|
330 | APISig: createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
|
331 | },
|
332 | timeout: this.timeout
|
333 | };
|
334 |
|
335 | try {
|
336 | debug('sending web event', opts);
|
337 | axios(opts)
|
338 | .then(resp => {
|
339 |
|
340 |
|
341 | if (ticket === this._sendingTS) {
|
342 | this._sending = false;
|
343 | }
|
344 | debug('received web response', resp && resp.status);
|
345 |
|
346 |
|
347 | this.retry = 0;
|
348 |
|
349 | this.emit('response', resp, opts);
|
350 | return debug('response received, status:', resp.statusCode, 'opts:', opts);
|
351 | })
|
352 |
|
353 | .catch(e => {
|
354 |
|
355 | if (e.response.status && ![ 400, 401 ].includes(e.response.status)) {
|
356 | return this._retry(data, e.response.status);
|
357 | }
|
358 | const err = new Error('invalid response');
|
359 | err.code = e.response.status;
|
360 |
|
361 | if (err.code === 401) {
|
362 | err.message = 'Unauthorized';
|
363 | this.emit('unauthorized', String(err), opts);
|
364 | }
|
365 | debug('web request received error', String(err), opts);
|
366 |
|
367 |
|
368 | if (ticket === this._sendingTS) {
|
369 | this._sending = false;
|
370 | }
|
371 | return this._retry(data, err.code, opts);
|
372 | });
|
373 | } catch (e) {
|
374 | debug('web request received error', e, opts);
|
375 | this._retry(data, e.code, opts);
|
376 | }
|
377 | };
|
378 |
|
379 |
|
380 |
|
381 |
|
382 |
|
383 |
|
384 | PubSubClient.prototype._validateTopic = function (name) {
|
385 | if (!this.getSubscribedTopic(name)) {
|
386 | debug('Unexpected event', name, ': client not configured to receive this event');
|
387 | }
|
388 | };
|
389 |
|
390 |
|
391 |
|
392 |
|
393 |
|
394 | PubSubClient.prototype.close = function () {
|
395 | debug('PubSubClient.close function has been deprecated. Websocket connections are no longer supported.');
|
396 | };
|
397 |
|
398 |
|
399 |
|
400 |
|
401 |
|
402 |
|
403 |
|
404 | PubSubClient.prototype.getFingerprint = function (callback) {
|
405 | debug('PubSubClient.getFingerprint function has been deprecated.');
|
406 | callback && callback(null, fingerprint);
|
407 | };
|
408 |
|
409 |
|
410 |
|
411 |
|
412 |
|
413 | PubSubClient.prototype.getSubscribedTopic = PubSubClient.prototype.hasSubscribedTopic;
|
414 |
|
415 | let on = PubSubClient.prototype.on;
|
416 | PubSubClient.prototype.on = function (name) {
|
417 | debug('on', name);
|
418 |
|
419 |
|
420 | if (!this.configued) {
|
421 | pendingChecks.push(name);
|
422 | } else {
|
423 | this._validateTopic(name);
|
424 | }
|
425 | return on.apply(this, arguments);
|
426 | };
|
427 |
|
428 | module.exports = PubSubClient;
|
429 |
|
430 |
|
431 |
|
432 |
|
433 |
|
434 | function _isPreproduction() {
|
435 | return String(process.env.NODE_ACS_URL).includes('.appctest.com')
|
436 | || String(process.env.NODE_ACS_URL).includes('.axwaytest.net')
|
437 | || preproductionEnvironments.includes(process.env.NODE_ENV)
|
438 | || preproductionEnvironments.includes(process.env.APPC_ENV);
|
439 | }
|
440 |
|
441 |
|
442 |
|
443 |
|
444 |
|
445 |
|
446 |
|
447 | function _sanitize(obj, seen) {
|
448 | if (!obj || typeof obj !== 'object') {
|
449 | return obj;
|
450 | }
|
451 | if (obj instanceof RegExp) {
|
452 | return obj.source;
|
453 | }
|
454 | if (obj instanceof Date) {
|
455 | return obj;
|
456 | }
|
457 | Object.keys(obj).forEach(function (key) {
|
458 | var value = obj[key],
|
459 | t = typeof value;
|
460 | if (t === 'function') {
|
461 | delete obj[key];
|
462 | } else if (/^(password|creditcard)/.test(key)) {
|
463 |
|
464 | obj[key] = '[HIDDEN]';
|
465 | } else if (value instanceof Date) {
|
466 |
|
467 | } else if (value instanceof RegExp) {
|
468 | obj[key] = value.source;
|
469 | } else if (value && t === 'object') {
|
470 | if (seen.includes(value)) {
|
471 | value = '[Circular]';
|
472 | } else {
|
473 | seen.push(value);
|
474 | value = _sanitize(value, seen);
|
475 | }
|
476 | obj[key] = value;
|
477 | }
|
478 | });
|
479 | return obj;
|
480 | }
|