UNPKG

12.2 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var urlParse = require('url').parse;
4var WebSocket = require('ws');
5var WebSocketServer = WebSocket.Server;
6var request = require('supertest');
7var util = require('util');
8var Scout = require('../zetta_runtime').Scout;
9var zetta = require('../zetta');
10var mocks = require('./fixture/scout_test_mocks');
11var MockRegistry = require('./fixture/mem_registry');
12var PeerRegistry = require('./fixture/mem_peer_registry');
13var GoodDevice = require('./fixture/example_driver');
14
15var GoodScout = module.exports = function() {
16 this.count = 0;
17 this.interval = 5000;
18 Scout.call(this);
19};
20util.inherits(GoodScout, Scout);
21
22GoodScout.prototype.init = function(cb){
23 var query = this.server.where({type:'test', vendorId:'1234567'});
24 var self = this;
25 this.server.find(query, function(err, results){
26 if(!err) {
27 if(results.length) {
28 self.provision(results[0], GoodDevice);
29 }
30 }
31 });
32 cb();
33};
34
35describe('Event Websocket', function() {
36 var peerRegistry = null;
37 var registry = null;
38 var app = null;
39 var deviceUrl = null;
40 var deviceUrlHttp = null;
41 var device = null;
42 var port = null;
43
44 beforeEach(function(done) {
45 peerRegistry = new PeerRegistry();
46 registry = new MockRegistry();
47 registry.db.put('BC2832FD-9437-4473-A4A8-AC1D56B12C6F', {id:'BC2832FD-9437-4473-A4A8-AC1D56B12C6F',type:'test', vendorId:'1234567', foo:'foo', bar:'bar', name:'Test Device'}, {valueEncoding: 'json'}, function(err) {
48 if (err) {
49 done(err);
50 return;
51 }
52 app = zetta({registry: registry, peerRegistry: peerRegistry});
53 app.silent();
54 app.id = 'BC2832FD-9437-4473-A4A8-AC1D56B12C61';
55 app.use(GoodScout)
56 app.listen(0, function(err){
57 port = app.httpServer.server.address().port;
58 deviceUrl = 'localhost:' + port + '/servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/events?topic=testdriver/BC2832FD-9437-4473-A4A8-AC1D56B12C6F';
59 deviceUrlHttp = 'localhost:' + port + '/servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/devices/BC2832FD-9437-4473-A4A8-AC1D56B12C6F';
60 device = app.runtime._jsDevices['BC2832FD-9437-4473-A4A8-AC1D56B12C6F'];
61 done(err);
62 });
63 });
64 });
65
66 afterEach(function(done) {
67 app.httpServer.server.close();
68 done();
69 });
70
71
72 describe('Basic Connection', function() {
73
74 it('http resource should exist with statusCode 200', function(done) {
75 http.get('http://'+deviceUrlHttp, function(res) {
76 assert.equal(res.statusCode, 200);
77 done();
78 }).on('error', done);
79 });
80
81 it('websocket should connect', function(done) {
82 var url = 'ws://' + deviceUrl + '/bar';
83 var error = 0;
84 var open = false;
85 var socket = new WebSocket(url);
86 socket.on('open', function(err) {
87 open = true;
88 });
89 socket.on('close', function(err) {
90 open = false;
91 });
92 socket.on('error', function(err) {
93 error++;
94 });
95
96 setTimeout(function() {
97 socket.close();
98 assert.equal(error, 0);
99 assert.equal(open, true, 'ws should be opened');
100 done();
101 }, 20);
102 });
103
104 });
105
106 describe('Embedding a websocket server', function() {
107 var app = null;
108 var port = null;
109 var wss = null;
110
111 beforeEach(function(done) {
112 var peerRegistry = new PeerRegistry();
113 var registry = new MockRegistry();
114 app = zetta({registry: registry, peerRegistry: peerRegistry});
115 app.silent();
116 app.use(function(server) {
117 var server = server.httpServer.server;
118 wss = new WebSocketServer({server: server, path: '/foo'});
119 });
120 app.listen(0, function(err){
121 port = app.httpServer.server.address().port;
122 done(err);
123 });
124 });
125
126 it('can connect to the custom server', function(done) {
127 var ws = new WebSocket('ws://localhost:'+port+'/foo');
128 ws.on('open', function open() {
129 done();
130 });
131 });
132
133 it('will fire the connection event on the server', function(done) {
134 var ws = new WebSocket('ws://localhost:'+port+'/foo');
135 ws.on('open', function open() {
136 });
137 wss.on('connection', function(ws) {
138 done();
139 });
140 });
141
142 it('can send data down the server websocket', function(done) {
143 var ws = new WebSocket('ws://localhost:'+port+'/foo');
144 ws.on('open', function open() {
145 });
146
147 ws.on('message', function() {
148 done();
149 });
150 wss.on('connection', function(ws) {
151 ws.send('foo');
152 });
153 });
154
155 it('can send data up the server websocket', function(done) {
156 var ws = new WebSocket('ws://localhost:'+port+'/foo');
157 wss.on('connection', function(ws) {
158 ws.on('message', function() {
159 done();
160 });
161 });
162
163 ws.on('open', function open() {
164 ws.send('foo');
165 });
166 });
167
168 afterEach(function(done) {
169 app.httpServer.server.close();
170 done();
171 });
172 });
173
174 describe('Receive json messages', function() {
175
176 it('websocket should recv only one set of messages when reconnecting', function(done) {
177 var url = 'ws://' + deviceUrl + '/bar';
178
179 function openAndClose(cb) {
180 var s1 = new WebSocket(url);
181 s1.on('open', function(err) {
182 s1.close();
183 s1.on('close', function(){
184 cb();
185 });
186 });
187 }
188 openAndClose(function(){
189 var s2 = new WebSocket(url);
190 s2.on('open', function(err) {
191 var count = 0;
192 s2.on('message', function(buf, flags) {
193 count++;
194 });
195
196 setTimeout(function(){
197 device.incrementStreamValue();
198 }, 20)
199
200 setTimeout(function() {
201 if (count === 1) {
202 done();
203 } else {
204 throw new Error('Should have only recieved one message. ' + count);
205 }
206 }, 100);
207 });
208 });
209
210 return;
211 });
212
213
214 it('websocket should connect and recv data in json form', function(done) {
215 var url = 'ws://' + deviceUrl + '/bar';
216 var error = 0;
217 var open = false;
218 var socket = new WebSocket(url);
219 socket.on('open', function(err) {
220 open = true;
221 });
222 socket.on('close', function(err) {
223 open = false;
224 });
225 socket.on('error', function(err) {
226 error++;
227 });
228
229 setTimeout(function() {
230
231 assert.equal(error, 0);
232 assert.equal(open, true, 'ws should be opened');
233
234 var recv = 0;
235 var timer = null;
236 socket.on('message', function(buf, flags) {
237 var msg = JSON.parse(buf);
238 recv++;
239 assert(msg.timestamp);
240 assert(msg.topic);
241 assert.equal(msg.data, recv);
242 if (recv === 3) {
243 clearTimeout(timer);
244 socket.close();
245 done();
246 }
247 });
248
249 device.incrementStreamValue();
250 device.incrementStreamValue();
251 device.incrementStreamValue();
252
253 timer = setTimeout(function() {
254 assert.equal(recv, 3, 'should have received 3 messages');
255 socket.close();
256 done();
257 }, 100);
258
259 }, 20);
260 });
261
262 it('websocket should connect and recv device log events from property API updates', function(done) {
263 var url = 'ws://' + deviceUrl + '/logs';
264 var error = 0;
265 var open = false;
266 var socket = new WebSocket(url);
267 socket.on('open', function(err) {
268 open = true;
269
270 });
271 socket.on('close', function(err) {
272 open = false;
273 });
274 socket.on('error', function(err) {
275 error++;
276 });
277
278 setTimeout(function() {
279 assert.equal(error, 0);
280 assert.equal(open, true, 'ws should be opened');
281
282 deviceUrlHttp = 'http://' + deviceUrlHttp;
283 var parsed = urlParse(deviceUrlHttp);
284 var reqOpts = {
285 hostname: 'localhost',
286 port: parseInt(parsed.port),
287 method: 'PUT',
288 path: parsed.path,
289 headers: {
290 'Content-Type': 'application/json'
291 }
292 }
293
294 var req = http.request(reqOpts);
295 req.write(JSON.stringify({ fu: 'bar' }));
296 req.end();
297 var recv = 0;
298 var timer = null;
299 socket.on('message', function(buf, flags) {
300 var msg = JSON.parse(buf);
301 recv++;
302 assert(msg.timestamp);
303 assert(msg.topic);
304 assert.equal(msg.transition, 'zetta-properties-update');
305 assert.equal(msg.properties.fu, 'bar');
306 assert.equal(msg.properties.foo, 0);
307
308 if (recv === 1) {
309 clearTimeout(timer);
310 socket.close();
311 done();
312 }
313 });
314 }, 20);
315 });
316
317 it('websocket should connect and recv device log events', function(done) {
318 var url = 'ws://' + deviceUrl + '/logs';
319 var error = 0;
320 var open = false;
321 var socket = new WebSocket(url);
322 socket.on('open', function(err) {
323 open = true;
324 });
325 socket.on('close', function(err) {
326 open = false;
327 });
328 socket.on('error', function(err) {
329 error++;
330 });
331
332 setTimeout(function() {
333 assert.equal(error, 0);
334 assert.equal(open, true, 'ws should be opened');
335
336 var recv = 0;
337 var timer = null;
338 socket.on('message', function(buf, flags) {
339 var msg = JSON.parse(buf);
340 recv++;
341 assert(msg.timestamp);
342 assert(msg.topic);
343 assert(msg.actions.filter(function(action) {
344 return action.name === 'prepare';
345 }).length > 0);
346
347 assert.equal(msg.actions[0].href.replace('http://',''), deviceUrlHttp)
348
349 if (recv === 1) {
350 clearTimeout(timer);
351 socket.close();
352 done();
353 }
354 });
355
356 device.call('change');
357
358 timer = setTimeout(function() {
359 assert.equal(recv, 1, 'should have received 1 message');
360 socket.close();
361 done();
362 }, 100);
363 }, 20);
364 });
365
366 it('websocket should recv connect and disconnect message for /peer-management', function(done) {
367 var url = 'ws://localhost:' + port + '/peer-management';
368 var socket = new WebSocket(url);
369
370 socket.on('open', function(err) {
371 socket.once('message', function(buf, flags) {
372 var msg = JSON.parse(buf);
373 assert.equal(msg.topic, 'connect');
374 socket.once('message', function(buf, flags) {
375 var msg = JSON.parse(buf);
376 assert.equal(msg.topic, 'disconnect');
377 done();
378 });
379 app.pubsub.publish('_peer/disconnect', { topic: 'disconnect'});
380 });
381 app.pubsub.publish('_peer/connect', { topic: 'connect'});
382 });
383 socket.on('error', done);
384 });
385
386 });
387
388
389
390
391
392
393 describe('Receive binary messages', function() {
394
395 it('websocket should connect and recv data in binary form', function(done) {
396 var url = 'ws://' + deviceUrl + '/foobar';
397 var error = 0;
398 var open = false;
399 var socket = new WebSocket(url);
400 socket.on('open', function(err) {
401 open = true;
402 });
403 socket.on('close', function(err) {
404 open = false;
405 });
406 socket.on('error', function(err) {
407 error++;
408 });
409
410 setTimeout(function() {
411
412 assert.equal(error, 0);
413 assert.equal(open, true, 'ws should be opened');
414
415 var recv = 0;
416 var timer = null;
417 socket.on('message', function(buf, flags) {
418 assert(Buffer.isBuffer(buf));
419 assert(flags.binary);
420 recv++;
421 assert.equal(buf[0], recv);
422 if (recv === 3) {
423 clearTimeout(timer);
424 socket.close();
425 done();
426 }
427 });
428
429 device.incrementFooBar();
430 device.incrementFooBar();
431 device.incrementFooBar();
432
433 timer = setTimeout(function() {
434 assert.equal(recv, 3, 'should have received 3 messages');
435 socket.close();
436 done();
437 }, 100);
438
439 }, 20);
440 });
441
442 });
443
444
445
446});