1 | var JSCompiler = require('caql-js-compiler');
|
2 | var querytopic = require('./query_topic');
|
3 | var StreamTopic = require('zetta-events-stream-protocol').StreamTopic;
|
4 |
|
5 | function eventIsSpecial(topic) {
|
6 | var SPECIAL = [
|
7 | /^_peer\/.+$/,
|
8 | /^query:.+$/,
|
9 | /^query\/.+$/,
|
10 | /^logs$/
|
11 | ];
|
12 | return SPECIAL.some(function(regExp) {
|
13 | return regExp.exec(topic);
|
14 | });
|
15 | }
|
16 |
|
17 | var EventBroker = module.exports = function(zetta) {
|
18 | this.zetta = zetta;
|
19 |
|
20 | this.peers = {};
|
21 | this.clients = [];
|
22 |
|
23 |
|
24 | this._peerSubscriptions = {};
|
25 |
|
26 |
|
27 | this._deviceQueries = {};
|
28 |
|
29 | this._queryCache = {};
|
30 | };
|
31 |
|
32 | EventBroker.prototype.peer = function(peer) {
|
33 | var self = this;
|
34 | this.peers[peer.name] = peer;
|
35 |
|
36 |
|
37 | if (this._peerSubscriptions[peer.name] === undefined) {
|
38 | return;
|
39 | }
|
40 |
|
41 | this._peerSubscriptions[peer.name].forEach(function(topic) {
|
42 | self._subscribeToPeer(peer.name, topic);
|
43 | });
|
44 |
|
45 | delete this._peerSubscriptions[peer.name];
|
46 | };
|
47 |
|
48 | EventBroker.prototype.client = function(client) {
|
49 | var self = this;
|
50 | var c = this.clients.filter(function(cl) {
|
51 | if (client.query.length !== cl.query.length) {
|
52 | return null;
|
53 | }
|
54 |
|
55 | var stillValid = true;
|
56 | for (var i = 0; i < client.query.length; i++) {
|
57 | if (!cl.query[i]) {
|
58 | stillValid = false;
|
59 | break;
|
60 | }
|
61 |
|
62 | var clq = querytopic.parse(cl.query[i].topic);
|
63 | var clientq = querytopic.parse(client.query[i].topic);
|
64 |
|
65 | if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
|
66 | stillValid = false;
|
67 | }
|
68 | }
|
69 |
|
70 | return stillValid && client.ws === cl.ws;
|
71 | });
|
72 |
|
73 | if (c.length > 0) {
|
74 | return;
|
75 | }
|
76 |
|
77 | if (client.streamEnabled) {
|
78 | this._streamEnabledClient(client);
|
79 | return;
|
80 | }
|
81 |
|
82 |
|
83 | client.query.forEach(function(query) {
|
84 | self._subscribe(client, query, self._publishNonStreamEnabledClient.bind(self));
|
85 | });
|
86 | };
|
87 |
|
88 | EventBroker.prototype._subscribe = function(client, query, sendMethod) {
|
89 | var self = this;
|
90 | var subscriptionTopic = query.topic;
|
91 | var isRemote = false;
|
92 | if (query.name && query.name !== self.zetta.id) {
|
93 | isRemote = true;
|
94 |
|
95 | self._subscribeToPeer(query.name, query.topic);
|
96 |
|
97 | subscriptionTopic = query.name + '/' + query.topic;
|
98 | } else {
|
99 |
|
100 | if (querytopic.isQuery(subscriptionTopic)) {
|
101 | self.subscribeToDeviceQuery(subscriptionTopic);
|
102 | }
|
103 | }
|
104 |
|
105 | var handler = function(topic, data, sourceTopic, fromRemote) {
|
106 | sendMethod(client, query, topic, data, sourceTopic, fromRemote);
|
107 | };
|
108 | self.zetta.pubsub.subscribe(subscriptionTopic, handler);
|
109 |
|
110 | var unsubscribe = function() {
|
111 | self.zetta.pubsub.unsubscribe(subscriptionTopic, handler);
|
112 |
|
113 |
|
114 | if (self._deviceQueries[subscriptionTopic]) {
|
115 | self._deviceQueries[subscriptionTopic].dispose();
|
116 | delete self._deviceQueries[subscriptionTopic];
|
117 | }
|
118 |
|
119 | if (isRemote) {
|
120 |
|
121 | self._unsubscribeFromPeer(query.name, query.topic);
|
122 | }
|
123 | };
|
124 |
|
125 | client.once('close', function() {
|
126 | unsubscribe();
|
127 | });
|
128 |
|
129 | return unsubscribe;
|
130 | };
|
131 |
|
132 | EventBroker.prototype._publishNonStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
|
133 | client.send(query.topic, data, function(err){
|
134 | if (err) {
|
135 | console.error('ws error: '+err);
|
136 | }
|
137 | });
|
138 | };
|
139 |
|
140 | EventBroker.prototype._publishStreamEnabledClient = function(client, query, topic, data, sourceTopic, fromRemote) {
|
141 | var newMsg = {};
|
142 | newMsg.type = 'event';
|
143 | newMsg.topic = sourceTopic;
|
144 | newMsg.timestamp = data.timestamp;
|
145 | newMsg.subscriptionId = query.subscriptionId;
|
146 |
|
147 | if (data.data) {
|
148 | newMsg.data = data.data;
|
149 | } else {
|
150 |
|
151 | newMsg.data = {};
|
152 | var filtered = ['topic', 'timestamp'];
|
153 | Object.keys(data)
|
154 | .filter(function(key) { return filtered.indexOf(key) === -1; })
|
155 | .forEach(function(key) {
|
156 | newMsg.data[key] = data[key];
|
157 | })
|
158 | }
|
159 | data = newMsg;
|
160 |
|
161 | if (query.caql) {
|
162 | var compiled = client._queryCache[query.caql];
|
163 | var result = compiled.filterOne({ data: data.data });
|
164 | if (result) {
|
165 | data.data = result[Object.keys(result)[0]];
|
166 | } else {
|
167 | return;
|
168 | }
|
169 | }
|
170 |
|
171 | query.count++;
|
172 | if (typeof query.limit === 'number' && query.count > query.limit) {
|
173 | client.emit('unsubscribe', query);
|
174 | client._unsubscribe(query.subscriptionId)
|
175 | return;
|
176 | }
|
177 |
|
178 | client.send(sourceTopic, data, function(err){
|
179 | if (err) {
|
180 | console.error('ws error: '+err);
|
181 | }
|
182 | });
|
183 | };
|
184 |
|
185 | EventBroker.prototype._streamEnabledClient = function(client) {
|
186 | var self = this;
|
187 |
|
188 |
|
189 | var unsubscriptions = {};
|
190 |
|
191 | client.on('subscribe', function(subscription) {
|
192 |
|
193 |
|
194 | var sendCache = [];
|
195 | var sendCacheSize = 100;
|
196 |
|
197 | unsubscriptions[subscription.subscriptionId] = [];
|
198 |
|
199 | var query = {
|
200 | name: subscription.topic.serverName(),
|
201 | topic: subscription.topic.pubsubIdentifier(),
|
202 | original: subscription.topic,
|
203 | subscriptionId: subscription.subscriptionId,
|
204 | limit: subscription.limit,
|
205 | count: 0,
|
206 | caql: subscription.topic.streamQuery
|
207 | };
|
208 | client.query.push(query);
|
209 |
|
210 | var connectedPeers = [];
|
211 | var subscribeToPeer = function(peerName) {
|
212 | if(query.name instanceof RegExp && !query.name.exec(peerName)) {
|
213 | return;
|
214 | }
|
215 |
|
216 | var copiedQuery = {};
|
217 | Object.keys(query).forEach(function(key) {
|
218 | copiedQuery[key] = query[key];
|
219 | });
|
220 |
|
221 | if(peerName) {
|
222 | copiedQuery.name = peerName;
|
223 | }
|
224 |
|
225 | if(connectedPeers.indexOf(copiedQuery.name) === -1) {
|
226 | connectedPeers.push(copiedQuery.name);
|
227 |
|
228 | var unsubscribe = self._subscribe(client, copiedQuery, function(client, query, topic, data, sourceTopic, fromRemote) {
|
229 |
|
230 |
|
231 | if (!query.original.isSpecial && !eventIsSpecial(sourceTopic) && !fromRemote) {
|
232 |
|
233 | sourceTopic = self.zetta.id + '/' + sourceTopic;
|
234 | }
|
235 |
|
236 |
|
237 |
|
238 | if (sendCache.indexOf(data) >= 0) {
|
239 | return;
|
240 | } else {
|
241 | sendCache.push(data);
|
242 | if (sendCache.length > sendCacheSize) {
|
243 | sendCache.shift();
|
244 | }
|
245 | }
|
246 |
|
247 | self._publishStreamEnabledClient(client, query, topic, data, sourceTopic);
|
248 | });
|
249 |
|
250 | unsubscriptions[subscription.subscriptionId].push(unsubscribe);
|
251 | }
|
252 | };
|
253 |
|
254 | if(query.name instanceof RegExp || query.name === '*') {
|
255 | var peerConnectSubscription = function(topic, data) {
|
256 |
|
257 | if (data.peer.name) {
|
258 | subscribeToPeer(data.peer.name);
|
259 | }
|
260 | };
|
261 | self.zetta.pubsub.subscribe('_peer/connect', peerConnectSubscription);
|
262 |
|
263 | unsubscriptions[subscription.subscriptionId].push(function() {
|
264 | self.zetta.pubsub.unsubscribe('_peer/connect', peerConnectSubscription);
|
265 | });
|
266 |
|
267 | Object.keys(self.peers).forEach(subscribeToPeer);
|
268 | subscribeToPeer(self.zetta._name);
|
269 | } else {
|
270 | subscribeToPeer();
|
271 | }
|
272 | });
|
273 |
|
274 | client.on('unsubscribe', function(subscription) {
|
275 | if (unsubscriptions[subscription.subscriptionId]) {
|
276 |
|
277 | unsubscriptions[subscription.subscriptionId].forEach(function(unsubscribe) {
|
278 | unsubscribe();
|
279 | });
|
280 | delete unsubscriptions[subscription.subscriptionId];
|
281 | }
|
282 | });
|
283 |
|
284 |
|
285 | client.on('close', function() {
|
286 | Object.keys(unsubscriptions).forEach(function(subscriptionId) {
|
287 | unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
|
288 | unsubscribe();
|
289 | });
|
290 | delete unsubscriptions[subscriptionId];
|
291 | })
|
292 | });
|
293 | };
|
294 |
|
295 |
|
296 |
|
297 |
|
298 | EventBroker.prototype._subscribeToPeer = function(peerName, topic) {
|
299 | var peer = this.peers[peerName];
|
300 | if (peer) {
|
301 | peer.subscribe(topic);
|
302 | } else {
|
303 | if (!this._peerSubscriptions[peerName]) {
|
304 | this._peerSubscriptions[peerName] = [];
|
305 | }
|
306 | this._peerSubscriptions[peerName].push(topic);
|
307 | }
|
308 | };
|
309 |
|
310 |
|
311 |
|
312 | EventBroker.prototype._unsubscribeFromPeer = function(peerName, topic) {
|
313 | var peer = this.peers[peerName];
|
314 | if (peer) {
|
315 | peer.unsubscribe(topic);
|
316 | } else {
|
317 | if (this._peerSubscriptions[peerName]) {
|
318 | var idx = this._peerSubscriptions[peerName].indexOf(topic);
|
319 | if (idx !== -1) {
|
320 | this._peerSubscriptions[peerName].splice(idx, 1);
|
321 | }
|
322 | if (this._peerSubscriptions[peerName].length === 0) {
|
323 | delete this._peerSubscriptions[peerName];
|
324 | }
|
325 | }
|
326 | }
|
327 | };
|
328 |
|
329 | EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
|
330 | if (this._deviceQueries[topic]) {
|
331 | return;
|
332 | }
|
333 |
|
334 | var qt = querytopic.parse(topic);
|
335 | var self = this;
|
336 | var q = self.zetta.runtime.query().ql(qt.ql);
|
337 | this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
|
338 | setImmediate(function() {
|
339 | self.zetta.pubsub.publish(topic, { query: topic, device: device });
|
340 | });
|
341 | });
|
342 | };
|