UNPKG

8.71 kBJavaScriptView Raw
1var EventEmitter = require('events').EventEmitter;
2var util = require('util');
3var async = require('async');
4var decompile = require('calypso-query-decompiler');
5var Rx = require('rx');
6var Logger = require('./logger');
7var Query = require('calypso').Query;
8var PubSub = require('./pubsub_service');
9var VirtualDevice = require('./virtual_device');
10var querytopic = require('./query_topic');
11
12var DeviceRegistry = require('./device_registry');
13
14var Runtime = module.exports = function(opts) {
15 EventEmitter.call(this);
16
17 if (!opts) {
18 opts = {};
19 }
20 this.registry = opts.registry || new DeviceRegistry();
21 this.pubsub = opts.pubsub || new PubSub();
22 this._log = opts.log || new Logger({pubsub: this.pubsub});
23 this.httpServer = opts.httpServer|| {};
24
25 this.exposed = {};
26 this.path = '/devices';
27 this.exposeQuery = null;
28 this._jsDevices = {};
29
30 // store remove virtual devices per server
31 this._remoteDevices = {}; // { <server_name>: { <device_id>: virtualDevice } }
32 this._remoteSubscriptions = {}; // { <server_name>: { <query>: listener } }
33
34 this._observable = Rx.Observable.fromEvent(this, 'deviceready');
35
36 this._peerRequestExtensions = [];
37 this._peerResponseExtensions = [];
38
39 this.filter = this._observable.filter.bind(this._observable);
40 this.map = this._observable.map.bind(this._observable);
41 this.take = this._observable.take.bind(this._observable);
42 this.zip = this._observable.zip.bind(this._observable);
43 this.subscribe = this._observable.subscribe.bind(this._observable);
44};
45util.inherits(Runtime, EventEmitter);
46
47Logger.LEVELS.forEach(function(level) {
48 Runtime.prototype[level] = function(message, data) {
49 this._log[level]('user-log', message, data);
50 };
51});
52
53Runtime.prototype.from = function(server) {
54 var q = Query.of('devices');
55 q.remote = true;
56 q.server = server;
57
58 return q;
59};
60
61Runtime.prototype.ql = function(q) {
62 return Query.of('devices').ql(q);
63};
64
65Runtime.prototype.query = function() {
66 return Query.of('devices');
67};
68
69Runtime.prototype.where = function(q) {
70 return Query.of('devices').where(q);
71};
72
73Runtime.prototype.expose = function(query) {
74 var self = this;
75 if(typeof query === 'string' && query === '*') {
76 query = new Query(query);
77 }
78
79 this.on('deviceready', function(device) {
80 self.registry.match(query, device, function(err, match) {
81 if (match) {
82 self._exposeDevice(device);
83 }
84 });
85 });
86};
87
88
89Runtime.prototype._exposeDevice = function(device) {
90 this.exposed[this.path + '/' + device.id] = device;
91};
92
93//This is the new observe syntax. It will take an array of queries, and a callback.
94Runtime.prototype.observe = function(queries, cb) {
95 var self = this;
96 var filters = [];
97 var observable = this._observable;
98
99 if (!Array.isArray(queries)) {
100 queries = [queries];
101 }
102
103 if(Object.keys(this._jsDevices).length) {
104 var existingDeviceObservable = Rx.Observable.create(function(observer) {
105 Object.keys(self._jsDevices).forEach(function(deviceId) {
106 observer.onNext(self._jsDevices[deviceId]);
107 });
108 });
109 observable = Rx.Observable.merge(this._observable, existingDeviceObservable);
110 }
111
112 var filters = [];
113 var self = this;
114 queries.forEach(function(query) {
115 if (query.remote === true) {
116 var ql = decompile(query);
117 var toRemove = 'select * ';
118 if (ql.slice(0, toRemove.length) === toRemove) {
119 ql = ql.slice(toRemove.length);
120 }
121
122 var queryObservable = Rx.Observable.create(function(observer) {
123
124 // peer not connected or query is for all peers
125 if (!self.httpServer.peers[query.server] || query.server === '*') {
126 // init peer on connect / reconnect
127 self.pubsub.subscribe('_peer/connect', function(ev, data) {
128 if (data.peer.name === query.server) {
129 // subscribe to the topic on peer, but keep track of topics in runtime
130 // to only ever setup a topic once.
131 self.on(data.peer.name + '/remotedeviceready', function(device) {
132 self.registry.match(query, device, function(err, match) {
133 if (match) {
134 observer.onNext(device);
135 }
136 });
137 });
138
139 self._initRemoteQueryListener(ql, data.peer);
140 }
141 });
142 }
143
144 function setupForPeer(peerName) {
145 var peer = self.httpServer.peers[peerName];
146 if (!peer) {
147 return;
148 }
149
150 // iterate through existing remote devices
151 if (self._remoteDevices[peer.name]) {
152 Object.keys(self._remoteDevices[peer.name]).forEach(function(deviceId) {
153 var device = self._remoteDevices[peer.name][deviceId];
154 self.registry.match(query, device, function(err, match) {
155 if (!match) {
156 return;
157 }
158
159 // Handle when peer for remote device was dissconnected
160 // TODO: Probably should not handle it only on device._socket but device state is disconnected
161 if (device._socket.status !== 'connected') {
162 device._socket.once('connected', function() {
163 observer.onNext(device);
164 });
165 return;
166 }
167
168 observer.onNext(device);
169 });
170 });
171 }
172
173
174 // listen for devices comming online from remote per observer
175 self.on(peer.name + '/remotedeviceready', function(device) {
176 self.registry.match(query, device, function(err, match) {
177 if (match) {
178 observer.onNext(device);
179 }
180 });
181 });
182
183 self._initRemoteQueryListener(ql, peer);
184 }
185
186 if (query.server === '*') {
187 var peersSetup = [];
188 Object.keys(self.httpServer.peers).forEach(function(peerName) {
189 peersSetup.push(peerName);
190 setupForPeer(peerName);
191 });
192
193 // setup all future peers
194 self.pubsub.subscribe('_peer/connect', function(e, data) {
195 if (peersSetup.indexOf(data.peer.name) === -1) {
196 peersSetup.push(data.peer.name);
197 setupForPeer(data.peer.name);
198 }
199 })
200 } else {
201 setupForPeer(query.server);
202 }
203 });
204
205 filters.push(queryObservable);
206 } else {
207 var queryObservable = observable.flatMap(function(device) {
208 return Rx.Observable.create(function(observer) {
209 self.registry.match(query, device, function(err, match) {
210 if (match) {
211 observer.onNext(device);
212 }
213 });
214 });
215 });
216
217 filters.push(queryObservable);
218 }
219 });
220
221 var source = null;
222 if(filters.length > 1) {
223 filters.push(function() {
224 return Array.prototype.slice.call(arguments);
225 });
226 source = Rx.Observable.zip.apply(null, filters);
227 } else {
228 source = filters[0];
229 }
230
231 return !cb ? source :
232 source
233 .subscribe(function(args){
234 if (Array.isArray(args)) {
235 cb.apply(null, args);
236 } else {
237 cb.apply(null, [args]);
238 }
239 });
240};
241
242// raw db -
243Runtime.prototype.find = function() {
244 return this.registry.find.apply(this.registry, arguments);
245};
246
247Runtime.prototype.onPeerRequest = function(fn) {
248 this._peerRequestExtensions.push(fn);
249};
250
251Runtime.prototype.onPeerResponse = function(fn) {
252 this._peerResponseExtensions.push(fn);
253};
254
255Runtime.prototype.modifyPeerRequest = function(ws) {
256 ws.extendRequest(this._peerRequestExtensions);
257};
258
259Runtime.prototype.modifyPeerResponse = function(ws) {
260 ws.extendResponse(this._peerResponseExtensions);
261};
262
263Runtime.prototype._initRemoteQueryListener = function(ql, peer) {
264 var self = this;
265
266 if (!this._remoteDevices[peer.name]) {
267 this._remoteDevices[peer.name] = {};
268 this._remoteSubscriptions[peer.name] = {};
269 }
270
271 var topic = querytopic.format({ql: ql});
272
273 // already subscribed to the query topic
274 if(this._remoteSubscriptions[peer.name][topic]) {
275 return;
276 }
277
278 // set up reactive query with peer and call onNext when available.
279 peer.subscribe(encodeURIComponent(topic));
280
281 this._remoteSubscriptions[peer.name][topic] = function(data) {
282 // device already in local memory, dont fire again
283 if (self._remoteDevices[peer.name][data.properties.id]) {
284 return;
285 }
286 var virtualDevice = new VirtualDevice(data, peer);
287 self._remoteDevices[peer.name][data.properties.id] = virtualDevice;
288 self.emit(peer.name + '/remotedeviceready', virtualDevice);
289 };
290
291 peer.on(topic, this._remoteSubscriptions[peer.name][topic]);
292};