UNPKG

11.3 kBJavaScriptView Raw
1var JSCompiler = require('caql-js-compiler');
2var querytopic = require('./query_topic');
3var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
4
5function eventIsSpecial(topic) {
6 var SPECIAL = [
7 /^_peer\/.+$/,
8 /^query:.+$/,
9 /^query\/.+$/,
10 /^logs$/
11 ];
12 return SPECIAL.some(function(regExp) {
13 return regExp.exec(topic);
14 });
15}
16
17var EventBroker = module.exports = function(zetta) {
18 this.zetta = zetta;
19
20 this.peers = {};
21 this.clients = [];
22
23 // List of subscriptions for a peer that has not yet been connected
24 this._peerSubscriptions = {}; // { <serverName>: [] }
25
26 // Hash of all current subscribed device queries and their observables.
27 this._deviceQueries = {}; // { <queryString>: Observable }
28
29 this._queryCache = {};
30};
31
32EventBroker.prototype.peer = function(peer) {
33 var self = this;
34 this.peers[peer.name] = peer;
35
36 // No awaiting subscriptions for that peer
37 if (this._peerSubscriptions[peer.name] === undefined) {
38 return;
39 }
40 // subscribe to topics for that peer
41 this._peerSubscriptions[peer.name].forEach(function(topic) {
42 self._subscribeToPeer(peer.name, topic);
43 });
44
45 delete this._peerSubscriptions[peer.name];
46};
47
48EventBroker.prototype.client = function(client) {
49 var self = this;
50 var c = this.clients.filter(function(cl) {
51 if (client.query.length !== cl.query.length) {
52 return null;
53 }
54
55 var stillValid = true;
56 for (var i = 0; i < client.query.length; i++) {
57 if (!cl.query[i]) {
58 stillValid = false;
59 break;
60 }
61
62 var clq = querytopic.parse(cl.query[i].topic);
63 var clientq = querytopic.parse(client.query[i].topic);
64
65 if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
66 stillValid = false;
67 }
68 }
69
70 return stillValid && client.ws === cl.ws;
71 });
72
73 if (c.length > 0) {
74 return;
75 }
76
77 if (client.streamEnabled) {
78 this._streamEnabledClient(client);
79 return;
80 }
81
82 // query: { name: <serverName>, topic: <pubsub topic>}
83 client.query.forEach(function(query) {
84 self._subscribe(client, query, self._publishNonStreamEnabledClient.bind(self));
85 });
86};
87
88EventBroker.prototype._subscribe = function(client, query, sendMethod) {
89 var self = this;
90 var subscriptionTopic = query.topic;
91 var isRemote = false;
92 if (query.name && query.name !== self.zetta.id) {
93 isRemote = true;
94 // subscribe through peer socket
95 self._subscribeToPeer(query.name, query.topic);
96 // Change subsciptions topic to append <serverName>/<topic> when it comes accross pubsub
97 subscriptionTopic = query.name + '/' + query.topic;
98 } else {
99 // If topic is device query setup an obserable
100 if (querytopic.isQuery(subscriptionTopic)) {
101 self.subscribeToDeviceQuery(subscriptionTopic);
102 }
103 }
104
105 var handler = function(topic, data, sourceTopic, fromRemote) {
106 sendMethod(client, query, topic, data, sourceTopic, fromRemote);
107 };
108 self.zetta.pubsub.subscribe(subscriptionTopic, handler);
109
110 var unsubscribe = function() {
111 self.zetta.pubsub.unsubscribe(subscriptionTopic, handler);
112
113 // If topic is a device query disposeof it.
114 if (self._deviceQueries[subscriptionTopic]) {
115 self._deviceQueries[subscriptionTopic].dispose();
116 delete self._deviceQueries[subscriptionTopic];
117 }
118
119 if (isRemote) {
120 // Use original query.topic to unsubscribe from peer
121 self._unsubscribeFromPeer(query.name, query.topic);
122 }
123 };
124
125 client.once('close', function() {
126 unsubscribe();
127 });
128
129 return unsubscribe;
130};
131
132EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
133 client.send(query.topic, data, function(err){
134 if (err) {
135 console.error('ws error: '+err);
136 }
137 });
138};
139
140EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
141
142 var origData = data;
143
144 var newMsg = {};
145 newMsg.type = 'event';
146 newMsg.topic = sourceTopic;
147 newMsg.timestamp = data.timestamp || new Date().getTime();
148 newMsg.subscriptionId = query.subscriptionId;
149
150 // check if topic is a device query, rewrite sent topic as the original topic
151 var qt = querytopic.parse(query.original.pubsubIdentifier());
152 if (qt) {
153 newMsg.topic = query.original.hash();
154 }
155
156 if (data.data !== undefined) {
157 newMsg.data = data.data;
158 } else {
159 // handle device and server /logs stream
160 newMsg.data = {};
161 var filtered = ['topic', 'timestamp'];
162 Object.keys(data)
163 .filter(function(key) { return filtered.indexOf(key) === -1; })
164 .forEach(function(key) {
165 newMsg.data[key] = data[key];
166 })
167 }
168 data = newMsg;
169
170 if (query.caql) {
171 var compiled = client._queryCache[query.caql];
172 var result = compiled.filterOne({ data: data.data });
173 if (result) {
174 data.data = result[Object.keys(result)[0]];
175 } else {
176 return;
177 }
178 }
179
180 query.count++;
181 if (typeof query.limit === 'number' && query.count > query.limit) {
182 client.emit('unsubscribe', query);
183 client._unsubscribe(query.subscriptionId)
184 return;
185 }
186 if (client.filterMultiple) {
187 // If query has caql statement don't filter
188 if (query.caql !== null) {
189 data.subscriptionId = [data.subscriptionId];
190 } else {
191 var found = client.hasBeenSent(origData);
192 if (found) {
193 return;
194 }
195
196 var subscriptionsIds = [];
197 client._subscriptions.forEach(function(subscription) {
198 // Only provide id if topic matches and topic doesn't have a caql statement
199 if (subscription.topic.match(sourceTopic) && subscription.topic.streamQuery === null) {
200 subscriptionsIds.push(subscription.subscriptionId);
201 }
202 });
203
204 data.subscriptionId = subscriptionsIds;
205 }
206 }
207
208 client.send(sourceTopic, data, function(err){
209 if (err) {
210 console.error('ws error: '+err);
211 }
212 });
213};
214
215EventBroker.prototype._streamEnabledClient = function(client) {
216 var self = this;
217
218 // Keep a list of unsubscribe functions to unsubscribe from pubsub
219 var unsubscriptions = {}; // { <subscriptionId>: [unsubscribe1, unsubscribe2, ...] }
220
221 client.on('subscribe', function(subscription) {
222
223 // Sendcache per subscription
224 var sendCache = [];
225 var sendCacheSize = 100;
226
227 unsubscriptions[subscription.subscriptionId] = [];
228
229 var query = {
230 name: subscription.topic.serverName(),
231 topic: subscription.topic.pubsubIdentifier(),
232 original: subscription.topic,
233 subscriptionId: subscription.subscriptionId,
234 limit: subscription.limit,
235 count: 0,
236 caql: subscription.topic.streamQuery
237 };
238
239 // If topic is a device query appened unique identifier to query
240 var qt = querytopic.parse(query.topic);
241 if (qt) {
242 query.topic = querytopic.format(qt);
243 }
244
245 client.query.push(query);
246
247 var connectedPeers = [];
248 var subscribeToPeer = function(peerName) {
249 if(query.name instanceof RegExp && !query.name.exec(peerName)) {
250 return;
251 }
252
253 var copiedQuery = {};
254 Object.keys(query).forEach(function(key) {
255 copiedQuery[key] = query[key];
256 });
257
258 if(peerName) {
259 copiedQuery.name = peerName;
260 }
261
262 if(connectedPeers.indexOf(copiedQuery.name) === -1) {
263 connectedPeers.push(copiedQuery.name);
264
265 var unsubscribe = self._subscribe(client, copiedQuery, function(client, query, topic, data, sourceTopic, fromRemote) {
266
267 // Not a sepcial and topic like _peer/connect and the query is local.
268 if (!query.original.isSpecial && !eventIsSpecial(sourceTopic) && !fromRemote) {
269 // Add local serverName to topic for local pubsub because it's not on the actual topic
270 sourceTopic = self.zetta.id + '/' + sourceTopic;
271 }
272
273 // B/c everything goes through the local pubsub queies that have * for the serverName
274 // may match twice. one for the local query and one for each peer query
275 if (sendCache.indexOf(data) >= 0) {
276 return;
277 } else {
278 sendCache.push(data);
279 if (sendCache.length > sendCacheSize) {
280 sendCache.shift();
281 }
282 }
283
284 self._publishStreamEnabledClient(client, query, topic, data, sourceTopic);
285 });
286
287 unsubscriptions[subscription.subscriptionId].push(unsubscribe);
288 }
289 };
290
291 if(query.name instanceof RegExp || query.name === '*') {
292 var peerConnectSubscription = function(topic, data) {
293 // Only subscribe to peer acceptor direction for peers
294 if (data.peer.name) {
295 subscribeToPeer(data.peer.name);
296 }
297 };
298 self.zetta.pubsub.subscribe('_peer/connect', peerConnectSubscription);
299 // Unsubscribe to peer/connect after topic is unsubscribed from
300 unsubscriptions[subscription.subscriptionId].push(function() {
301 self.zetta.pubsub.unsubscribe('_peer/connect', peerConnectSubscription);
302 });
303
304 Object.keys(self.peers).forEach(subscribeToPeer);
305 subscribeToPeer(self.zetta._name);
306 } else {
307 subscribeToPeer();
308 }
309 });
310
311 client.on('unsubscribe', function(subscription) {
312 if (unsubscriptions[subscription.subscriptionId]) {
313 // Unsubscribe to all subscriptions
314 unsubscriptions[subscription.subscriptionId].forEach(function(unsubscribe) {
315 unsubscribe();
316 });
317 delete unsubscriptions[subscription.subscriptionId];
318 }
319 });
320
321 // Unsubscribe to all subscriptions if the client disconnects
322 client.on('close', function() {
323 Object.keys(unsubscriptions).forEach(function(subscriptionId) {
324 unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
325 unsubscribe();
326 });
327 delete unsubscriptions[subscriptionId];
328 })
329 });
330};
331
332
333// Subscribe to peer has been conneced. If peer is not connected keep a list of topics for
334// when it does connect.
335EventBroker.prototype._subscribeToPeer = function(peerName, topic) {
336 var peer = this.peers[peerName];
337 if (peer) {
338 peer.subscribe(topic);
339 } else {
340 if (!this._peerSubscriptions[peerName]) {
341 this._peerSubscriptions[peerName] = [];
342 }
343 this._peerSubscriptions[peerName].push(topic);
344 }
345};
346
347// Unsubscribe from peer if peer has been connected. If not remove topic
348// from list of topics.
349EventBroker.prototype._unsubscribeFromPeer = function(peerName, topic) {
350 var peer = this.peers[peerName];
351 if (peer) {
352 peer.unsubscribe(topic);
353 } else {
354 if (this._peerSubscriptions[peerName]) {
355 var idx = this._peerSubscriptions[peerName].indexOf(topic);
356 if (idx !== -1) {
357 this._peerSubscriptions[peerName].splice(idx, 1);
358 }
359 if (this._peerSubscriptions[peerName].length === 0) {
360 delete this._peerSubscriptions[peerName];
361 }
362 }
363 }
364};
365
366EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
367 if (this._deviceQueries[topic]) {
368 return;
369 }
370
371 var qt = querytopic.parse(topic);
372 var self = this;
373 var q = self.zetta.runtime.query().ql(qt.ql);
374 this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
375 setImmediate(function() {
376 self.zetta.pubsub.publish(topic, { query: topic, device: device });
377 });
378 });
379};