1 | 'use strict';
|
2 |
|
3 | const basicAuth = require('basic-auth');
|
4 | const exec = require('child_process').exec;
|
5 | const url = require('url');
|
6 | const crypto = require('crypto');
|
7 | const util = require('util');
|
8 | const request = require('request');
|
9 | const EventEmitter = require('events').EventEmitter;
|
10 | const version = require('../package.json').version;
|
11 |
|
12 | let debug;
|
13 | let environments = {
|
14 | production: 'https://pubsub.platform.axway.com',
|
15 | preproduction: 'https://pubsub.axwaytest.net'
|
16 | };
|
17 | let fingerprint;
|
18 | let pendingChecks = [];
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 | function isPreproduction() {
|
25 | return process.env.NODE_ACS_URL
|
26 | && process.env.NODE_ACS_URL.includes('.appctest.com')
|
27 | || process.env.NODE_ENV === 'preproduction'
|
28 | || process.env.APPC_ENV === 'preproduction'
|
29 | || process.env.NODE_ENV === 'development'
|
30 | || process.env.APPC_ENV === 'development';
|
31 | }
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 | function sha1(value) {
|
39 | return crypto
|
40 | .createHash('sha1')
|
41 | .update(value)
|
42 | .digest('hex');
|
43 | }
|
44 |
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 | function getComputerFingerprint(callback, append) {
|
52 | if (fingerprint) {
|
53 | return callback && callback(null, fingerprint);
|
54 | }
|
55 |
|
56 | let cmd;
|
57 | switch (process.platform) {
|
58 | case 'darwin':
|
59 |
|
60 | cmd = 'ioreg -c IOPlatformExpertDevice -d 2 | awk -F\\" \'/IOPlatformSerialNumber|IOPlatformUUID/{ print $(NF-1) }\';';
|
61 | debug('running:', cmd);
|
62 | return exec(cmd, function (err, stdout) {
|
63 | if (err) {
|
64 | return callback(err);
|
65 | }
|
66 | fingerprint = sha1(stdout + process.pid);
|
67 | callback && callback(null, fingerprint);
|
68 | });
|
69 | case 'win32':
|
70 | case 'windows':
|
71 | cmd = 'reg query HKLM\\Software\\Microsoft\\Cryptography /v MachineGuid';
|
72 | if (append) {
|
73 | cmd += append;
|
74 | }
|
75 | debug('running:', cmd);
|
76 | return exec(cmd, function (err, stdout) {
|
77 | if (err && !append) {
|
78 | debug('trying again, forcing it to use 64bit registry view');
|
79 | return getComputerFingerprint(callback, ' /reg:64');
|
80 | } else if (err) {
|
81 | return callback(err);
|
82 | }
|
83 | let tokens = stdout.trim().split(/\s/),
|
84 | serial = tokens[tokens.length - 1];
|
85 | fingerprint = sha1(serial + process.pid);
|
86 | callback && callback(null, fingerprint);
|
87 | });
|
88 | case 'linux':
|
89 | cmd = 'ifconfig | grep eth0 | grep -i hwaddr | awk \'{print $1$5}\' | sed \'s/://g\' | xargs echo | sed \'s/ //g\'';
|
90 | debug('running:', cmd);
|
91 | return exec(cmd, function (err, stdout) {
|
92 | if (err) {
|
93 | return callback(err);
|
94 | }
|
95 | let serial = stdout.trim();
|
96 | fingerprint = sha1(serial + process.pid);
|
97 | callback && callback(null, fingerprint);
|
98 | });
|
99 | default:
|
100 | return callback(new Error('Unknown platform:' + process.platform));
|
101 | }
|
102 | }
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 |
|
110 | function PubSubClient(opts) {
|
111 | opts = opts || {};
|
112 |
|
113 |
|
114 | debug = function () {};
|
115 | opts.debug && (debug = function () {
|
116 | let args = Array.prototype.slice.call(arguments);
|
117 | args.unshift('appc:pubsub');
|
118 | console.log.apply(this, args);
|
119 | });
|
120 |
|
121 |
|
122 | let env = opts.env || (isPreproduction() ? 'preproduction' : 'production');
|
123 | this.url = opts.url || environments[env] || environments.production;
|
124 |
|
125 |
|
126 | this.disabled = opts.disabled;
|
127 | this.key = opts.key;
|
128 | this.secret = opts.secret;
|
129 |
|
130 | if (this.disabled) {
|
131 | return;
|
132 | }
|
133 | if (!this.key) {
|
134 | throw new Error('missing key');
|
135 | }
|
136 | if (!this.secret) {
|
137 | throw new Error('missing secret');
|
138 | }
|
139 |
|
140 | this.timeout = opts.timeout || 10000;
|
141 |
|
142 | getComputerFingerprint();
|
143 | this.fetchConfig();
|
144 |
|
145 | this.authenticateWebhook = this.authenticateWebhook.bind(this);
|
146 | this.handleWebhook = this.handleWebhook.bind(this);
|
147 | }
|
148 |
|
149 | util.inherits(PubSubClient, EventEmitter);
|
150 |
|
151 |
|
152 |
|
153 |
|
154 |
|
155 | PubSubClient.prototype.fetchConfig = function () {
|
156 |
|
157 | let data = {},
|
158 | opts = {
|
159 | url: url.resolve(this.url, '/api/client/config'),
|
160 | method: 'get',
|
161 | json: data,
|
162 | headers: {
|
163 | 'User-Agent': 'Appcelerator PubSub Client/' + version + ' (' + fingerprint + ')',
|
164 | APIKey: this.key,
|
165 | APISig: crypto.createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
|
166 | },
|
167 | gzip: true,
|
168 | timeout: this.timeout,
|
169 | followAllRedirects: true,
|
170 | rejectUnauthorized: this.url.includes(environments.production.split('.').slice(-2).join('.'))
|
171 | };
|
172 | debug('fetching client config');
|
173 | this.config = {};
|
174 | request(opts, function (err, resp, body) {
|
175 | if (err) {
|
176 | return debug('error', err);
|
177 | }
|
178 | let data = body && body[body.key];
|
179 |
|
180 | if (!data || resp.statusCode !== 200) {
|
181 | let err = new Error('invalid response');
|
182 | err.code = resp.statusCode;
|
183 |
|
184 | if (resp.statusCode === 401) {
|
185 | err.message = 'Unauthorized';
|
186 | this.emit('unauthorized', err, opts);
|
187 | }
|
188 | return debug('error', err, resp.statusCode, resp.body);
|
189 | }
|
190 |
|
191 | this._parseConfig(data);
|
192 |
|
193 | this.on('configured', () => pendingChecks.forEach(this._validateTopic.bind(this)));
|
194 | this.emit('configured', this.config);
|
195 | debug('got config', this.config);
|
196 | }.bind(this));
|
197 | };
|
198 |
|
199 | PubSubClient.prototype._parseConfig = function (data) {
|
200 | if (data.can_consume) {
|
201 |
|
202 | data.topics = Object.keys(data.events || {});
|
203 |
|
204 |
|
205 | if (data.auth_type === 'basic' && data.url) {
|
206 | let details = (url.parse(data.url) || '').auth.split(':');
|
207 | data.auth_user = details[0];
|
208 | data.auth_pass = details[1];
|
209 | }
|
210 | }
|
211 | this.config = data;
|
212 | };
|
213 |
|
214 |
|
215 |
|
216 |
|
217 |
|
218 |
|
219 |
|
220 |
|
221 | PubSubClient.prototype.authenticateWebhook = function (req, res, next) {
|
222 | if (req._authenticatedWebhook) {
|
223 | next && next();
|
224 | return true;
|
225 | }
|
226 |
|
227 | if (!this.config.can_consume) {
|
228 | res && res.writeHead(400, { 'Content-Type': 'application/json' });
|
229 | res && res.end(JSON.stringify({
|
230 | success: false,
|
231 | message: 'This client does not have consumption enabled.'
|
232 | }));
|
233 | return false;
|
234 | }
|
235 | debug('authenticating webhook using: method =', this.config.auth_type);
|
236 |
|
237 | let conf = this.config,
|
238 | headers = req && req.headers || {},
|
239 | user = basicAuth(req),
|
240 |
|
241 | authenticated
|
242 |
|
243 | = conf.auth_type === 'basic'
|
244 | ? user.name === conf.auth_user && user.pass === conf.auth_pass
|
245 |
|
246 | : conf.auth_type === 'token'
|
247 | ? headers['x-auth-token'] === this.config.auth_token
|
248 |
|
249 | : conf.auth_type === 'key_secret'
|
250 | ? headers['x-signature'] === crypto.createHmac('SHA256', this.secret).update(JSON.stringify(req.body)).digest('hex')
|
251 |
|
252 | : true;
|
253 |
|
254 |
|
255 | if (!authenticated) {
|
256 | debug('webhook authentication failed', headers);
|
257 | res && res.writeHead(401, { 'Content-Type': 'application/json' });
|
258 | res && res.end(JSON.stringify({
|
259 | success: false,
|
260 | message: 'Unauthorized'
|
261 | }));
|
262 | return false;
|
263 | }
|
264 | req._authenticatedWebhook = true;
|
265 | next && next();
|
266 | return true;
|
267 | };
|
268 |
|
269 |
|
270 |
|
271 |
|
272 |
|
273 |
|
274 | PubSubClient.prototype.handleWebhook = function (req, res) {
|
275 |
|
276 | if (!this.authenticateWebhook(req, res)) {
|
277 | return;
|
278 | }
|
279 |
|
280 | let topic = req.body.topic;
|
281 | debug('event received', topic, req.body);
|
282 |
|
283 |
|
284 | if (this.getSubscribedTopic(topic)) {
|
285 | debug('emitting event:' + topic);
|
286 | this.emit('event:' + topic, req.body);
|
287 | }
|
288 |
|
289 | res.writeHead(200, { 'Content-Type': 'application/json' });
|
290 | res.end(JSON.stringify({ success: true }));
|
291 | };
|
292 |
|
293 |
|
294 |
|
295 |
|
296 |
|
297 |
|
298 |
|
299 | PubSubClient.prototype.getSubscribedTopic = function (name, topics) {
|
300 |
|
301 | name = name.replace('event:', '');
|
302 |
|
303 | let validTopics = [ 'configured', 'unauthorized' ].concat(topics || this.config.topics || []);
|
304 | return validTopics.find(topic => {
|
305 |
|
306 | if (topic === name) {
|
307 | return topic;
|
308 | }
|
309 |
|
310 | if (!topic.includes('*')) {
|
311 | return null;
|
312 | }
|
313 | let eventSegments = name.split('.');
|
314 | let topicSegments = topic.split('.');
|
315 |
|
316 | if (!topic.includes('**') && eventSegments.length !== topicSegments.length) {
|
317 | return null;
|
318 | }
|
319 |
|
320 | return topicSegments.reduce(function (m, segment, i) {
|
321 | return m && (
|
322 |
|
323 | segment === eventSegments[i]
|
324 |
|
325 | || segment === '*'
|
326 |
|
327 | || (segment === '**' && i === topicSegments.length - 1)
|
328 | );
|
329 | }, true);
|
330 | }) || null;
|
331 | };
|
332 |
|
333 |
|
334 |
|
335 |
|
336 |
|
337 |
|
338 | PubSubClient.prototype.getFingerprint = function (callback) {
|
339 | if (fingerprint && callback) {
|
340 | return callback(null, fingerprint);
|
341 | } else if (fingerprint) {
|
342 | return fingerprint;
|
343 | } else if (callback) {
|
344 | return getComputerFingerprint(callback);
|
345 | }
|
346 | throw new Error('fingerprint has not yet been generated. invoke this function with a callback');
|
347 | };
|
348 |
|
349 | function serialize(obj, seen) {
|
350 | if (!obj || typeof(obj) !== 'object') {
|
351 | return obj;
|
352 | }
|
353 | if (obj instanceof RegExp) {
|
354 | return obj.source;
|
355 | }
|
356 | if (obj instanceof Date) {
|
357 | return obj;
|
358 | }
|
359 | Object.keys(obj).forEach(function (key) {
|
360 | var value = obj[key],
|
361 | t = typeof(value);
|
362 | if (t === 'function') {
|
363 | delete obj[key];
|
364 | } else if (/^(password|creditcard)/.test(key)) {
|
365 |
|
366 | obj[key] = '[HIDDEN]';
|
367 | } else if (value instanceof Date) {
|
368 |
|
369 | } else if (value instanceof RegExp) {
|
370 | obj[key] = value.source;
|
371 | } else if (t === 'object') {
|
372 | if (seen.includes(value)) {
|
373 | value = '[Circular]';
|
374 | } else {
|
375 | seen.push(value);
|
376 | value = serialize(value, seen);
|
377 | }
|
378 | obj[key] = value;
|
379 | }
|
380 | });
|
381 | return obj;
|
382 | }
|
383 |
|
384 |
|
385 |
|
386 |
|
387 |
|
388 |
|
389 |
|
390 |
|
391 | PubSubClient.prototype.publish = function (name, data, options) {
|
392 | if (this.disabled) {
|
393 | return;
|
394 | }
|
395 | debug('publish', name);
|
396 | if (!name) {
|
397 | throw new Error('required event name');
|
398 | }
|
399 | if (Buffer.byteLength(name) > 255) {
|
400 | throw new Error('name length must be less than 255 bytes');
|
401 | }
|
402 | if (data && typeof(data) !== 'object') {
|
403 | throw new Error('data must be an object');
|
404 | }
|
405 |
|
406 | try {
|
407 | data = JSON.parse(JSON.stringify(data || {}));
|
408 | } catch (e) {
|
409 | throw new Error('data could not be cloned');
|
410 | }
|
411 | this._send({
|
412 | event: name,
|
413 | data: serialize(data, []),
|
414 | options: options
|
415 | });
|
416 | };
|
417 |
|
418 |
|
419 |
|
420 |
|
421 |
|
422 |
|
423 |
|
424 |
|
425 |
|
426 | PubSubClient.prototype._retry = function (data, reason, opts) {
|
427 | debug('retry called', reason, opts);
|
428 | if (this.disabled) {
|
429 | debug('retry ignored, disabled:', !!this.disabled, new Error().stack);
|
430 | return;
|
431 | }
|
432 |
|
433 | setTimeout(() => this._send(data), Math.max(500, this.retry * 500));
|
434 | this.emit('retry', reason, opts, this.retry);
|
435 | };
|
436 |
|
437 |
|
438 |
|
439 |
|
440 |
|
441 |
|
442 |
|
443 |
|
444 | PubSubClient.prototype._send = function (data) {
|
445 | if (this.disabled) {
|
446 | return false;
|
447 | }
|
448 |
|
449 | debug('_send', data);
|
450 | let self = this;
|
451 | if (!fingerprint) {
|
452 |
|
453 | getComputerFingerprint(() => this._send.apply(this, arguments));
|
454 | return false;
|
455 | }
|
456 |
|
457 | this.retry = (this.retry || 0) + 1;
|
458 |
|
459 | if (!data) {
|
460 | return;
|
461 | }
|
462 |
|
463 | this._sending = true;
|
464 | let ticket = Date.now();
|
465 | this._sendingTS = ticket;
|
466 |
|
467 | let opts = {
|
468 | url: url.resolve(this.url, '/api/event'),
|
469 | method: 'post',
|
470 | json: data,
|
471 | headers: {
|
472 | 'User-Agent': 'Appcelerator PubSub Client/' + version + ' (' + fingerprint + ')',
|
473 | APIKey: this.key,
|
474 | APISig: crypto.createHmac('SHA256', this.secret).update(JSON.stringify(data)).digest('base64')
|
475 | },
|
476 | gzip: true,
|
477 | timeout: this.timeout,
|
478 | followAllRedirects: true,
|
479 | rejectUnauthorized: this.url.includes(environments.production.split('.').slice(-2).join('.'))
|
480 | };
|
481 |
|
482 | try {
|
483 | debug('sending web event', opts);
|
484 | let req = request(opts);
|
485 |
|
486 | req.on('response', function (resp) {
|
487 |
|
488 |
|
489 | if (ticket === self._sendingTS) {
|
490 | self._sending = false;
|
491 | }
|
492 | debug('received web response', resp && resp.statusCode);
|
493 | if (resp && resp.statusCode !== 200) {
|
494 |
|
495 | if (resp.statusCode && !(/^(400|401)$/).test(resp.statusCode)) {
|
496 | return self._retry(data, resp.statusCode);
|
497 | }
|
498 | let err = new Error('invalid response');
|
499 | err.code = resp.statusCode;
|
500 |
|
501 | if (resp.statusCode === 401) {
|
502 | err.message = 'Unauthorized';
|
503 | self.emit('unauthorized', err, opts);
|
504 | }
|
505 | debug('error', err, resp.statusCode, resp.body);
|
506 | } else if (resp) {
|
507 |
|
508 | self.retry = 0;
|
509 |
|
510 | self.emit('response', resp, opts);
|
511 | debug('response received, status:', resp.statusCode, 'opts:', opts);
|
512 | }
|
513 | });
|
514 |
|
515 | req.on('error', function (err) {
|
516 | debug('web request received error', err, opts);
|
517 |
|
518 |
|
519 | if (ticket === self._sendingTS) {
|
520 | self._sending = false;
|
521 | }
|
522 | return self._retry(data, err.code, opts);
|
523 | });
|
524 | } catch (E) {
|
525 | debug('web request received error', E, opts);
|
526 | self._retry(data, E.code, opts);
|
527 | }
|
528 | };
|
529 |
|
530 | let on = PubSubClient.prototype.on;
|
531 | PubSubClient.prototype.on = function (name) {
|
532 | debug('on', name);
|
533 |
|
534 |
|
535 | if (!this.configued) {
|
536 | pendingChecks.push(name);
|
537 | } else {
|
538 | this._validateTopic(name);
|
539 | }
|
540 | return on.apply(this, arguments);
|
541 | };
|
542 |
|
543 | PubSubClient.prototype._validateTopic = function (name) {
|
544 | if (!this.getSubscribedTopic(name)) {
|
545 | debug('Unexpected event', name, ': client not configured to receive this event');
|
546 | }
|
547 | };
|
548 |
|
549 |
|
550 |
|
551 |
|
552 | PubSubClient.prototype.close = function () {
|
553 | debug('pubsub.close function has been deprecated. Websocket connections are no longer supported.');
|
554 | };
|
555 |
|
556 | module.exports = PubSubClient;
|