UNPKG

5.66 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
3var ObjectStream = require('zetta-streams').ObjectStream;
4var deviceFormatter = require('./api_formats/siren/device.siren');
5
6// TODO(adammagaluk): This is broken in the new spdy version.
7// Always returning null right now.
8// Cannot get the underlying socket because of `handle.js` in node-spdy.
9// See: Handle.prototype.assignClientRequest
10// We could try and use the spdy state and it's stream count.
11function socketFdFromEnv(env) {
12 // TODO(adammagaluk): Find better solution.
13 if (env.request && env.request.socket) {
14 return env.request.socket.remotePort;
15 } else {
16 return null;
17 }
18}
19
20var PubSub = module.exports = function() {
21 this.emitter = new EventEmitter();
22
23 // Keep from warning poping up
24 this.emitter.setMaxListeners(Infinity);
25
26 this._listeners = {};
27
28 // sendcache ensures only one event is sent to cloud connection for a topic subscription
29 // this can happen now because of wildcards and regexes in topics
30 this._sendCache = {}; // { <socketFd>: [event1, event2, ... ] }
31 this._maxCacheSize = 100; // keep list of last 100 events per peer connection
32};
33
34PubSub.prototype.publish = function(topic, data, fromRemote) {
35 fromRemote = !!(fromRemote);
36 var x = decodeURIComponent(topic);
37 this.emitter.emit(x, data, fromRemote);
38 this.emitter.emit('_data', x, data, fromRemote);
39};
40
41PubSub.prototype.subscribe = function(topic, callback) {
42 var self = this;
43 if (typeof topic === 'string') {
44 topic = StreamTopic.parse(topic);
45 }
46
47 var f = function(t, data, fromRemote) {
48 if (topic.match(t)) {
49 if (typeof callback === 'function') {
50 self._onCallback(topic, t, data, fromRemote, callback);
51 } else if (typeof callback === 'object') {
52 // Only send to peer if event did not come from a downstream peer
53 if (!fromRemote) {
54 self._onResponse(topic, t, data, fromRemote, callback);
55 }
56 }
57 }
58 };
59
60 this.emitter.on('_data', f);
61
62 if (!this._listeners[topic.hash()]) {
63 this._listeners[topic.hash()] = [];
64 }
65
66 this._listeners[topic.hash()].push({ listener: callback, actual: f });
67};
68
69PubSub.prototype.unsubscribe = function(topic, listener) {
70 if (typeof topic === 'string') {
71 topic = StreamTopic.parse(topic);
72 }
73
74 if (!this._listeners[topic.hash()]) {
75 return;
76 }
77
78 var found = -1;
79 this._listeners[topic.hash()].some(function(l, idx) {
80 if (l.listener === listener) {
81 found = idx;
82 return true;
83 }
84 });
85
86 if (found === -1) {
87 return;
88 }
89
90 if (typeof listener === 'object') {
91 var underlyingSocketFd = socketFdFromEnv(listener);
92 if (underlyingSocketFd !== null) {
93 delete this._sendCache[underlyingSocketFd];
94 }
95 listener.response.end(); // end response for push request
96 }
97
98 this.emitter.removeListener('_data', this._listeners[topic.hash()][found].actual);
99 this._listeners[topic.hash()].splice(found, 1);
100
101 if (this._listeners[topic.hash()].length === 0) {
102 delete this._listeners[topic.hash()];
103 }
104};
105
106PubSub.prototype._onCallback = function(topic, sourceTopic, data, fromRemote, cb) {
107 var self = this;
108 cb(topic, data, sourceTopic, fromRemote);
109};
110
111
112// topic: StreamTopic that was used to subscribe
113// sourceTopic: topic string emitted
114// data...
115// env: argo env for the subscription request
116PubSub.prototype._onResponse = function(topic, sourceTopic, data, fromRemote, env) {
117
118 // topic.pubsubIdentifier() -> Topic of the subscription
119 // sourceTopic -> source from the event.
120 var underlyingSocketFd = socketFdFromEnv(env);
121 // TODO(adammagaluk): underlyingSocketFd is sometimes null.
122 if (this._sendCache[underlyingSocketFd] === undefined) {
123 this._sendCache[underlyingSocketFd] = [];
124 }
125
126 if (this._sendCache[underlyingSocketFd].indexOf(data) >= 0) {
127 return;
128 } else {
129 this._sendCache[underlyingSocketFd].push(data);
130 if (this._sendCache[underlyingSocketFd].length > this._maxCacheSize) {
131 this._sendCache[underlyingSocketFd].shift();
132 }
133 }
134
135 var self = this;
136 var encoding = '';
137 if(Buffer.isBuffer(data)) {
138 encoding = 'application/octet-stream';
139 } else if (data.query && data.device) {
140 var serverId = env.route.params.serverId;
141 var loader = { path: '/servers/' + encodeURIComponent(serverId) };
142 data = deviceFormatter({ loader: loader, env: env, model: data.device });
143 encoding = 'application/json';
144 data = new Buffer(JSON.stringify(data));
145 } else if (typeof data == 'object') {
146 encoding = 'application/json';
147
148 // used for _peer/connect _peer/disconnect
149 if (sourceTopic.indexOf('_peer/') === 0 && typeof data.peer === 'object') {
150 data = ObjectStream.format(sourceTopic, data.peer.properties());
151 }
152
153 try {
154 data = new Buffer(JSON.stringify(data));
155 } catch (err) {
156 console.error(err, err.stack);
157 return;
158 }
159 } else {
160 console.error('PubSub._onResponse encoding not set.');
161 }
162
163 var pushOpts = {
164 // Headers must be withing request: now
165 request: {
166 'Host': encodeURIComponent(serverId) + '.unreachable.zettajs.io',
167 'Content-Length': data.length,
168 'Content-Type': encoding,
169 // TOOD: Added topic. This allows the server to check the topic vesus
170 // the streamurl.
171 'Topic': topic.pubsubIdentifier(),
172 }
173 };
174 var stream = env.response.push('/' + sourceTopic, pushOpts);
175
176 stream.on('error', function(err) {
177 if (err.code === 'RST_STREAM' && err.status === 3) {
178 stream.end();
179 } else {
180 console.error('PubSub._onCallback', err);
181 }
182 });
183
184 stream.end(data);
185};
186