1 | var EventEmitter = require('events').EventEmitter;
|
2 | var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
|
3 | var ObjectStream = require('zetta-streams').ObjectStream;
|
4 | var deviceFormatter = require('./api_formats/siren/device.siren');
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | function socketFdFromEnv(env) {
|
12 |
|
13 | if (env.request && env.request.socket) {
|
14 | return env.request.socket.remotePort;
|
15 | } else {
|
16 | return null;
|
17 | }
|
18 | }
|
19 |
|
20 | var PubSub = module.exports = function() {
|
21 | this.emitter = new EventEmitter();
|
22 |
|
23 |
|
24 | this.emitter.setMaxListeners(Infinity);
|
25 |
|
26 | this._listeners = {};
|
27 |
|
28 |
|
29 |
|
30 | this._sendCache = {};
|
31 | this._maxCacheSize = 100;
|
32 | };
|
33 |
|
34 | PubSub.prototype.publish = function(topic, data, fromRemote) {
|
35 | fromRemote = !!(fromRemote);
|
36 | var x = decodeURIComponent(topic);
|
37 | this.emitter.emit(x, data, fromRemote);
|
38 | this.emitter.emit('_data', x, data, fromRemote);
|
39 | };
|
40 |
|
41 | PubSub.prototype.subscribe = function(topic, callback) {
|
42 | var self = this;
|
43 | if (typeof topic === 'string') {
|
44 | topic = StreamTopic.parse(topic);
|
45 | }
|
46 |
|
47 | var f = function(t, data, fromRemote) {
|
48 | if (topic.match(t)) {
|
49 | if (typeof callback === 'function') {
|
50 | self._onCallback(topic, t, data, fromRemote, callback);
|
51 | } else if (typeof callback === 'object') {
|
52 |
|
53 | if (!fromRemote) {
|
54 | self._onResponse(topic, t, data, fromRemote, callback);
|
55 | }
|
56 | }
|
57 | }
|
58 | };
|
59 |
|
60 | this.emitter.on('_data', f);
|
61 |
|
62 | if (!this._listeners[topic.hash()]) {
|
63 | this._listeners[topic.hash()] = [];
|
64 | }
|
65 |
|
66 | this._listeners[topic.hash()].push({ listener: callback, actual: f });
|
67 | };
|
68 |
|
69 | PubSub.prototype.unsubscribe = function(topic, listener) {
|
70 | if (typeof topic === 'string') {
|
71 | topic = StreamTopic.parse(topic);
|
72 | }
|
73 |
|
74 | if (!this._listeners[topic.hash()]) {
|
75 | return;
|
76 | }
|
77 |
|
78 | var found = -1;
|
79 | this._listeners[topic.hash()].some(function(l, idx) {
|
80 | if (l.listener === listener) {
|
81 | found = idx;
|
82 | return true;
|
83 | }
|
84 | });
|
85 |
|
86 | if (found === -1) {
|
87 | return;
|
88 | }
|
89 |
|
90 | if (typeof listener === 'object') {
|
91 | var underlyingSocketFd = socketFdFromEnv(listener);
|
92 | if (underlyingSocketFd !== null) {
|
93 | delete this._sendCache[underlyingSocketFd];
|
94 | }
|
95 | listener.response.end();
|
96 | }
|
97 |
|
98 | this.emitter.removeListener('_data', this._listeners[topic.hash()][found].actual);
|
99 | this._listeners[topic.hash()].splice(found, 1);
|
100 |
|
101 | if (this._listeners[topic.hash()].length === 0) {
|
102 | delete this._listeners[topic.hash()];
|
103 | }
|
104 | };
|
105 |
|
106 | PubSub.prototype._onCallback = function(topic, sourceTopic, data, fromRemote, cb) {
|
107 | var self = this;
|
108 | cb(topic, data, sourceTopic, fromRemote);
|
109 | };
|
110 |
|
111 |
|
112 |
|
113 |
|
114 |
|
115 |
|
116 | PubSub.prototype._onResponse = function(topic, sourceTopic, data, fromRemote, env) {
|
117 |
|
118 |
|
119 |
|
120 | var underlyingSocketFd = socketFdFromEnv(env);
|
121 |
|
122 | if (this._sendCache[underlyingSocketFd] === undefined) {
|
123 | this._sendCache[underlyingSocketFd] = [];
|
124 | }
|
125 |
|
126 | if (this._sendCache[underlyingSocketFd].indexOf(data) >= 0) {
|
127 | return;
|
128 | } else {
|
129 | this._sendCache[underlyingSocketFd].push(data);
|
130 | if (this._sendCache[underlyingSocketFd].length > this._maxCacheSize) {
|
131 | this._sendCache[underlyingSocketFd].shift();
|
132 | }
|
133 | }
|
134 |
|
135 | var self = this;
|
136 | var encoding = '';
|
137 | if(Buffer.isBuffer(data)) {
|
138 | encoding = 'application/octet-stream';
|
139 | } else if (data.query && data.device) {
|
140 | var serverId = env.route.params.serverId;
|
141 | var loader = { path: '/servers/' + encodeURIComponent(serverId) };
|
142 | data = deviceFormatter({ loader: loader, env: env, model: data.device });
|
143 | encoding = 'application/json';
|
144 | data = new Buffer(JSON.stringify(data));
|
145 | } else if (typeof data == 'object') {
|
146 | encoding = 'application/json';
|
147 |
|
148 |
|
149 | if (sourceTopic.indexOf('_peer/') === 0 && typeof data.peer === 'object') {
|
150 | data = ObjectStream.format(sourceTopic, data.peer.properties());
|
151 | }
|
152 |
|
153 | try {
|
154 | data = new Buffer(JSON.stringify(data));
|
155 | } catch (err) {
|
156 | console.error(err, err.stack);
|
157 | return;
|
158 | }
|
159 | } else {
|
160 | console.error('PubSub._onResponse encoding not set.');
|
161 | }
|
162 |
|
163 | var pushOpts = {
|
164 |
|
165 | request: {
|
166 | 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io',
|
167 | 'Content-Length': data.length,
|
168 | 'Content-Type': encoding,
|
169 |
|
170 |
|
171 | 'Topic': topic.pubsubIdentifier(),
|
172 | }
|
173 | };
|
174 | var stream = env.response.push('/' + sourceTopic, pushOpts);
|
175 |
|
176 | stream.on('error', function(err) {
|
177 | if (err.code === 'RST_STREAM' && err.status === 3) {
|
178 | stream.end();
|
179 | } else {
|
180 | console.error('PubSub._onCallback', err);
|
181 | }
|
182 | });
|
183 |
|
184 | stream.end(data);
|
185 | };
|
186 |
|