UNPKG

14.1 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var zetta = require('../');
4var zettacluster = require('zetta-cluster');
5var Scout = require('./fixture/example_scout');
6var ExampleDevice = require('./fixture/example_driver');
7var VirtualDevice = require('../lib/virtual_device');
8var LedJSON = require('./fixture/virtual_device.json');
9var decompiler = require('calypso-query-decompiler');
10var ZScout = require('zetta-scout');
11var util = require('util');
12var WebSocket = require('ws');
13var MemRegistry = require('./fixture/mem_registry');
14var MemPeerRegistry = require('./fixture/mem_peer_registry');
15
16
17function FakeScout() {
18 ZScout.call(this);
19};
20util.inherits(FakeScout, ZScout);
21
22FakeScout.prototype.init = function(cb) {cb();};
23
24
25var mockSocket = {
26 on: function(){},
27 subscribe: function(topic, cb){
28 if(cb) {
29 cb();
30 }
31 },
32 unsubscribe: function(){}
33};
34
35describe('Remote queries', function() {
36 var cluster = null;
37 var detroit1 = null;
38 var chicago = null;
39 var cloud = null;
40 var urlLocal = null;
41 var urlProxied = null
42 var urlRoot = null;
43
44 beforeEach(function(done) {
45 cluster = zettacluster({ zetta: zetta })
46 .server('cloud', [Scout])
47 .server('detroit1', [Scout], ['cloud'])
48 .server('chicago', [Scout], ['cloud'])
49 .on('ready', function() {
50 urlRoot = 'localhost:' + cluster.servers['cloud']._testPort;
51 urlProxied = 'localhost:' + cluster.servers['cloud']._testPort + '/servers/detroit1';
52 urlLocal = 'localhost:' + cluster.servers['detroit1']._testPort + '/servers/detroit1';
53
54 detroit1 = cluster.servers['detroit1'];
55 chicago = cluster.servers['chicago'];
56 cloud = cluster.servers['cloud'];
57 done();
58 })
59 .run(function(err){
60 if (err) {
61 return done(err);
62 }
63 });
64 });
65
66 afterEach(function(done) {
67 cluster.stop();
68 setTimeout(done, 10); // fix issues with server not being closed before a new one starts
69 });
70
71 describe('remote query events', function() {
72
73 it('should fire a remote query event on detroit1 after peers connect', function(done) {
74 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
75 cloud.runtime.observe([query], function(testdriver){
76 });
77 var key = Object.keys(cloud.runtime._remoteSubscriptions['detroit1'])[0];
78 detroit1.pubsub.subscribe(key, function() {
79 done();
80 });
81 });
82
83 it('should fire remote query for both server detroit1 and chicago', function(done) {
84 var query1 = cloud.runtime.from('detroit1').where({type: 'testdriver'});
85 var query2 = cloud.runtime.from('chicago').where({type: 'testdriver'});
86 cloud.runtime.observe([query1, query2], function(d1, d2){
87 done();
88 });
89 })
90
91 it('should return devices from both Z1 and Z2 after peers connects', function(done) {
92 var query1 = cloud.runtime.from('Z1').where({type: 'testdriver'});
93 var query2 = cloud.runtime.from('Z2').where({type: 'testdriver'});
94 cloud.runtime.observe([query1, query2], function(d1, d2){
95 done();
96 });
97
98 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
99 .name('Z1')
100 .use(Scout)
101 .silent()
102 .link('http://' + urlRoot)
103 .listen(0);
104
105 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
106 .name('Z2')
107 .use(Scout)
108 .silent()
109 .link('http://' + urlRoot)
110 .listen(0);
111 })
112
113 it('should return all test devices when quering .from(\'*\')', function(done) {
114 var query = cloud.runtime.from('*').where({type: 'testdriver'});
115 var count = 0;
116 cloud.runtime.observe(query, function(device){
117 count++;
118 if (count === 2) {
119 done();
120 }
121 });
122 });
123
124 it('should return all test devices from quering .from(\'*\') when a new peer connects', function(done) {
125 var query = cloud.runtime.from('*').where({type: 'testdriver'});
126 var count = 0;
127 cloud.runtime.observe(query, function(device){
128 count++;
129 if (count === 3) {
130 done();
131 }
132 });
133
134 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
135 .name('local')
136 .use(Scout)
137 .silent()
138 .link('http://' + urlRoot)
139 .listen(0);
140 })
141
142 it('adding a device on the remote server should add a device to app with star query', function(done) {
143 var query = cloud.runtime.from('*').where({type: 'testdriver'});
144 var recv = 0;
145 cloud.runtime.observe([query], function(testdriver){
146 recv++;
147 });
148
149 var detroit = cluster.servers['detroit1'];
150 var scout = new FakeScout();
151 scout.server = detroit.runtime;
152 scout.discover(ExampleDevice);
153
154 setTimeout(function() {
155 assert.equal(recv, 3);
156 done();
157 }, 100);
158 });
159
160 it('should pass a remote query to peer socket through subscribe', function(done) {
161 var query = cloud.runtime.from('detroit2').where({type: 'testdriver'});
162 var ql = decompiler(query);
163 var remove = 'select * ';
164 if(ql.slice(0, remove.length) === remove) {
165 ql = ql.slice(remove.length);
166 }
167
168 cloud.runtime.observe([query], function(testdriver){
169 });
170
171 var sock = {
172 subscribe: function(){},
173 on: function(ev, data){
174 if(ev.indexOf('query:') === 0) {
175 done();
176 }
177 },
178 name: 'detroit2'
179 };
180
181 cloud.pubsub.publish('_peer/connect', { peer: sock });
182 });
183
184 it('adding a device on the remote server should add a device to app', function(done) {
185 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
186 var recv = 0;
187 cloud.runtime.observe([query], function(testdriver){
188 recv++;
189 });
190
191 var detroit = cluster.servers['detroit1'];
192 var scout = new FakeScout();
193 scout.server = detroit.runtime;
194 scout.discover(ExampleDevice);
195
196 setTimeout(function() {
197 assert.equal(recv, 2);
198 done();
199 }, 100);
200 });
201
202 });
203
204 describe('Peer Reconnects', function() {
205
206 it('runtime should only pass the device once to app', function(done) {
207 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
208 var recv = 0;
209 cloud.runtime.observe([query], function(testdriver){
210 recv++;
211 });
212
213 var socket = cluster.servers['cloud'].httpServer.peers['detroit1'];
214 setTimeout(function(){
215 socket.close();
216 }, 100);
217
218 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
219 if (data.peer.name === 'detroit1') {
220 assert.equal(recv, 1);
221 done();
222 }
223 });
224 });
225
226 it('runtime should ony pass the device once to app for each peer', function(done) {
227 var query = cloud.runtime.from('*').where({type: 'testdriver'});
228 var recv = 0;
229 cloud.runtime.observe([query], function(testdriver){
230 recv++;
231 });
232
233 var socket = cluster.servers['cloud'].httpServer.peers['detroit1'];
234 setTimeout(function(){
235 socket.close();
236 }, 100);
237
238 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
239 if (data.peer.name === 'detroit1') {
240 assert.equal(recv, 2);
241 done();
242 }
243 });
244 })
245
246
247 it('should send back 1 result for peer after a reconnet', function(done) {
248 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
249 var recv = 0;
250
251 var socketP = cluster.servers['cloud'].httpServer.peers['detroit1'];
252 setTimeout(function(){
253 socketP.close();
254 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
255 if (data.peer.name === 'detroit1') {
256 setTimeout(function() {
257 assert.equal(recv, 1);
258 done();
259 }, 100);
260 }
261 });
262 }, 100);
263
264 socket.on('message', function(data) {
265 var json = JSON.parse(data);
266 // test links are properly set
267 json.links.forEach(function(link) {
268 assert(link.href.indexOf(urlProxied) > -1)
269 });
270 assert.equal(json.properties.type, 'testdriver');
271 recv++;
272 });
273
274
275 });
276 });
277
278
279 describe('Websocket Local Queries', function() {
280
281 it('should send back 1 result for local device', function(done) {
282 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
283 socket.on('open', function(err) {
284 socket.on('message', function(data) {
285 var json = JSON.parse(data);
286 // test links are properly set
287 json.links.forEach(function(link) {
288 assert(link.href.indexOf(urlLocal) > -1)
289 });
290
291 assert.equal(json.properties.type, 'testdriver');
292 done();
293 });
294 });
295 });
296
297 it('should send back 2 results for local device after a device is added', function(done) {
298 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
299 socket.on('open', function(err) {
300 var recv = 0;
301
302 setTimeout(function(){
303 var detroit = cluster.servers['detroit1'];
304 var scout = new FakeScout();
305 scout.server = detroit.runtime;
306 scout.discover(ExampleDevice);
307 }, 50);
308
309 socket.on('message', function(data) {
310 var json = JSON.parse(data);
311 assert.equal(json.properties.type, 'testdriver');
312 recv++;
313
314 if (recv === 2) {
315 done();
316 }
317 });
318 });
319
320 });
321
322 it('reconnecting should only have 1 result', function(done) {
323 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
324 socket.on('open', function(err) {
325 socket.on('message', function(data) {
326 var json = JSON.parse(data);
327 assert.equal(json.properties.type, 'testdriver');
328 socket.close();
329
330 var socket2 = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
331 socket2.on('open', function(err) {
332 socket2.on('message', function(data) {
333 var json = JSON.parse(data);
334 assert.equal(json.properties.type, 'testdriver');
335 done();
336 });
337 });
338
339 });
340 });
341 });
342
343 });
344
345
346
347
348
349 describe('Websocket Proxied Queries', function() {
350
351 it('should send back 1 result for local device', function(done) {
352 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
353 socket.on('open', function(err) {
354 socket.on('message', function(data) {
355 var json = JSON.parse(data);
356
357 // test links are properly set
358 json.links.forEach(function(link) {
359 assert(link.href.indexOf(urlProxied) > -1)
360 });
361
362 assert.equal(json.properties.type, 'testdriver');
363 done();
364 });
365 });
366 });
367
368 it('should send back 2 results for local device after a device is added', function(done) {
369 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
370 socket.on('open', function(err) {
371 var recv = 0;
372
373 setTimeout(function(){
374 var detroit = cluster.servers['detroit1'];
375 var scout = new FakeScout();
376 scout.server = detroit.runtime;
377 scout.discover(ExampleDevice);
378 }, 50);
379
380 socket.on('message', function(data) {
381 var json = JSON.parse(data);
382 assert.equal(json.properties.type, 'testdriver');
383 recv++;
384
385 if (recv === 2) {
386 done();
387 }
388 });
389 });
390
391 });
392
393 it('reconnecting should only have 1 result', function(done) {
394 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
395 socket.on('open', function(err) {
396 socket.on('message', function(data) {
397 var json = JSON.parse(data);
398 assert.equal(json.properties.type, 'testdriver');
399 socket.close();
400
401 var socket2 = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
402 socket2.on('open', function(err) {
403 socket2.on('message', function(data) {
404 var json = JSON.parse(data);
405 assert.equal(json.properties.type, 'testdriver');
406 done();
407 });
408 });
409
410 });
411 });
412 });
413
414 });
415
416 describe('Websocket Cross-Server Queries', function() {
417
418 it('should send back 2 results', function(done) {
419 var socket = new WebSocket("ws://" + urlRoot + '/events?topic=query/where type = "testdriver"');
420 socket.on('open', function(err) {
421 var count = 0;
422 socket.on('message', function(data) {
423 var json = JSON.parse(data);
424
425 // test links are properly set
426 json.links.forEach(function(link) {
427 assert(link.href.indexOf(urlRoot) > -1)
428 });
429
430 assert.equal(json.properties.type, 'testdriver');
431 count++;
432
433 if (count == 2) {
434 done();
435 }
436 });
437 });
438 });
439
440 it('should send back 3 results after a device is added', function(done) {
441 var socket = new WebSocket("ws://" + urlRoot + '/events?topic=query/where type = "testdriver"');
442 socket.on('open', function(err) {
443 var recv = 0;
444
445 setTimeout(function(){
446 var detroit = cluster.servers['detroit1'];
447 var scout = new FakeScout();
448 scout.server = detroit.runtime;
449 scout.discover(ExampleDevice);
450 }, 50);
451
452 socket.on('message', function(data) {
453 var json = JSON.parse(data);
454 assert.equal(json.properties.type, 'testdriver');
455 recv++;
456
457 if (recv === 3) {
458 done();
459 }
460 });
461 });
462
463 });
464 });
465
466});
467