1 | const { createHash, createHmac } = require('crypto');
|
2 | const { EventEmitter } = require('events');
|
3 | const { hostname } = require('os');
|
4 | const { inherits } = require('util');
|
5 |
|
6 | const axios = require('axios');
|
7 | const auth = require('basic-auth');
|
8 |
|
9 | const { version } = require('../package.json');
|
10 |
|
11 | const environments = {
|
12 | production: 'https://pubsub.platform.axway.com',
|
13 | preproduction: 'https://pubsub.axwaytest.net'
|
14 | };
|
15 | const fingerprint = createHash('sha256').update(hostname() || Date.now()).digest('hex');
|
16 | const preproductionEnvironments = [ 'preproduction', 'development' ];
|
17 | const userAgent = `appc-pubsub/${version} (${fingerprint})`;
|
18 |
|
19 | let pendingChecks = [];
|
20 | let debug;
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | function PubSubClient(opts) {
|
29 | opts = opts || {};
|
30 |
|
31 |
|
32 | debug = function () {};
|
33 | opts.debug && (debug = function () {
|
34 | let args = Array.prototype.slice.call(arguments);
|
35 | args.unshift('appc:pubsub');
|
36 | console.log.apply(this, args);
|
37 | });
|
38 |
|
39 |
|
40 | let env = opts.env || (_isPreproduction() ? 'preproduction' : 'production');
|
41 | this.url = opts.url || environments[env] || environments.production;
|
42 |
|
43 |
|
44 | this.disabled = opts.disabled;
|
45 | this.key = opts.key;
|
46 | this.secret = opts.secret;
|
47 |
|
48 | if (this.disabled) {
|
49 | return;
|
50 | }
|
51 | if (!this.key) {
|
52 | throw new Error('missing key');
|
53 | }
|
54 | if (!this.secret) {
|
55 | throw new Error('missing secret');
|
56 | }
|
57 |
|
58 | this.timeout = opts.timeout || 10000;
|
59 | this.retryLimit = opts.retryLimit || 10;
|
60 | this.retries = {};
|
61 |
|
62 | this.fetchConfig();
|
63 |
|
64 |
|
65 | this.authenticateWebhook = this.authenticateWebhook.bind(this);
|
66 | this.handleWebhook = this.handleWebhook.bind(this);
|
67 | }
|
68 |
|
69 | inherits(PubSubClient, EventEmitter);
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 |
|
78 | PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
|
79 | if (req._authenticatedWebhook) {
|
80 | next && next();
|
81 | return true;
|
82 | }
|
83 |
|
84 | if (!this.config.can_consume) {
|
85 | res && res.writeHead(400, { 'Content-Type': 'application/json' });
|
86 | res && res.end(JSON.stringify({
|
87 | success: false,
|
88 | message: 'This client does not have consumption enabled.'
|
89 | }));
|
90 | return false;
|
91 | }
|
92 | debug('authenticating webhook using: method =', this.config.auth_type);
|
93 |
|
94 | let conf = this.config,
|
95 | headers = req && req.headers || {},
|
96 | creds = auth(req),
|
97 |
|
98 | authenticated
|
99 |
|
100 | = conf.auth_type === 'basic'
|
101 | ? creds.name === conf.auth_user && creds.pass === conf.auth_pass
|
102 |
|
103 | : conf.auth_type === 'token'
|
104 | ? headers['x-auth-token'] === this.config.auth_token
|
105 |
|
106 | : conf.auth_type === 'key_secret'
|
107 | ? headers['x-signature'] === createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
|
108 |
|
109 | : true;
|
110 |
|
111 |
|
112 | if (!authenticated) {
|
113 | debug('webhook authentication failed', headers);
|
114 | res && res.writeHead(401, { 'Content-Type': 'application/json' });
|
115 | res && res.end(JSON.stringify({
|
116 | success: false,
|
117 | message: 'Unauthorized'
|
118 | }));
|
119 | return false;
|
120 | }
|
121 | req._authenticatedWebhook = true;
|
122 | next && next();
|
123 | return true;
|
124 | };
|
125 |
|
126 |
|
127 |
|
128 |
|
129 | PubSubClient.prototype.fetchConfig = function () {
|
130 | const url = new URL('/api/client/config', this.url);
|
131 | const opts = {
|
132 | url: url.href,
|
133 | headers: {
|
134 | 'user-agent': userAgent,
|
135 | APIKey: this.key,
|
136 | APISig: createHmac('SHA256', this.secret).update('{}').digest('base64')
|
137 | },
|
138 | timeout: this.timeout
|
139 | };
|
140 | debug('fetching client config', this.key);
|
141 | this.config = {};
|
142 | axios(opts)
|
143 | .then(resp => {
|
144 | const data = resp.data && resp.data.key && resp.data[resp.data.key];
|
145 | if (!data) {
|
146 | let err = new Error('invalid response');
|
147 | return debug('error', err, resp.status, resp.data);
|
148 | }
|
149 |
|
150 | if (data.can_consume) {
|
151 |
|
152 | data.topics = Object.keys(data.events || {});
|
153 |
|
154 |
|
155 | if (data.auth_type === 'basic' && data.url) {
|
156 | const url = new URL(data.url);
|
157 | data.auth_user = url.username;
|
158 | data.auth_pass = url.password;
|
159 | }
|
160 | }
|
161 |
|
162 | this.config = data;
|
163 |
|
164 | this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
|
165 | this.emit('configured', this.config);
|
166 | return debug('got config', this.config);
|
167 | })
|
168 | .catch(e => {
|
169 |
|
170 | if (e.response.status === 401) {
|
171 | e.message = 'Unauthorized';
|
172 | this.emit('unauthorized', String(e), opts);
|
173 | }
|
174 | debug('error fetching config', String(e));
|
175 | });
|
176 | };
|
177 |
|
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 |
|
184 |
|
185 | PubSubClient.prototype.hasSubscribedTopic = function (name, topics) {
|
186 |
|
187 | name = name.replace('event:', '');
|
188 |
|
189 | let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
|
190 | return validTopics.find(topic => {
|
191 |
|
192 | if (topic === name) {
|
193 | return topic;
|
194 | }
|
195 |
|
196 | if (!topic.includes('*')) {
|
197 | return null;
|
198 | }
|
199 | let eventSegments = name.split('.');
|
200 | let topicSegments = topic.split('.');
|
201 |
|
202 | if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
|
203 | return null;
|
204 | }
|
205 |
|
206 | return topicSegments.reduce(function (m, segment, i) {
|
207 | return m && (
|
208 |
|
209 | segment === eventSegments[i]
|
210 |
|
211 | || segment === '*'
|
212 |
|
213 | || (segment === '**' && i === topicSegments.length - 1)
|
214 | );
|
215 | }, true);
|
216 | }) || null;
|
217 | };
|
218 |
|
219 |
|
220 |
|
221 |
|
222 |
|
223 |
|
224 | PubSubClient.prototype.handleWebhook = function (req, res) {
|
225 |
|
226 | if (!this.authenticateWebhook(req, res)) {
|
227 | return;
|
228 | }
|
229 |
|
230 | let topic = req.body.topic;
|
231 | debug('event received', topic, req.body);
|
232 |
|
233 |
|
234 | if (this.hasSubscribedTopic(topic)) {
|
235 | debug('emitting event:' + topic);
|
236 | this.emit('event:' + topic, req.body);
|
237 | }
|
238 |
|
239 | res.writeHead(200, { 'Content-Type': 'application/json' });
|
240 | res.end(JSON.stringify({ success: true }));
|
241 | };
|
242 |
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 | PubSubClient.prototype.publish = function (event, data, options) {
|
250 | if (this.disabled) {
|
251 | return;
|
252 | }
|
253 | debug('publish', event);
|
254 | if (!event) {
|
255 | throw new Error('required event name');
|
256 | }
|
257 | if (Buffer.byteLength(event) > 255) {
|
258 | throw new Error('name length must be less than 255 bytes');
|
259 | }
|
260 | if (data && typeof data !== 'object') {
|
261 | throw new Error('data must be an object');
|
262 | }
|
263 |
|
264 | try {
|
265 | data = JSON.parse(JSON.stringify(data || {}));
|
266 | } catch (e) {
|
267 | throw new Error('data could not be parsed');
|
268 | }
|
269 |
|
270 |
|
271 | this._send(event + '-' + Date.now(), {
|
272 | data: _sanitize(data, []),
|
273 | event,
|
274 | options
|
275 | });
|
276 | };
|
277 |
|
278 |
|
279 |
|
280 |
|
281 |
|
282 |
|
283 |
|
284 |
|
285 |
|
286 |
|
287 | PubSubClient.prototype._retry = function (id, data, reason, opts) {
|
288 | debug('retry called', reason, opts);
|
289 |
|
290 | if (this.disabled) {
|
291 | return debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
|
292 | }
|
293 |
|
294 | if (this.retries[id] > this.retryLimit) {
|
295 | return debug('Retry limit exceeded', new Error().stack);
|
296 | }
|
297 |
|
298 |
|
299 | setTimeout(() => this._send(id, data), Math.max(500, (Math.pow(2, this.retries[id]) - 1) * 500));
|
300 | this.emit('retry', reason, opts, this.retries[id]);
|
301 | };
|
302 |
|
303 |
|
304 |
|
305 |
|
306 |
|
307 |
|
308 |
|
309 |
|
310 | PubSubClient.prototype._send = function (id, data) {
|
311 | if (this.disabled) {
|
312 | return false;
|
313 | }
|
314 |
|
315 | debug('_send', id, data);
|
316 |
|
317 |
|
318 | if (!data || !data.event) {
|
319 | return;
|
320 | }
|
321 |
|
322 | this.retries[id] = (this.retries[id] || 0) + 1;
|
323 |
|
324 | const url = new URL('/api/event', this.url);
|
325 | let opts = {
|
326 | url: url.href,
|
327 | method: 'post',
|
328 | data,
|
329 | headers: {
|
330 | 'user-agent': userAgent,
|
331 | APIKey: this.key,
|
332 | APISig: createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
|
333 | },
|
334 | timeout: this.timeout
|
335 | };
|
336 |
|
337 | try {
|
338 | axios(opts)
|
339 | .then(resp => {
|
340 |
|
341 | delete this.retries[id];
|
342 |
|
343 |
|
344 | this.emit('response', resp, opts);
|
345 |
|
346 |
|
347 | return debug('response received, status:', resp.status);
|
348 | })
|
349 |
|
350 | .catch(e => {
|
351 | const err = new Error('invalid response');
|
352 | err.code = e.response && e.response.status || String(e);
|
353 |
|
354 |
|
355 | if (err.code === 401) {
|
356 | err.message = 'Unauthorized';
|
357 | this.emit('unauthorized', String(err), opts);
|
358 | return debug('sending event failed', String(err));
|
359 | }
|
360 |
|
361 |
|
362 | if (err.code === 400) {
|
363 | err.message = 'Failed';
|
364 | return debug('sending event failed', e.response && e.response.data || String(e));
|
365 | }
|
366 |
|
367 |
|
368 | debug('received error', String(err), opts);
|
369 | return this._retry(id, data, err.code, opts);
|
370 | });
|
371 | } catch (e) {
|
372 |
|
373 |
|
374 | debug('web request received error', e, opts);
|
375 | }
|
376 | };
|
377 |
|
378 |
|
379 |
|
380 |
|
381 |
|
382 |
|
383 | PubSubClient.prototype._validateTopic = function (name) {
|
384 | if (String(name).startsWith('event:') && !this.hasSubscribedTopic(name)) {
|
385 | debug('Unexpected event', name, ': client not configured to receive this event');
|
386 | }
|
387 | };
|
388 |
|
389 |
|
390 |
|
391 |
|
392 |
|
393 | PubSubClient.prototype.close = function () {
|
394 | debug('PubSubClient.close function has been deprecated. Websocket connections are no longer supported.');
|
395 | };
|
396 |
|
397 |
|
398 |
|
399 |
|
400 |
|
401 |
|
402 |
|
403 | PubSubClient.prototype.getFingerprint = function (callback) {
|
404 | debug('PubSubClient.getFingerprint function has been deprecated.');
|
405 | callback && callback(null, fingerprint);
|
406 | };
|
407 |
|
408 |
|
409 |
|
410 |
|
411 |
|
412 | PubSubClient.prototype.getSubscribedTopic = PubSubClient.prototype.hasSubscribedTopic;
|
413 |
|
414 | let on = PubSubClient.prototype.on;
|
415 | PubSubClient.prototype.on = function (name) {
|
416 | debug('on', name);
|
417 |
|
418 |
|
419 | if (!this.configured) {
|
420 | pendingChecks.push(name);
|
421 | } else {
|
422 | this._validateTopic(name);
|
423 | }
|
424 | return on.apply(this, arguments);
|
425 | };
|
426 |
|
427 | module.exports = PubSubClient;
|
428 |
|
429 |
|
430 |
|
431 |
|
432 |
|
433 | function _isPreproduction() {
|
434 | return String(process.env.NODE_ACS_URL).includes('.appctest.com')
|
435 | || String(process.env.NODE_ACS_URL).includes('.axwaytest.net')
|
436 | || preproductionEnvironments.includes(process.env.NODE_ENV)
|
437 | || preproductionEnvironments.includes(process.env.APPC_ENV);
|
438 | }
|
439 |
|
440 |
|
441 |
|
442 |
|
443 |
|
444 |
|
445 |
|
446 | function _sanitize(obj, seen) {
|
447 | if (!obj || typeof obj !== 'object') {
|
448 | return obj;
|
449 | }
|
450 | if (obj instanceof RegExp) {
|
451 | return obj.source;
|
452 | }
|
453 | if (obj instanceof Date) {
|
454 | return obj;
|
455 | }
|
456 | Object.keys(obj).forEach(function (key) {
|
457 | var value = obj[key],
|
458 | t = typeof value;
|
459 | if (t === 'function') {
|
460 | delete obj[key];
|
461 | } else if (/^(password|creditcard)/.test(key)) {
|
462 |
|
463 | obj[key] = '[HIDDEN]';
|
464 | } else if (value instanceof Date) {
|
465 |
|
466 | } else if (value instanceof RegExp) {
|
467 | obj[key] = value.source;
|
468 | } else if (value && t === 'object') {
|
469 | if (seen.includes(value)) {
|
470 | value = '[Circular]';
|
471 | } else {
|
472 | seen.push(value);
|
473 | value = _sanitize(value, seen);
|
474 | }
|
475 | obj[key] = value;
|
476 | }
|
477 | });
|
478 | return obj;
|
479 | }
|