UNPKG

2.81 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var deviceFormatter = require('./api_formats/siren/device.siren');
3
4var PubSub = module.exports = function() {
5 this.emitter = new EventEmitter();
6 this._listeners = {};
7};
8
9PubSub.prototype.publish = function(topic, data) {
10 var x = decodeURIComponent(topic);
11 this.emitter.emit(x, data);
12};
13
14PubSub.prototype.subscribe = function(topic, callback) {
15 var f = null;
16 if (typeof callback === 'function') {
17 f = this._onCallback(topic, callback);
18 this.emitter.on(topic, f);
19 } else if (typeof callback === 'object') {
20 f = this._onResponse(topic, callback);
21 this.emitter.on(topic, f);
22 } else {
23 return;
24 }
25
26 if (!this._listeners[topic]) {
27 this._listeners[topic] = [];
28 }
29
30 this._listeners[topic].push({ listener: callback, actual: f });
31};
32
33PubSub.prototype.unsubscribe = function(topic, listener) {
34 if (!this._listeners[topic]) {
35 return;
36 }
37 var found = -1;
38 this._listeners[topic].some(function(l, idx) {
39 if (l.listener === listener) {
40 found = idx;
41 return true;
42 }
43 });
44
45 if (found === -1) {
46 return;
47 }
48
49 if (typeof listener === 'object') {
50 listener.response.end(); // end response for push request
51 }
52
53 this.emitter.removeListener(topic, this._listeners[topic][found].actual);
54 this._listeners[topic].splice(found, 1);
55
56 if (this._listeners[topic].length === 0) {
57 delete this._listeners[topic];
58 }
59
60};
61
62PubSub.prototype._onCallback = function(topic, cb) {
63 var self = this;
64 return function(data, options) {
65 cb(topic, data);
66 };
67};
68
69PubSub.prototype._onResponse = function(topic, env) {
70 var self = this;
71 return function(data) {
72 var encoding = '';
73 if(Buffer.isBuffer(data)) {
74 encoding = 'buffer';
75 } else if (data.query && data.device) {
76 var serverId = env.route.params.serverId;
77 var loader = { path: '/servers/' + encodeURI(serverId) };
78 data = deviceFormatter({ loader: loader, env: env, model: data.device });
79 data = new Buffer(JSON.stringify(data));
80 } else if (typeof data == 'object') {
81 encoding = 'json';
82 try {
83 data = new Buffer(JSON.stringify(data));
84 } catch (err) {
85 console.error(err, err.stack);
86 return;
87 }
88 } else {
89 console.error('PubSub._onResponse encoding not set.');
90 }
91
92 var stream = env.response.push('/' + topic, { 'Host': 'fog.argo.cx',
93 'Content-Length': data.length,
94 'X-Event-Encoding': encoding
95 });
96
97 stream.on('error', function(err) {
98 if (err.code === 'RST_STREAM' && err.status === 3) {
99 stream.end();
100 } else {
101 console.error('PubSub._onCallback', err);
102 }
103 });
104
105 stream.end(data);
106 };
107};
108