UNPKG

14.1 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var zetta = require('../');
4var zettacluster = require('zetta-cluster');
5var Scout = require('./fixture/example_scout');
6var ExampleDevice = require('./fixture/example_driver');
7var VirtualDevice = require('../lib/virtual_device');
8var LedJSON = require('./fixture/virtual_device.json');
9var decompiler = require('calypso-query-decompiler');
10var ZScout = require('zetta-scout');
11var util = require('util');
12var WebSocket = require('ws');
13var MemRegistry = require('./fixture/mem_registry');
14var MemPeerRegistry = require('./fixture/mem_peer_registry');
15
16
17function FakeScout() {
18 ZScout.call(this);
19};
20util.inherits(FakeScout, ZScout);
21
22FakeScout.prototype.init = function(cb) {cb();};
23
24
25var mockSocket = {
26 on: function(){},
27 subscribe: function(topic, cb){
28 if(cb) {
29 cb();
30 }
31 },
32 unsubscribe: function(){}
33};
34
35describe('Remote queries', function() {
36 var cluster = null;
37 var detroit1 = null;
38 var chicago = null;
39 var cloud = null;
40 var urlLocal = null;
41 var urlProxied = null
42 var urlRoot = null;
43
44 beforeEach(function(done) {
45 cluster = zettacluster({ zetta: zetta })
46 .server('cloud', [Scout])
47 .server('detroit1', [Scout], ['cloud'])
48 .server('chicago', [Scout], ['cloud'])
49 .on('ready', function() {
50 urlRoot = 'localhost:' + cluster.servers['cloud']._testPort;
51 urlProxied = 'localhost:' + cluster.servers['cloud']._testPort + '/servers/detroit1';
52 urlLocal = 'localhost:' + cluster.servers['detroit1']._testPort + '/servers/detroit1';
53
54 detroit1 = cluster.servers['detroit1'];
55 chicago = cluster.servers['chicago'];
56 cloud = cluster.servers['cloud'];
57 done();
58 })
59 .run(function(err){
60 if (err) {
61 return done(err);
62 }
63 });
64 });
65
66 afterEach(function(done) {
67 cluster.stop();
68 setTimeout(done, 10); // fix issues with server not being closed before a new one starts
69 });
70
71 describe('remote query events', function() {
72
73 it('should fire a remote query event on detroit1 after peers connect', function(done) {
74 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
75 cloud.runtime.observe([query], function(testdriver){
76 });
77 var key = Object.keys(cloud.runtime._remoteSubscriptions['detroit1'])[0];
78 detroit1.pubsub.subscribe(key, function() {
79 done();
80 });
81 });
82
83 it('should fire remote query for both server detroit1 and chicago', function(done) {
84 var query1 = cloud.runtime.from('detroit1').where({type: 'testdriver'});
85 var query2 = cloud.runtime.from('chicago').where({type: 'testdriver'});
86 cloud.runtime.observe([query1, query2], function(d1, d2){
87 done();
88 });
89 })
90
91 it('should return devices from both Z1 and Z2 after peers connects', function(done) {
92 var query1 = cloud.runtime.from('Z1').where({type: 'testdriver'});
93 var query2 = cloud.runtime.from('Z2').where({type: 'testdriver'});
94 cloud.runtime.observe([query1, query2], function(d1, d2){
95 done();
96 });
97
98 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
99 .name('Z1')
100 .use(Scout)
101 .silent()
102 .link('http://' + urlRoot)
103 .listen(0);
104
105 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
106 .name('Z2')
107 .use(Scout)
108 .silent()
109 .link('http://' + urlRoot)
110 .listen(0);
111 })
112
113 it('should return all test devices when quering .from(\'*\')', function(done) {
114 var query = cloud.runtime.from('*').where({type: 'testdriver'});
115 var count = 0;
116 cloud.runtime.observe(query, function(device){
117 count++;
118 if (count === 2) {
119 done();
120 }
121 });
122 });
123
124 it('should return all test devices from quering .from(\'*\') when a new peer connects', function(done) {
125 var query = cloud.runtime.from('*').where({type: 'testdriver'});
126 var count = 0;
127 cloud.runtime.observe(query, function(device){
128 count++;
129 if (count === 3) {
130 done();
131 }
132 });
133
134 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
135 .name('local')
136 .use(Scout)
137 .silent()
138 .link('http://' + urlRoot)
139 .listen(0);
140 })
141
142 it('adding a device on the remote server should add a device to app with star query', function(done) {
143 var query = cloud.runtime.from('*').where({type: 'testdriver'});
144 var recv = 0;
145 cloud.runtime.observe([query], function(testdriver){
146 recv++;
147 });
148
149 var detroit = cluster.servers['detroit1'];
150 var scout = new FakeScout();
151 scout.server = detroit.runtime;
152 scout.discover(ExampleDevice);
153
154 setTimeout(function() {
155 assert.equal(recv, 3);
156 done();
157 }, 100);
158 });
159
160 it('should pass a remote query to peer socket through subscribe', function(done) {
161 var query = cloud.runtime.from('detroit2').where({type: 'testdriver'});
162 var ql = decompiler(query);
163 var remove = 'select * ';
164 if(ql.slice(0, remove.length) === remove) {
165 ql = ql.slice(remove.length);
166 }
167
168 cloud.runtime.observe([query], function(testdriver){
169 });
170
171 var sock = {
172 subscribe: function(){},
173 on: function(ev, data){
174 if(ev.indexOf('query:') === 0) {
175 done();
176 }
177 },
178 name: 'detroit2'
179 };
180
181 cloud.pubsub.publish('_peer/connect', { peer: sock });
182 });
183
184 it('adding a device on the remote server should add a device to app', function(done) {
185 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
186 var recv = 0;
187 cloud.runtime.observe([query], function(testdriver){
188 recv++;
189 });
190
191 var detroit = cluster.servers['detroit1'];
192 var scout = new FakeScout();
193 scout.server = detroit.runtime;
194 scout.discover(ExampleDevice);
195
196 setTimeout(function() {
197 assert.equal(recv, 2);
198 done();
199 }, 100);
200 });
201
202 });
203
204 describe('Peer Reconnects', function() {
205
206 it('runtime should only pass the device once to app', function(done) {
207 var query = cloud.runtime.from('detroit1').where({type: 'testdriver'});
208 var recv = 0;
209 cloud.runtime.observe([query], function(testdriver){
210 recv++;
211 });
212
213 var socket = cluster.servers['cloud'].httpServer.peers['detroit1'];
214 setTimeout(function(){
215 socket.close();
216 }, 100);
217
218 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
219 if (data.peer.name === 'detroit1') {
220 assert.equal(recv, 1);
221 done();
222 }
223 });
224 });
225
226 it('runtime should ony pass the device once to app for each peer', function(done) {
227 var query = cloud.runtime.from('*').where({type: 'testdriver'});
228 var recv = 0;
229 cloud.runtime.observe([query], function(testdriver){
230 recv++;
231 });
232
233 var socket = cluster.servers['cloud'].httpServer.peers['detroit1'];
234 setTimeout(function(){
235 socket.close();
236 }, 100);
237
238 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
239 if (data.peer.name === 'detroit1') {
240 assert.equal(recv, 2);
241 done();
242 }
243 });
244 })
245
246
247 it('should send back 1 result for peer after a reconnet', function(done) {
248 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
249 var recv = 0;
250
251 var socketP = cluster.servers['cloud'].httpServer.peers['detroit1'];
252 setTimeout(function(){
253 socketP.close();
254 cloud.pubsub.subscribe('_peer/connect', function(ev, data) {
255 if (data.peer.name === 'detroit1') {
256 setTimeout(function() {
257 assert.equal(recv, 1);
258 done();
259 }, 100);
260 }
261 });
262 }, 100);
263
264 socket.on('message', function(data) {
265 var json = JSON.parse(data);
266 // test links are properly set
267 json.links.forEach(function(link) {
268 assert(link.href.indexOf(urlProxied) > -1)
269 });
270 assert.equal(json.properties.type, 'testdriver');
271 recv++;
272 });
273
274
275 });
276 });
277
278
279 describe('Websocket Local Queries', function() {
280
281 it('should send back 1 result for local device', function(done) {
282 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
283 socket.on('open', function(err) {
284 socket.on('message', function(data) {
285 var json = JSON.parse(data);
286
287 // test links are properly set
288 json.links.forEach(function(link) {
289 assert(link.href.indexOf(urlLocal) > -1)
290 });
291
292 assert.equal(json.properties.type, 'testdriver');
293 done();
294 });
295 });
296 });
297
298 it('should send back 2 results for local device after a device is added', function(done) {
299 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
300 socket.on('open', function(err) {
301 var recv = 0;
302
303 setTimeout(function(){
304 var detroit = cluster.servers['detroit1'];
305 var scout = new FakeScout();
306 scout.server = detroit.runtime;
307 scout.discover(ExampleDevice);
308 }, 50);
309
310 socket.on('message', function(data) {
311 var json = JSON.parse(data);
312 assert.equal(json.properties.type, 'testdriver');
313 recv++;
314
315 if (recv === 2) {
316 done();
317 }
318 });
319 });
320
321 });
322
323 it('reconnecting should only have 1 result', function(done) {
324 var socket = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
325 socket.on('open', function(err) {
326 socket.on('message', function(data) {
327 var json = JSON.parse(data);
328 assert.equal(json.properties.type, 'testdriver');
329 socket.close();
330
331 var socket2 = new WebSocket("ws://" + urlLocal + '/events?topic=query/where type = "testdriver"');
332 socket2.on('open', function(err) {
333 socket2.on('message', function(data) {
334 var json = JSON.parse(data);
335 assert.equal(json.properties.type, 'testdriver');
336 done();
337 });
338 });
339
340 });
341 });
342 });
343
344 });
345
346
347
348
349
350 describe('Websocket Proxied Queries', function() {
351
352 it('should send back 1 result for local device', function(done) {
353 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
354 socket.on('open', function(err) {
355 socket.on('message', function(data) {
356 var json = JSON.parse(data);
357
358 // test links are properly set
359 json.links.forEach(function(link) {
360 assert(link.href.indexOf(urlProxied) > -1)
361 });
362
363 assert.equal(json.properties.type, 'testdriver');
364 done();
365 });
366 });
367 });
368
369 it('should send back 2 results for local device after a device is added', function(done) {
370 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
371 socket.on('open', function(err) {
372 var recv = 0;
373
374 setTimeout(function(){
375 var detroit = cluster.servers['detroit1'];
376 var scout = new FakeScout();
377 scout.server = detroit.runtime;
378 scout.discover(ExampleDevice);
379 }, 50);
380
381 socket.on('message', function(data) {
382 var json = JSON.parse(data);
383 assert.equal(json.properties.type, 'testdriver');
384 recv++;
385
386 if (recv === 2) {
387 done();
388 }
389 });
390 });
391
392 });
393
394 it('reconnecting should only have 1 result', function(done) {
395 var socket = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
396 socket.on('open', function(err) {
397 socket.on('message', function(data) {
398 var json = JSON.parse(data);
399 assert.equal(json.properties.type, 'testdriver');
400 socket.close();
401
402 var socket2 = new WebSocket("ws://" + urlProxied + '/events?topic=query/where type = "testdriver"');
403 socket2.on('open', function(err) {
404 socket2.on('message', function(data) {
405 var json = JSON.parse(data);
406 assert.equal(json.properties.type, 'testdriver');
407 done();
408 });
409 });
410
411 });
412 });
413 });
414
415 });
416
417 describe('Websocket Cross-Server Queries', function() {
418
419 it('should send back 2 results', function(done) {
420 var socket = new WebSocket("ws://" + urlRoot + '/events?topic=query/where type = "testdriver"');
421 socket.on('open', function(err) {
422 var count = 0;
423 socket.on('message', function(data) {
424 var json = JSON.parse(data);
425
426 // test links are properly set
427 json.links.forEach(function(link) {
428 assert(link.href.indexOf(urlRoot) > -1)
429 });
430
431 assert.equal(json.properties.type, 'testdriver');
432 count++;
433
434 if (count == 2) {
435 done();
436 }
437 });
438 });
439 });
440
441 it('should send back 3 results after a device is added', function(done) {
442 var socket = new WebSocket("ws://" + urlRoot + '/events?topic=query/where type = "testdriver"');
443 socket.on('open', function(err) {
444 var recv = 0;
445
446 setTimeout(function(){
447 var detroit = cluster.servers['detroit1'];
448 var scout = new FakeScout();
449 scout.server = detroit.runtime;
450 scout.discover(ExampleDevice);
451 }, 50);
452
453 socket.on('message', function(data) {
454 var json = JSON.parse(data);
455 assert.equal(json.properties.type, 'testdriver');
456 recv++;
457
458 if (recv === 3) {
459 done();
460 }
461 });
462 });
463
464 });
465 });
466
467});
468