1 | var JSCompiler = require('caql-js-compiler');
|
2 | var querytopic = require('./query_topic');
|
3 | var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
|
4 |
|
5 | function eventIsSpecial(topic) {
|
6 | var SPECIAL = [
|
7 | /^_peer\/.+$/,
|
8 | /^query:.+$/,
|
9 | /^query\/.+$/,
|
10 | /^logs$/
|
11 | ];
|
12 | return SPECIAL.some(function(regExp) {
|
13 | return regExp.exec(topic);
|
14 | });
|
15 | }
|
16 |
|
17 | var EventBroker = module.exports = function(zetta) {
|
18 | this.zetta = zetta;
|
19 |
|
20 | this.peers = {};
|
21 | this.clients = [];
|
22 |
|
23 |
|
24 | this._peerSubscriptions = {};
|
25 |
|
26 |
|
27 | this._deviceQueries = {};
|
28 |
|
29 | this._queryCache = {};
|
30 | };
|
31 |
|
32 | EventBroker.prototype.peer = function(peer) {
|
33 | var self = this;
|
34 | this.peers[peer.name] = peer;
|
35 |
|
36 |
|
37 | if (this._peerSubscriptions[peer.name] === undefined) {
|
38 | return;
|
39 | }
|
40 |
|
41 | this._peerSubscriptions[peer.name].forEach(function(topic) {
|
42 | self._subscribeToPeer(peer.name, topic);
|
43 | });
|
44 |
|
45 | delete this._peerSubscriptions[peer.name];
|
46 | };
|
47 |
|
48 | EventBroker.prototype.client = function(client) {
|
49 | var self = this;
|
50 | var c = this.clients.filter(function(cl) {
|
51 | if (client.query.length !== cl.query.length) {
|
52 | return null;
|
53 | }
|
54 |
|
55 | var stillValid = true;
|
56 | for (var i = 0; i < client.query.length; i++) {
|
57 | if (!cl.query[i]) {
|
58 | stillValid = false;
|
59 | break;
|
60 | }
|
61 |
|
62 | var clq = querytopic.parse(cl.query[i].topic);
|
63 | var clientq = querytopic.parse(client.query[i].topic);
|
64 |
|
65 | if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
|
66 | stillValid = false;
|
67 | }
|
68 | }
|
69 |
|
70 | return stillValid && client.ws === cl.ws;
|
71 | });
|
72 |
|
73 | if (c.length > 0) {
|
74 | return;
|
75 | }
|
76 |
|
77 | if (client.streamEnabled) {
|
78 | this._streamEnabledClient(client);
|
79 | return;
|
80 | }
|
81 |
|
82 |
|
83 | client.query.forEach(function(query) {
|
84 | self._subscribe(client, query, self._publishNonStreamEnabledClient.bind(self));
|
85 | });
|
86 | };
|
87 |
|
88 | EventBroker.prototype._subscribe = function(client, query, sendMethod) {
|
89 | var self = this;
|
90 | var subscriptionTopic = query.topic;
|
91 | var isRemote = false;
|
92 | if (query.name && query.name !== self.zetta.id) {
|
93 | isRemote = true;
|
94 |
|
95 | self._subscribeToPeer(query.name, query.topic);
|
96 |
|
97 | subscriptionTopic = query.name + '/' + query.topic;
|
98 | } else {
|
99 |
|
100 | if (querytopic.isQuery(subscriptionTopic)) {
|
101 | self.subscribeToDeviceQuery(subscriptionTopic);
|
102 | }
|
103 | }
|
104 |
|
105 | var handler = function(topic, data, sourceTopic, fromRemote) {
|
106 | sendMethod(client, query, topic, data, sourceTopic, fromRemote);
|
107 | };
|
108 | self.zetta.pubsub.subscribe(subscriptionTopic, handler);
|
109 |
|
110 | var unsubscribe = function() {
|
111 | self.zetta.pubsub.unsubscribe(subscriptionTopic, handler);
|
112 |
|
113 |
|
114 | if (self._deviceQueries[subscriptionTopic]) {
|
115 | self._deviceQueries[subscriptionTopic].dispose();
|
116 | delete self._deviceQueries[subscriptionTopic];
|
117 | }
|
118 |
|
119 | if (isRemote) {
|
120 |
|
121 | self._unsubscribeFromPeer(query.name, query.topic);
|
122 | }
|
123 | };
|
124 |
|
125 | if(!client.streamEnabled) {
|
126 | client.once('close', function() {
|
127 | unsubscribe();
|
128 | });
|
129 | }
|
130 |
|
131 | return unsubscribe;
|
132 | };
|
133 |
|
134 | EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
|
135 | client.send(query.topic, data, function(err){
|
136 | if (err) {
|
137 | console.error('ws error: '+err);
|
138 | }
|
139 | });
|
140 | };
|
141 |
|
142 | EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
|
143 |
|
144 | var origData = data;
|
145 |
|
146 | var newMsg = {};
|
147 | newMsg.type = 'event';
|
148 | newMsg.topic = sourceTopic;
|
149 | newMsg.timestamp = data.timestamp || new Date().getTime();
|
150 | newMsg.subscriptionId = query.subscriptionId;
|
151 |
|
152 |
|
153 | var qt = querytopic.parse(query.original.pubsubIdentifier());
|
154 | if (qt) {
|
155 | newMsg.topic = query.original.hash();
|
156 | }
|
157 |
|
158 | if (data.data !== undefined) {
|
159 | newMsg.data = data.data;
|
160 | } else {
|
161 |
|
162 | newMsg.data = {};
|
163 | var filtered = ['topic', 'timestamp'];
|
164 | Object.keys(data)
|
165 | .filter(function(key) { return filtered.indexOf(key) === -1; })
|
166 | .forEach(function(key) {
|
167 | newMsg.data[key] = data[key];
|
168 | })
|
169 | }
|
170 | data = newMsg;
|
171 |
|
172 | if (query.caql) {
|
173 | var compiled = client._queryCache[query.caql];
|
174 | var result = compiled.filterOne({ data: data.data });
|
175 | if (result) {
|
176 | data.data = result[Object.keys(result)[0]];
|
177 | } else {
|
178 | return;
|
179 | }
|
180 | }
|
181 |
|
182 | query.count++;
|
183 | if (typeof query.limit === 'number' && query.count > query.limit) {
|
184 | client.emit('unsubscribe', query);
|
185 | client._unsubscribe(query.subscriptionId)
|
186 | return;
|
187 | }
|
188 | if (client.filterMultiple) {
|
189 |
|
190 | if (query.caql !== null) {
|
191 | data.subscriptionId = [data.subscriptionId];
|
192 | } else {
|
193 | var found = client.hasBeenSent(origData);
|
194 | if (found) {
|
195 | return;
|
196 | }
|
197 |
|
198 | var subscriptionsIds = [];
|
199 | client._subscriptions.forEach(function(subscription) {
|
200 |
|
201 | if (subscription.topic.match(sourceTopic) && subscription.topic.streamQuery === null) {
|
202 | subscriptionsIds.push(subscription.subscriptionId);
|
203 | }
|
204 | });
|
205 |
|
206 | data.subscriptionId = subscriptionsIds;
|
207 | }
|
208 | }
|
209 |
|
210 | client.send(sourceTopic, data, function(err){
|
211 | if (err) {
|
212 | console.error('ws error: '+err);
|
213 | }
|
214 | });
|
215 | };
|
216 |
|
217 | EventBroker.prototype._streamEnabledClient = function(client) {
|
218 | var self = this;
|
219 |
|
220 |
|
221 | var unsubscriptions = {};
|
222 |
|
223 | client.on('subscribe', function(subscription) {
|
224 |
|
225 |
|
226 | var sendCache = [];
|
227 | var sendCacheSize = 100;
|
228 |
|
229 | unsubscriptions[subscription.subscriptionId] = [];
|
230 |
|
231 | var query = {
|
232 | name: subscription.topic.serverName(),
|
233 | topic: subscription.topic.pubsubIdentifier(),
|
234 | original: subscription.topic,
|
235 | subscriptionId: subscription.subscriptionId,
|
236 | limit: subscription.limit,
|
237 | count: 0,
|
238 | caql: subscription.topic.streamQuery
|
239 | };
|
240 |
|
241 |
|
242 | var qt = querytopic.parse(query.topic);
|
243 | if (qt) {
|
244 | query.topic = querytopic.format(qt);
|
245 | }
|
246 |
|
247 | client.query.push(query);
|
248 |
|
249 | var connectedPeers = [];
|
250 | var subscribeToPeer = function(peerName) {
|
251 | if(query.name instanceof RegExp && !query.name.exec(peerName)) {
|
252 | return;
|
253 | }
|
254 |
|
255 | var copiedQuery = {};
|
256 | Object.keys(query).forEach(function(key) {
|
257 | copiedQuery[key] = query[key];
|
258 | });
|
259 |
|
260 | if(peerName) {
|
261 | copiedQuery.name = peerName;
|
262 | }
|
263 |
|
264 | if(connectedPeers.indexOf(copiedQuery.name) === -1) {
|
265 | connectedPeers.push(copiedQuery.name);
|
266 |
|
267 | var unsubscribe = self._subscribe(client, copiedQuery, function(client, query, topic, data, sourceTopic, fromRemote) {
|
268 |
|
269 |
|
270 | if (!query.original.isSpecial && !eventIsSpecial(sourceTopic) && !fromRemote) {
|
271 |
|
272 | sourceTopic = self.zetta.id + '/' + sourceTopic;
|
273 | }
|
274 |
|
275 |
|
276 |
|
277 | if (sendCache.indexOf(data) >= 0) {
|
278 | return;
|
279 | } else {
|
280 | sendCache.push(data);
|
281 | if (sendCache.length > sendCacheSize) {
|
282 | sendCache.shift();
|
283 | }
|
284 | }
|
285 |
|
286 | self._publishStreamEnabledClient(client, query, topic, data, sourceTopic);
|
287 | });
|
288 |
|
289 | unsubscriptions[subscription.subscriptionId].push(unsubscribe);
|
290 | }
|
291 | };
|
292 |
|
293 | if(query.name instanceof RegExp || query.name === '*') {
|
294 | var peerConnectSubscription = function(topic, data) {
|
295 |
|
296 | if (data.peer.name) {
|
297 | subscribeToPeer(data.peer.name);
|
298 | }
|
299 | };
|
300 | self.zetta.pubsub.subscribe('_peer/connect', peerConnectSubscription);
|
301 |
|
302 | unsubscriptions[subscription.subscriptionId].push(function() {
|
303 | self.zetta.pubsub.unsubscribe('_peer/connect', peerConnectSubscription);
|
304 | });
|
305 |
|
306 | Object.keys(self.peers).forEach(subscribeToPeer);
|
307 | subscribeToPeer(self.zetta._name);
|
308 | } else {
|
309 | subscribeToPeer();
|
310 | }
|
311 | });
|
312 |
|
313 | client.on('unsubscribe', function(subscription) {
|
314 | if (unsubscriptions[subscription.subscriptionId]) {
|
315 |
|
316 | unsubscriptions[subscription.subscriptionId].forEach(function(unsubscribe) {
|
317 | unsubscribe();
|
318 | });
|
319 | delete unsubscriptions[subscription.subscriptionId];
|
320 | }
|
321 | });
|
322 |
|
323 |
|
324 | client.on('close', function() {
|
325 | Object.keys(unsubscriptions).forEach(function(subscriptionId) {
|
326 | unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
|
327 | unsubscribe();
|
328 | });
|
329 | delete unsubscriptions[subscriptionId];
|
330 | })
|
331 | });
|
332 | };
|
333 |
|
334 |
|
335 |
|
336 |
|
337 | EventBroker.prototype._subscribeToPeer = function(peerName, topic) {
|
338 | var peer = this.peers[peerName];
|
339 | if (peer) {
|
340 | peer.subscribe(topic);
|
341 | } else {
|
342 | if (!this._peerSubscriptions[peerName]) {
|
343 | this._peerSubscriptions[peerName] = [];
|
344 | }
|
345 | this._peerSubscriptions[peerName].push(topic);
|
346 | }
|
347 | };
|
348 |
|
349 |
|
350 |
|
351 | EventBroker.prototype._unsubscribeFromPeer = function(peerName, topic) {
|
352 | var peer = this.peers[peerName];
|
353 | if (peer) {
|
354 | peer.unsubscribe(topic);
|
355 | } else {
|
356 | if (this._peerSubscriptions[peerName]) {
|
357 | var idx = this._peerSubscriptions[peerName].indexOf(topic);
|
358 | if (idx !== -1) {
|
359 | this._peerSubscriptions[peerName].splice(idx, 1);
|
360 | }
|
361 | if (this._peerSubscriptions[peerName].length === 0) {
|
362 | delete this._peerSubscriptions[peerName];
|
363 | }
|
364 | }
|
365 | }
|
366 | };
|
367 |
|
368 | EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
|
369 | if (this._deviceQueries[topic]) {
|
370 | return;
|
371 | }
|
372 |
|
373 | var qt = querytopic.parse(topic);
|
374 | var self = this;
|
375 | var q = self.zetta.runtime.query().ql(qt.ql);
|
376 | this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
|
377 | setImmediate(function() {
|
378 | self.zetta.pubsub.publish(topic, { query: topic, device: device });
|
379 | });
|
380 | });
|
381 | };
|