1 | var assert = require('assert');
|
2 | var WebSocket = require('ws');
|
3 | var zetta = require('./..');
|
4 | var zettacluster = require('zetta-cluster');
|
5 | var Driver = require('./fixture/example_driver');
|
6 | var MemRegistry = require('./fixture/mem_registry');
|
7 | var MemPeerRegistry = require('./fixture/mem_peer_registry');
|
8 |
|
9 | describe('Peering Event Streams', function() {
|
10 | var cloud = null;
|
11 | var cloudUrl = null;
|
12 | var baseUrl = '/events';
|
13 |
|
14 | beforeEach(function(done) {
|
15 | cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
16 | cloud.silent();
|
17 | cloud.listen(0, function(err) {
|
18 | if(err) {
|
19 | return done(err);
|
20 | }
|
21 | cloudUrl = 'http://localhost:' + cloud.httpServer.server.address().port;
|
22 | done();
|
23 | });
|
24 | });
|
25 |
|
26 | afterEach(function(done) {
|
27 | cloud.httpServer.server.close();
|
28 | done();
|
29 | });
|
30 |
|
31 | it('will receive a _peer/connect event when subscribed', function(done) {
|
32 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
33 | z.silent();
|
34 | z.listen(0, function(err) {
|
35 | if(err) {
|
36 | return done(err);
|
37 | }
|
38 | var zPort = z.httpServer.server.address().port;
|
39 | var endpoint = 'localhost:' + zPort;
|
40 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
41 | ws.on('open', function() {
|
42 | var msg = { type: 'subscribe', topic: '_peer/connect' };
|
43 | ws.send(JSON.stringify(msg));
|
44 | ws.on('message', function(buffer) {
|
45 | var json = JSON.parse(buffer);
|
46 | if(json.type === 'subscribe-ack') {
|
47 | assert.equal(json.type, 'subscribe-ack');
|
48 | assert(json.timestamp);
|
49 | assert.equal(json.topic, '_peer/connect');
|
50 | assert(json.subscriptionId);
|
51 | } else if(json.type === 'event') {
|
52 | assert.equal(json.topic, '_peer/connect');
|
53 | done();
|
54 | }
|
55 | });
|
56 | });
|
57 | ws.on('error', done);
|
58 | z.link(cloudUrl);
|
59 | });
|
60 | });
|
61 |
|
62 | it('will receive a _peer/connect event when subscribed to **', function(done) {
|
63 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
64 | z.silent();
|
65 | z.listen(0, function(err) {
|
66 | if(err) {
|
67 | return done(err);
|
68 | }
|
69 | var zPort = z.httpServer.server.address().port;
|
70 | var endpoint = 'localhost:' + zPort;
|
71 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
72 | ws.on('open', function() {
|
73 | var msg = { type: 'subscribe', topic: '**' };
|
74 | ws.send(JSON.stringify(msg));
|
75 | ws.on('message', function(buffer) {
|
76 | var json = JSON.parse(buffer);
|
77 | if(json.type === 'subscribe-ack') {
|
78 | assert.equal(json.type, 'subscribe-ack');
|
79 | assert(json.timestamp);
|
80 | assert.equal(json.topic, '**');
|
81 | assert(json.subscriptionId);
|
82 | } else if(json.type === 'event') {
|
83 | assert.equal(json.topic, '_peer/connect');
|
84 | done();
|
85 | }
|
86 | });
|
87 | });
|
88 | ws.on('error', done);
|
89 | z.link(cloudUrl);
|
90 | });
|
91 | });
|
92 |
|
93 |
|
94 | it('will receive a _peer/disconnect event when subscribed', function(done) {
|
95 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
96 | z.silent();
|
97 | z.pubsub.subscribe('_peer/connect', function(topic, data) {
|
98 | var peer = data.peer;
|
99 | peer.close();
|
100 | });
|
101 | z.listen(0, function(err) {
|
102 | if(err) {
|
103 | return done(err);
|
104 | }
|
105 | var zPort = z.httpServer.server.address().port;
|
106 | var endpoint = 'localhost:' + zPort;
|
107 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
108 | ws.on('open', function() {
|
109 | var msg = { type: 'subscribe', topic: '_peer/disconnect' };
|
110 | ws.send(JSON.stringify(msg));
|
111 | ws.on('message', function(buffer) {
|
112 | var json = JSON.parse(buffer);
|
113 | if(json.type === 'subscribe-ack') {
|
114 | assert.equal(json.type, 'subscribe-ack');
|
115 | assert(json.timestamp);
|
116 | assert.equal(json.topic, '_peer/disconnect');
|
117 | assert(json.subscriptionId);
|
118 | } else if(json.type === 'event') {
|
119 | assert.equal(json.topic, '_peer/disconnect');
|
120 | done();
|
121 | }
|
122 | });
|
123 | });
|
124 | ws.on('error', done);
|
125 | z.link(cloudUrl);
|
126 | });
|
127 | });
|
128 |
|
129 | it('will receive a _peer/connect event when subscribed with wildcards', function(done) {
|
130 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
131 | z.silent();
|
132 | z.pubsub.subscribe('_peer/connect', function(topic, data) {
|
133 | var peer = data.peer;
|
134 | });
|
135 | z.listen(0, function(err) {
|
136 | if(err) {
|
137 | return done(err);
|
138 | }
|
139 | var zPort = z.httpServer.server.address().port;
|
140 | var endpoint = 'localhost:' + zPort;
|
141 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
142 | ws.on('open', function() {
|
143 | var msg = { type: 'subscribe', topic: '_peer/*' };
|
144 | ws.send(JSON.stringify(msg));
|
145 | ws.on('message', function(buffer) {
|
146 | var json = JSON.parse(buffer);
|
147 | if(json.type === 'subscribe-ack') {
|
148 | assert.equal(json.type, 'subscribe-ack');
|
149 | assert(json.timestamp);
|
150 | assert.equal(json.topic, '_peer/*');
|
151 | assert(json.subscriptionId);
|
152 | } else if(json.type === 'event') {
|
153 | assert.equal(json.topic, '_peer/connect');
|
154 | done();
|
155 | }
|
156 | });
|
157 | });
|
158 | ws.on('error', done);
|
159 | z.link(cloudUrl);
|
160 | });
|
161 | });
|
162 | it('will receive a _peer/connect and _peer/disconnect event when subscribed with wildcards', function(done) {
|
163 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
164 | z.silent();
|
165 | z.pubsub.subscribe('_peer/connect', function(topic, data) {
|
166 | var peer = data.peer;
|
167 | peer.close();
|
168 | });
|
169 | var recv = 0;
|
170 | z.listen(0, function(err) {
|
171 | if(err) {
|
172 | return done(err);
|
173 | }
|
174 | var zPort = z.httpServer.server.address().port;
|
175 | var endpoint = 'localhost:' + zPort;
|
176 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
177 | ws.on('open', function() {
|
178 | var msg = { type: 'subscribe', topic: '_peer/*' };
|
179 | ws.send(JSON.stringify(msg));
|
180 | ws.on('message', function(buffer) {
|
181 | var json = JSON.parse(buffer);
|
182 | if(json.type === 'subscribe-ack') {
|
183 | assert.equal(json.type, 'subscribe-ack');
|
184 | assert(json.timestamp);
|
185 | assert.equal(json.topic, '_peer/*');
|
186 | assert(json.subscriptionId);
|
187 | } else if(json.type === 'event') {
|
188 | recv++;
|
189 | if(recv == 1) {
|
190 | assert.equal(json.topic, '_peer/connect');
|
191 | } else if(recv == 2) {
|
192 | assert.equal(json.topic, '_peer/disconnect');
|
193 | done();
|
194 | }
|
195 |
|
196 | }
|
197 | });
|
198 | });
|
199 | ws.on('error', done);
|
200 | z.link(cloudUrl);
|
201 | });
|
202 | });
|
203 | });
|
204 |
|
205 | describe('Event Streams', function() {
|
206 | var cluster = null;
|
207 | var urls = [];
|
208 | var baseUrl = '/events';
|
209 | var devices = [];
|
210 | var validTopics = [];
|
211 |
|
212 | beforeEach(function(done) {
|
213 | urls = [];
|
214 | devices = [];
|
215 | validTopics = [];
|
216 | cluster = zettacluster({ zetta: zetta })
|
217 | .server('cloud')
|
218 | .server('hub', [Driver, Driver], ['cloud'])
|
219 | .server('hub2', [Driver, Driver], ['cloud'])
|
220 | .on('ready', function() {
|
221 | app = cluster.servers['cloud'];
|
222 | urls.push('localhost:' + cluster.servers['cloud']._testPort);
|
223 | urls.push('localhost:' + cluster.servers['hub']._testPort);
|
224 |
|
225 | ['hub', 'hub2'].forEach(function(hubname) {
|
226 | Object.keys(cluster.servers[hubname].runtime._jsDevices).forEach(function(id) {
|
227 | var device = cluster.servers[hubname].runtime._jsDevices[id];
|
228 | devices.push(device);
|
229 | validTopics.push(hubname + '/' + device.type + '/' + device.id + '/state');
|
230 | });
|
231 | })
|
232 | done();
|
233 | })
|
234 | .run(function(err){
|
235 | if (err) {
|
236 | return done(err);
|
237 | }
|
238 | });
|
239 | });
|
240 |
|
241 | afterEach(function() {
|
242 | cluster.stop();
|
243 | });
|
244 |
|
245 | describe('Websocket API', function() {
|
246 | var itBoth = function(testMsg, test) {
|
247 | it('for cloud, ' + testMsg, test.bind(null, 0));
|
248 | it('for hub, ' + testMsg, test.bind(null, 1));
|
249 | };
|
250 |
|
251 | itBoth('subscribing to a topic receives a subscription-ack', function(idx, done) {
|
252 | var endpoint = urls[idx];
|
253 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
254 | ws.on('open', function() {
|
255 | var msg = { type: 'subscribe', topic: 'hub/led/1234/state' };
|
256 | ws.send(JSON.stringify(msg));
|
257 | ws.on('message', function(buffer) {
|
258 | var json = JSON.parse(buffer);
|
259 | assert.equal(json.type, 'subscribe-ack');
|
260 | assert(json.timestamp);
|
261 | assert.equal(json.topic, 'hub/led/1234/state');
|
262 | assert(json.subscriptionId);
|
263 | done();
|
264 | });
|
265 | });
|
266 | ws.on('error', done);
|
267 | });
|
268 |
|
269 | itBoth('sending ping request will return a pong response without data field', function(idx, done) {
|
270 | var endpoint = urls[idx];
|
271 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
272 | ws.on('open', function() {
|
273 | var msg = { type: 'ping'};
|
274 | ws.send(JSON.stringify(msg));
|
275 | ws.on('message', function(buffer) {
|
276 | var json = JSON.parse(buffer);
|
277 | assert.equal(json.type, 'pong');
|
278 | assert(json.timestamp);
|
279 | assert.equal(json.data, undefined);
|
280 | done();
|
281 | });
|
282 | });
|
283 | ws.on('error', done);
|
284 | });
|
285 |
|
286 | itBoth('sending ping request will return a pong response with data field', function(idx, done) {
|
287 | var endpoint = urls[idx];
|
288 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
289 | ws.on('open', function() {
|
290 | var msg = { type: 'ping', data: 'Application data'};
|
291 | ws.send(JSON.stringify(msg));
|
292 | ws.on('message', function(buffer) {
|
293 | var json = JSON.parse(buffer);
|
294 | assert.equal(json.type, 'pong');
|
295 | assert(json.timestamp);
|
296 | assert.equal(json.data, 'Application data');
|
297 | done();
|
298 | });
|
299 | });
|
300 | ws.on('error', done);
|
301 | });
|
302 |
|
303 | itBoth('unsubscribing to a topic receives a unsubscription-ack', function(idx, done) {
|
304 | var endpoint = urls[idx];
|
305 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
306 | ws.on('open', function() {
|
307 | var msg = { type: 'subscribe', topic: 'hub/led/1234/state' };
|
308 | ws.send(JSON.stringify(msg));
|
309 | ws.once('message', function(buffer) {
|
310 | var json = JSON.parse(buffer);
|
311 | var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId };
|
312 | ws.send(JSON.stringify(msg));
|
313 | ws.on('message', function(buffer) {
|
314 | var json2 = JSON.parse(buffer);
|
315 | assert.equal(json2.type, 'unsubscribe-ack');
|
316 | assert(json2.timestamp);
|
317 | assert.equal(json2.subscriptionId, json.subscriptionId);
|
318 | done();
|
319 | });
|
320 | });
|
321 | });
|
322 | ws.on('error', done);
|
323 | });
|
324 |
|
325 | itBoth('verify error message format', function(){});
|
326 |
|
327 | itBoth('specific topic subscription only receives messages with that topic', function(idx, done) {
|
328 | var endpoint = urls[idx];
|
329 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
330 | var subscriptionId = null;
|
331 | var topic = validTopics[0];
|
332 | ws.on('open', function() {
|
333 | var msg = { type: 'subscribe', topic: topic };
|
334 | ws.send(JSON.stringify(msg));
|
335 | ws.on('message', function(buffer) {
|
336 | var json = JSON.parse(buffer);
|
337 | if(json.type === 'subscribe-ack') {
|
338 | assert.equal(json.type, 'subscribe-ack');
|
339 | assert(json.timestamp);
|
340 | assert.equal(json.topic, topic);
|
341 | assert(json.subscriptionId);
|
342 | subscriptionId = json.subscriptionId;
|
343 |
|
344 | setTimeout(function() {
|
345 | devices[1].call('change');
|
346 | devices[0].call('change');
|
347 | }, 50);
|
348 | } else {
|
349 | assert.equal(json.type, 'event');
|
350 | assert(json.timestamp);
|
351 | assert.equal(json.topic, topic);
|
352 | assert.equal(json.subscriptionId, subscriptionId);
|
353 | assert(json.data);
|
354 | done();
|
355 | }
|
356 | });
|
357 | });
|
358 | ws.on('error', done);
|
359 | });
|
360 |
|
361 | itBoth('multiple clients specific topic subscription only receives messages with that topic', function(idx, done) {
|
362 | var endpoint = urls[idx];
|
363 | var topic = validTopics[0];
|
364 |
|
365 | var connected = 0;
|
366 | var recv = 0;
|
367 |
|
368 | var ws1 = new WebSocket('ws://' + endpoint + baseUrl);
|
369 | ws1.on('open', function() {
|
370 | var msg = { type: 'subscribe', topic: topic };
|
371 | ws1.send(JSON.stringify(msg));
|
372 | var subscriptionId = null;
|
373 | ws1.on('message', function(buffer) {
|
374 | var json = JSON.parse(buffer);
|
375 | if(json.type === 'subscribe-ack') {
|
376 | connected++;
|
377 | subscriptionId = json.subscriptionId;
|
378 | if (connected === 2) {
|
379 | setTimeout(function() {
|
380 | devices[1].call('change');
|
381 | devices[0].call('change');
|
382 | }, 50);
|
383 | }
|
384 | } else {
|
385 | assert.equal(json.topic, topic);
|
386 | assert.equal(json.subscriptionId, subscriptionId);
|
387 | recv++;
|
388 | if (recv === 2) {
|
389 | done();
|
390 | }
|
391 | }
|
392 | });
|
393 | });
|
394 | ws1.on('error', done);
|
395 |
|
396 | var ws2 = new WebSocket('ws://' + endpoint + baseUrl);
|
397 | ws2.on('open', function() {
|
398 | var msg = { type: 'subscribe', topic: topic };
|
399 | ws2.send(JSON.stringify(msg));
|
400 | var subscriptionId = null;
|
401 | ws2.on('message', function(buffer) {
|
402 | var json = JSON.parse(buffer);
|
403 | if(json.type === 'subscribe-ack') {
|
404 | subscriptionId = json.subscriptionId;
|
405 | connected++;
|
406 | if (connected === 2) {
|
407 | setTimeout(function() {
|
408 | devices[0].call('change');
|
409 | }, 50);
|
410 | }
|
411 | } else {
|
412 | assert.equal(json.topic, topic);
|
413 | assert.equal(json.subscriptionId, subscriptionId);
|
414 | recv++;
|
415 | if (recv === 2) {
|
416 | done();
|
417 | }
|
418 | }
|
419 | });
|
420 | });
|
421 | ws2.on('error', done);
|
422 | });
|
423 |
|
424 | itBoth('multiple clients using different topic subscriptions only receive one message per event', function(idx, done) {
|
425 | var endpoint = urls[idx];
|
426 | var topic = validTopics[0];
|
427 |
|
428 | var connected = 0;
|
429 | var recv1 = 0;
|
430 |
|
431 | var ws1 = new WebSocket('ws://' + endpoint + baseUrl);
|
432 | ws1.on('open', function() {
|
433 | var msg = { type: 'subscribe', topic: topic };
|
434 | ws1.send(JSON.stringify(msg));
|
435 | var subscriptionId = null;
|
436 | ws1.on('message', function(buffer) {
|
437 | var json = JSON.parse(buffer);
|
438 | if(json.type === 'subscribe-ack') {
|
439 | connected++;
|
440 | subscriptionId = json.subscriptionId;
|
441 | if (connected === 2) {
|
442 | setTimeout(function() {
|
443 | devices[0].call('change');
|
444 | }, 50);
|
445 | }
|
446 | } else {
|
447 | assert.equal(json.topic, topic);
|
448 | assert.equal(json.subscriptionId, subscriptionId);
|
449 | recv1++;
|
450 | }
|
451 | });
|
452 | });
|
453 | ws1.on('error', done);
|
454 |
|
455 | var recv2 = 0;
|
456 | var ws2 = new WebSocket('ws://' + endpoint + baseUrl);
|
457 | ws2.on('open', function() {
|
458 | var msg = { type: 'subscribe', topic: 'hub/testdriver/*/state' };
|
459 | ws2.send(JSON.stringify(msg));
|
460 | var subscriptionId = null;
|
461 | ws2.on('message', function(buffer) {
|
462 | var json = JSON.parse(buffer);
|
463 | if(json.type === 'subscribe-ack') {
|
464 | subscriptionId = json.subscriptionId;
|
465 | connected++;
|
466 | if (connected === 2) {
|
467 | setTimeout(function() {
|
468 | devices[0].call('change');
|
469 | }, 50);
|
470 | }
|
471 | } else {
|
472 | assert.equal(json.topic, topic);
|
473 | assert.equal(json.subscriptionId, subscriptionId);
|
474 | recv2++;
|
475 | }
|
476 | });
|
477 | });
|
478 | ws2.on('error', done);
|
479 |
|
480 | setTimeout(function() {
|
481 | assert.equal(recv1, 1);
|
482 | assert.equal(recv2, 1);
|
483 | done();
|
484 | }, 250);
|
485 | });
|
486 |
|
487 | itBoth('wildcard server topic subscription only receives messages with that topic', function(idx, done) {
|
488 | var endpoint = urls[idx];
|
489 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
490 | var subscriptionId = null;
|
491 | var topic = validTopics[0];
|
492 | topic = topic.replace('hub', '*');
|
493 | ws.on('open', function() {
|
494 | var msg = { type: 'subscribe', topic: topic };
|
495 | ws.send(JSON.stringify(msg));
|
496 | ws.on('message', function(buffer) {
|
497 | var json = JSON.parse(buffer);
|
498 | if(json.type === 'subscribe-ack') {
|
499 | assert.equal(json.type, 'subscribe-ack');
|
500 | assert(json.timestamp);
|
501 | assert.equal(json.topic, topic);
|
502 | assert(json.subscriptionId);
|
503 | subscriptionId = json.subscriptionId;
|
504 |
|
505 | setTimeout(function() {
|
506 | devices[1].call('change');
|
507 | devices[0].call('change');
|
508 | }, 50);
|
509 | } else {
|
510 | assert.equal(json.type, 'event');
|
511 | assert(json.timestamp);
|
512 | assert.equal(json.topic, validTopics[0]);
|
513 | assert.equal(json.subscriptionId, subscriptionId);
|
514 | assert(json.data);
|
515 | done();
|
516 | }
|
517 | });
|
518 | });
|
519 | ws.on('error', done);
|
520 | });
|
521 |
|
522 | itBoth('wildcard topic and static topic subscription will receive messages for both subscriptions', function(idx, done) {
|
523 | var endpoint = urls[idx];
|
524 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
525 | var lastSubscriptionId = null;
|
526 | var count = 0;
|
527 | ws.on('open', function() {
|
528 | var msg = { type: 'subscribe', topic: validTopics[0] };
|
529 | ws.send(JSON.stringify(msg));
|
530 | msg = { type: 'subscribe', topic: 'hub/testdriver/*/state' };
|
531 | ws.send(JSON.stringify(msg));
|
532 | ws.on('message', function(buffer) {
|
533 | var json = JSON.parse(buffer);
|
534 | if(json.type === 'subscribe-ack') {
|
535 | assert.equal(json.type, 'subscribe-ack');
|
536 | assert(json.timestamp);
|
537 | assert(json.subscriptionId);
|
538 | setTimeout(function() {
|
539 | devices[0].call('change');
|
540 | }, 50);
|
541 | } else {
|
542 | count++;
|
543 | assert.notEqual(lastSubscriptionId, json.subscriptionId);
|
544 | lastSubscriptionId = json.subscriptionId;
|
545 | assert.equal(json.type, 'event');
|
546 | assert(json.timestamp);
|
547 | assert.equal(json.topic, validTopics[0]);
|
548 | assert(json.data);
|
549 | if (count === 2) {
|
550 | done();
|
551 | }
|
552 | }
|
553 | });
|
554 | });
|
555 | ws.on('error', done);
|
556 | });
|
557 |
|
558 | itBoth('wildcard device id topic subscription and cloud app query both will recieve data', function(idx, done) {
|
559 | var endpoint = urls[idx];
|
560 | var subscriptionId = null;
|
561 | var topic = 'hub/testdriver/*/state';
|
562 |
|
563 | var runtime = cluster.servers['cloud'].runtime;
|
564 | var query = runtime.from('hub').where({ type: 'testdriver', id: devices[0].id });
|
565 | runtime.observe(query, function(device) {
|
566 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
567 | ws.on('open', function() {
|
568 | var msg = { type: 'subscribe', topic: topic };
|
569 | ws.send(JSON.stringify(msg));
|
570 | ws.on('message', function(buffer) {
|
571 | var json = JSON.parse(buffer);
|
572 | if(json.type === 'subscribe-ack') {
|
573 | assert.equal(json.type, 'subscribe-ack');
|
574 | assert(json.timestamp);
|
575 | assert.equal(json.topic, topic);
|
576 | assert(json.subscriptionId);
|
577 | subscriptionId = json.subscriptionId;
|
578 |
|
579 | setTimeout(function() {
|
580 | devices[0].call('change');
|
581 | }, 50);
|
582 | } else {
|
583 | assert.equal(json.type, 'event');
|
584 | assert(json.timestamp);
|
585 | assert.equal(json.topic, validTopics[0]);
|
586 | assert.equal(json.subscriptionId, subscriptionId);
|
587 | assert(json.data);
|
588 | done();
|
589 | }
|
590 | });
|
591 | });
|
592 | ws.on('error', done);
|
593 | });
|
594 | });
|
595 |
|
596 | itBoth('wildcard device id topic subscription and hub app query both will recieve data', function(idx, done) {
|
597 | var endpoint = urls[idx];
|
598 | var subscriptionId = null;
|
599 | var topic = 'hub/testdriver/*/state';
|
600 |
|
601 | var runtime = cluster.servers['hub'].runtime;
|
602 | var query = runtime.where({ type: 'testdriver', id: devices[0].id });
|
603 | runtime.observe(query, function(device) {
|
604 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
605 | ws.on('open', function() {
|
606 | var msg = { type: 'subscribe', topic: topic };
|
607 | ws.send(JSON.stringify(msg));
|
608 | ws.on('message', function(buffer) {
|
609 | var json = JSON.parse(buffer);
|
610 | if(json.type === 'subscribe-ack') {
|
611 | assert.equal(json.type, 'subscribe-ack');
|
612 | assert(json.timestamp);
|
613 | assert.equal(json.topic, topic);
|
614 | assert(json.subscriptionId);
|
615 | subscriptionId = json.subscriptionId;
|
616 |
|
617 | setTimeout(function() {
|
618 | devices[0].call('change');
|
619 | }, 50);
|
620 | } else {
|
621 | assert.equal(json.type, 'event');
|
622 | assert(json.timestamp);
|
623 | assert.equal(json.topic, validTopics[0]);
|
624 | assert.equal(json.subscriptionId, subscriptionId);
|
625 | assert(json.data);
|
626 | done();
|
627 | }
|
628 | });
|
629 | });
|
630 | ws.on('error', done);
|
631 | });
|
632 | });
|
633 |
|
634 | it('wildcard server topic subscription receives messages from both hubs', function(done) {
|
635 | var endpoint = urls[0];
|
636 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
637 | var subscriptionId = null;
|
638 | var topic = '*/testdriver/*/state';
|
639 | ws.on('open', function() {
|
640 | var msg = { type: 'subscribe', topic: topic };
|
641 | ws.send(JSON.stringify(msg));
|
642 | var recv = 0;
|
643 | ws.on('message', function(buffer) {
|
644 | var json = JSON.parse(buffer);
|
645 | if(json.type === 'subscribe-ack') {
|
646 | assert.equal(json.type, 'subscribe-ack');
|
647 | assert(json.timestamp);
|
648 | assert.equal(json.topic, topic);
|
649 | assert(json.subscriptionId);
|
650 | subscriptionId = json.subscriptionId;
|
651 |
|
652 | setTimeout(function() {
|
653 | devices[0].call('change');
|
654 | devices[2].call('change');
|
655 | }, 50);
|
656 | } else {
|
657 | recv++;
|
658 | assert.equal(json.type, 'event');
|
659 | assert(json.timestamp);
|
660 | assert(json.topic);
|
661 | assert.equal(json.subscriptionId, subscriptionId);
|
662 | assert(json.data);
|
663 | if (recv === 2) {
|
664 | done();
|
665 | }
|
666 | }
|
667 | });
|
668 | });
|
669 | ws.on('error', done);
|
670 | });
|
671 |
|
672 | it('wildcard topic ** will subscribe to all topics for both hubs', function(done) {
|
673 | var endpoint = urls[0];
|
674 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
675 | var subscriptionId = null;
|
676 | var topic = '**';
|
677 |
|
678 | var neededTopics = [];
|
679 | devices.forEach(function(device, idx) {
|
680 | var server = (idx < 2) ? 'hub' : 'hub2';
|
681 | neededTopics.push(server + '/' + device.type + '/' + device.id + '/' + 'state');
|
682 | neededTopics.push(server + '/' + device.type + '/' + device.id + '/' + 'logs');
|
683 | });
|
684 |
|
685 | ws.on('open', function() {
|
686 | var msg = { type: 'subscribe', topic: topic };
|
687 | ws.send(JSON.stringify(msg));
|
688 | ws.on('message', function(buffer) {
|
689 | var json = JSON.parse(buffer);
|
690 | if(json.type === 'subscribe-ack') {
|
691 | assert.equal(json.type, 'subscribe-ack');
|
692 | assert(json.timestamp);
|
693 | assert.equal(json.topic, topic);
|
694 | assert(json.subscriptionId);
|
695 | subscriptionId = json.subscriptionId;
|
696 |
|
697 | setTimeout(function() {
|
698 | devices[0].call('change');
|
699 | devices[1].call('change');
|
700 | devices[2].call('change');
|
701 | devices[3].call('change');
|
702 | }, 250);
|
703 | } else {
|
704 | assert.equal(json.type, 'event');
|
705 | assert(json.timestamp);
|
706 | assert(json.topic);
|
707 | assert.equal(json.subscriptionId, subscriptionId);
|
708 | assert(json.data);
|
709 | var idx = neededTopics.indexOf(json.topic);
|
710 | assert.notEqual(idx, -1);
|
711 | neededTopics.splice(idx, 1);
|
712 | if (neededTopics.length === 0) {
|
713 | done();
|
714 | }
|
715 | }
|
716 | });
|
717 | });
|
718 | ws.on('error', done);
|
719 | });
|
720 |
|
721 | it('wildcard topic ** will subscribe to all topics for single hub', function(done) {
|
722 | var endpoint = urls[1];
|
723 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
724 | var subscriptionId = null;
|
725 | var topic = '**';
|
726 |
|
727 | var neededTopics = [];
|
728 | for (var i=0; i<2; i++) {
|
729 | var device = devices[i];
|
730 | neededTopics.push('hub/' + device.type + '/' + device.id + '/' + 'state');
|
731 | neededTopics.push('hub/' + device.type + '/' + device.id + '/' + 'logs');
|
732 | }
|
733 |
|
734 | ws.on('open', function() {
|
735 | var msg = { type: 'subscribe', topic: topic };
|
736 | ws.send(JSON.stringify(msg));
|
737 | ws.on('message', function(buffer) {
|
738 | var json = JSON.parse(buffer);
|
739 | if(json.type === 'subscribe-ack') {
|
740 | assert.equal(json.type, 'subscribe-ack');
|
741 | assert(json.timestamp);
|
742 | assert.equal(json.topic, topic);
|
743 | assert(json.subscriptionId);
|
744 | subscriptionId = json.subscriptionId;
|
745 |
|
746 | setTimeout(function() {
|
747 | devices[0].call('change');
|
748 | devices[1].call('change');
|
749 | devices[2].call('change');
|
750 | devices[3].call('change');
|
751 | }, 50);
|
752 | } else {
|
753 | assert.equal(json.type, 'event');
|
754 | assert(json.timestamp);
|
755 | assert(json.topic);
|
756 | assert.equal(json.subscriptionId, subscriptionId);
|
757 | assert(json.data);
|
758 | var idx = neededTopics.indexOf(json.topic);
|
759 | assert.notEqual(idx, -1);
|
760 | neededTopics.splice(idx, 1);
|
761 | if (neededTopics.length === 0) {
|
762 | done();
|
763 | }
|
764 | }
|
765 | });
|
766 | });
|
767 | ws.on('error', done);
|
768 | });
|
769 |
|
770 | itBoth('wildcard topic for single peer receives all messages for all topics', function(idx, done) {
|
771 | var endpoint = urls[idx];
|
772 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
773 | var subscriptionId = null;
|
774 | var count = 0;
|
775 | var topic = 'hub/testdriver/*/state';
|
776 | var lastTopic = null;
|
777 | ws.on('open', function() {
|
778 | var msg = { type: 'subscribe', topic: topic };
|
779 | ws.send(JSON.stringify(msg));
|
780 | ws.on('message', function(buffer) {
|
781 | var json = JSON.parse(buffer);
|
782 | if(json.type === 'subscribe-ack') {
|
783 | assert.equal(json.type, 'subscribe-ack');
|
784 | assert(json.timestamp);
|
785 | assert.equal(json.topic, topic);
|
786 | assert(json.subscriptionId);
|
787 | subscriptionId = json.subscriptionId;
|
788 |
|
789 | setTimeout(function() {
|
790 | devices[0].call('change');
|
791 | devices[1].call('change');
|
792 | }, 50);
|
793 | } else {
|
794 | assert.equal(json.type, 'event');
|
795 | assert(json.timestamp);
|
796 | assert(json.topic);
|
797 | assert.notEqual(json.topic, lastTopic);
|
798 | lastTopic = json.topic;
|
799 | assert.equal(json.subscriptionId, subscriptionId);
|
800 | assert(json.data);
|
801 | count++;
|
802 | if(count === 2) {
|
803 | done();
|
804 | }
|
805 | }
|
806 | });
|
807 | });
|
808 | ws.on('error', done);
|
809 | });
|
810 |
|
811 | itBoth('wildcard topic for device id and stream types receives all messages for all topics', function(idx, done) {
|
812 | var endpoint = urls[idx];
|
813 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
814 | var subscriptionId = null;
|
815 | var count = 0;
|
816 | var topic = 'hub/testdriver/**';
|
817 | var lastTopic = null;
|
818 | ws.on('open', function() {
|
819 | var msg = { type: 'subscribe', topic: topic };
|
820 | ws.send(JSON.stringify(msg));
|
821 | ws.on('message', function(buffer) {
|
822 | var json = JSON.parse(buffer);
|
823 | if(json.type === 'subscribe-ack') {
|
824 | assert.equal(json.type, 'subscribe-ack');
|
825 | assert(json.timestamp);
|
826 | assert.equal(json.topic, topic);
|
827 | assert(json.subscriptionId);
|
828 | subscriptionId = json.subscriptionId;
|
829 |
|
830 | setTimeout(function() {
|
831 | devices[0].call('change');
|
832 | devices[1].call('change');
|
833 | }, 50);
|
834 | } else {
|
835 | assert.equal(json.type, 'event');
|
836 | assert(json.timestamp);
|
837 | assert(json.topic);
|
838 | assert.notEqual(json.topic, lastTopic);
|
839 | lastTopic = json.topic;
|
840 | assert.equal(json.subscriptionId, subscriptionId);
|
841 | assert(json.data);
|
842 | count++;
|
843 | if(count === 4) {
|
844 | done();
|
845 | }
|
846 | }
|
847 | });
|
848 | });
|
849 | ws.on('error', done);
|
850 | });
|
851 |
|
852 | itBoth('**/led/<device_id>/state will match valid topic', function(idx, done) {
|
853 | var endpoint = urls[idx];
|
854 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
855 | var subscriptionId = null;
|
856 | var topic = validTopics[0].replace('hub/', '**/');
|
857 | ws.on('open', function() {
|
858 | var msg = { type: 'subscribe', topic: topic };
|
859 | ws.send(JSON.stringify(msg));
|
860 | ws.on('message', function(buffer) {
|
861 | var json = JSON.parse(buffer);
|
862 | if(json.type === 'subscribe-ack') {
|
863 | assert.equal(json.type, 'subscribe-ack');
|
864 | assert(json.timestamp);
|
865 | assert.equal(json.topic, topic);
|
866 | assert(json.subscriptionId);
|
867 | subscriptionId = json.subscriptionId;
|
868 |
|
869 | setTimeout(function() {
|
870 | devices[1].call('change');
|
871 | devices[0].call('change');
|
872 | }, 50);
|
873 | } else {
|
874 | assert.equal(json.type, 'event');
|
875 | assert(json.timestamp);
|
876 | assert.equal(json.topic, validTopics[0]);
|
877 | assert.equal(json.subscriptionId, subscriptionId);
|
878 | assert(json.data);
|
879 | done();
|
880 | }
|
881 | });
|
882 | });
|
883 | ws.on('error', done);
|
884 | });
|
885 |
|
886 | itBoth('**/<device_id>/state will match valid topic from device', function(idx, done) {
|
887 | var endpoint = urls[idx];
|
888 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
889 | var subscriptionId = null;
|
890 | var topic = validTopics[0].replace('hub/', '**/');
|
891 | ws.on('open', function() {
|
892 | var msg = { type: 'subscribe', topic: topic };
|
893 | ws.send(JSON.stringify(msg));
|
894 | ws.on('message', function(buffer) {
|
895 | var json = JSON.parse(buffer);
|
896 | if(json.type === 'subscribe-ack') {
|
897 | assert.equal(json.type, 'subscribe-ack');
|
898 | assert(json.timestamp);
|
899 | assert.equal(json.topic, topic);
|
900 | assert(json.subscriptionId);
|
901 | subscriptionId = json.subscriptionId;
|
902 |
|
903 | setTimeout(function() {
|
904 | devices[1].call('change');
|
905 | devices[0].call('change');
|
906 | }, 50);
|
907 | } else {
|
908 | assert.equal(json.type, 'event');
|
909 | assert(json.timestamp);
|
910 | assert.equal(json.topic, validTopics[0]);
|
911 | assert.equal(json.subscriptionId, subscriptionId);
|
912 | assert(json.data);
|
913 | done();
|
914 | }
|
915 | });
|
916 | });
|
917 | ws.on('error', done);
|
918 | });
|
919 |
|
920 | itBoth('**/state will match valid topic from device', function(idx, done) {
|
921 | var endpoint = urls[idx];
|
922 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
923 | var subscriptionId = null;
|
924 | var topic = validTopics[0].replace('hub/', '**/');
|
925 | ws.on('open', function() {
|
926 | var msg = { type: 'subscribe', topic: topic };
|
927 | ws.send(JSON.stringify(msg));
|
928 | ws.on('message', function(buffer) {
|
929 | var json = JSON.parse(buffer);
|
930 | if(json.type === 'subscribe-ack') {
|
931 | assert.equal(json.type, 'subscribe-ack');
|
932 | assert(json.timestamp);
|
933 | assert.equal(json.topic, topic);
|
934 | assert(json.subscriptionId);
|
935 | subscriptionId = json.subscriptionId;
|
936 |
|
937 | setTimeout(function() {
|
938 | devices[0].call('change');
|
939 | }, 50);
|
940 | } else {
|
941 | assert.equal(json.type, 'event');
|
942 | assert(json.timestamp);
|
943 | assert.equal(json.topic, validTopics[0]);
|
944 | assert.equal(json.subscriptionId, subscriptionId);
|
945 | assert(json.data);
|
946 | done();
|
947 | }
|
948 | });
|
949 | });
|
950 | ws.on('error', done);
|
951 | });
|
952 |
|
953 |
|
954 | itBoth('subscribing to logs topic on device will get properly formated response', function(idx, done) {
|
955 | var endpoint = urls[idx];
|
956 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
957 | var subscriptionId = null;
|
958 | var topic = 'hub/testdriver/*/logs';
|
959 | var lastTopic = null;
|
960 | ws.on('open', function() {
|
961 | var msg = { type: 'subscribe', topic: topic };
|
962 | ws.send(JSON.stringify(msg));
|
963 | ws.on('message', function(buffer) {
|
964 | var json = JSON.parse(buffer);
|
965 | if(json.type === 'subscribe-ack') {
|
966 | assert.equal(json.type, 'subscribe-ack');
|
967 | assert(json.timestamp);
|
968 | assert.equal(json.topic, topic);
|
969 | assert(json.subscriptionId);
|
970 | subscriptionId = json.subscriptionId;
|
971 | setTimeout(function() {
|
972 | devices[0].call('change');
|
973 | }, 50);
|
974 | } else {
|
975 | assert.equal(json.type, 'event');
|
976 | assert(json.timestamp);
|
977 | assert(json.topic);
|
978 | assert.notEqual(json.topic, lastTopic);
|
979 | lastTopic = json.topic;
|
980 | assert.equal(json.subscriptionId, subscriptionId);
|
981 | assert(json.data);
|
982 | assert.equal(json.data.transition, 'change');
|
983 | assert(!json.data.transitions);
|
984 | assert.deepEqual(json.data.input, []);
|
985 | assert(json.data.properties);
|
986 | assert(json.data.actions);
|
987 | done();
|
988 | }
|
989 | });
|
990 | });
|
991 | ws.on('error', done);
|
992 | });
|
993 |
|
994 |
|
995 | itBoth('topic that doesnt exist still opens stream', function(idx, done) {
|
996 | var endpoint = urls[idx];
|
997 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
998 | var topic = 'blah/foo/1/blah';
|
999 | ws.on('open', function() {
|
1000 | var msg = { type: 'subscribe', topic: topic };
|
1001 | ws.send(JSON.stringify(msg));
|
1002 | ws.on('message', function(buffer) {
|
1003 | var json = JSON.parse(buffer);
|
1004 | assert.equal(json.type, 'subscribe-ack');
|
1005 | assert(json.timestamp);
|
1006 | assert.equal(json.topic, topic);
|
1007 | assert(json.subscriptionId);
|
1008 | done();
|
1009 | });
|
1010 | });
|
1011 | ws.on('error', done);
|
1012 | });
|
1013 |
|
1014 | it('subscription cloud will get _peer/connect events from hub', function(done) {
|
1015 | var endpoint = urls[0];
|
1016 | var topic = 'hub/**';
|
1017 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1018 | ws.on('open', function() {
|
1019 | ws.send(JSON.stringify({ type: 'subscribe', topic: topic }));
|
1020 |
|
1021 | ws.on('message', function(buffer) {
|
1022 | var json = JSON.parse(buffer);
|
1023 | if (json.type === 'subscribe-ack') {
|
1024 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
1025 | z.name('some-peer');
|
1026 | z.link('http://' + urls[1]);
|
1027 | z.silent();
|
1028 | z.listen(0);
|
1029 | } else if (json.type === 'event'){
|
1030 | assert.equal(json.topic, 'hub/_peer/connect');
|
1031 | assert.equal(json.data.id, 'some-peer');
|
1032 | done();
|
1033 | }
|
1034 | });
|
1035 |
|
1036 | });
|
1037 | });
|
1038 |
|
1039 | itBoth('subscription to non existent hub does not return data for that subscriptionId', function(idx, done) {
|
1040 | var endpoint = urls[idx];
|
1041 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1042 | var validTopic = validTopics[0];
|
1043 | var invalidTopic = validTopic.replace('hub/', 'notahub/');
|
1044 | var invalidSubscriptionId = null;
|
1045 |
|
1046 | ws.on('open', function() {
|
1047 | ws.send(JSON.stringify({ type: 'subscribe', topic: invalidTopic }));
|
1048 | ws.send(JSON.stringify({ type: 'subscribe', topic: validTopic }));
|
1049 |
|
1050 | ws.on('message', function(buffer) {
|
1051 | var json = JSON.parse(buffer);
|
1052 | if (json.type === 'subscribe-ack') {
|
1053 | if (json.topic === invalidTopic) {
|
1054 | invalidSubscriptionId = json.subscriptionId;
|
1055 | }
|
1056 | setTimeout(function() {
|
1057 | devices[0].call('change');
|
1058 | }, 50)
|
1059 | } else {
|
1060 | assert.notEqual(json.subscriptionId, invalidSubscriptionId);
|
1061 | done();
|
1062 | }
|
1063 | });
|
1064 | });
|
1065 | ws.on('error', done);
|
1066 | });
|
1067 |
|
1068 | itBoth('wildcard and specific topic will each publish a message on a subscription', function(idx, done) {
|
1069 | var endpoint = urls[idx];
|
1070 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1071 | var subscriptionId = null;
|
1072 | var count = 0;
|
1073 | var ackCount = 0;
|
1074 | var topicOne = validTopics[0];
|
1075 | var topicTwo = 'hub/testdriver/*/state';
|
1076 | ws.on('open', function() {
|
1077 | var msgOne = { type: 'subscribe', topic: topicOne };
|
1078 | var msgTwo = { type: 'subscribe', topic: topicTwo };
|
1079 | ws.send(JSON.stringify(msgOne));
|
1080 | ws.send(JSON.stringify(msgTwo));
|
1081 | ws.on('message', function(buffer) {
|
1082 | var json = JSON.parse(buffer);
|
1083 | if(json.type === 'subscribe-ack') {
|
1084 | assert.equal(json.type, 'subscribe-ack');
|
1085 | assert(json.timestamp);
|
1086 | assert(json.topic);
|
1087 | assert(json.subscriptionId);
|
1088 | subscriptionId = json.subscriptionId;
|
1089 | ackCount++;
|
1090 | setTimeout(function() {
|
1091 | for(var i=0; i<11; i++) {
|
1092 | devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
|
1093 | }
|
1094 | }, 50);
|
1095 | } else {
|
1096 | assert.equal(json.type, 'event');
|
1097 | assert(json.timestamp);
|
1098 | assert(json.topic);
|
1099 | assert(json.subscriptionId);
|
1100 | count++;
|
1101 | if(count === 2) {
|
1102 | assert.equal(ackCount, 2);
|
1103 | done();
|
1104 | }
|
1105 | }
|
1106 | });
|
1107 | });
|
1108 | ws.on('error', done);
|
1109 | });
|
1110 |
|
1111 | itBoth('adding limit to subscription should limit number of messages received', function(idx, done){
|
1112 | var endpoint = urls[idx];
|
1113 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1114 | var subscriptionId = null;
|
1115 | var count = 0;
|
1116 | var topic = validTopics[0];
|
1117 | var data = null;
|
1118 | ws.on('open', function() {
|
1119 | var msg = { type: 'subscribe', topic: topic, limit: 10 };
|
1120 | ws.send(JSON.stringify(msg));
|
1121 | ws.on('message', function(buffer) {
|
1122 | var json = JSON.parse(buffer);
|
1123 | if(json.type === 'subscribe-ack') {
|
1124 | assert.equal(json.type, 'subscribe-ack');
|
1125 | assert(json.timestamp);
|
1126 | assert(json.topic);
|
1127 | assert(json.subscriptionId);
|
1128 | subscriptionId = json.subscriptionId;
|
1129 |
|
1130 | setTimeout(function() {
|
1131 | for(var i=0; i<15; i++) {
|
1132 | devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
|
1133 | }
|
1134 | }, 50);
|
1135 | } else if (json.type !== 'unsubscribe-ack') {
|
1136 | assert.equal(json.type, 'event');
|
1137 | assert(json.timestamp);
|
1138 | assert(json.topic);
|
1139 | assert(json.subscriptionId, subscriptionId);
|
1140 | assert(json.data);
|
1141 |
|
1142 | count++;
|
1143 | if(count === 10) {
|
1144 | setTimeout(function() {
|
1145 | assert.equal(count, 10);
|
1146 | done();
|
1147 | }, 200)
|
1148 | }
|
1149 | }
|
1150 | });
|
1151 | });
|
1152 | ws.on('error', done);
|
1153 | });
|
1154 |
|
1155 | itBoth('when limit is reached a unsubscribe-ack should be received', function(idx, done){
|
1156 | var endpoint = urls[idx];
|
1157 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1158 | var subscriptionId = null;
|
1159 | var count = 0;
|
1160 | var topic = validTopics[0];
|
1161 | var data = null;
|
1162 | ws.on('open', function() {
|
1163 | var msg = { type: 'subscribe', topic: topic, limit: 10 };
|
1164 | ws.send(JSON.stringify(msg));
|
1165 | ws.on('message', function(buffer) {
|
1166 | var json = JSON.parse(buffer);
|
1167 | if(json.type === 'subscribe-ack') {
|
1168 | assert.equal(json.type, 'subscribe-ack');
|
1169 | assert(json.timestamp);
|
1170 | assert(json.topic);
|
1171 | assert(json.subscriptionId);
|
1172 | subscriptionId = json.subscriptionId;
|
1173 | setTimeout(function() {
|
1174 | for(var i=0; i<11; i++) {
|
1175 | devices[0].call((i % 2 === 0) ? 'change' : 'prepare');
|
1176 | }
|
1177 | }, 50);
|
1178 | } else if(json.type === 'event') {
|
1179 | assert.equal(json.type, 'event');
|
1180 | assert(json.timestamp);
|
1181 | assert(json.topic);
|
1182 | assert(json.subscriptionId, subscriptionId);
|
1183 | assert(json.data);
|
1184 | count++;
|
1185 | } else if(json.type === 'unsubscribe-ack') {
|
1186 | assert.equal(json.type, 'unsubscribe-ack');
|
1187 | assert(json.timestamp);
|
1188 | assert.equal(json.subscriptionId, subscriptionId);
|
1189 | assert.equal(count, 10);
|
1190 | done();
|
1191 | }
|
1192 | });
|
1193 | });
|
1194 | ws.on('error', done);
|
1195 | });
|
1196 |
|
1197 | itBoth('when limit is reached with a query selector a unsubscribe-ack should be received', function(idx, done){
|
1198 | var endpoint = urls[idx];
|
1199 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1200 | var subscriptionId = null;
|
1201 | var count = 0;
|
1202 | var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 5';
|
1203 | var data = null;
|
1204 | ws.on('open', function() {
|
1205 | var msg = { type: 'subscribe', topic: topic, limit: 10 };
|
1206 | ws.send(JSON.stringify(msg));
|
1207 | ws.on('message', function(buffer) {
|
1208 | var json = JSON.parse(buffer);
|
1209 | if(json.type === 'subscribe-ack') {
|
1210 | assert.equal(json.type, 'subscribe-ack');
|
1211 | assert(json.timestamp);
|
1212 | assert(json.topic);
|
1213 | assert(json.subscriptionId);
|
1214 | subscriptionId = json.subscriptionId;
|
1215 | setTimeout(function() {
|
1216 | for(var i=0; i<16; i++) {
|
1217 | devices[0].incrementStreamValue();
|
1218 | }
|
1219 | }, 50);
|
1220 | } else if(json.type === 'event') {
|
1221 | assert.equal(json.type, 'event');
|
1222 | assert(json.timestamp);
|
1223 | assert(json.topic);
|
1224 | assert(json.subscriptionId, subscriptionId);
|
1225 | assert(json.data);
|
1226 | count++;
|
1227 | } else if(json.type === 'unsubscribe-ack') {
|
1228 | assert.equal(json.type, 'unsubscribe-ack');
|
1229 | assert(json.timestamp);
|
1230 | assert.equal(json.subscriptionId, subscriptionId);
|
1231 | assert.equal(count, 10);
|
1232 | done();
|
1233 | }
|
1234 | });
|
1235 | });
|
1236 | ws.on('error', done);
|
1237 | });
|
1238 |
|
1239 | itBoth('query field selector should only return properties in selection', function(idx, done){
|
1240 | var endpoint = urls[idx];
|
1241 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1242 | var subscriptionId = null;
|
1243 | var count = 0;
|
1244 | var topic = 'hub/testdriver/' + devices[0].id + '/bar?select data where data >= 1';
|
1245 | ws.on('open', function() {
|
1246 | var msg = { type: 'subscribe', topic: topic };
|
1247 | ws.send(JSON.stringify(msg));
|
1248 | ws.on('message', function(buffer) {
|
1249 | var json = JSON.parse(buffer);
|
1250 | if(json.type === 'subscribe-ack') {
|
1251 | assert.equal(json.type, 'subscribe-ack');
|
1252 | assert(json.timestamp);
|
1253 | assert(json.topic);
|
1254 | assert(json.subscriptionId);
|
1255 | subscriptionId = json.subscriptionId;
|
1256 | setTimeout(function() {
|
1257 | devices[0].incrementStreamValue();
|
1258 | }, 50);
|
1259 | } else if(json.type === 'event') {
|
1260 | assert.equal(json.type, 'event');
|
1261 | assert(json.timestamp);
|
1262 | assert(json.topic);
|
1263 | assert(json.subscriptionId, subscriptionId);
|
1264 | assert(json.data);
|
1265 | done();
|
1266 | }
|
1267 | });
|
1268 | });
|
1269 | ws.on('error', done);
|
1270 | });
|
1271 |
|
1272 | itBoth('query field selector * should all properties in selection', function(idx, done){
|
1273 | var endpoint = urls[idx];
|
1274 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1275 | var subscriptionId = null;
|
1276 | var count = 0;
|
1277 | var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select * where data.val >= 2';
|
1278 | var data = { foo: 'bar', val: 2 };
|
1279 | ws.on('open', function() {
|
1280 | var msg = { type: 'subscribe', topic: topic };
|
1281 | ws.send(JSON.stringify(msg));
|
1282 | ws.on('message', function(buffer) {
|
1283 | var json = JSON.parse(buffer);
|
1284 | if(json.type === 'subscribe-ack') {
|
1285 | assert.equal(json.type, 'subscribe-ack');
|
1286 | assert(json.timestamp);
|
1287 | assert(json.topic);
|
1288 | assert(json.subscriptionId);
|
1289 | subscriptionId = json.subscriptionId;
|
1290 | setTimeout(function() {
|
1291 | devices[0].publishStreamObject(data);
|
1292 | }, 50);
|
1293 | } else if(json.type === 'event') {
|
1294 | assert(json.timestamp);
|
1295 | assert(json.topic);
|
1296 | assert(json.subscriptionId, subscriptionId);
|
1297 | assert(json.data);
|
1298 | assert.equal(json.data.val, 2);
|
1299 | assert.equal(json.data.foo, 'bar');
|
1300 | done();
|
1301 | }
|
1302 | });
|
1303 | });
|
1304 | ws.on('error', done);
|
1305 | });
|
1306 |
|
1307 | itBoth('query field selector should return only selected properties', function(idx, done){
|
1308 | var endpoint = urls[idx];
|
1309 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1310 | var subscriptionId = null;
|
1311 | var count = 0;
|
1312 | var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?select data.val';
|
1313 | var data = { foo: 'bar', val: 2 };
|
1314 | ws.on('open', function() {
|
1315 | var msg = { type: 'subscribe', topic: topic };
|
1316 | ws.send(JSON.stringify(msg));
|
1317 | ws.on('message', function(buffer) {
|
1318 | var json = JSON.parse(buffer);
|
1319 | if(json.type === 'subscribe-ack') {
|
1320 | assert.equal(json.type, 'subscribe-ack');
|
1321 | assert(json.timestamp);
|
1322 | assert(json.topic);
|
1323 | assert(json.subscriptionId);
|
1324 | subscriptionId = json.subscriptionId;
|
1325 | setTimeout(function() {
|
1326 | devices[0].publishStreamObject(data);
|
1327 | }, 50);
|
1328 | } else if(json.type === 'event') {
|
1329 | assert(json.timestamp);
|
1330 | assert(json.topic);
|
1331 | assert(json.subscriptionId, subscriptionId);
|
1332 | assert(json.data);
|
1333 | assert.equal(json.data.val, 2);
|
1334 | assert.equal(json.data.foo, undefined);
|
1335 | done();
|
1336 | }
|
1337 | });
|
1338 | });
|
1339 | ws.on('error', done);
|
1340 | });
|
1341 |
|
1342 | itBoth('subscribing to all ** and then unsubscribing followed by a peer connecting wont crash zetta', function(idx, done){
|
1343 | var endpoint = urls[idx];
|
1344 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1345 | var topic = '**';
|
1346 | ws.on('open', function() {
|
1347 | var msg = { type: 'subscribe', topic: topic };
|
1348 | ws.send(JSON.stringify(msg));
|
1349 | ws.on('message', function(buffer) {
|
1350 | var json = JSON.parse(buffer);
|
1351 | if(json.type === 'subscribe-ack') {
|
1352 | assert.equal(json.type, 'subscribe-ack');
|
1353 | assert(json.timestamp);
|
1354 | assert(json.topic);
|
1355 | assert(json.subscriptionId);
|
1356 | var msg = { type: 'unsubscribe', subscriptionId: json.subscriptionId };
|
1357 | ws.send(JSON.stringify(msg));
|
1358 | } else if(json.type === 'unsubscribe-ack') {
|
1359 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
1360 | z.silent();
|
1361 | z.name('some-new-peer')
|
1362 | z.link('http://' + urls[0]);
|
1363 | z.use(function(server) {
|
1364 | server.pubsub.subscribe('_peer/connect', function(topic, data) {
|
1365 | setTimeout(function() {
|
1366 | done();
|
1367 | }, 400);
|
1368 | });
|
1369 | });
|
1370 | z.listen(0);
|
1371 | }
|
1372 | });
|
1373 | });
|
1374 | ws.on('error', done);
|
1375 | });
|
1376 |
|
1377 | describe('Protocol Errors', function() {
|
1378 |
|
1379 | var makeTopicStringErrorsTest = function(topic) {
|
1380 | itBoth('invalid stream topic "' + topic + '" should result in a 400 error', function(idx, done){
|
1381 | var endpoint = urls[idx];
|
1382 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1383 | ws.on('open', function() {
|
1384 | var msg = { type: 'subscribe', topic: topic };
|
1385 | ws.send(JSON.stringify(msg));
|
1386 | ws.on('message', function(buffer) {
|
1387 | var json = JSON.parse(buffer);
|
1388 | assert(json.timestamp);
|
1389 | assert.equal(json.topic, topic);
|
1390 | assert.equal(json.code, 400);
|
1391 | assert(json.message);
|
1392 | done();
|
1393 | });
|
1394 | });
|
1395 | ws.on('error', done);
|
1396 | });
|
1397 | };
|
1398 |
|
1399 | makeTopicStringErrorsTest('*');
|
1400 | makeTopicStringErrorsTest('hub');
|
1401 | makeTopicStringErrorsTest('{hub.+}');
|
1402 | makeTopicStringErrorsTest('*/');
|
1403 | makeTopicStringErrorsTest('**/');
|
1404 | makeTopicStringErrorsTest('hub/');
|
1405 | makeTopicStringErrorsTest('{hub.+}/');
|
1406 |
|
1407 | itBoth('invalid stream query should result in a 400 error', function(idx, done){
|
1408 | var endpoint = urls[idx];
|
1409 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1410 | var subscriptionId = null;
|
1411 | var count = 0;
|
1412 | var topic = 'hub/testdriver/' + devices[0].id + '/fooobject?invalid stream query';
|
1413 | var data = { foo: 'bar', val: 2 };
|
1414 | ws.on('open', function() {
|
1415 | var msg = { type: 'subscribe', topic: topic };
|
1416 | ws.send(JSON.stringify(msg));
|
1417 | ws.on('message', function(buffer) {
|
1418 | var json = JSON.parse(buffer);
|
1419 | assert(json.timestamp);
|
1420 | assert.equal(json.topic, topic);
|
1421 | assert.equal(json.code, 400);
|
1422 | assert(json.message);
|
1423 | done();
|
1424 | });
|
1425 | });
|
1426 | ws.on('error', done);
|
1427 | });
|
1428 |
|
1429 | itBoth('invalid subscribe should result in a 400 error', function(idx, done){
|
1430 | var endpoint = urls[idx];
|
1431 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1432 | var subscriptionId = null;
|
1433 | var count = 0;
|
1434 | var topic = 'hub/testdriver/' + devices[0].id + '/fooobject';
|
1435 | ws.on('open', function() {
|
1436 | var msg = { type: 'subscribe' };
|
1437 | ws.send(JSON.stringify(msg));
|
1438 | ws.on('message', function(buffer) {
|
1439 | var json = JSON.parse(buffer);
|
1440 | assert(json.timestamp);
|
1441 | assert.equal(json.code, 400);
|
1442 | assert(json.message);
|
1443 | done();
|
1444 | });
|
1445 | });
|
1446 | ws.on('error', done);
|
1447 | });
|
1448 |
|
1449 | itBoth('unsubscribing from an invalid subscriptionId should result in a 405 error', function(idx, done){
|
1450 | var endpoint = urls[idx];
|
1451 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1452 | var subscriptionId = null;
|
1453 | var count = 0;
|
1454 | ws.on('open', function() {
|
1455 | var msg = { type: 'unsubscribe', subscriptionId: 123 };
|
1456 | ws.send(JSON.stringify(msg));
|
1457 | ws.on('message', function(buffer) {
|
1458 | var json = JSON.parse(buffer);
|
1459 | assert(json.timestamp);
|
1460 | assert.equal(json.code, 405);
|
1461 | assert(json.message);
|
1462 | done();
|
1463 | });
|
1464 | });
|
1465 | ws.on('error', done);
|
1466 | });
|
1467 |
|
1468 | itBoth('invalid type should result in a 405 error', function(idx, done){
|
1469 | var endpoint = urls[idx];
|
1470 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1471 | var subscriptionId = null;
|
1472 | var count = 0;
|
1473 | ws.on('open', function() {
|
1474 | var msg = { type: 'not-a-type', topic: '**' };
|
1475 | ws.send(JSON.stringify(msg));
|
1476 | ws.on('message', function(buffer) {
|
1477 | var json = JSON.parse(buffer);
|
1478 | assert(json.timestamp);
|
1479 | assert.equal(json.code, 405);
|
1480 | assert(json.message);
|
1481 | done();
|
1482 | });
|
1483 | });
|
1484 | ws.on('error', done);
|
1485 | });
|
1486 |
|
1487 | itBoth('unsubscribing from a missing subscriptionId should result in a 400 error', function(idx, done){
|
1488 | var endpoint = urls[idx];
|
1489 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1490 | var subscriptionId = null;
|
1491 | var count = 0;
|
1492 | ws.on('open', function() {
|
1493 | var msg = { type: 'unsubscribe' };
|
1494 | ws.send(JSON.stringify(msg));
|
1495 | ws.on('message', function(buffer) {
|
1496 | var json = JSON.parse(buffer);
|
1497 | assert(json.timestamp);
|
1498 | assert.equal(json.code, 400);
|
1499 | assert(json.message);
|
1500 | done();
|
1501 | });
|
1502 | });
|
1503 | ws.on('error', done);
|
1504 | });
|
1505 |
|
1506 | itBoth('on invalid message should result in a 400 error', function(idx, done){
|
1507 | var endpoint = urls[idx];
|
1508 | var ws = new WebSocket('ws://' + endpoint + baseUrl);
|
1509 | var subscriptionId = null;
|
1510 | var count = 0;
|
1511 | ws.on('open', function() {
|
1512 | var msg = { test: 123 };
|
1513 | ws.send(JSON.stringify(msg));
|
1514 | ws.on('message', function(buffer) {
|
1515 | var json = JSON.parse(buffer);
|
1516 | assert(json.timestamp);
|
1517 | assert.equal(json.code, 400);
|
1518 | assert(json.message);
|
1519 | done();
|
1520 | });
|
1521 | });
|
1522 | ws.on('error', done);
|
1523 | });
|
1524 |
|
1525 |
|
1526 | })
|
1527 |
|
1528 | });
|
1529 |
|
1530 | describe('SPDY API', function() {
|
1531 | });
|
1532 |
|
1533 | });
|