UNPKG

10.2 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var async = require('async');
4var decompile = require('calypso-query-decompiler');
5var Rx = require('rx');
6var Logger = require('./logger');
7var Query = require('calypso').Query;
8var PubSub = require('./pubsub_service');
9var VirtualDevice = require('./virtual_device');
10var querytopic = require('./query_topic');
11
12var DeviceRegistry = require('./device_registry');
13
14var Runtime = module.exports = function(opts) {
15 EventEmitter.call(this);
16 var self = this;
17
18 if (!opts) {
19 opts = {};
20 }
21 this.registry = opts.registry || new DeviceRegistry();
22 this.pubsub = opts.pubsub || new PubSub();
23 this._log = opts.log || new Logger({pubsub: this.pubsub});
24 this.httpServer = opts.httpServer|| {};
25
26 this.exposed = {};
27 this.path = '/devices';
28 this.exposeQuery = null;
29 this._jsDevices = {};
30
31 // store remove virtual devices per server
32 this._remoteDevices = {}; // { <server_name>: { <device_id>: virtualDevice } }
33 this._remoteSubscriptions = {}; // { <server_name>: { <query>: listener } }
34
35 this._observable = Rx.Observable.fromEvent(this, 'deviceready');
36 this.on('deviceready', function(device) {
37 device.on('destroy', function(device, cb) {
38 self._onDestroy(device, cb);
39 });
40 });
41
42 this._peerRequestExtensions = [];
43 this._peerResponseExtensions = [];
44
45 this.filter = this._observable.filter.bind(this._observable);
46 this.map = this._observable.map.bind(this._observable);
47 this.take = this._observable.take.bind(this._observable);
48 this.zip = this._observable.zip.bind(this._observable);
49 this.subscribe = this._observable.subscribe.bind(this._observable);
50};
51util.inherits(Runtime, EventEmitter);
52
53Logger.LEVELS.forEach(function(level) {
54 Runtime.prototype[level] = function(message, data) {
55 this._log[level]('user-log', message, data);
56 };
57});
58
59Runtime.prototype.from = function(server) {
60 var q = Query.of('devices');
61 q.remote = true;
62 q.server = server;
63
64 return q;
65};
66
67Runtime.prototype.ql = function(q) {
68 return Query.of('devices').ql(q);
69};
70
71Runtime.prototype.query = function() {
72 return Query.of('devices');
73};
74
75Runtime.prototype.where = function(q) {
76 return Query.of('devices').where(q);
77};
78
79Runtime.prototype.expose = function(query) {
80 var self = this;
81 if(typeof query === 'string' && query === '*') {
82 query = new Query(query);
83 }
84
85 this.on('deviceready', function(device) {
86 self.registry.match(query, device, function(err, match) {
87 if (match) {
88 self._exposeDevice(device);
89 }
90 });
91 });
92};
93
94
95Runtime.prototype._exposeDevice = function(device) {
96 this.exposed[this.path + '/' + device.id] = device;
97};
98
99//This is the new observe syntax. It will take an array of queries, and a callback.
100Runtime.prototype.observe = function(queries, cb) {
101 var self = this;
102 var filters = [];
103 var observable = this._observable;
104
105 if (!Array.isArray(queries)) {
106 queries = [queries];
107 }
108
109 if(Object.keys(this._jsDevices).length) {
110 var existingDeviceObservable = Rx.Observable.create(function(observer) {
111 Object.keys(self._jsDevices).forEach(function(deviceId) {
112 observer.onNext(self._jsDevices[deviceId]);
113 });
114 });
115 observable = Rx.Observable.merge(this._observable, existingDeviceObservable);
116 }
117
118 var filters = [];
119 var self = this;
120 queries.forEach(function(query) {
121 if (query.remote === true) {
122 var ql = decompile(query);
123 var toRemove = 'select * ';
124 if (ql.slice(0, toRemove.length) === toRemove) {
125 ql = ql.slice(toRemove.length);
126 }
127
128 var queryObservable = Rx.Observable.create(function(observer) {
129
130 // peer not connected or query is for all peers
131 if (!self.httpServer.peers[query.server] || query.server === '*') {
132 // init peer on connect / reconnect
133 self.pubsub.subscribe('_peer/connect', function(ev, data) {
134 if (data.peer.name === query.server) {
135 // subscribe to the topic on peer, but keep track of topics in runtime
136 // to only ever setup a topic once.
137 self.on(data.peer.name + '/remotedeviceready', function(device) {
138 self.registry.match(query, device, function(err, match) {
139 if (match) {
140 observer.onNext(device);
141 }
142 });
143 });
144
145 self._initRemoteQueryListener(ql, data.peer);
146 }
147 });
148 }
149
150 function setupForPeer(peerName) {
151 var peer = self.httpServer.peers[peerName];
152 if (!peer) {
153 return;
154 }
155
156 // iterate through existing remote devices
157 if (self._remoteDevices[peer.name]) {
158 Object.keys(self._remoteDevices[peer.name]).forEach(function(deviceId) {
159 var device = self._remoteDevices[peer.name][deviceId];
160 self.registry.match(query, device, function(err, match) {
161 if (!match) {
162 return;
163 }
164
165 // Handle when peer for remote device was dissconnected
166 // TODO: Probably should not handle it only on device._socket but device state is disconnected
167 if (device._socket.status !== 'connected') {
168 device._socket.once('connected', function() {
169 observer.onNext(device);
170 });
171 return;
172 }
173
174 observer.onNext(device);
175 });
176 });
177 }
178
179
180 // listen for devices comming online from remote per observer
181 self.on(peer.name + '/remotedeviceready', function(device) {
182 self.registry.match(query, device, function(err, match) {
183 if (match) {
184 observer.onNext(device);
185 }
186 });
187 });
188
189 self._initRemoteQueryListener(ql, peer);
190 }
191
192 if (query.server === '*') {
193 var peersSetup = [];
194 Object.keys(self.httpServer.peers).forEach(function(peerName) {
195 peersSetup.push(peerName);
196 setupForPeer(peerName);
197 });
198
199 // setup all future peers
200 self.pubsub.subscribe('_peer/connect', function(e, data) {
201 if (peersSetup.indexOf(data.peer.name) === -1) {
202 peersSetup.push(data.peer.name);
203 setupForPeer(data.peer.name);
204 }
205 })
206 } else {
207 setupForPeer(query.server);
208 }
209 });
210
211 filters.push(queryObservable);
212 } else {
213 var queryObservable = observable.flatMap(function(device) {
214 return Rx.Observable.create(function(observer) {
215 self.registry.match(query, device, function(err, match) {
216 if (match) {
217 observer.onNext(device);
218 }
219 });
220 });
221 });
222
223 filters.push(queryObservable);
224 }
225 });
226
227 var source = null;
228 if(filters.length > 1) {
229 filters.push(function() {
230 return Array.prototype.slice.call(arguments);
231 });
232 source = Rx.Observable.zip.apply(null, filters);
233 } else {
234 source = filters[0];
235 }
236
237 return !cb ? source :
238 source
239 .subscribe(function(args){
240 if (Array.isArray(args)) {
241 cb.apply(null, args);
242 } else {
243 cb.apply(null, [args]);
244 }
245 });
246};
247
248// raw db -
249Runtime.prototype.find = function() {
250 return this.registry.find.apply(this.registry, arguments);
251};
252
253Runtime.prototype.onPeerRequest = function(fn) {
254 this._peerRequestExtensions.push(fn);
255};
256
257Runtime.prototype.onPeerResponse = function(fn) {
258 this._peerResponseExtensions.push(fn);
259};
260
261Runtime.prototype.modifyPeerRequest = function(ws) {
262 ws.extendRequest(this._peerRequestExtensions);
263};
264
265Runtime.prototype.modifyPeerResponse = function(ws) {
266 ws.extendResponse(this._peerResponseExtensions);
267};
268
269Runtime.prototype._initRemoteQueryListener = function(ql, peer) {
270 var self = this;
271
272 if (!this._remoteDevices[peer.name]) {
273 this._remoteDevices[peer.name] = {};
274 this._remoteSubscriptions[peer.name] = {};
275 }
276
277 var topic = querytopic.format({ql: ql});
278
279 // already subscribed to the query topic
280 if(this._remoteSubscriptions[peer.name][topic]) {
281 return;
282 }
283
284 // set up reactive query with peer and call onNext when available.
285 peer.subscribe(topic);
286
287 this._remoteSubscriptions[peer.name][topic] = function(data) {
288 self._createRemoteDevice(peer, data);
289 };
290
291 peer.on(topic, this._remoteSubscriptions[peer.name][topic]);
292};
293
294Runtime.prototype._createRemoteDevice = function(peer, data) {
295 // device already in local memory, dont fire again
296 var self = this;
297 if (self._remoteDevices[peer.name][data.properties.id]) {
298 return;
299 }
300 var virtualDevice = new VirtualDevice(data, peer);
301 virtualDevice.on('remote-destroy', function(virtualDevice) {
302 delete self._remoteDevices[peer.name][data.properties.id];
303 });
304 self._remoteDevices[peer.name][data.properties.id] = virtualDevice;
305 self.emit(peer.name + '/remotedeviceready', virtualDevice);
306 return virtualDevice;
307};
308
309Runtime.prototype._onDestroy = function(device, cb) {
310 var self = this;
311 if(!cb) {
312 cb = function() {};
313 }
314 this._destroyDevice(device, function(err) {
315 if(err) {
316 self._log.emit('error', 'server', 'Device (' + device.id + ') could not be deleted. Error: ' + err.message);
317 }
318
319 cb(err);
320 });
321};
322
323Runtime.prototype._destroyDevice = function(device, cb) {
324 var self = this;
325 if(!cb) {
326 cb = function() {};
327 }
328 device.state = 'zetta-device-destroy';
329 if(typeof device._sendLogStreamEvent === 'function') {
330 device._sendLogStreamEvent('zetta-device-destroy', [], function() {
331 self.registry.remove(device, function(err) {
332 if(err) {
333 cb(err);
334 } else {
335 delete self._jsDevices[device.id];
336 cb(null);
337 }
338 });
339 });
340 } else {
341 self._log.emit('error', 'server', 'Device (' + device.id + ') could not be deleted. Error: Device incompatible with delete functionality.');
342 cb(new Error('Device not compatible'));
343 }
344};