1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var async = require('async');
|
4 | var decompile = require('calypso-query-decompiler');
|
5 | var Rx = require('rx');
|
6 | var Logger = require('./logger');
|
7 | var Query = require('calypso').Query;
|
8 | var PubSub = require('./pubsub_service');
|
9 | var VirtualDevice = require('./virtual_device');
|
10 | var querytopic = require('./query_topic');
|
11 |
|
12 | var DeviceRegistry = require('./device_registry');
|
13 |
|
14 | var Runtime = module.exports = function(opts) {
|
15 | EventEmitter.call(this);
|
16 | var self = this;
|
17 |
|
18 | if (!opts) {
|
19 | opts = {};
|
20 | }
|
21 | this.registry = opts.registry || new DeviceRegistry();
|
22 | this.pubsub = opts.pubsub || new PubSub();
|
23 | this._log = opts.log || new Logger({pubsub: this.pubsub});
|
24 | this.httpServer = opts.httpServer|| {};
|
25 |
|
26 | this.exposed = {};
|
27 | this.path = '/devices';
|
28 | this.exposeQuery = null;
|
29 | this._jsDevices = {};
|
30 |
|
31 |
|
32 | this._remoteDevices = {};
|
33 | this._remoteSubscriptions = {};
|
34 |
|
35 | this._observable = Rx.Observable.fromEvent(this, 'deviceready');
|
36 | this.on('deviceready', function(device) {
|
37 | device.on('destroy', function(device, cb) {
|
38 | self._onDestroy(device, cb);
|
39 | });
|
40 | });
|
41 |
|
42 | this._peerRequestExtensions = [];
|
43 | this._peerResponseExtensions = [];
|
44 |
|
45 | this.filter = this._observable.filter.bind(this._observable);
|
46 | this.map = this._observable.map.bind(this._observable);
|
47 | this.take = this._observable.take.bind(this._observable);
|
48 | this.zip = this._observable.zip.bind(this._observable);
|
49 | this.subscribe = this._observable.subscribe.bind(this._observable);
|
50 | };
|
51 | util.inherits(Runtime, EventEmitter);
|
52 |
|
53 | Logger.LEVELS.forEach(function(level) {
|
54 | Runtime.prototype[level] = function(message, data) {
|
55 | this._log[level]('user-log', message, data);
|
56 | };
|
57 | });
|
58 |
|
59 | Runtime.prototype.from = function(server) {
|
60 | var q = Query.of('devices');
|
61 | q.remote = true;
|
62 | q.server = server;
|
63 |
|
64 | return q;
|
65 | };
|
66 |
|
67 | Runtime.prototype.ql = function(q) {
|
68 | return Query.of('devices').ql(q);
|
69 | };
|
70 |
|
71 | Runtime.prototype.query = function() {
|
72 | return Query.of('devices');
|
73 | };
|
74 |
|
75 | Runtime.prototype.where = function(q) {
|
76 | return Query.of('devices').where(q);
|
77 | };
|
78 |
|
79 | Runtime.prototype.expose = function(query) {
|
80 | var self = this;
|
81 | if(typeof query === 'string' && query === '*') {
|
82 | query = new Query(query);
|
83 | }
|
84 |
|
85 | this.on('deviceready', function(device) {
|
86 | self.registry.match(query, device, function(err, match) {
|
87 | if (match) {
|
88 | self._exposeDevice(device);
|
89 | }
|
90 | });
|
91 | });
|
92 | };
|
93 |
|
94 |
|
95 | Runtime.prototype._exposeDevice = function(device) {
|
96 | this.exposed[this.path + '/' + device.id] = device;
|
97 | };
|
98 |
|
99 |
|
100 | Runtime.prototype.observe = function(queries, cb) {
|
101 | var self = this;
|
102 | var filters = [];
|
103 | var observable = this._observable;
|
104 |
|
105 | if (!Array.isArray(queries)) {
|
106 | queries = [queries];
|
107 | }
|
108 |
|
109 | if(Object.keys(this._jsDevices).length) {
|
110 | var existingDeviceObservable = Rx.Observable.create(function(observer) {
|
111 | Object.keys(self._jsDevices).forEach(function(deviceId) {
|
112 | observer.onNext(self._jsDevices[deviceId]);
|
113 | });
|
114 | });
|
115 | observable = Rx.Observable.merge(this._observable, existingDeviceObservable);
|
116 | }
|
117 |
|
118 | var filters = [];
|
119 | var self = this;
|
120 | queries.forEach(function(query) {
|
121 | if (query.remote === true) {
|
122 | var ql = decompile(query);
|
123 | var toRemove = 'select * ';
|
124 | if (ql.slice(0, toRemove.length) === toRemove) {
|
125 | ql = ql.slice(toRemove.length);
|
126 | }
|
127 |
|
128 | var queryObservable = Rx.Observable.create(function(observer) {
|
129 |
|
130 |
|
131 | if (!self.httpServer.peers[query.server] || query.server === '*') {
|
132 |
|
133 | self.pubsub.subscribe('_peer/connect', function(ev, data) {
|
134 | if (data.peer.name === query.server) {
|
135 |
|
136 |
|
137 | self.on(data.peer.name + '/remotedeviceready', function(device) {
|
138 | self.registry.match(query, device, function(err, match) {
|
139 | if (match) {
|
140 | observer.onNext(device);
|
141 | }
|
142 | });
|
143 | });
|
144 |
|
145 | self._initRemoteQueryListener(ql, data.peer);
|
146 | }
|
147 | });
|
148 | }
|
149 |
|
150 | function setupForPeer(peerName) {
|
151 | var peer = self.httpServer.peers[peerName];
|
152 | if (!peer) {
|
153 | return;
|
154 | }
|
155 |
|
156 |
|
157 | if (self._remoteDevices[peer.name]) {
|
158 | Object.keys(self._remoteDevices[peer.name]).forEach(function(deviceId) {
|
159 | var device = self._remoteDevices[peer.name][deviceId];
|
160 | self.registry.match(query, device, function(err, match) {
|
161 | if (!match) {
|
162 | return;
|
163 | }
|
164 |
|
165 |
|
166 |
|
167 | if (device._socket.status !== 'connected') {
|
168 | device._socket.once('connected', function() {
|
169 | observer.onNext(device);
|
170 | });
|
171 | return;
|
172 | }
|
173 |
|
174 | observer.onNext(device);
|
175 | });
|
176 | });
|
177 | }
|
178 |
|
179 |
|
180 |
|
181 | self.on(peer.name + '/remotedeviceready', function(device) {
|
182 | self.registry.match(query, device, function(err, match) {
|
183 | if (match) {
|
184 | observer.onNext(device);
|
185 | }
|
186 | });
|
187 | });
|
188 |
|
189 | self._initRemoteQueryListener(ql, peer);
|
190 | }
|
191 |
|
192 | if (query.server === '*') {
|
193 | var peersSetup = [];
|
194 | Object.keys(self.httpServer.peers).forEach(function(peerName) {
|
195 | peersSetup.push(peerName);
|
196 | setupForPeer(peerName);
|
197 | });
|
198 |
|
199 |
|
200 | self.pubsub.subscribe('_peer/connect', function(e, data) {
|
201 | if (peersSetup.indexOf(data.peer.name) === -1) {
|
202 | peersSetup.push(data.peer.name);
|
203 | setupForPeer(data.peer.name);
|
204 | }
|
205 | })
|
206 | } else {
|
207 | setupForPeer(query.server);
|
208 | }
|
209 | });
|
210 |
|
211 | filters.push(queryObservable);
|
212 | } else {
|
213 | var queryObservable = observable.flatMap(function(device) {
|
214 | return Rx.Observable.create(function(observer) {
|
215 | self.registry.match(query, device, function(err, match) {
|
216 | if (match) {
|
217 | observer.onNext(device);
|
218 | }
|
219 | });
|
220 | });
|
221 | });
|
222 |
|
223 | filters.push(queryObservable);
|
224 | }
|
225 | });
|
226 |
|
227 | var source = null;
|
228 | if(filters.length > 1) {
|
229 | filters.push(function() {
|
230 | return Array.prototype.slice.call(arguments);
|
231 | });
|
232 | source = Rx.Observable.zip.apply(null, filters);
|
233 | } else {
|
234 | source = filters[0];
|
235 | }
|
236 |
|
237 | return !cb ? source :
|
238 | source
|
239 | .subscribe(function(args){
|
240 | if (Array.isArray(args)) {
|
241 | cb.apply(null, args);
|
242 | } else {
|
243 | cb.apply(null, [args]);
|
244 | }
|
245 | });
|
246 | };
|
247 |
|
248 |
|
249 | Runtime.prototype.find = function() {
|
250 | return this.registry.find.apply(this.registry, arguments);
|
251 | };
|
252 |
|
253 | Runtime.prototype.onPeerRequest = function(fn) {
|
254 | this._peerRequestExtensions.push(fn);
|
255 | };
|
256 |
|
257 | Runtime.prototype.onPeerResponse = function(fn) {
|
258 | this._peerResponseExtensions.push(fn);
|
259 | };
|
260 |
|
261 | Runtime.prototype.modifyPeerRequest = function(ws) {
|
262 | ws.extendRequest(this._peerRequestExtensions);
|
263 | };
|
264 |
|
265 | Runtime.prototype.modifyPeerResponse = function(ws) {
|
266 | ws.extendResponse(this._peerResponseExtensions);
|
267 | };
|
268 |
|
269 | Runtime.prototype._initRemoteQueryListener = function(ql, peer) {
|
270 | var self = this;
|
271 |
|
272 | if (!this._remoteDevices[peer.name]) {
|
273 | this._remoteDevices[peer.name] = {};
|
274 | this._remoteSubscriptions[peer.name] = {};
|
275 | }
|
276 |
|
277 | var topic = querytopic.format({ql: ql});
|
278 |
|
279 |
|
280 | if(this._remoteSubscriptions[peer.name][topic]) {
|
281 | return;
|
282 | }
|
283 |
|
284 |
|
285 | peer.subscribe(encodeURIComponent(topic));
|
286 |
|
287 | this._remoteSubscriptions[peer.name][topic] = function(data) {
|
288 | self._createRemoteDevice(peer, data);
|
289 | };
|
290 |
|
291 | peer.on(topic, this._remoteSubscriptions[peer.name][topic]);
|
292 | };
|
293 |
|
294 | Runtime.prototype._createRemoteDevice = function(peer, data) {
|
295 |
|
296 | var self = this;
|
297 | if (self._remoteDevices[peer.name][data.properties.id]) {
|
298 | return;
|
299 | }
|
300 | var virtualDevice = new VirtualDevice(data, peer);
|
301 | virtualDevice.on('remote-destroy', function(virtualDevice) {
|
302 | delete self._remoteDevices[peer.name][data.properties.id];
|
303 | });
|
304 | self._remoteDevices[peer.name][data.properties.id] = virtualDevice;
|
305 | self.emit(peer.name + '/remotedeviceready', virtualDevice);
|
306 | return virtualDevice;
|
307 | };
|
308 |
|
309 | Runtime.prototype._onDestroy = function(device, cb) {
|
310 | var self = this;
|
311 | if(!cb) {
|
312 | cb = function() {};
|
313 | }
|
314 | this._destroyDevice(device, function(err) {
|
315 | if(err) {
|
316 | self._log.emit('error', 'server', 'Device (' + device.id + ') could not be deleted. Error: ' + err.message);
|
317 | }
|
318 |
|
319 | cb(err);
|
320 | });
|
321 | };
|
322 |
|
323 | Runtime.prototype._destroyDevice = function(device, cb) {
|
324 | var self = this;
|
325 | if(!cb) {
|
326 | cb = function() {};
|
327 | }
|
328 | device.state = 'zetta-device-destroy';
|
329 | if(typeof device._sendLogStreamEvent === 'function') {
|
330 | device._sendLogStreamEvent('zetta-device-destroy', [], function() {
|
331 | self.registry.remove(device, function(err) {
|
332 | if(err) {
|
333 | cb(err);
|
334 | } else {
|
335 | delete self._jsDevices[device.id];
|
336 | cb(null);
|
337 | }
|
338 | });
|
339 | });
|
340 | } else {
|
341 | self._log.emit('error', 'server', 'Device (' + device.id + ') could not be deleted. Error: Device incompatible with delete functionality.');
|
342 | cb(new Error('Device not compatible'));
|
343 | }
|
344 | };
|