UNPKG

11.3 kBJavaScriptView Raw
1var JSCompiler = require('caql-js-compiler');
2var querytopic = require('./query_topic');
3var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
4
5function eventIsSpecial(topic) {
6 var SPECIAL = [
7 /^_peer\/.+$/,
8 /^query:.+$/,
9 /^query\/.+$/,
10 /^logs$/
11 ];
12 return SPECIAL.some(function(regExp) {
13 return regExp.exec(topic);
14 });
15}
16
17var EventBroker = module.exports = function(zetta) {
18 this.zetta = zetta;
19
20 this.peers = {};
21 this.clients = [];
22
23 // List of subscriptions for a peer that has not yet been connected
24 this._peerSubscriptions = {}; // { <serverName>: [] }
25
26 // Hash of all current subscribed device queries and their observables.
27 this._deviceQueries = {}; // { <queryString>: Observable }
28
29 this._queryCache = {};
30};
31
32EventBroker.prototype.peer = function(peer) {
33 var self = this;
34 this.peers[peer.name] = peer;
35
36 // No awaiting subscriptions for that peer
37 if (this._peerSubscriptions[peer.name] === undefined) {
38 return;
39 }
40 // subscribe to topics for that peer
41 this._peerSubscriptions[peer.name].forEach(function(topic) {
42 self._subscribeToPeer(peer.name, topic);
43 });
44
45 delete this._peerSubscriptions[peer.name];
46};
47
48EventBroker.prototype.client = function(client) {
49 var self = this;
50 var c = this.clients.filter(function(cl) {
51 if (client.query.length !== cl.query.length) {
52 return null;
53 }
54
55 var stillValid = true;
56 for (var i = 0; i < client.query.length; i++) {
57 if (!cl.query[i]) {
58 stillValid = false;
59 break;
60 }
61
62 var clq = querytopic.parse(cl.query[i].topic);
63 var clientq = querytopic.parse(client.query[i].topic);
64
65 if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
66 stillValid = false;
67 }
68 }
69
70 return stillValid && client.ws === cl.ws;
71 });
72
73 if (c.length > 0) {
74 return;
75 }
76
77 if (client.streamEnabled) {
78 this._streamEnabledClient(client);
79 return;
80 }
81
82 // query: { name: <serverName>, topic: <pubsub topic>}
83 client.query.forEach(function(query) {
84 self._subscribe(client, query, self._publishNonStreamEnabledClient.bind(self));
85 });
86};
87
88EventBroker.prototype._subscribe = function(client, query, sendMethod) {
89 var self = this;
90 var subscriptionTopic = query.topic;
91 var isRemote = false;
92 if (query.name && query.name !== self.zetta.id) {
93 isRemote = true;
94 // subscribe through peer socket
95 self._subscribeToPeer(query.name, query.topic);
96 // Change subsciptions topic to append <serverName>/<topic> when it comes accross pubsub
97 subscriptionTopic = query.name + '/' + query.topic;
98 } else {
99 // If topic is device query setup an obserable
100 if (querytopic.isQuery(subscriptionTopic)) {
101 self.subscribeToDeviceQuery(subscriptionTopic);
102 }
103 }
104
105 var handler = function(topic, data, sourceTopic, fromRemote) {
106 sendMethod(client, query, topic, data, sourceTopic, fromRemote);
107 };
108 self.zetta.pubsub.subscribe(subscriptionTopic, handler);
109
110 var unsubscribe = function() {
111 self.zetta.pubsub.unsubscribe(subscriptionTopic, handler);
112
113 // If topic is a device query disposeof it.
114 if (self._deviceQueries[subscriptionTopic]) {
115 self._deviceQueries[subscriptionTopic].dispose();
116 delete self._deviceQueries[subscriptionTopic];
117 }
118
119 if (isRemote) {
120 // Use original query.topic to unsubscribe from peer
121 self._unsubscribeFromPeer(query.name, query.topic);
122 }
123 };
124
125 if(!client.streamEnabled) {
126 client.once('close', function() {
127 unsubscribe();
128 });
129 }
130
131 return unsubscribe;
132};
133
134EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
135 client.send(query.topic, data, function(err){
136 if (err) {
137 console.error('ws error: '+err);
138 }
139 });
140};
141
142EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
143
144 var origData = data;
145
146 var newMsg = {};
147 newMsg.type = 'event';
148 newMsg.topic = sourceTopic;
149 newMsg.timestamp = data.timestamp || new Date().getTime();
150 newMsg.subscriptionId = query.subscriptionId;
151
152 // check if topic is a device query, rewrite sent topic as the original topic
153 var qt = querytopic.parse(query.original.pubsubIdentifier());
154 if (qt) {
155 newMsg.topic = query.original.hash();
156 }
157
158 if (data.data !== undefined) {
159 newMsg.data = data.data;
160 } else {
161 // handle device and server /logs stream
162 newMsg.data = {};
163 var filtered = ['topic', 'timestamp'];
164 Object.keys(data)
165 .filter(function(key) { return filtered.indexOf(key) === -1; })
166 .forEach(function(key) {
167 newMsg.data[key] = data[key];
168 })
169 }
170 data = newMsg;
171
172 if (query.caql) {
173 var compiled = client._queryCache[query.caql];
174 var result = compiled.filterOne({ data: data.data });
175 if (result) {
176 data.data = result[Object.keys(result)[0]];
177 } else {
178 return;
179 }
180 }
181
182 query.count++;
183 if (typeof query.limit === 'number' && query.count > query.limit) {
184 client.emit('unsubscribe', query);
185 client._unsubscribe(query.subscriptionId)
186 return;
187 }
188 if (client.filterMultiple) {
189 // If query has caql statement don't filter
190 if (query.caql !== null) {
191 data.subscriptionId = [data.subscriptionId];
192 } else {
193 var found = client.hasBeenSent(origData);
194 if (found) {
195 return;
196 }
197
198 var subscriptionsIds = [];
199 client._subscriptions.forEach(function(subscription) {
200 // Only provide id if topic matches and topic doesn't have a caql statement
201 if (subscription.topic.match(sourceTopic) && subscription.topic.streamQuery === null) {
202 subscriptionsIds.push(subscription.subscriptionId);
203 }
204 });
205
206 data.subscriptionId = subscriptionsIds;
207 }
208 }
209
210 client.send(sourceTopic, data, function(err){
211 if (err) {
212 console.error('ws error: '+err);
213 }
214 });
215};
216
217EventBroker.prototype._streamEnabledClient = function(client) {
218 var self = this;
219
220 // Keep a list of unsubscribe functions to unsubscribe from pubsub
221 var unsubscriptions = {}; // { <subscriptionId>: [unsubscribe1, unsubscribe2, ...] }
222
223 client.on('subscribe', function(subscription) {
224
225 // Sendcache per subscription
226 var sendCache = [];
227 var sendCacheSize = 100;
228
229 unsubscriptions[subscription.subscriptionId] = [];
230
231 var query = {
232 name: subscription.topic.serverName(),
233 topic: subscription.topic.pubsubIdentifier(),
234 original: subscription.topic,
235 subscriptionId: subscription.subscriptionId,
236 limit: subscription.limit,
237 count: 0,
238 caql: subscription.topic.streamQuery
239 };
240
241 // If topic is a device query appened unique identifier to query
242 var qt = querytopic.parse(query.topic);
243 if (qt) {
244 query.topic = querytopic.format(qt);
245 }
246
247 client.query.push(query);
248
249 var connectedPeers = [];
250 var subscribeToPeer = function(peerName) {
251 if(query.name instanceof RegExp && !query.name.exec(peerName)) {
252 return;
253 }
254
255 var copiedQuery = {};
256 Object.keys(query).forEach(function(key) {
257 copiedQuery[key] = query[key];
258 });
259
260 if(peerName) {
261 copiedQuery.name = peerName;
262 }
263
264 if(connectedPeers.indexOf(copiedQuery.name) === -1) {
265 connectedPeers.push(copiedQuery.name);
266
267 var unsubscribe = self._subscribe(client, copiedQuery, function(client, query, topic, data, sourceTopic, fromRemote) {
268
269 // Not a sepcial and topic like _peer/connect and the query is local.
270 if (!query.original.isSpecial && !eventIsSpecial(sourceTopic) && !fromRemote) {
271 // Add local serverName to topic for local pubsub because it's not on the actual topic
272 sourceTopic = self.zetta.id + '/' + sourceTopic;
273 }
274
275 // B/c everything goes through the local pubsub queies that have * for the serverName
276 // may match twice. one for the local query and one for each peer query
277 if (sendCache.indexOf(data) >= 0) {
278 return;
279 } else {
280 sendCache.push(data);
281 if (sendCache.length > sendCacheSize) {
282 sendCache.shift();
283 }
284 }
285
286 self._publishStreamEnabledClient(client, query, topic, data, sourceTopic);
287 });
288
289 unsubscriptions[subscription.subscriptionId].push(unsubscribe);
290 }
291 };
292
293 if(query.name instanceof RegExp || query.name === '*') {
294 var peerConnectSubscription = function(topic, data) {
295 // Only subscribe to peer acceptor direction for peers
296 if (data.peer.name) {
297 subscribeToPeer(data.peer.name);
298 }
299 };
300 self.zetta.pubsub.subscribe('_peer/connect', peerConnectSubscription);
301 // Unsubscribe to peer/connect after topic is unsubscribed from
302 unsubscriptions[subscription.subscriptionId].push(function() {
303 self.zetta.pubsub.unsubscribe('_peer/connect', peerConnectSubscription);
304 });
305
306 Object.keys(self.peers).forEach(subscribeToPeer);
307 subscribeToPeer(self.zetta._name);
308 } else {
309 subscribeToPeer();
310 }
311 });
312
313 client.on('unsubscribe', function(subscription) {
314 if (unsubscriptions[subscription.subscriptionId]) {
315 // Unsubscribe to all subscriptions
316 unsubscriptions[subscription.subscriptionId].forEach(function(unsubscribe) {
317 unsubscribe();
318 });
319 delete unsubscriptions[subscription.subscriptionId];
320 }
321 });
322
323 // Unsubscribe to all subscriptions if the client disconnects
324 client.on('close', function() {
325 Object.keys(unsubscriptions).forEach(function(subscriptionId) {
326 unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
327 unsubscribe();
328 });
329 delete unsubscriptions[subscriptionId];
330 })
331 });
332};
333
334
335// Subscribe to peer has been conneced. If peer is not connected keep a list of topics for
336// when it does connect.
337EventBroker.prototype._subscribeToPeer = function(peerName, topic) {
338 var peer = this.peers[peerName];
339 if (peer) {
340 peer.subscribe(topic);
341 } else {
342 if (!this._peerSubscriptions[peerName]) {
343 this._peerSubscriptions[peerName] = [];
344 }
345 this._peerSubscriptions[peerName].push(topic);
346 }
347};
348
349// Unsubscribe from peer if peer has been connected. If not remove topic
350// from list of topics.
351EventBroker.prototype._unsubscribeFromPeer = function(peerName, topic) {
352 var peer = this.peers[peerName];
353 if (peer) {
354 peer.unsubscribe(topic);
355 } else {
356 if (this._peerSubscriptions[peerName]) {
357 var idx = this._peerSubscriptions[peerName].indexOf(topic);
358 if (idx !== -1) {
359 this._peerSubscriptions[peerName].splice(idx, 1);
360 }
361 if (this._peerSubscriptions[peerName].length === 0) {
362 delete this._peerSubscriptions[peerName];
363 }
364 }
365 }
366};
367
368EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
369 if (this._deviceQueries[topic]) {
370 return;
371 }
372
373 var qt = querytopic.parse(topic);
374 var self = this;
375 var q = self.zetta.runtime.query().ql(qt.ql);
376 this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
377 setImmediate(function() {
378 self.zetta.pubsub.publish(topic, { query: topic, device: device });
379 });
380 });
381};