UNPKG

5.26 kBJavaScriptView Raw
1var querytopic = require('./query_topic');
2
3function topicMatch(a, b) {
4 return a === b;
5}
6
7var EventBroker = module.exports = function(zetta) {
8 this.peers = {};
9 this.zetta = zetta;
10 this.clients = [];
11
12 this.subscriptions = {};
13 this._peerSubscriptions = {};
14
15 this._publishListeners = {}; // {<server_name>: {<topic>: _listner } }
16 this._deviceQueries = {};
17};
18
19EventBroker.prototype.peer = function(peer) {
20 var self = this;
21 this.peers[peer.name] = peer;
22 this._peerSubscriptions[peer.name] = this._peerSubscriptions[peer.name] || {};
23 // subscribe to topics for interest
24 Object.keys(this._peerSubscriptions[peer.name]).forEach(function(topic) {
25 self._setupPublishListener(peer, topic);
26 });
27};
28
29EventBroker.prototype._setupPublishListener = function(peer, topic) {
30 var self = this;
31
32 if (!this._publishListeners[peer.name]) {
33 this._publishListeners[peer.name] = {};
34 }
35
36 if (this._publishListeners[peer.name][topic]) {
37 return;
38 }
39
40 this._publishListeners[peer.name][topic] = function(data) {
41 self._publish(topic, data);
42 };
43
44 peer.on(topic, this._publishListeners[peer.name][topic]);
45 peer.subscribe(topic);
46};
47
48EventBroker.prototype.client = function(client) {
49 var self = this;
50 var c = this.clients.filter(function(cl) {
51 if (client.query.length !== cl.query.length) {
52 return null;
53 }
54
55 var stillValid = true;
56 for (var i = 0; i < client.query.length; i++) {
57 if (!cl.query[i]) {
58 stillValid = false;
59 break;
60 }
61
62 var clq = querytopic.parse(cl.query[i].topic);
63 var clientq = querytopic.parse(client.query[i].topic);
64
65 if ((!clq || !clientq) || clq.ql !== clientq.ql || cl.query[i].name !== client.query[i].name) {
66 stillValid = false;
67 }
68 }
69
70 return stillValid && client.ws === cl.ws;
71 });
72
73 if (c.length > 0) {
74 return;
75 }
76
77 this.clients.push(client);
78
79 client.on('close', function() {
80 client.query.forEach(self._unsubscribe.bind(self));
81 var idx = self.clients.indexOf(client);
82 if (idx === -1) {
83 return;
84 }
85 self.clients.splice(idx, 1);
86 });
87 client.query.forEach(this._subscribe.bind(this));
88};
89
90EventBroker.prototype._subscribe = function(query) {
91 var self = this;
92 var topic = query.topic;
93
94 // is local
95 if (query.name === this.zetta.id) {
96 if (!this.subscriptions[topic]) {
97 this.subscriptions[topic] = { count: 0, listener: null };
98 }
99
100 // subscribe locally, only once peer topic
101 if (this.subscriptions[topic].count === 0) {
102 this.subscriptions[topic].listener = this._onLocalPubsub.bind(this);
103 this.zetta.pubsub.subscribe(topic, this.subscriptions[topic].listener);
104
105 // subscribe to local
106 if (querytopic.isQuery(topic)) {
107 this.subscribeToDeviceQuery(topic);
108 }
109 }
110
111 this.subscriptions[topic].count++;
112 } else {
113
114 if (!this._peerSubscriptions[query.name]) {
115 this._peerSubscriptions[query.name] = {};
116 }
117
118 if (!this._peerSubscriptions[query.name][topic]) {
119 this._peerSubscriptions[query.name][topic] = 0;
120 var peer = this.peers[query.name];
121 if (peer) {
122 this._setupPublishListener(peer, topic);
123 }
124 }
125 this._peerSubscriptions[query.name][topic]++;
126 }
127};
128
129EventBroker.prototype._unsubscribe = function(query) {
130 var topic = query.topic;
131
132 if (query.name === this.zetta.id) {
133 this.subscriptions[topic].count--;
134 if (this.subscriptions[topic].count > 0) {
135 return;
136 }
137 // unsubscribe locally
138 this.zetta.pubsub.unsubscribe(topic, this.subscriptions[topic].listener);
139 delete this.subscriptions[topic];
140
141 if (this._deviceQueries[topic]) {
142 this._deviceQueries[topic].dispose();
143 delete this._deviceQueries[topic];
144 }
145 } else {
146 if (!this._peerSubscriptions[query.name]) {
147 this._peerSubscriptions[query.name] = { topic: 1};
148 }
149
150 this._peerSubscriptions[query.name][topic]--;
151 if (this._peerSubscriptions[query.name][topic] > 0) {
152 return;
153 }
154
155 delete this._peerSubscriptions[query.name][topic];
156
157 var peer = this.peers[query.name];
158
159 if (this._publishListeners[query.name] &&
160 this._publishListeners[query.name][topic]) {
161
162 if (peer) {
163 peer.removeListener(topic, this._publishListeners[query.name][topic]);
164 }
165
166 delete this._publishListeners[query.name][topic];
167 }
168
169 if (peer) {
170 peer.unsubscribe(topic);
171 }
172 }
173};
174
175
176EventBroker.prototype._publish = function(topic, data) {
177 this.clients.forEach(function(client) {
178 client.query.forEach(function(query) {
179 if (!topicMatch(topic, query.topic)) {
180 return;
181 }
182
183 client.send(topic, data, function(err){
184 if (err) {
185 console.error('ws error: '+err);
186 }
187 });
188 });
189 });
190};
191
192EventBroker.prototype._onLocalPubsub = function(topic, data) {
193 this._publish(topic, data);
194};
195
196
197EventBroker.prototype.subscribeToDeviceQuery = function(topic) {
198 if (this._deviceQueries[topic]) {
199 return;
200 }
201
202 var qt = querytopic.parse(topic);
203 var self = this;
204 var q = self.zetta.runtime.query().ql(qt.ql);
205 this._deviceQueries[topic] = this.zetta.runtime.observe(q, function(device) {
206 self.zetta.pubsub.publish(topic, { query: topic, device: device });
207 });
208};