UNPKG

10.2 kBJavaScriptView Raw
1var JSCompiler = require('caql-js-compiler');
2var querytopic = require('./query_topic');
3var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
4
5function eventIsSpecial(topic) {
6 var SPECIAL = [
7 /^_peer\/.+$/,
8 /^query:.+$/,
9 /^query\/.+$/,
10 /^logs$/
11 ];
12 return SPECIAL.some(function(regExp) {
13 return regExp.exec(topic);
14 });
15}
16
17var EventBroker = module.exports = function(zetta) {
18 this.zetta = zetta;
19
20 this.peers = {};
21 this.clients = [];
22
23 // List of subscriptions for a peer that has not yet been connected
24 this._peerSubscriptions = {}; // { <serverName>: [] }
25
26 // Hash of all current subscribed device queries and their observables.
27 this._deviceQueries = {}; // { <queryString>: Observable }
28
29 this._queryCache = {};
30};
31
32EventBroker.prototype.peer = function(peer) {
33 var self = this;
34 this.peers[peer.name] = peer;
35
36 // No awaiting subscriptions for that peer
37 if (this._peerSubscriptions[peer.name] === undefined) {
38 return;
39 }
40 // subscribe to topics for that peer
41 this._peerSubscriptions[peer.name].forEach(function(topic) {
42 self._subscribeToPeer(peer.name, topic);
43 });
44
45 delete this._peerSubscriptions[peer.name];
46};
47
48EventBroker.prototype.client = function(client) {
49 var self = this;
50 var c = this.clients.filter(function(cl) {
51 if (client.query.length !== cl.query.length) {
52 return null;
53 }
54
55 var stillValid = true;
56 for (var i = 0; i < client.query.length; i++) {
57 if (!cl.query[i]) {
58 stillValid = false;
59 break;
60 }
61
62 var clq = querytopic.parse(cl.query[i].topic);
63 var clientq = querytopic.parse(client.query[i].topic);
64
65 if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
66 stillValid = false;
67 }
68 }
69
70 return stillValid && client.ws === cl.ws;
71 });
72
73 if (c.length > 0) {
74 return;
75 }
76
77 if (client.streamEnabled) {
78 this._streamEnabledClient(client);
79 return;
80 }
81
82 // query: { name: <serverName>, topic: <pubsub topic>}
83 client.query.forEach(function(query) {
84 self._subscribe(client, query, self._publishNonStreamEnabledClient.bind(self));
85 });
86};
87
88EventBroker.prototype._subscribe = function(client, query, sendMethod) {
89 var self = this;
90 var subscriptionTopic = query.topic;
91 var isRemote = false;
92 if (query.name && query.name !== self.zetta.id) {
93 isRemote = true;
94 // subscribe through peer socket
95 self._subscribeToPeer(query.name, query.topic);
96 // Change subsciptions topic to append <serverName>/<topic> when it comes accross pubsub
97 subscriptionTopic = query.name + '/' + query.topic;
98 } else {
99 // If topic is device query setup an obserable
100 if (querytopic.isQuery(subscriptionTopic)) {
101 self.subscribeToDeviceQuery(subscriptionTopic);
102 }
103 }
104
105 var handler = function(topic, data, sourceTopic, fromRemote) {
106 sendMethod(client, query, topic, data, sourceTopic, fromRemote);
107 };
108 self.zetta.pubsub.subscribe(subscriptionTopic, handler);
109
110 var unsubscribe = function() {
111 self.zetta.pubsub.unsubscribe(subscriptionTopic, handler);
112
113 // If topic is a device query disposeof it.
114 if (self._deviceQueries[subscriptionTopic]) {
115 self._deviceQueries[subscriptionTopic].dispose();
116 delete self._deviceQueries[subscriptionTopic];
117 }
118
119 if (isRemote) {
120 // Use original query.topic to unsubscribe from peer
121 self._unsubscribeFromPeer(query.name, query.topic);
122 }
123 };
124
125 client.once('close', function() {
126 unsubscribe();
127 });
128
129 return unsubscribe;
130};
131
132EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
133 client.send(query.topic, data, function(err){
134 if (err) {
135 console.error('ws error: '+err);
136 }
137 });
138};
139
140EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
141 var newMsg = {};
142 newMsg.type = 'event';
143 newMsg.topic = sourceTopic;
144 newMsg.timestamp = data.timestamp;
145 newMsg.subscriptionId = query.subscriptionId;
146
147 if (data.data) {
148 newMsg.data = data.data;
149 } else {
150 // handle device and server /logs stream
151 newMsg.data = {};
152 var filtered = ['topic', 'timestamp'];
153 Object.keys(data)
154 .filter(function(key) { return filtered.indexOf(key) === -1; })
155 .forEach(function(key) {
156 newMsg.data[key] = data[key];
157 })
158 }
159 data = newMsg;
160
161 if (query.caql) {
162 var compiled = client._queryCache[query.caql];
163 var result = compiled.filterOne({ data: data.data });
164 if (result) {
165 data.data = result[Object.keys(result)[0]];
166 } else {
167 return;
168 }
169 }
170
171 query.count++;
172 if (typeof query.limit === 'number' && query.count > query.limit) {
173 client.emit('unsubscribe', query);
174 client._unsubscribe(query.subscriptionId)
175 return;
176 }
177
178 client.send(sourceTopic, data, function(err){
179 if (err) {
180 console.error('ws error: '+err);
181 }
182 });
183};
184
185EventBroker.prototype._streamEnabledClient = function(client) {
186 var self = this;
187
188 // Keep a list of unsubscribe functions to unsubscribe from pubsub
189 var unsubscriptions = {}; // { <subscriptionId>: [unsubscribe1, unsubscribe2, ...] }
190
191 client.on('subscribe', function(subscription) {
192
193 // Sendcache per subscription
194 var sendCache = [];
195 var sendCacheSize = 100;
196
197 unsubscriptions[subscription.subscriptionId] = [];
198
199 var query = {
200 name: subscription.topic.serverName(),
201 topic: subscription.topic.pubsubIdentifier(),
202 original: subscription.topic,
203 subscriptionId: subscription.subscriptionId,
204 limit: subscription.limit,
205 count: 0,
206 caql: subscription.topic.streamQuery
207 };
208 client.query.push(query);
209
210 var connectedPeers = [];
211 var subscribeToPeer = function(peerName) {
212 if(query.name instanceof RegExp && !query.name.exec(peerName)) {
213 return;
214 }
215
216 var copiedQuery = {};
217 Object.keys(query).forEach(function(key) {
218 copiedQuery[key] = query[key];
219 });
220
221 if(peerName) {
222 copiedQuery.name = peerName;
223 }
224
225 if(connectedPeers.indexOf(copiedQuery.name) === -1) {
226 connectedPeers.push(copiedQuery.name);
227
228 var unsubscribe = self._subscribe(client, copiedQuery, function(client, query, topic, data, sourceTopic, fromRemote) {
229
230 // Not a sepcial and topic like _peer/connect and the query is local.
231 if (!query.original.isSpecial && !eventIsSpecial(sourceTopic) && !fromRemote) {
232 // Add local serverName to topic for local pubsub because it's not on the actual topic
233 sourceTopic = self.zetta.id + '/' + sourceTopic;
234 }
235
236 // B/c everything goes through the local pubsub queies that have * for the serverName
237 // may match twice. one for the local query and one for each peer query
238 if (sendCache.indexOf(data) >= 0) {
239 return;
240 } else {
241 sendCache.push(data);
242 if (sendCache.length > sendCacheSize) {
243 sendCache.shift();
244 }
245 }
246
247 self._publishStreamEnabledClient(client, query, topic, data, sourceTopic);
248 });
249
250 unsubscriptions[subscription.subscriptionId].push(unsubscribe);
251 }
252 };
253
254 if(query.name instanceof RegExp || query.name === '*') {
255 var peerConnectSubscription = function(topic, data) {
256 // Only subscribe to peer acceptor direction for peers
257 if (data.peer.name) {
258 subscribeToPeer(data.peer.name);
259 }
260 };
261 self.zetta.pubsub.subscribe('_peer/connect', peerConnectSubscription);
262 // Unsubscribe to peer/connect after topic is unsubscribed from
263 unsubscriptions[subscription.subscriptionId].push(function() {
264 self.zetta.pubsub.unsubscribe('_peer/connect', peerConnectSubscription);
265 });
266
267 Object.keys(self.peers).forEach(subscribeToPeer);
268 subscribeToPeer(self.zetta._name);
269 } else {
270 subscribeToPeer();
271 }
272 });
273
274 client.on('unsubscribe', function(subscription) {
275 if (unsubscriptions[subscription.subscriptionId]) {
276 // Unsubscribe to all subscriptions
277 unsubscriptions[subscription.subscriptionId].forEach(function(unsubscribe) {
278 unsubscribe();
279 });
280 delete unsubscriptions[subscription.subscriptionId];
281 }
282 });
283
284 // Unsubscribe to all subscriptions if the client disconnects
285 client.on('close', function() {
286 Object.keys(unsubscriptions).forEach(function(subscriptionId) {
287 unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
288 unsubscribe();
289 });
290 delete unsubscriptions[subscriptionId];
291 })
292 });
293};
294
295
296// Subscribe to peer has been conneced. If peer is not connected keep a list of topics for
297// when it does connect.
298EventBroker.prototype._subscribeToPeer = function(peerName, topic) {
299 var peer = this.peers[peerName];
300 if (peer) {
301 peer.subscribe(topic);
302 } else {
303 if (!this._peerSubscriptions[peerName]) {
304 this._peerSubscriptions[peerName] = [];
305 }
306 this._peerSubscriptions[peerName].push(topic);
307 }
308};
309
310// Unsubscribe from peer if peer has been connected. If not remove topic
311// from list of topics.
312EventBroker.prototype._unsubscribeFromPeer = function(peerName, topic) {
313 var peer = this.peers[peerName];
314 if (peer) {
315 peer.unsubscribe(topic);
316 } else {
317 if (this._peerSubscriptions[peerName]) {
318 var idx = this._peerSubscriptions[peerName].indexOf(topic);
319 if (idx !== -1) {
320 this._peerSubscriptions[peerName].splice(idx, 1);
321 }
322 if (this._peerSubscriptions[peerName].length === 0) {
323 delete this._peerSubscriptions[peerName];
324 }
325 }
326 }
327};
328
329EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
330 if (this._deviceQueries[topic]) {
331 return;
332 }
333
334 var qt = querytopic.parse(topic);
335 var self = this;
336 var q = self.zetta.runtime.query().ql(qt.ql);
337 this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
338 setImmediate(function() {
339 self.zetta.pubsub.publish(topic, { query: topic, device: device });
340 });
341 });
342};