1 | var EventEmitter = require('events').EventEmitter;
|
2 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
3 |
|
4 | var PubSub = module.exports = function() {
|
5 | this.emitter = new EventEmitter();
|
6 | this._listeners = {};
|
7 | };
|
8 |
|
9 | PubSub.prototype.publish = function(topic, data) {
|
10 | var x = decodeURIComponent(topic);
|
11 | this.emitter.emit(x, data);
|
12 | };
|
13 |
|
14 | PubSub.prototype.subscribe = function(topic, callback) {
|
15 | var f = null;
|
16 | if (typeof callback === 'function') {
|
17 | f = this._onCallback(topic, callback);
|
18 | this.emitter.on(topic, f);
|
19 | } else if (typeof callback === 'object') {
|
20 | f = this._onResponse(topic, callback);
|
21 | this.emitter.on(topic, f);
|
22 | } else {
|
23 | return;
|
24 | }
|
25 |
|
26 | if (!this._listeners[topic]) {
|
27 | this._listeners[topic] = [];
|
28 | }
|
29 |
|
30 | this._listeners[topic].push({ listener: callback, actual: f });
|
31 | };
|
32 |
|
33 | PubSub.prototype.unsubscribe = function(topic, listener) {
|
34 | if (!this._listeners[topic]) {
|
35 | return;
|
36 | }
|
37 | var found = -1;
|
38 | this._listeners[topic].some(function(l, idx) {
|
39 | if (l.listener === listener) {
|
40 | found = idx;
|
41 | return true;
|
42 | }
|
43 | });
|
44 |
|
45 | if (found === -1) {
|
46 | return;
|
47 | }
|
48 |
|
49 | if (typeof listener === 'object') {
|
50 | listener.response.end();
|
51 | }
|
52 |
|
53 | this.emitter.removeListener(topic, this._listeners[topic][found].actual);
|
54 | this._listeners[topic].splice(found, 1);
|
55 |
|
56 | if (this._listeners[topic].length === 0) {
|
57 | delete this._listeners[topic];
|
58 | }
|
59 |
|
60 | };
|
61 |
|
62 | PubSub.prototype._onCallback = function(topic, cb) {
|
63 | var self = this;
|
64 | return function(data, options) {
|
65 | cb(topic, data);
|
66 | };
|
67 | };
|
68 |
|
69 | PubSub.prototype._onResponse = function(topic, env) {
|
70 | var self = this;
|
71 | return function(data) {
|
72 | var encoding = '';
|
73 | if(Buffer.isBuffer(data)) {
|
74 | encoding = 'buffer';
|
75 | } else if (data.query && data.device) {
|
76 | var serverId = env.route.params.serverId;
|
77 | var loader = { path: '/servers/' + encodeURI(serverId) };
|
78 | data = deviceFormatter({ loader: loader, env: env, model: data.device });
|
79 | data = new Buffer(JSON.stringify(data));
|
80 | } else if (typeof data == 'object') {
|
81 | encoding = 'json';
|
82 | try {
|
83 | data = new Buffer(JSON.stringify(data));
|
84 | } catch (err) {
|
85 | console.error(err, err.stack);
|
86 | return;
|
87 | }
|
88 | } else {
|
89 | console.error('PubSub._onResponse encoding not set.');
|
90 | }
|
91 |
|
92 | var stream = env.response.push('/' + topic, { 'Host': 'fog.argo.cx',
|
93 | 'Content-Length': data.length,
|
94 | 'X-Event-Encoding': encoding
|
95 | });
|
96 |
|
97 | stream.on('error', function(err) {
|
98 | if (err.code === 'RST_STREAM' && err.status === 3) {
|
99 | stream.end();
|
100 | } else {
|
101 | console.error('PubSub._onCallback', err);
|
102 | }
|
103 | });
|
104 |
|
105 | stream.end(data);
|
106 | };
|
107 | };
|
108 |
|