UNPKG

11.4 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var urlParse = require('url').parse;
4var WebSocket = require('ws');
5var WebSocketServer = WebSocket.Server;
6var request = require('supertest');
7var util = require('util');
8var Scout = require('../zetta_runtime').Scout;
9var zetta = require('../zetta');
10var mocks = require('./fixture/scout_test_mocks');
11var MockRegistry = require('./fixture/mem_registry');
12var PeerRegistry = require('./fixture/mem_peer_registry');
13var GoodDevice = require('./fixture/example_driver');
14
15var GoodScout = module.exports = function() {
16 this.count = 0;
17 this.interval = 5000;
18 Scout.call(this);
19};
20util.inherits(GoodScout, Scout);
21
22GoodScout.prototype.init = function(cb){
23 var query = this.server.where({type:'test', vendorId:'1234567'});
24 var self = this;
25 this.server.find(query, function(err, results){
26 if(!err) {
27 if(results.length) {
28 self.provision(results[0], GoodDevice);
29 }
30 }
31 });
32 cb();
33};
34
35describe('Event Websocket', function() {
36 var peerRegistry = null;
37 var registry = null;
38 var app = null;
39 var deviceUrl = null;
40 var deviceUrlHttp = null;
41 var device = null;
42 var port = null;
43
44 beforeEach(function(done) {
45 peerRegistry = new PeerRegistry();
46 registry = new MockRegistry();
47 registry.db.put('BC2832FD-9437-4473-A4A8-AC1D56B12C6F', {id:'BC2832FD-9437-4473-A4A8-AC1D56B12C6F',type:'test', vendorId:'1234567', foo:'foo', bar:'bar', name:'Test Device'}, {valueEncoding: 'json'}, function(err) {
48 if (err) {
49 done(err);
50 return;
51 }
52 app = zetta({registry: registry, peerRegistry: peerRegistry});
53 app.silent();
54 app.name('BC2832FD-9437-4473-A4A8-AC1D56B12C61');
55 app.use(GoodScout);
56 app.listen(0, function(err){
57 port = app.httpServer.server.address().port;
58 deviceUrl = 'localhost:' + port + '/servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/events?topic=testdriver/BC2832FD-9437-4473-A4A8-AC1D56B12C6F';
59 deviceUrlHttp = 'localhost:' + port + '/servers/BC2832FD-9437-4473-A4A8-AC1D56B12C61/devices/BC2832FD-9437-4473-A4A8-AC1D56B12C6F';
60 device = app.runtime._jsDevices['BC2832FD-9437-4473-A4A8-AC1D56B12C6F'];
61 done(err);
62 });
63 });
64 });
65
66 afterEach(function(done) {
67 app.httpServer.server.close();
68 done();
69 });
70
71
72 describe('Basic Connection', function() {
73 this.timeout(6000);
74 it('http resource should exist with statusCode 200', function(done) {
75 http.get('http://'+deviceUrlHttp, function(res) {
76 assert.equal(res.statusCode, 200);
77 done();
78 }).on('error', done);
79 });
80
81 it('websocket should connect', function(done) {
82 var url = 'ws://' + deviceUrl + '/bar';
83 var socket = new WebSocket(url);
84
85 socket.on('open', function(err) {
86 socket.close();
87 done();
88 });
89 socket.on('error', done);
90 });
91
92 it('will return a 404 on non ws urls', function(done) {
93 var url = 'ws://localhost:' + port + '/not-a-endpoint';
94 var socket = new WebSocket(url);
95 socket.on('open', function(err) {
96 done(new Error('Should not be open.'));
97 });
98 socket.on('error', function(err) {
99 assert.equal(err.message, 'unexpected server response (404)');
100 done();
101 });
102 });
103
104 it('will return a 404 on non ws urls for /events123123', function(done) {
105 var url = 'ws://localhost:' + port + '/events123123';
106 var socket = new WebSocket(url);
107 socket.on('open', function(err) {
108 done(new Error('Should not be open.'));
109 });
110 socket.on('error', function(err) {
111 assert.equal(err.message, 'unexpected server response (404)');
112 done();
113 });
114 });
115
116
117 });
118
119 describe('Embedding a websocket server', function() {
120 this.timeout(6000);
121 var app = null;
122 var port = null;
123 var wss = null;
124
125 beforeEach(function(done) {
126 var peerRegistry = new PeerRegistry();
127 var registry = new MockRegistry();
128 app = zetta({registry: registry, peerRegistry: peerRegistry});
129 app.silent();
130 app.use(function(server) {
131 var server = server.httpServer.server;
132 wss = new WebSocketServer({server: server, path: '/foo'});
133 });
134 app.listen(0, function(err){
135 port = app.httpServer.server.address().port;
136 done(err);
137 });
138 });
139
140 it('can connect to the custom server', function(done) {
141 var ws = new WebSocket('ws://localhost:'+port+'/foo');
142 ws.on('open', function open() {
143 done();
144 });
145 });
146
147 it('will fire the connection event on the server', function(done) {
148 var ws = new WebSocket('ws://localhost:'+port+'/foo');
149 ws.on('open', function open() {
150 });
151 wss.on('connection', function(ws) {
152 done();
153 });
154 });
155
156 it('can send data down the server websocket', function(done) {
157 var ws = new WebSocket('ws://localhost:'+port+'/foo');
158 ws.on('open', function open() {
159 });
160
161 ws.on('message', function() {
162 done();
163 });
164 wss.on('connection', function(ws) {
165 ws.send('foo');
166 });
167 });
168
169 it('can send data up the server websocket', function(done) {
170 var ws = new WebSocket('ws://localhost:'+port+'/foo');
171 wss.on('connection', function(ws) {
172 ws.on('message', function() {
173 done();
174 });
175 });
176
177 ws.on('open', function open() {
178 ws.send('foo');
179 });
180 });
181
182 it('will return a 404 on non ws urls', function(done) {
183 var url = 'ws://localhost:' + port + '/not-a-endpoint';
184 var socket = new WebSocket(url);
185 socket.on('open', function(err) {
186 done(new Error('Should not be open.'));
187 });
188 socket.on('error', function(err) {
189 assert.equal(err.message, 'unexpected server response (404)');
190 done();
191 });
192 });
193
194 afterEach(function(done) {
195 app.httpServer.server.close();
196 done();
197 });
198 });
199
200 describe('Receive json messages', function() {
201
202 it('websocket should recv only one set of messages when reconnecting', function(done) {
203 var url = 'ws://' + deviceUrl + '/bar';
204
205 function openAndClose(cb) {
206 var s1 = new WebSocket(url);
207 s1.on('open', function(err) {
208 s1.close();
209 s1.on('close', function(){
210 cb();
211 });
212 });
213 }
214 openAndClose(function(){
215 var s2 = new WebSocket(url);
216 s2.on('open', function(err) {
217 s2.on('message', function(buf, flags) {
218 done();
219 });
220
221 setTimeout(function(){
222 device.incrementStreamValue();
223 }, 20)
224 });
225 });
226
227 return;
228 });
229
230
231 it('websocket should connect and recv data in json form', function(done) {
232 var url = 'ws://' + deviceUrl + '/bar';
233 var socket = new WebSocket(url);
234
235 socket.on('open', function(err) {
236 var recv = 0;
237 socket.on('message', function(buf, flags) {
238 var msg = JSON.parse(buf);
239 recv++;
240 assert(msg.timestamp);
241 assert(msg.topic);
242 assert.equal(msg.data, recv);
243 if (recv === 3) {
244 socket.close();
245 done();
246 }
247 });
248
249 device.incrementStreamValue();
250 device.incrementStreamValue();
251 device.incrementStreamValue();
252 });
253 socket.on('error', done);
254 });
255
256 it('websocket should connect and recv device log events from property API updates', function(done) {
257 var url = 'ws://' + deviceUrl + '/logs';
258 var socket = new WebSocket(url);
259 socket.on('open', function(err) {
260 deviceUrlHttp = 'http://' + deviceUrlHttp;
261 var parsed = urlParse(deviceUrlHttp);
262 var reqOpts = {
263 hostname: 'localhost',
264 port: parseInt(parsed.port),
265 method: 'PUT',
266 path: parsed.path,
267 headers: {
268 'Content-Type': 'application/json'
269 }
270 }
271
272 var req = http.request(reqOpts);
273 req.write(JSON.stringify({ fu: 'bar' }));
274 req.end();
275 var recv = 0;
276 socket.on('message', function(buf, flags) {
277 var msg = JSON.parse(buf);
278 recv++;
279 assert(msg.timestamp);
280 assert(msg.topic);
281 assert.equal(msg.transition, 'zetta-properties-update');
282 assert.equal(msg.properties.fu, 'bar');
283 assert.equal(msg.properties.foo, 0);
284
285 if (recv === 1) {
286 socket.close();
287 done();
288 }
289 });
290 });
291 socket.on('error', done);
292 });
293
294 it('websocket should connect and recv device log events', function(done) {
295 var url = 'ws://' + deviceUrl + '/logs';
296 var socket = new WebSocket(url);
297
298 socket.on('open', function(err) {
299 var recv = 0;
300 socket.on('message', function(buf, flags) {
301 var msg = JSON.parse(buf);
302 recv++;
303
304 assert(msg.timestamp);
305 assert(msg.topic);
306 assert(msg.actions.filter(function(action) {
307 return action.name === 'prepare';
308 }).length > 0);
309
310 assert.equal(msg.actions[0].href.replace('http://',''), deviceUrlHttp)
311
312 if (recv === 1) {
313 socket.close();
314 done();
315 }
316 });
317
318 device.call('change');
319 });
320 });
321
322 it('websocket should recv connect and disconnect message for /peer-management', function(done) {
323 var url = 'ws://localhost:' + port + '/peer-management';
324 var socket = new WebSocket(url);
325 var peer = null;
326
327 socket.on('open', function(err) {
328 socket.once('message', function(buf, flags) {
329 var msg = JSON.parse(buf);
330 assert.equal(msg.topic, '_peer/connect');
331 assert(msg.timestamp);
332 assert.equal(msg.data.id, 'some-peer');
333 assert(msg.data.connectionId);
334 assert.equal(Object.keys(msg).length, 3);
335
336 socket.once('message', function(buf, flags) {
337 var msg = JSON.parse(buf);
338 assert.equal(msg.topic, '_peer/disconnect');
339 assert(msg.timestamp);
340 assert.equal(msg.data.id, 'some-peer');
341 assert(msg.data.connectionId);
342 assert.equal(Object.keys(msg).length, 3);
343 done();
344 });
345
346 // disconnect
347 peer._peerClients[0].close();
348 });
349 peer = zetta({registry: new MockRegistry(), peerRegistry: new PeerRegistry() });
350 peer.name('some-peer');
351 peer.silent();
352 peer.link('http://localhost:' + port);
353 peer.listen(0);
354 });
355 socket.on('error', done);
356 });
357 });
358
359
360
361
362
363
364 describe('Receive binary messages', function() {
365
366 it('websocket should connect and recv data in binary form', function(done) {
367 var url = 'ws://' + deviceUrl + '/foobar';
368 var socket = new WebSocket(url);
369 socket.on('open', function(err) {
370 var recv = 0;
371 socket.on('message', function(buf, flags) {
372 assert(Buffer.isBuffer(buf));
373 assert(flags.binary);
374 recv++;
375 assert.equal(buf[0], recv);
376 if (recv === 3) {
377 socket.close();
378 done();
379 }
380 });
381
382 device.incrementFooBar();
383 device.incrementFooBar();
384 device.incrementFooBar();
385 });
386 socket.on('error', done);
387 });
388
389 });
390
391
392
393});