UNPKG

62.3 kBJavaScriptView Raw
1var assert = require('assert');
2var WebSocket = require('ws');
3var zetta = require('./..');
4var zettacluster = require('zetta-cluster');
5var Driver = require('./fixture/example_driver');
6var MemRegistry = require('./fixture/mem_registry');
7var MemPeerRegistry = require('./fixture/mem_peer_registry');
8
9describe('Peering Event Streams', function() {
10 var cloud = null;
11 var cloudUrl = null;
12 var baseUrl = '/events';
13
14 beforeEach(function(done) {
15 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
16 cloud.silent();
17 cloud.listen(0, function(err) {
18 if(err) {
19 return done(err);
20 }
21 cloudUrl = 'http://localhost:' + cloud.httpServer.server.address().port;
22 done();
23 });
24 });
25
26 afterEach(function(done) {
27 cloud.httpServer.server.close();
28 done();
29 });
30
31 it('will receive a _peer/connect event when subscribed', function(done) {
32 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
33 z.silent();
34 z.listen(0, function(err) {
35 if(err) {
36 return done(err);
37 }
38 var zPort = z.httpServer.server.address().port;
39 var endpoint = 'localhost:' + zPort;
40 var ws = new WebSocket('ws://' + endpoint + baseUrl);
41 ws.on('open', function() {
42 var msg = { type: 'subscribe', topic: '_peer/connect' };
43 ws.send(JSON.stringify(msg));
44 ws.on('message', function(buffer) {
45 var json = JSON.parse(buffer);
46 if(json.type === 'subscribe-ack') {
47 assert.equal(json.type, 'subscribe-ack');
48 assert(json.timestamp);
49 assert.equal(json.topic, '_peer/connect');
50 assert(json.subscriptionId);
51 } else if(json.type === 'event') {
52 assert.equal(json.topic, '_peer/connect');
53 done();
54 }
55 });
56 });
57 ws.on('error', done);
58 z.link(cloudUrl);
59 });
60 });
61
62 it('will receive a _peer/connect event when subscribed to **', function(done) {
63 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
64 z.silent();
65 z.listen(0, function(err) {
66 if(err) {
67 return done(err);
68 }
69 var zPort = z.httpServer.server.address().port;
70 var endpoint = 'localhost:' + zPort;
71 var ws = new WebSocket('ws://' + endpoint + baseUrl);
72 ws.on('open', function() {
73 var msg = { type: 'subscribe', topic: '**' };
74 ws.send(JSON.stringify(msg));
75 ws.on('message', function(buffer) {
76 var json = JSON.parse(buffer);
77 if(json.type === 'subscribe-ack') {
78 assert.equal(json.type, 'subscribe-ack');
79 assert(json.timestamp);
80 assert.equal(json.topic, '**');
81 assert(json.subscriptionId);
82 } else if(json.type === 'event') {
83 assert.equal(json.topic, '_peer/connect');
84 done();
85 }
86 });
87 });
88 ws.on('error', done);
89 z.link(cloudUrl);
90 });
91 });
92
93
94 it('will receive a _peer/disconnect event when subscribed', function(done) {
95 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
96 z.silent();
97 z.pubsub.subscribe('_peer/connect', function(topic, data) {
98 var peer = data.peer;
99 peer.close();
100 });
101 z.listen(0, function(err) {
102 if(err) {
103 return done(err);
104 }
105 var zPort = z.httpServer.server.address().port;
106 var endpoint = 'localhost:' + zPort;
107 var ws = new WebSocket('ws://' + endpoint + baseUrl);
108 ws.on('open', function() {
109 var msg = { type: 'subscribe', topic: '_peer/disconnect' };
110 ws.send(JSON.stringify(msg));
111 ws.on('message', function(buffer) {
112 var json = JSON.parse(buffer);
113 if(json.type === 'subscribe-ack') {
114 assert.equal(json.type, 'subscribe-ack');
115 assert(json.timestamp);
116 assert.equal(json.topic, '_peer/disconnect');
117 assert(json.subscriptionId);
118 } else if(json.type === 'event') {
119 assert.equal(json.topic, '_peer/disconnect');
120 done();
121 }
122 });
123 });
124 ws.on('error', done);
125 z.link(cloudUrl);
126 });
127 });
128
129 it('will receive a _peer/connect event when subscribed with wildcards', function(done) {
130 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
131 z.silent();
132 z.pubsub.subscribe('_peer/connect', function(topic, data) {
133 var peer = data.peer;
134 });
135 z.listen(0, function(err) {
136 if(err) {
137 return done(err);
138 }
139 var zPort = z.httpServer.server.address().port;
140 var endpoint = 'localhost:' + zPort;
141 var ws = new WebSocket('ws://' + endpoint + baseUrl);
142 ws.on('open', function() {
143 var msg = { type: 'subscribe', topic: '_peer/*' };
144 ws.send(JSON.stringify(msg));
145 ws.on('message', function(buffer) {
146 var json = JSON.parse(buffer);
147 if(json.type === 'subscribe-ack') {
148 assert.equal(json.type, 'subscribe-ack');
149 assert(json.timestamp);
150 assert.equal(json.topic, '_peer/*');
151 assert(json.subscriptionId);
152 } else if(json.type === 'event') {
153 assert.equal(json.topic, '_peer/connect');
154 done();
155 }
156 });
157 });
158 ws.on('error', done);
159 z.link(cloudUrl);
160 });
161 });
162 it('will receive a _peer/connect and _peer/disconnect event when subscribed with wildcards', function(done) {
163 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
164 z.silent();
165 z.pubsub.subscribe('_peer/connect', function(topic, data) {
166 var peer = data.peer;
167 peer.close();
168 });
169 var recv = 0;
170 z.listen(0, function(err) {
171 if(err) {
172 return done(err);
173 }
174 var zPort = z.httpServer.server.address().port;
175 var endpoint = 'localhost:' + zPort;
176 var ws = new WebSocket('ws://' + endpoint + baseUrl);
177 ws.on('open', function() {
178 var msg = { type: 'subscribe', topic: '_peer/*' };
179 ws.send(JSON.stringify(msg));
180 ws.on('message', function(buffer) {
181 var json = JSON.parse(buffer);
182 if(json.type === 'subscribe-ack') {
183 assert.equal(json.type, 'subscribe-ack');
184 assert(json.timestamp);
185 assert.equal(json.topic, '_peer/*');
186 assert(json.subscriptionId);
187 } else if(json.type === 'event') {
188 recv++;
189 if(recv == 1) {
190 assert.equal(json.topic, '_peer/connect');
191 } else if(recv == 2) {
192 assert.equal(json.topic, '_peer/disconnect');
193 done();
194 }
195
196 }
197 });
198 });
199 ws.on('error', done);
200 z.link(cloudUrl);
201 });
202 });
203});
204
205describe('Event Streams', function() {
206 var cluster = null;
207 var urls = [];
208 var baseUrl = '/events';
209 var devices = [];
210 var validTopics = [];
211
212 beforeEach(function(done) {
213 urls = [];
214 devices = [];
215 validTopics = [];
216 cluster = zettacluster({ zetta: zetta })
217 .server('cloud')
218 .server('hub', [Driver, Driver], ['cloud'])
219 .server('hub2', [Driver, Driver], ['cloud'])
220 .on('ready', function() {
221 app = cluster.servers['cloud'];
222 urls.push('localhost:' + cluster.servers['cloud']._testPort);
223 urls.push('localhost:' + cluster.servers['hub']._testPort);
224
225 ['hub', 'hub2'].forEach(function(hubname) {
226 Object.keys(cluster.servers[hubname].runtime._jsDevices).forEach(function(id) {
227 var device = cluster.servers[hubname].runtime._jsDevices[id];
228 devices.push(device);
229 validTopics.push(hubname + '/' + device.type + '/' + device.id + '/state');
230 });
231 })
232 done();
233 })
234 .run(function(err){
235 if (err) {
236 return done(err);
237 }
238 });
239 });
240
241 afterEach(function() {
242 cluster.stop();
243 });
244
245 describe('Websocket API', function() {
246 var itBoth = function(testMsg, test) {
247 it('for cloud, ' + testMsg, test.bind(null, 0));
248 it('for hub, ' + testMsg, test.bind(null, 1));
249 };
250
251 itBoth('subscribing to a topic receives a subscription-ack', function(idx, done) {
252 var endpoint = urls[idx];
253 var ws = new WebSocket('ws://' + endpoint + baseUrl);
254 ws.on('open', function() {
255 var msg = { type: 'subscribe', topic: 'hub/led/1234/state' };
256 ws.send(JSON.stringify(msg));
257 ws.on('message', function(buffer) {
258 var json = JSON.parse(buffer);
259 assert.equal(json.type, 'subscribe-ack');
260 assert(json.timestamp);
261 assert.equal(json.topic, 'hub/led/1234/state');
262 assert(json.subscriptionId);
263 done();
264 });
265 });
266 ws.on('error', done);
267 });
268
269 itBoth('sending ping request will return a pong response without data field', function(idx, done) {
270 var endpoint = urls[idx];
271 var ws = new WebSocket('ws://' + endpoint + baseUrl);
272 ws.on('open', function() {
273 var msg = { type: 'ping'};
274 ws.send(JSON.stringify(msg));
275 ws.on('message', function(buffer) {
276 var json = JSON.parse(buffer);
277 assert.equal(json.type, 'pong');
278 assert(json.timestamp);
279 assert.equal(json.data, undefined);
280 done();
281 });
282 });
283 ws.on('error', done);
284 });
285
286 itBoth('sending ping request will return a pong response with data field', function(idx, done) {
287 var endpoint = urls[idx];
288 var ws = new WebSocket('ws://' + endpoint + baseUrl);
289 ws.on('open', function() {
290 var msg = { type: 'ping', data: 'Application data'};
291 ws.send(JSON.stringify(msg));
292 ws.on('message', function(buffer) {
293 var json = JSON.parse(buffer);
294 assert.equal(json.type, 'pong');
295 assert(json.timestamp);
296 assert.equal(json.data, 'Application data');
297 done();
298 });
299 });
300 ws.on('error', done);
301 });
302
303 itBoth('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) {
304 var endpoint = urls[idx];
305 var ws = new WebSocket('ws://' + endpoint + baseUrl);
306 ws.on('open', function() {
307 var msg = { type: 'subscribe', topic: 'hub/led/1234/state' };
308 ws.send(JSON.stringify(msg));
309 ws.once('message', function(buffer) {
310 var json = JSON.parse(buffer);
311 var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId };
312 ws.send(JSON.stringify(msg));
313 ws.on('message', function(buffer) {
314 var json2 = JSON.parse(buffer);
315 assert.equal(json2.type, 'unsubscribe-ack');
316 assert(json2.timestamp);
317 assert.equal(json2.subscriptionId, json.subscriptionId);
318 done();
319 });
320 });
321 });
322 ws.on('error', done);
323 });
324
325 itBoth('verify error message format', function(){});
326
327 itBoth('specific topic subscription only receives messages with that topic', function(idx, done) {
328 var endpoint = urls[idx];
329 var ws = new WebSocket('ws://' + endpoint + baseUrl);
330 var subscriptionId = null;
331 var topic = validTopics[0];
332 ws.on('open', function() {
333 var msg = { type: 'subscribe', topic: topic };
334 ws.send(JSON.stringify(msg));
335 ws.on('message', function(buffer) {
336 var json = JSON.parse(buffer);
337 if(json.type === 'subscribe-ack') {
338 assert.equal(json.type, 'subscribe-ack');
339 assert(json.timestamp);
340 assert.equal(json.topic, topic);
341 assert(json.subscriptionId);
342 subscriptionId = json.subscriptionId;
343
344 setTimeout(function() {
345 devices[1].call('change');
346 devices[0].call('change');
347 }, 50);
348 } else {
349 assert.equal(json.type, 'event');
350 assert(json.timestamp);
351 assert.equal(json.topic, topic);
352 assert.equal(json.subscriptionId, subscriptionId);
353 assert(json.data);
354 done();
355 }
356 });
357 });
358 ws.on('error', done);
359 });
360
361 itBoth('multiple clients specific topic subscription only receives messages with that topic', function(idx, done) {
362 var endpoint = urls[idx];
363 var topic = validTopics[0];
364
365 var connected = 0;
366 var recv = 0;
367
368 var ws1 = new WebSocket('ws://' + endpoint + baseUrl);
369 ws1.on('open', function() {
370 var msg = { type: 'subscribe', topic: topic };
371 ws1.send(JSON.stringify(msg));
372 var subscriptionId = null;
373 ws1.on('message', function(buffer) {
374 var json = JSON.parse(buffer);
375 if(json.type === 'subscribe-ack') {
376 connected++;
377 subscriptionId = json.subscriptionId;
378 if (connected === 2) {
379 setTimeout(function() {
380 devices[1].call('change');
381 devices[0].call('change');
382 }, 50);
383 }
384 } else {
385 assert.equal(json.topic, topic);
386 assert.equal(json.subscriptionId, subscriptionId);
387 recv++;
388 if (recv === 2) {
389 done();
390 }
391 }
392 });
393 });
394 ws1.on('error', done);
395
396 var ws2 = new WebSocket('ws://' + endpoint + baseUrl);
397 ws2.on('open', function() {
398 var msg = { type: 'subscribe', topic: topic };
399 ws2.send(JSON.stringify(msg));
400 var subscriptionId = null;
401 ws2.on('message', function(buffer) {
402 var json = JSON.parse(buffer);
403 if(json.type === 'subscribe-ack') {
404 subscriptionId = json.subscriptionId;
405 connected++;
406 if (connected === 2) {
407 setTimeout(function() {
408 devices[0].call('change');
409 }, 50);
410 }
411 } else {
412 assert.equal(json.topic, topic);
413 assert.equal(json.subscriptionId, subscriptionId);
414 recv++;
415 if (recv === 2) {
416 done();
417 }
418 }
419 });
420 });
421 ws2.on('error', done);
422 });
423
424 itBoth('multiple clients using different topic subscriptions only receive one message per event', function(idx, done) {
425 var endpoint = urls[idx];
426 var topic = validTopics[0];
427
428 var connected = 0;
429 var recv1 = 0;
430
431 var ws1 = new WebSocket('ws://' + endpoint + baseUrl);
432 ws1.on('open', function() {
433 var msg = { type: 'subscribe', topic: topic };
434 ws1.send(JSON.stringify(msg));
435 var subscriptionId = null;
436 ws1.on('message', function(buffer) {
437 var json = JSON.parse(buffer);
438 if(json.type === 'subscribe-ack') {
439 connected++;
440 subscriptionId = json.subscriptionId;
441 if (connected === 2) {
442 setTimeout(function() {
443 devices[0].call('change');
444 }, 50);
445 }
446 } else {
447 assert.equal(json.topic, topic);
448 assert.equal(json.subscriptionId, subscriptionId);
449 recv1++;
450 }
451 });
452 });
453 ws1.on('error', done);
454
455 var recv2 = 0;
456 var ws2 = new WebSocket('ws://' + endpoint + baseUrl);
457 ws2.on('open', function() {
458 var msg = { type: 'subscribe', topic: 'hub/testdriver/*/state' };
459 ws2.send(JSON.stringify(msg));
460 var subscriptionId = null;
461 ws2.on('message', function(buffer) {
462 var json = JSON.parse(buffer);
463 if(json.type === 'subscribe-ack') {
464 subscriptionId = json.subscriptionId;
465 connected++;
466 if (connected === 2) {
467 setTimeout(function() {
468 devices[0].call('change');
469 }, 50);
470 }
471 } else {
472 assert.equal(json.topic, topic);
473 assert.equal(json.subscriptionId, subscriptionId);
474 recv2++;
475 }
476 });
477 });
478 ws2.on('error', done);
479
480 setTimeout(function() {
481 assert.equal(recv1, 1);
482 assert.equal(recv2, 1);
483 done();
484 }, 250);
485 });
486
487 itBoth('wildcard server topic subscription only receives messages with that topic', function(idx, done) {
488 var endpoint = urls[idx];
489 var ws = new WebSocket('ws://' + endpoint + baseUrl);
490 var subscriptionId = null;
491 var topic = validTopics[0];
492 topic = topic.replace('hub', '*');
493 ws.on('open', function() {
494 var msg = { type: 'subscribe', topic: topic };
495 ws.send(JSON.stringify(msg));
496 ws.on('message', function(buffer) {
497 var json = JSON.parse(buffer);
498 if(json.type === 'subscribe-ack') {
499 assert.equal(json.type, 'subscribe-ack');
500 assert(json.timestamp);
501 assert.equal(json.topic, topic);
502 assert(json.subscriptionId);
503 subscriptionId = json.subscriptionId;
504
505 setTimeout(function() {
506 devices[1].call('change');
507 devices[0].call('change');
508 }, 50);
509 } else {
510 assert.equal(json.type, 'event');
511 assert(json.timestamp);
512 assert.equal(json.topic, validTopics[0]);
513 assert.equal(json.subscriptionId, subscriptionId);
514 assert(json.data);
515 done();
516 }
517 });
518 });
519 ws.on('error', done);
520 });
521
522 itBoth('wildcard topic and static topic subscription will receive messages for both subscriptions', function(idx, done) {
523 var endpoint = urls[idx];
524 var ws = new WebSocket('ws://' + endpoint + baseUrl);
525 var lastSubscriptionId = null;
526 var count = 0;
527 ws.on('open', function() {
528 var msg = { type: 'subscribe', topic: validTopics[0] };
529 ws.send(JSON.stringify(msg));
530 msg = { type: 'subscribe', topic: 'hub/testdriver/*/state' };
531 ws.send(JSON.stringify(msg));
532 ws.on('message', function(buffer) {
533 var json = JSON.parse(buffer);
534 if(json.type === 'subscribe-ack') {
535 assert.equal(json.type, 'subscribe-ack');
536 assert(json.timestamp);
537 assert(json.subscriptionId);
538 setTimeout(function() {
539 devices[0].call('change');
540 }, 50);
541 } else {
542 count++;
543 assert.notEqual(lastSubscriptionId, json.subscriptionId);
544 lastSubscriptionId = json.subscriptionId;
545 assert.equal(json.type, 'event');
546 assert(json.timestamp);
547 assert.equal(json.topic, validTopics[0]);
548 assert(json.data);
549 if (count === 2) {
550 done();
551 }
552 }
553 });
554 });
555 ws.on('error', done);
556 });
557
558 itBoth('wildcard device id topic subscription and cloud app query both will recieve data', function(idx, done) {
559 var endpoint = urls[idx];
560 var subscriptionId = null;
561 var topic = 'hub/testdriver/*/state';
562
563 var runtime = cluster.servers['cloud'].runtime;
564 var query = runtime.from('hub').where({ type: 'testdriver', id: devices[0].id });
565 runtime.observe(query, function(device) {
566 var ws = new WebSocket('ws://' + endpoint + baseUrl);
567 ws.on('open', function() {
568 var msg = { type: 'subscribe', topic: topic };
569 ws.send(JSON.stringify(msg));
570 ws.on('message', function(buffer) {
571 var json = JSON.parse(buffer);
572 if(json.type === 'subscribe-ack') {
573 assert.equal(json.type, 'subscribe-ack');
574 assert(json.timestamp);
575 assert.equal(json.topic, topic);
576 assert(json.subscriptionId);
577 subscriptionId = json.subscriptionId;
578
579 setTimeout(function() {
580 devices[0].call('change');
581 }, 50);
582 } else {
583 assert.equal(json.type, 'event');
584 assert(json.timestamp);
585 assert.equal(json.topic, validTopics[0]);
586 assert.equal(json.subscriptionId, subscriptionId);
587 assert(json.data);
588 done();
589 }
590 });
591 });
592 ws.on('error', done);
593 });
594 });
595
596 itBoth('wildcard device id topic subscription and hub app query both will recieve data', function(idx, done) {
597 var endpoint = urls[idx];
598 var subscriptionId = null;
599 var topic = 'hub/testdriver/*/state';
600
601 var runtime = cluster.servers['hub'].runtime;
602 var query = runtime.where({ type: 'testdriver', id: devices[0].id });
603 runtime.observe(query, function(device) {
604 var ws = new WebSocket('ws://' + endpoint + baseUrl);
605 ws.on('open', function() {
606 var msg = { type: 'subscribe', topic: topic };
607 ws.send(JSON.stringify(msg));
608 ws.on('message', function(buffer) {
609 var json = JSON.parse(buffer);
610 if(json.type === 'subscribe-ack') {
611 assert.equal(json.type, 'subscribe-ack');
612 assert(json.timestamp);
613 assert.equal(json.topic, topic);
614 assert(json.subscriptionId);
615 subscriptionId = json.subscriptionId;
616
617 setTimeout(function() {
618 devices[0].call('change');
619 }, 50);
620 } else {
621 assert.equal(json.type, 'event');
622 assert(json.timestamp);
623 assert.equal(json.topic, validTopics[0]);
624 assert.equal(json.subscriptionId, subscriptionId);
625 assert(json.data);
626 done();
627 }
628 });
629 });
630 ws.on('error', done);
631 });
632 });
633
634 it('wildcard server topic subscription receives messages from both hubs', function(done) {
635 var endpoint = urls[0];
636 var ws = new WebSocket('ws://' + endpoint + baseUrl);
637 var subscriptionId = null;
638 var topic = '*/testdriver/*/state';
639 ws.on('open', function() {
640 var msg = { type: 'subscribe', topic: topic };
641 ws.send(JSON.stringify(msg));
642 var recv = 0;
643 ws.on('message', function(buffer) {
644 var json = JSON.parse(buffer);
645 if(json.type === 'subscribe-ack') {
646 assert.equal(json.type, 'subscribe-ack');
647 assert(json.timestamp);
648 assert.equal(json.topic, topic);
649 assert(json.subscriptionId);
650 subscriptionId = json.subscriptionId;
651
652 setTimeout(function() {
653 devices[0].call('change');
654 devices[2].call('change');
655 }, 50);
656 } else {
657 recv++;
658 assert.equal(json.type, 'event');
659 assert(json.timestamp);
660 assert(json.topic);
661 assert.equal(json.subscriptionId, subscriptionId);
662 assert(json.data);
663 if (recv === 2) {
664 done();
665 }
666 }
667 });
668 });
669 ws.on('error', done);
670 });
671
672 it('wildcard topic ** will subscribe to all topics for both hubs', function(done) {
673 var endpoint = urls[0]; // cloud
674 var ws = new WebSocket('ws://' + endpoint + baseUrl);
675 var subscriptionId = null;
676 var topic = '**';
677
678 var neededTopics = [];
679 devices.forEach(function(device, idx) {
680 var server = (idx < 2) ? 'hub' : 'hub2';
681 neededTopics.push(server + '/' + device.type + '/' + device.id + '/' + 'state');
682 neededTopics.push(server + '/' + device.type + '/' + device.id + '/' + 'logs');
683 });
684
685 ws.on('open', function() {
686 var msg = { type: 'subscribe', topic: topic };
687 ws.send(JSON.stringify(msg));
688 ws.on('message', function(buffer) {
689 var json = JSON.parse(buffer);
690 if(json.type === 'subscribe-ack') {
691 assert.equal(json.type, 'subscribe-ack');
692 assert(json.timestamp);
693 assert.equal(json.topic, topic);
694 assert(json.subscriptionId);
695 subscriptionId = json.subscriptionId;
696
697 setTimeout(function() {
698 devices[0].call('change');
699 devices[1].call('change');
700 devices[2].call('change');
701 devices[3].call('change');
702 }, 250);
703 } else {
704 assert.equal(json.type, 'event');
705 assert(json.timestamp);
706 assert(json.topic);
707 assert.equal(json.subscriptionId, subscriptionId);
708 assert(json.data);
709 var idx = neededTopics.indexOf(json.topic);
710 assert.notEqual(idx, -1);
711 neededTopics.splice(idx, 1);
712 if (neededTopics.length === 0) {
713 done();
714 }
715 }
716 });
717 });
718 ws.on('error', done);
719 });
720
721 it('wildcard topic ** will subscribe to all topics for single hub', function(done) {
722 var endpoint = urls[1]; // hub
723 var ws = new WebSocket('ws://' + endpoint + baseUrl);
724 var subscriptionId = null;
725 var topic = '**';
726
727 var neededTopics = [];
728 for (var i=0; i<2; i++) {
729 var device = devices[i];
730 neededTopics.push('hub/' + device.type + '/' + device.id + '/' + 'state');
731 neededTopics.push('hub/' + device.type + '/' + device.id + '/' + 'logs');
732 }
733
734 ws.on('open', function() {
735 var msg = { type: 'subscribe', topic: topic };
736 ws.send(JSON.stringify(msg));
737 ws.on('message', function(buffer) {
738 var json = JSON.parse(buffer);
739 if(json.type === 'subscribe-ack') {
740 assert.equal(json.type, 'subscribe-ack');
741 assert(json.timestamp);
742 assert.equal(json.topic, topic);
743 assert(json.subscriptionId);
744 subscriptionId = json.subscriptionId;
745
746 setTimeout(function() {
747 devices[0].call('change');
748 devices[1].call('change');
749 devices[2].call('change');
750 devices[3].call('change');
751 }, 50);
752 } else {
753 assert.equal(json.type, 'event');
754 assert(json.timestamp);
755 assert(json.topic);
756 assert.equal(json.subscriptionId, subscriptionId);
757 assert(json.data);
758 var idx = neededTopics.indexOf(json.topic);
759 assert.notEqual(idx, -1);
760 neededTopics.splice(idx, 1);
761 if (neededTopics.length === 0) {
762 done();
763 }
764 }
765 });
766 });
767 ws.on('error', done);
768 });
769
770 itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) {
771 var endpoint = urls[idx];
772 var ws = new WebSocket('ws://' + endpoint + baseUrl);
773 var subscriptionId = null;
774 var count = 0;
775 var topic = 'hub/testdriver/*/state';
776 var lastTopic = null;
777 ws.on('open', function() {
778 var msg = { type: 'subscribe', topic: topic };
779 ws.send(JSON.stringify(msg));
780 ws.on('message', function(buffer) {
781 var json = JSON.parse(buffer);
782 if(json.type === 'subscribe-ack') {
783 assert.equal(json.type, 'subscribe-ack');
784 assert(json.timestamp);
785 assert.equal(json.topic, topic);
786 assert(json.subscriptionId);
787 subscriptionId = json.subscriptionId;
788
789 setTimeout(function() {
790 devices[0].call('change');
791 devices[1].call('change');
792 }, 50);
793 } else {
794 assert.equal(json.type, 'event');
795 assert(json.timestamp);
796 assert(json.topic);
797 assert.notEqual(json.topic, lastTopic);
798 lastTopic = json.topic;
799 assert.equal(json.subscriptionId, subscriptionId);
800 assert(json.data);
801 count++;
802 if(count === 2) {
803 done();
804 }
805 }
806 });
807 });
808 ws.on('error', done);
809 });
810
811 itBoth('wildcard topic for device id and stream types receives all messages for all topics', function(idx, done) {
812 var endpoint = urls[idx];
813 var ws = new WebSocket('ws://' + endpoint + baseUrl);
814 var subscriptionId = null;
815 var count = 0;
816 var topic = 'hub/testdriver/**';
817 var lastTopic = null;
818 ws.on('open', function() {
819 var msg = { type: 'subscribe', topic: topic };
820 ws.send(JSON.stringify(msg));
821 ws.on('message', function(buffer) {
822 var json = JSON.parse(buffer);
823 if(json.type === 'subscribe-ack') {
824 assert.equal(json.type, 'subscribe-ack');
825 assert(json.timestamp);
826 assert.equal(json.topic, topic);
827 assert(json.subscriptionId);
828 subscriptionId = json.subscriptionId;
829
830 setTimeout(function() {
831 devices[0].call('change');
832 devices[1].call('change');
833 }, 50);
834 } else {
835 assert.equal(json.type, 'event');
836 assert(json.timestamp);
837 assert(json.topic);
838 assert.notEqual(json.topic, lastTopic);
839 lastTopic = json.topic;
840 assert.equal(json.subscriptionId, subscriptionId);
841 assert(json.data);
842 count++;
843 if(count === 4) {
844 done();
845 }
846 }
847 });
848 });
849 ws.on('error', done);
850 });
851
852 itBoth('**/led/<device_id>/state will match valid topic', function(idx, done) {
853 var endpoint = urls[idx];
854 var ws = new WebSocket('ws://' + endpoint + baseUrl);
855 var subscriptionId = null;
856 var topic = validTopics[0].replace('hub/', '**/');
857 ws.on('open', function() {
858 var msg = { type: 'subscribe', topic: topic };
859 ws.send(JSON.stringify(msg));
860 ws.on('message', function(buffer) {
861 var json = JSON.parse(buffer);
862 if(json.type === 'subscribe-ack') {
863 assert.equal(json.type, 'subscribe-ack');
864 assert(json.timestamp);
865 assert.equal(json.topic, topic);
866 assert(json.subscriptionId);
867 subscriptionId = json.subscriptionId;
868
869 setTimeout(function() {
870 devices[1].call('change');
871 devices[0].call('change');
872 }, 50);
873 } else {
874 assert.equal(json.type, 'event');
875 assert(json.timestamp);
876 assert.equal(json.topic, validTopics[0]);
877 assert.equal(json.subscriptionId, subscriptionId);
878 assert(json.data);
879 done();
880 }
881 });
882 });
883 ws.on('error', done);
884 });
885
886 itBoth('**/<device_id>/state will match valid topic from device', function(idx, done) {
887 var endpoint = urls[idx];
888 var ws = new WebSocket('ws://' + endpoint + baseUrl);
889 var subscriptionId = null;
890 var topic = validTopics[0].replace('hub/', '**/');
891 ws.on('open', function() {
892 var msg = { type: 'subscribe', topic: topic };
893 ws.send(JSON.stringify(msg));
894 ws.on('message', function(buffer) {
895 var json = JSON.parse(buffer);
896 if(json.type === 'subscribe-ack') {
897 assert.equal(json.type, 'subscribe-ack');
898 assert(json.timestamp);
899 assert.equal(json.topic, topic);
900 assert(json.subscriptionId);
901 subscriptionId = json.subscriptionId;
902
903 setTimeout(function() {
904 devices[1].call('change');
905 devices[0].call('change');
906 }, 50);
907 } else {
908 assert.equal(json.type, 'event');
909 assert(json.timestamp);
910 assert.equal(json.topic, validTopics[0]);
911 assert.equal(json.subscriptionId, subscriptionId);
912 assert(json.data);
913 done();
914 }
915 });
916 });
917 ws.on('error', done);
918 });
919
920 itBoth('**/state will match valid topic from device', function(idx, done) {
921 var endpoint = urls[idx];
922 var ws = new WebSocket('ws://' + endpoint + baseUrl);
923 var subscriptionId = null;
924 var topic = validTopics[0].replace('hub/', '**/');
925 ws.on('open', function() {
926 var msg = { type: 'subscribe', topic: topic };
927 ws.send(JSON.stringify(msg));
928 ws.on('message', function(buffer) {
929 var json = JSON.parse(buffer);
930 if(json.type === 'subscribe-ack') {
931 assert.equal(json.type, 'subscribe-ack');
932 assert(json.timestamp);
933 assert.equal(json.topic, topic);
934 assert(json.subscriptionId);
935 subscriptionId = json.subscriptionId;
936
937 setTimeout(function() {
938 devices[0].call('change');
939 }, 50);
940 } else {
941 assert.equal(json.type, 'event');
942 assert(json.timestamp);
943 assert.equal(json.topic, validTopics[0]);
944 assert.equal(json.subscriptionId, subscriptionId);
945 assert(json.data);
946 done();
947 }
948 });
949 });
950 ws.on('error', done);
951 });
952
953
954 itBoth('subscribing to logs topic on device will get properly formated response', function(idx, done) {
955 var endpoint = urls[idx];
956 var ws = new WebSocket('ws://' + endpoint + baseUrl);
957 var subscriptionId = null;
958 var topic = 'hub/testdriver/*/logs';
959 var lastTopic = null;
960 ws.on('open', function() {
961 var msg = { type: 'subscribe', topic: topic };
962 ws.send(JSON.stringify(msg));
963 ws.on('message', function(buffer) {
964 var json = JSON.parse(buffer);
965 if(json.type === 'subscribe-ack') {
966 assert.equal(json.type, 'subscribe-ack');
967 assert(json.timestamp);
968 assert.equal(json.topic, topic);
969 assert(json.subscriptionId);
970 subscriptionId = json.subscriptionId;
971 setTimeout(function() {
972 devices[0].call('change');
973 }, 50);
974 } else {
975 assert.equal(json.type, 'event');
976 assert(json.timestamp);
977 assert(json.topic);
978 assert.notEqual(json.topic, lastTopic);
979 lastTopic = json.topic;
980 assert.equal(json.subscriptionId, subscriptionId);
981 assert(json.data);
982 assert.equal(json.data.transition, 'change');
983 assert(!json.data.transitions);
984 assert.deepEqual(json.data.input, []);
985 assert(json.data.properties);
986 assert(json.data.actions);
987 done();
988 }
989 });
990 });
991 ws.on('error', done);
992 });
993
994
995 itBoth('topic that doesnt exist still opens stream', function(idx, done) {
996 var endpoint = urls[idx];
997 var ws = new WebSocket('ws://' + endpoint + baseUrl);
998 var topic = 'blah/foo/1/blah';
999 ws.on('open', function() {
1000 var msg = { type: 'subscribe', topic: topic };
1001 ws.send(JSON.stringify(msg));
1002 ws.on('message', function(buffer) {
1003 var json = JSON.parse(buffer);
1004 assert.equal(json.type, 'subscribe-ack');
1005 assert(json.timestamp);
1006 assert.equal(json.topic, topic);
1007 assert(json.subscriptionId);
1008 done();
1009 });
1010 });
1011 ws.on('error', done);
1012 });
1013
1014 it('subscription cloud will get _peer/connect events from hub', function(done) {
1015 var endpoint = urls[0];
1016 var topic = 'hub/**';
1017 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1018 ws.on('open', function() {
1019 ws.send(JSON.stringify({ type: 'subscribe', topic: topic }));
1020
1021 ws.on('message', function(buffer) {
1022 var json = JSON.parse(buffer);
1023 if (json.type === 'subscribe-ack') {
1024 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
1025 z.name('some-peer');
1026 z.link('http://' + urls[1]); // link to hub
1027 z.silent();
1028 z.listen(0);
1029 } else if (json.type === 'event'){
1030 assert.equal(json.topic, 'hub/_peer/connect');
1031 assert.equal(json.data.id, 'some-peer');
1032 done();
1033 }
1034 });
1035
1036 });
1037 });
1038
1039 itBoth('subscription to non existent hub does not return data for that subscriptionId', function(idx, done) {
1040 var endpoint = urls[idx];
1041 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1042 var validTopic = validTopics[0];
1043 var invalidTopic = validTopic.replace('hub/', 'notahub/');
1044 var invalidSubscriptionId = null;
1045
1046 ws.on('open', function() {
1047 ws.send(JSON.stringify({ type: 'subscribe', topic: invalidTopic }));
1048 ws.send(JSON.stringify({ type: 'subscribe', topic: validTopic }));
1049
1050 ws.on('message', function(buffer) {
1051 var json = JSON.parse(buffer);
1052 if (json.type === 'subscribe-ack') {
1053 if (json.topic === invalidTopic) {
1054 invalidSubscriptionId = json.subscriptionId;
1055 }
1056 setTimeout(function() {
1057 devices[0].call('change');
1058 }, 50)
1059 } else {
1060 assert.notEqual(json.subscriptionId, invalidSubscriptionId);
1061 done();
1062 }
1063 });
1064 });
1065 ws.on('error', done);
1066 });
1067
1068 itBoth('wildcard and specific topic will each publish a message on a subscription', function(idx, done) {
1069 var endpoint = urls[idx];
1070 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1071 var subscriptionId = null;
1072 var count = 0;
1073 var ackCount = 0;
1074 var topicOne = validTopics[0];
1075 var topicTwo = 'hub/testdriver/*/state';
1076 ws.on('open', function() {
1077 var msgOne = { type: 'subscribe', topic: topicOne };
1078 var msgTwo = { type: 'subscribe', topic: topicTwo };
1079 ws.send(JSON.stringify(msgOne));
1080 ws.send(JSON.stringify(msgTwo));
1081 ws.on('message', function(buffer) {
1082 var json = JSON.parse(buffer);
1083 if(json.type === 'subscribe-ack') {
1084 assert.equal(json.type, 'subscribe-ack');
1085 assert(json.timestamp);
1086 assert(json.topic);
1087 assert(json.subscriptionId);
1088 subscriptionId = json.subscriptionId;
1089 ackCount++;
1090 setTimeout(function() {
1091 for(var i=0; i<11; i++) {
1092 devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
1093 }
1094 }, 50);
1095 } else {
1096 assert.equal(json.type, 'event');
1097 assert(json.timestamp);
1098 assert(json.topic);
1099 assert(json.subscriptionId);
1100 count++;
1101 if(count === 2) {
1102 assert.equal(ackCount, 2);
1103 done();
1104 }
1105 }
1106 });
1107 });
1108 ws.on('error', done);
1109 });
1110
1111 itBoth('adding limit to subscription should limit number of messages received', function(idx, done){
1112 var endpoint = urls[idx];
1113 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1114 var subscriptionId = null;
1115 var count = 0;
1116 var topic = validTopics[0];
1117 var data = null;
1118 ws.on('open', function() {
1119 var msg = { type: 'subscribe', topic: topic, limit: 10 };
1120 ws.send(JSON.stringify(msg));
1121 ws.on('message', function(buffer) {
1122 var json = JSON.parse(buffer);
1123 if(json.type === 'subscribe-ack') {
1124 assert.equal(json.type, 'subscribe-ack');
1125 assert(json.timestamp);
1126 assert(json.topic);
1127 assert(json.subscriptionId);
1128 subscriptionId = json.subscriptionId;
1129
1130 setTimeout(function() {
1131 for(var i=0; i<15; i++) {
1132 devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
1133 }
1134 }, 50);
1135 } else if (json.type !== 'unsubscribe-ack') {
1136 assert.equal(json.type, 'event');
1137 assert(json.timestamp);
1138 assert(json.topic);
1139 assert(json.subscriptionId, subscriptionId);
1140 assert(json.data);
1141
1142 count++;
1143 if(count === 10) {
1144 setTimeout(function() {
1145 assert.equal(count, 10);
1146 done();
1147 }, 200)
1148 }
1149 }
1150 });
1151 });
1152 ws.on('error', done);
1153 });
1154
1155 itBoth('when limit is reached a unsubscribe-ack should be received', function(idx, done){
1156 var endpoint = urls[idx];
1157 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1158 var subscriptionId = null;
1159 var count = 0;
1160 var topic = validTopics[0];
1161 var data = null;
1162 ws.on('open', function() {
1163 var msg = { type: 'subscribe', topic: topic, limit: 10 };
1164 ws.send(JSON.stringify(msg));
1165 ws.on('message', function(buffer) {
1166 var json = JSON.parse(buffer);
1167 if(json.type === 'subscribe-ack') {
1168 assert.equal(json.type, 'subscribe-ack');
1169 assert(json.timestamp);
1170 assert(json.topic);
1171 assert(json.subscriptionId);
1172 subscriptionId = json.subscriptionId;
1173 setTimeout(function() {
1174 for(var i=0; i<11; i++) {
1175 devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
1176 }
1177 }, 50);
1178 } else if(json.type === 'event') {
1179 assert.equal(json.type, 'event');
1180 assert(json.timestamp);
1181 assert(json.topic);
1182 assert(json.subscriptionId, subscriptionId);
1183 assert(json.data);
1184 count++;
1185 } else if(json.type === 'unsubscribe-ack') {
1186 assert.equal(json.type, 'unsubscribe-ack');
1187 assert(json.timestamp);
1188 assert.equal(json.subscriptionId, subscriptionId);
1189 assert.equal(count, 10);
1190 done();
1191 }
1192 });
1193 });
1194 ws.on('error', done);
1195 });
1196
1197 itBoth('when limit is reached with a query selector a unsubscribe-ack should be received', function(idx, done){
1198 var endpoint = urls[idx];
1199 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1200 var subscriptionId = null;
1201 var count = 0;
1202 var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 5';
1203 var data = null;
1204 ws.on('open', function() {
1205 var msg = { type: 'subscribe', topic: topic, limit: 10 };
1206 ws.send(JSON.stringify(msg));
1207 ws.on('message', function(buffer) {
1208 var json = JSON.parse(buffer);
1209 if(json.type === 'subscribe-ack') {
1210 assert.equal(json.type, 'subscribe-ack');
1211 assert(json.timestamp);
1212 assert(json.topic);
1213 assert(json.subscriptionId);
1214 subscriptionId = json.subscriptionId;
1215 setTimeout(function() {
1216 for(var i=0; i<16; i++) {
1217 devices[0].incrementStreamValue();
1218 }
1219 }, 50);
1220 } else if(json.type === 'event') {
1221 assert.equal(json.type, 'event');
1222 assert(json.timestamp);
1223 assert(json.topic);
1224 assert(json.subscriptionId, subscriptionId);
1225 assert(json.data);
1226 count++;
1227 } else if(json.type === 'unsubscribe-ack') {
1228 assert.equal(json.type, 'unsubscribe-ack');
1229 assert(json.timestamp);
1230 assert.equal(json.subscriptionId, subscriptionId);
1231 assert.equal(count, 10);
1232 done();
1233 }
1234 });
1235 });
1236 ws.on('error', done);
1237 });
1238
1239 itBoth('query field selector should only return properties in selection', function(idx, done){
1240 var endpoint = urls[idx];
1241 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1242 var subscriptionId = null;
1243 var count = 0;
1244 var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 1';
1245 ws.on('open', function() {
1246 var msg = { type: 'subscribe', topic: topic };
1247 ws.send(JSON.stringify(msg));
1248 ws.on('message', function(buffer) {
1249 var json = JSON.parse(buffer);
1250 if(json.type === 'subscribe-ack') {
1251 assert.equal(json.type, 'subscribe-ack');
1252 assert(json.timestamp);
1253 assert(json.topic);
1254 assert(json.subscriptionId);
1255 subscriptionId = json.subscriptionId;
1256 setTimeout(function() {
1257 devices[0].incrementStreamValue();
1258 }, 50);
1259 } else if(json.type === 'event') {
1260 assert.equal(json.type, 'event');
1261 assert(json.timestamp);
1262 assert(json.topic);
1263 assert(json.subscriptionId, subscriptionId);
1264 assert(json.data);
1265 done();
1266 }
1267 });
1268 });
1269 ws.on('error', done);
1270 });
1271
1272 itBoth('query field selector * should all properties in selection', function(idx, done){
1273 var endpoint = urls[idx];
1274 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1275 var subscriptionId = null;
1276 var count = 0;
1277 var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.val >= 2';
1278 var data = { foo: 'bar', val: 2 };
1279 ws.on('open', function() {
1280 var msg = { type: 'subscribe', topic: topic };
1281 ws.send(JSON.stringify(msg));
1282 ws.on('message', function(buffer) {
1283 var json = JSON.parse(buffer);
1284 if(json.type === 'subscribe-ack') {
1285 assert.equal(json.type, 'subscribe-ack');
1286 assert(json.timestamp);
1287 assert(json.topic);
1288 assert(json.subscriptionId);
1289 subscriptionId = json.subscriptionId;
1290 setTimeout(function() {
1291 devices[0].publishStreamObject(data);
1292 }, 50);
1293 } else if(json.type === 'event') {
1294 assert(json.timestamp);
1295 assert(json.topic);
1296 assert(json.subscriptionId, subscriptionId);
1297 assert(json.data);
1298 assert.equal(json.data.val, 2);
1299 assert.equal(json.data.foo, 'bar');
1300 done();
1301 }
1302 });
1303 });
1304 ws.on('error', done);
1305 });
1306
1307 itBoth('query field selector should return only selected properties', function(idx, done){
1308 var endpoint = urls[idx];
1309 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1310 var subscriptionId = null;
1311 var count = 0;
1312 var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select data.val';
1313 var data = { foo: 'bar', val: 2 };
1314 ws.on('open', function() {
1315 var msg = { type: 'subscribe', topic: topic };
1316 ws.send(JSON.stringify(msg));
1317 ws.on('message', function(buffer) {
1318 var json = JSON.parse(buffer);
1319 if(json.type === 'subscribe-ack') {
1320 assert.equal(json.type, 'subscribe-ack');
1321 assert(json.timestamp);
1322 assert(json.topic);
1323 assert(json.subscriptionId);
1324 subscriptionId = json.subscriptionId;
1325 setTimeout(function() {
1326 devices[0].publishStreamObject(data);
1327 }, 50);
1328 } else if(json.type === 'event') {
1329 assert(json.timestamp);
1330 assert(json.topic);
1331 assert(json.subscriptionId, subscriptionId);
1332 assert(json.data);
1333 assert.equal(json.data.val, 2);
1334 assert.equal(json.data.foo, undefined);
1335 done();
1336 }
1337 });
1338 });
1339 ws.on('error', done);
1340 });
1341
1342 itBoth('subscribing to all ** and then unsubscribing followed by a peer connecting wont crash zetta', function(idx, done){
1343 var endpoint = urls[idx];
1344 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1345 var topic = '**';
1346 ws.on('open', function() {
1347 var msg = { type: 'subscribe', topic: topic };
1348 ws.send(JSON.stringify(msg));
1349 ws.on('message', function(buffer) {
1350 var json = JSON.parse(buffer);
1351 if(json.type === 'subscribe-ack') {
1352 assert.equal(json.type, 'subscribe-ack');
1353 assert(json.timestamp);
1354 assert(json.topic);
1355 assert(json.subscriptionId);
1356 var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId };
1357 ws.send(JSON.stringify(msg));
1358 } else if(json.type === 'unsubscribe-ack') {
1359 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
1360 z.silent();
1361 z.name('some-new-peer')
1362 z.link('http://' + urls[0]);
1363 z.use(function(server) {
1364 server.pubsub.subscribe('_peer/connect', function(topic, data) {
1365 setTimeout(function() {
1366 done();
1367 }, 400);
1368 });
1369 });
1370 z.listen(0);
1371 }
1372 });
1373 });
1374 ws.on('error', done);
1375 });
1376
1377 itBoth('Passing filterMultiple options to ws only one data event will be sent', function(idx, done) {
1378 var endpoint = urls[idx];
1379 var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true');
1380 var topic = validTopics[0];
1381 ws.on('open', function() {
1382 var msg = { type: 'subscribe', topic: topic };
1383 var msg2 = { type: 'subscribe', topic: 'hub/testdriver/*/state' };
1384 ws.send(JSON.stringify(msg));
1385 ws.send(JSON.stringify(msg2));
1386 var subscriptions = [];
1387 ws.on('message', function(buffer) {
1388 var json = JSON.parse(buffer);
1389 if(json.type === 'subscribe-ack') {
1390 assert.equal(json.type, 'subscribe-ack');
1391 assert(json.timestamp);
1392 assert(json.subscriptionId);
1393 subscriptions.push(json.subscriptionId);
1394 if (subscriptions.length === 2) {
1395 setTimeout(function() {
1396 devices[0].call('change');
1397 }, 50);
1398 }
1399 } else {
1400 assert.equal(json.type, 'event');
1401 assert(json.timestamp);
1402 assert.equal(json.topic, topic);
1403 assert.equal(json.subscriptionId.length, subscriptions.length);
1404 subscriptions.forEach(function(id) {
1405 assert(json.subscriptionId.indexOf(id) >= -1);
1406 });
1407 assert(json.data);
1408 done();
1409 }
1410 });
1411 });
1412 ws.on('error', done);
1413 });
1414
1415 itBoth('Passing filterMultiple options to ws will apply limits for both topics', function(idx, done) {
1416 var endpoint = urls[idx];
1417 var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true');
1418 var topic = validTopics[0];
1419 var topic2 = 'hub/testdriver/*/state';
1420 ws.on('open', function() {
1421 var msg = { type: 'subscribe', topic: topic, limit: 2 };
1422 var msg2 = { type: 'subscribe', topic: topic2, limit: 3 };
1423 ws.send(JSON.stringify(msg));
1424 ws.send(JSON.stringify(msg2));
1425 var subscriptions = {};
1426
1427 ws.on('message', function(buffer) {
1428 var json = JSON.parse(buffer);
1429 if(json.type === 'subscribe-ack') {
1430 assert.equal(json.type, 'subscribe-ack');
1431 assert(json.timestamp);
1432 assert(json.subscriptionId);
1433 subscriptions[json.subscriptionId] = 0;
1434 if (Object.keys(subscriptions).length === 2) {
1435 setTimeout(function() {
1436 devices[0].call('change');
1437 devices[0].call('prepare');
1438 devices[0].call('change');
1439 }, 50);
1440 }
1441 } else if (json.type === 'event') {
1442 assert(json.timestamp);
1443 assert.equal(json.topic, topic);
1444 assert(json.data);
1445
1446 json.subscriptionId.forEach(function(id) {
1447 subscriptions[id]++;
1448 });
1449
1450 if (subscriptions[1] === 2 && subscriptions[2] === 3) {
1451 done();
1452 }
1453 }
1454 });
1455 });
1456 ws.on('error', done);
1457 });
1458
1459 itBoth('Passing filterMultiple options to ws will have no effect on topics with caql query', function(idx, done) {
1460 var endpoint = urls[idx];
1461 var ws = new WebSocket('ws://' + endpoint + baseUrl + '?filterMultiple=true');
1462 var topic = validTopics[0] + '?select *';
1463 var topic2 = 'hub/testdriver/*/state';
1464 ws.on('open', function() {
1465 var msg = { type: 'subscribe', topic: topic };
1466 var msg2 = { type: 'subscribe', topic: topic2 };
1467 ws.send(JSON.stringify(msg));
1468 ws.send(JSON.stringify(msg2));
1469 var received = 0;
1470
1471 ws.on('message', function(buffer) {
1472 var json = JSON.parse(buffer);
1473 if(json.type === 'subscribe-ack') {
1474 assert.equal(json.type, 'subscribe-ack');
1475 assert(json.timestamp);
1476 assert(json.subscriptionId);
1477 setTimeout(function() {
1478 devices[0].call('change');
1479 }, 50);
1480 } else if (json.type === 'event') {
1481 assert(json.timestamp);
1482 assert(json.data);
1483 assert.equal(json.subscriptionId.length, 1);
1484 received++;
1485
1486 if (received === 2) {
1487 done();
1488 }
1489 }
1490 });
1491 });
1492 ws.on('error', done);
1493 });
1494
1495
1496 itBoth('subscribing to a query with hub for hub will return all devices', function(idx, done) {
1497 var endpoint = urls[idx];
1498 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1499 var subscriptionId = null;
1500 var topic = 'hub/query/where type is not missing';
1501 var count = 0;
1502 var expected = (idx === 1) ? 2 : 2;
1503 ws.on('open', function() {
1504 var msg = { type: 'subscribe', topic: topic };
1505 ws.send(JSON.stringify(msg));
1506 ws.on('message', function(buffer) {
1507 var json = JSON.parse(buffer);
1508 if(json.type === 'subscribe-ack') {
1509 assert.equal(json.type, 'subscribe-ack');
1510 assert(json.timestamp);
1511 assert.equal(json.topic, topic);
1512 assert(json.subscriptionId);
1513 subscriptionId = json.subscriptionId;
1514 } else {
1515 assert.equal(json.type, 'event');
1516 assert(json.timestamp);
1517 assert.equal(json.topic, topic);
1518 assert.equal(json.subscriptionId, subscriptionId);
1519 assert(json.data);
1520 count++;
1521 if (count === expected) {
1522 done();
1523 }
1524 }
1525 });
1526 });
1527 ws.on('error', done);
1528 });
1529
1530
1531 itBoth('subscribing to a query with * for hub will return all devices', function(idx, done) {
1532 var endpoint = urls[idx];
1533 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1534 var subscriptionId = null;
1535 var topic = '*/query/where type is not missing';
1536 var count = 0;
1537 var expected = (idx === 1) ? 2 : 4; // cloud will have 4 devices
1538 ws.on('open', function() {
1539 var msg = { type: 'subscribe', topic: topic };
1540 ws.send(JSON.stringify(msg));
1541 ws.on('message', function(buffer) {
1542 var json = JSON.parse(buffer);
1543 if(json.type === 'subscribe-ack') {
1544 assert.equal(json.type, 'subscribe-ack');
1545 assert(json.timestamp);
1546 assert.equal(json.topic, topic);
1547 assert(json.subscriptionId);
1548 subscriptionId = json.subscriptionId;
1549 } else {
1550 assert.equal(json.type, 'event');
1551 assert(json.timestamp);
1552 assert.equal(json.topic, topic);
1553 assert.equal(json.subscriptionId, subscriptionId);
1554 assert(json.data);
1555 count++;
1556 if (count === expected) {
1557 done();
1558 }
1559 }
1560 });
1561 });
1562 ws.on('error', done);
1563 });
1564
1565 itBoth('when data is 0 value it should be formatted correctly', function(idx, done) {
1566 var endpoint = urls[idx];
1567 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1568 var subscriptionId = null;
1569 var topic = 'hub/testdriver/' + devices[0].id + '/bar';
1570 ws.on('open', function() {
1571 var msg = { type: 'subscribe', topic: topic };
1572 ws.send(JSON.stringify(msg));
1573 ws.on('message', function(buffer) {
1574 var json = JSON.parse(buffer);
1575 if(json.type === 'subscribe-ack') {
1576 assert.equal(json.type, 'subscribe-ack');
1577 assert(json.timestamp);
1578 assert.equal(json.topic, topic);
1579 assert(json.subscriptionId);
1580 subscriptionId = json.subscriptionId;
1581
1582 setTimeout(function() {
1583 devices[0].bar = -1;
1584 devices[0].incrementStreamValue();
1585 }, 50);
1586 } else {
1587 assert.equal(json.type, 'event');
1588 assert(json.timestamp);
1589 assert.equal(json.topic, topic);
1590 assert.equal(json.subscriptionId, subscriptionId);
1591 assert.equal(json.data, 0);
1592 done();
1593 }
1594 });
1595 });
1596 ws.on('error', done);
1597 });
1598
1599 describe('Protocol Errors', function() {
1600
1601 var makeTopicStringErrorsTest = function(topic) {
1602 itBoth('invalid stream topic "' + topic + '" should result in a 400 error', function(idx, done){
1603 var endpoint = urls[idx];
1604 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1605 ws.on('open', function() {
1606 var msg = { type: 'subscribe', topic: topic };
1607 ws.send(JSON.stringify(msg));
1608 ws.on('message', function(buffer) {
1609 var json = JSON.parse(buffer);
1610 assert(json.timestamp);
1611 assert.equal(json.topic, topic);
1612 assert.equal(json.code, 400);
1613 assert(json.message);
1614 done();
1615 });
1616 });
1617 ws.on('error', done);
1618 });
1619 };
1620
1621 makeTopicStringErrorsTest('*');
1622 makeTopicStringErrorsTest('hub');
1623 makeTopicStringErrorsTest('{hub.+}');
1624 makeTopicStringErrorsTest('*/');
1625 makeTopicStringErrorsTest('**/');
1626 makeTopicStringErrorsTest('hub/');
1627 makeTopicStringErrorsTest('{hub.+}/');
1628
1629 itBoth('invalid stream query should result in a 400 error', function(idx, done){
1630 var endpoint = urls[idx];
1631 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1632 var subscriptionId = null;
1633 var count = 0;
1634 var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?invalid stream query';
1635 var data = { foo: 'bar', val: 2 };
1636 ws.on('open', function() {
1637 var msg = { type: 'subscribe', topic: topic };
1638 ws.send(JSON.stringify(msg));
1639 ws.on('message', function(buffer) {
1640 var json = JSON.parse(buffer);
1641 assert(json.timestamp);
1642 assert.equal(json.topic, topic);
1643 assert.equal(json.code, 400);
1644 assert(json.message);
1645 done();
1646 });
1647 });
1648 ws.on('error', done);
1649 });
1650
1651 itBoth('invalid subscribe should result in a 400 error', function(idx, done){
1652 var endpoint = urls[idx];
1653 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1654 var subscriptionId = null;
1655 var count = 0;
1656 var topic = 'hub/testdriver/' + devices[0].id + '/fooobject';
1657 ws.on('open', function() {
1658 var msg = { type: 'subscribe' };
1659 ws.send(JSON.stringify(msg));
1660 ws.on('message', function(buffer) {
1661 var json = JSON.parse(buffer);
1662 assert(json.timestamp);
1663 assert.equal(json.code, 400);
1664 assert(json.message);
1665 done();
1666 });
1667 });
1668 ws.on('error', done);
1669 });
1670
1671 itBoth('unsubscribing from an invalid subscriptionId should result in a 405 error', function(idx, done){
1672 var endpoint = urls[idx];
1673 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1674 var subscriptionId = null;
1675 var count = 0;
1676 ws.on('open', function() {
1677 var msg = { type: 'unsubscribe', subscriptionId: 123 };
1678 ws.send(JSON.stringify(msg));
1679 ws.on('message', function(buffer) {
1680 var json = JSON.parse(buffer);
1681 assert(json.timestamp);
1682 assert.equal(json.code, 405);
1683 assert(json.message);
1684 done();
1685 });
1686 });
1687 ws.on('error', done);
1688 });
1689
1690 itBoth('invalid type should result in a 405 error', function(idx, done){
1691 var endpoint = urls[idx];
1692 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1693 var subscriptionId = null;
1694 var count = 0;
1695 ws.on('open', function() {
1696 var msg = { type: 'not-a-type', topic: '**' };
1697 ws.send(JSON.stringify(msg));
1698 ws.on('message', function(buffer) {
1699 var json = JSON.parse(buffer);
1700 assert(json.timestamp);
1701 assert.equal(json.code, 405);
1702 assert(json.message);
1703 done();
1704 });
1705 });
1706 ws.on('error', done);
1707 });
1708
1709 itBoth('unsubscribing from a missing subscriptionId should result in a 400 error', function(idx, done){
1710 var endpoint = urls[idx];
1711 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1712 var subscriptionId = null;
1713 var count = 0;
1714 ws.on('open', function() {
1715 var msg = { type: 'unsubscribe' };
1716 ws.send(JSON.stringify(msg));
1717 ws.on('message', function(buffer) {
1718 var json = JSON.parse(buffer);
1719 assert(json.timestamp);
1720 assert.equal(json.code, 400);
1721 assert(json.message);
1722 done();
1723 });
1724 });
1725 ws.on('error', done);
1726 });
1727
1728 itBoth('on invalid message should result in a 400 error', function(idx, done){
1729 var endpoint = urls[idx];
1730 var ws = new WebSocket('ws://' + endpoint + baseUrl);
1731 var subscriptionId = null;
1732 var count = 0;
1733 ws.on('open', function() {
1734 var msg = { test: 123 };
1735 ws.send(JSON.stringify(msg));
1736 ws.on('message', function(buffer) {
1737 var json = JSON.parse(buffer);
1738 assert(json.timestamp);
1739 assert.equal(json.code, 400);
1740 assert(json.message);
1741 done();
1742 });
1743 });
1744 ws.on('error', done);
1745 });
1746
1747
1748 })
1749
1750 });
1751
1752 describe('SPDY API', function() {
1753 });
1754
1755});