1 | var EventEmitter = require('events').EventEmitter;
|
2 | var util = require('util');
|
3 | var async = require('async');
|
4 | var decompile = require('calypso-query-decompiler');
|
5 | var Rx = require('rx');
|
6 | var Logger = require('./logger');
|
7 | var Query = require('calypso').Query;
|
8 | var PubSub = require('./pubsub_service');
|
9 | var VirtualDevice = require('./virtual_device');
|
10 | var querytopic = require('./query_topic');
|
11 |
|
12 | var DeviceRegistry = require('./device_registry');
|
13 |
|
14 | var Runtime = module.exports = function(opts) {
|
15 | EventEmitter.call(this);
|
16 |
|
17 | if (!opts) {
|
18 | opts = {};
|
19 | }
|
20 | this.registry = opts.registry || new DeviceRegistry();
|
21 | this.pubsub = opts.pubsub || new PubSub();
|
22 | this._log = opts.log || new Logger({pubsub: this.pubsub});
|
23 | this.httpServer = opts.httpServer|| {};
|
24 |
|
25 | this.exposed = {};
|
26 | this.path = '/devices';
|
27 | this.exposeQuery = null;
|
28 | this._jsDevices = {};
|
29 |
|
30 |
|
31 | this._remoteDevices = {};
|
32 | this._remoteSubscriptions = {};
|
33 |
|
34 | this._observable = Rx.Observable.fromEvent(this, 'deviceready');
|
35 |
|
36 | this._peerRequestExtensions = [];
|
37 | this._peerResponseExtensions = [];
|
38 |
|
39 | this.filter = this._observable.filter.bind(this._observable);
|
40 | this.map = this._observable.map.bind(this._observable);
|
41 | this.take = this._observable.take.bind(this._observable);
|
42 | this.zip = this._observable.zip.bind(this._observable);
|
43 | this.subscribe = this._observable.subscribe.bind(this._observable);
|
44 | };
|
45 | util.inherits(Runtime, EventEmitter);
|
46 |
|
47 | Logger.LEVELS.forEach(function(level) {
|
48 | Runtime.prototype[level] = function(message, data) {
|
49 | this._log[level]('user-log', message, data);
|
50 | };
|
51 | });
|
52 |
|
53 | Runtime.prototype.from = function(server) {
|
54 | var q = Query.of('devices');
|
55 | q.remote = true;
|
56 | q.server = server;
|
57 |
|
58 | return q;
|
59 | };
|
60 |
|
61 | Runtime.prototype.ql = function(q) {
|
62 | return Query.of('devices').ql(q);
|
63 | };
|
64 |
|
65 | Runtime.prototype.query = function() {
|
66 | return Query.of('devices');
|
67 | };
|
68 |
|
69 | Runtime.prototype.where = function(q) {
|
70 | return Query.of('devices').where(q);
|
71 | };
|
72 |
|
73 | Runtime.prototype.expose = function(query) {
|
74 | var self = this;
|
75 | if(typeof query === 'string' && query === '*') {
|
76 | query = new Query(query);
|
77 | }
|
78 |
|
79 | this.on('deviceready', function(device) {
|
80 | self.registry.match(query, device, function(err, match) {
|
81 | if (match) {
|
82 | self._exposeDevice(device);
|
83 | }
|
84 | });
|
85 | });
|
86 | };
|
87 |
|
88 |
|
89 | Runtime.prototype._exposeDevice = function(device) {
|
90 | this.exposed[this.path + '/' + device.id] = device;
|
91 | };
|
92 |
|
93 |
|
94 | Runtime.prototype.observe = function(queries, cb) {
|
95 | var self = this;
|
96 | var filters = [];
|
97 | var observable = this._observable;
|
98 |
|
99 | if (!Array.isArray(queries)) {
|
100 | queries = [queries];
|
101 | }
|
102 |
|
103 | if(Object.keys(this._jsDevices).length) {
|
104 | var existingDeviceObservable = Rx.Observable.create(function(observer) {
|
105 | Object.keys(self._jsDevices).forEach(function(deviceId) {
|
106 | observer.onNext(self._jsDevices[deviceId]);
|
107 | });
|
108 | });
|
109 | observable = Rx.Observable.merge(this._observable, existingDeviceObservable);
|
110 | }
|
111 |
|
112 | var filters = [];
|
113 | var self = this;
|
114 | queries.forEach(function(query) {
|
115 | if (query.remote === true) {
|
116 | var ql = decompile(query);
|
117 | var toRemove = 'select * ';
|
118 | if (ql.slice(0, toRemove.length) === toRemove) {
|
119 | ql = ql.slice(toRemove.length);
|
120 | }
|
121 |
|
122 | var queryObservable = Rx.Observable.create(function(observer) {
|
123 |
|
124 |
|
125 | if (!self.httpServer.peers[query.server] || query.server === '*') {
|
126 |
|
127 | self.pubsub.subscribe('_peer/connect', function(ev, data) {
|
128 | if (data.peer.name === query.server) {
|
129 |
|
130 |
|
131 | self.on(data.peer.name + '/remotedeviceready', function(device) {
|
132 | self.registry.match(query, device, function(err, match) {
|
133 | if (match) {
|
134 | observer.onNext(device);
|
135 | }
|
136 | });
|
137 | });
|
138 |
|
139 | self._initRemoteQueryListener(ql, data.peer);
|
140 | }
|
141 | });
|
142 | }
|
143 |
|
144 | function setupForPeer(peerName) {
|
145 | var peer = self.httpServer.peers[peerName];
|
146 | if (!peer) {
|
147 | return;
|
148 | }
|
149 |
|
150 |
|
151 | if (self._remoteDevices[peer.name]) {
|
152 | Object.keys(self._remoteDevices[peer.name]).forEach(function(deviceId) {
|
153 | var device = self._remoteDevices[peer.name][deviceId];
|
154 | self.registry.match(query, device, function(err, match) {
|
155 | if (!match) {
|
156 | return;
|
157 | }
|
158 |
|
159 |
|
160 |
|
161 | if (device._socket.status !== 'connected') {
|
162 | device._socket.once('connected', function() {
|
163 | observer.onNext(device);
|
164 | });
|
165 | return;
|
166 | }
|
167 |
|
168 | observer.onNext(device);
|
169 | });
|
170 | });
|
171 | }
|
172 |
|
173 |
|
174 |
|
175 | self.on(peer.name + '/remotedeviceready', function(device) {
|
176 | self.registry.match(query, device, function(err, match) {
|
177 | if (match) {
|
178 | observer.onNext(device);
|
179 | }
|
180 | });
|
181 | });
|
182 |
|
183 | self._initRemoteQueryListener(ql, peer);
|
184 | }
|
185 |
|
186 | if (query.server === '*') {
|
187 | var peersSetup = [];
|
188 | Object.keys(self.httpServer.peers).forEach(function(peerName) {
|
189 | peersSetup.push(peerName);
|
190 | setupForPeer(peerName);
|
191 | });
|
192 |
|
193 |
|
194 | self.pubsub.subscribe('_peer/connect', function(e, data) {
|
195 | if (peersSetup.indexOf(data.peer.name) === -1) {
|
196 | peersSetup.push(data.peer.name);
|
197 | setupForPeer(data.peer.name);
|
198 | }
|
199 | })
|
200 | } else {
|
201 | setupForPeer(query.server);
|
202 | }
|
203 | });
|
204 |
|
205 | filters.push(queryObservable);
|
206 | } else {
|
207 | var queryObservable = observable.flatMap(function(device) {
|
208 | return Rx.Observable.create(function(observer) {
|
209 | self.registry.match(query, device, function(err, match) {
|
210 | if (match) {
|
211 | observer.onNext(device);
|
212 | }
|
213 | });
|
214 | });
|
215 | });
|
216 |
|
217 | filters.push(queryObservable);
|
218 | }
|
219 | });
|
220 |
|
221 | var source = null;
|
222 | if(filters.length > 1) {
|
223 | filters.push(function() {
|
224 | return Array.prototype.slice.call(arguments);
|
225 | });
|
226 | source = Rx.Observable.zip.apply(null, filters);
|
227 | } else {
|
228 | source = filters[0];
|
229 | }
|
230 |
|
231 | return !cb ? source :
|
232 | source
|
233 | .subscribe(function(args){
|
234 | if (Array.isArray(args)) {
|
235 | cb.apply(null, args);
|
236 | } else {
|
237 | cb.apply(null, [args]);
|
238 | }
|
239 | });
|
240 | };
|
241 |
|
242 |
|
243 | Runtime.prototype.find = function() {
|
244 | return this.registry.find.apply(this.registry, arguments);
|
245 | };
|
246 |
|
247 | Runtime.prototype.onPeerRequest = function(fn) {
|
248 | this._peerRequestExtensions.push(fn);
|
249 | };
|
250 |
|
251 | Runtime.prototype.onPeerResponse = function(fn) {
|
252 | this._peerResponseExtensions.push(fn);
|
253 | };
|
254 |
|
255 | Runtime.prototype.modifyPeerRequest = function(ws) {
|
256 | ws.extendRequest(this._peerRequestExtensions);
|
257 | };
|
258 |
|
259 | Runtime.prototype.modifyPeerResponse = function(ws) {
|
260 | ws.extendResponse(this._peerResponseExtensions);
|
261 | };
|
262 |
|
263 | Runtime.prototype._initRemoteQueryListener = function(ql, peer) {
|
264 | var self = this;
|
265 |
|
266 | if (!this._remoteDevices[peer.name]) {
|
267 | this._remoteDevices[peer.name] = {};
|
268 | this._remoteSubscriptions[peer.name] = {};
|
269 | }
|
270 |
|
271 | var topic = querytopic.format({ql: ql});
|
272 |
|
273 |
|
274 | if(this._remoteSubscriptions[peer.name][topic]) {
|
275 | return;
|
276 | }
|
277 |
|
278 |
|
279 | peer.subscribe(encodeURIComponent(topic));
|
280 |
|
281 | this._remoteSubscriptions[peer.name][topic] = function(data) {
|
282 |
|
283 | if (self._remoteDevices[peer.name][data.properties.id]) {
|
284 | return;
|
285 | }
|
286 | var virtualDevice = new VirtualDevice(data, peer);
|
287 | self._remoteDevices[peer.name][data.properties.id] = virtualDevice;
|
288 | self.emit(peer.name + '/remotedeviceready', virtualDevice);
|
289 | };
|
290 |
|
291 | peer.on(topic, this._remoteSubscriptions[peer.name][topic]);
|
292 | };
|