UNPKG

12.3 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var zetta = require('../zetta');
4var MemRegistry = require('./fixture/mem_registry');
5var MemPeerRegistry = require('./fixture/mem_peer_registry');
6var request = require('supertest');
7var PeerRegistry = require('../lib/peer_registry');
8var Query = require('calypso').Query;
9var querystring = require('querystring');
10
11function deleteRequest(port, connectionId) {
12 var opts = {
13 host: 'localhost',
14 port: port,
15 method: 'DELETE',
16 path: '/peer-management/' + connectionId
17 }
18
19 var req = http.request(opts);
20 req.end();
21}
22
23function putRequest(port, connectionId, url) {
24 var qs = {
25 url: url
26 };
27 var string = querystring.stringify(qs);
28 var opts = {
29 host: 'localhost',
30 port: port,
31 method: 'PUT',
32 path: '/peer-management/' + connectionId,
33 headers: {
34 'Content-Length': string.length
35 }
36 };
37
38 var req = http.request(opts);
39 req.write(string);
40 req.end();
41}
42
43
44function getHttpServer(app) {
45 return app.httpServer.server;
46}
47
48function getBody(fn) {
49 return function(res) {
50 try {
51 if(res.text) {
52 var body = JSON.parse(res.text);
53 } else {
54 var body = '';
55 }
56 } catch(err) {
57 throw new Error('Failed to parse json body');
58 }
59
60 fn(res, body);
61 }
62}
63
64describe('Peer Connection API', function() {
65 describe('/peer-management embedded entities', function() {
66 var peerRegistry = null;
67 var app = null;
68
69 beforeEach(function(done) {
70 peerRegistry = new MemPeerRegistry();
71 app = zetta({ registry: new MemRegistry(), peerRegistry: peerRegistry })
72 .silent()
73 .name('local')
74 ._run(done);
75 });
76
77 checkPeersActionsForState('initiator', 'connected', 2);
78 checkPeersActionsForState('initiator', 'disconnected', 2);
79 checkPeersActionsForState('acceptor', 'connected', 2);
80 checkPeersActionsForState('acceptor', 'disconnected', 0);
81
82 function checkPeersActionsForState(direction, status, numberOfActionsExpected) {
83 it('exposes ' + numberOfActionsExpected + ' actions on the embedded entity when ' + status + ' and ' + direction, function(done) {
84
85 var peer = {
86 id:'foo',
87 connectionId:'12345',
88 direction: direction,
89 status: status
90 };
91
92 peerRegistry.save(peer, function() {
93 var url = '/peer-management';
94 request(getHttpServer(app))
95 .get(url)
96 .expect(getBody(function(res, body) {
97 assert.equal(body.entities.length, 1);
98 assert.equal(body.entities[0].actions.length, numberOfActionsExpected);
99 body.entities[0].actions.forEach(function(action) {
100 assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
101 })
102 }))
103 .end(done);
104 });
105 });
106 }
107
108 checkPeersActionsOnFullEntity('initiator', 'connected', 2);
109 checkPeersActionsOnFullEntity('initiator', 'disconnected', 2);
110 checkPeersActionsOnFullEntity('acceptor', 'connected', 2);
111 checkPeersActionsOnFullEntity('acceptor', 'disconnected', 0);
112
113 function checkPeersActionsOnFullEntity(direction, status, numberOfActionsExpected) {
114 it('when ' + direction + ' exposes ' + numberOfActionsExpected + ' actions on the full entity when ' + status, function(done) {
115 var peer = {
116 id:'foo',
117 connectionId:'12345',
118 direction: direction,
119 status: status
120 };
121 peerRegistry.save(peer, function() {
122 var url = '/peer-management/foo';
123 request(getHttpServer(app))
124 .get(url)
125 .expect(getBody(function(res, body) {
126 assert.equal(body.actions.length, numberOfActionsExpected);
127 body.actions.forEach(function(action) {
128 assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
129 });
130 }))
131 .end(done);
132 });
133 });
134 }
135 });
136
137 describe('Root API for peers', function() {
138 var cloud = null;
139 var cloudUrl = null;
140 var cloudPort = null;
141 var db1 = null;
142 var db2 = null;
143
144 beforeEach(function(done) {
145
146 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
147 cloud.name('cloud');
148 cloud.silent();
149
150 cloud.listen(0, function(err) {
151 if(err) {
152 return done(err);
153 }
154
155 cloudPort = cloud.httpServer.server.address().port;
156 cloudUrl = 'http://localhost:' + cloudPort;
157 done();
158 });
159 });
160
161 afterEach(function(done) {
162 cloud.httpServer.server.close();
163 done();
164 });
165
166 it('will have rel of server on peer', function(done) {
167 this.timeout(10000);
168 var connected = false;
169 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
170 z.name('local');
171
172 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
173 setImmediate(function() {
174 var url = '/';
175 request(getHttpServer(cloud))
176 .get(url)
177 .expect(getBody(function(res, body) {
178 var peerLinks = body.links.filter(function(link) { return link.rel.indexOf('http://rels.zettajs.io/peer') !== -1; });
179 var peerLink = peerLinks[0];
180 assert.ok(peerLink.rel.indexOf('http://rels.zettajs.io/server') !== -1);
181 }))
182 .end(done);
183 });
184 });
185
186 z.silent();
187 z.link(cloudUrl);
188 z.listen(0);
189
190
191 });
192
193 });
194
195 describe('/peer-management disconnection API', function() {
196 var cloud = null;
197 var cloudUrl = null;
198 var cloudPort = null;
199 var db1 = null;
200 var db2 = null;
201
202 beforeEach(function(done) {
203
204 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
205 cloud.name('cloud');
206 cloud.silent();
207
208 cloud.listen(0, function(err) {
209 if(err) {
210 return done(err);
211 }
212
213 cloudPort = cloud.httpServer.server.address().port;
214 cloudUrl = 'http://localhost:' + cloudPort;
215 done();
216 });
217 });
218
219 afterEach(function(done) {
220 cloud.httpServer.server.close();
221 done();
222 });
223
224 it('will return 404 if connection does not exist', function(done) {
225 var url = '/peer-management/1234';
226 request(getHttpServer(cloud))
227 .del(url)
228 .expect(404, done);
229 });
230
231 it('will proxy a disconnection between two peers', function(done) {
232 //Had to increase the timeout. The standard two seconds may not be long enough for a connection to be established.
233 this.timeout(10000);
234 var connected = false;
235 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
236 z.name('local');
237 var connectionId = null;
238
239 z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
240 assert.equal(connectionId, data.peer.connectionId);
241 done();
242 });
243
244 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
245 assert.equal(connected, true);
246 connectionId = data.peer.connectionId;
247 deleteRequest(cloudPort, connectionId);
248 });
249 z.pubsub.subscribe('_peer/connect', function(topic, data) {
250 connected = true;
251 });
252
253 z.silent();
254 z.link(cloudUrl);
255 z.listen(0);
256
257
258 });
259
260 it('will disconnect two peers', function(done) {
261 this.timeout(10000);
262 var connected = false;
263 var localPort = null;
264 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
265 z.name('local');
266
267 var connectionId = null;
268
269 z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
270 assert.equal(connectionId, data.peer.connectionId);
271 done();
272 });
273
274 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
275 assert.equal(connected, true);
276 connectionId = data.peer.connectionId;
277 deleteRequest(localPort, connectionId);
278 });
279
280 z.pubsub.subscribe('_peer/connect', function(topic, data) {
281 connected = true;
282 });
283
284 z.silent();
285 z.link(cloudUrl);
286 z.listen(0, function(err) {
287 if(err) {
288 done(err);
289 }
290
291 localPort = z.httpServer.server.address().port;
292 });
293
294 });
295 });
296
297 describe('/peer-management update API', function() {
298 var cloud = null;
299 var localOne = null;
300 var cloudPort = null;
301 var localOnePort = null;
302 var connectionId = null;
303
304
305 beforeEach(function(done) {
306 this.timeout(10000);
307 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
308 cloud.name('cloud');
309 cloud.silent();
310
311 localOne = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
312 localOne.name('localOne');
313 localOne.silent();
314
315 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
316 connectionId = data.peer.connectionId;
317 done();
318 });
319
320 cloud.listen(0, function(err) {
321 if(err) {
322 return done(err);
323 }
324
325 cloudPort = cloud.httpServer.server.address().port;
326 var cloudUrl = 'http://localhost:' + cloudPort;
327
328 localOne.link(cloudUrl);
329 localOne.listen(0, function(err) {
330 if(err) {
331 done(err);
332 }
333
334 localPort = localOne.httpServer.server.address().port;
335 });
336 });
337 });
338
339 afterEach(function(done) {
340 cloud.httpServer.server.close();
341 localOne.httpServer.server.close();
342 done();
343 });
344
345 it('will return 404 if connection does not exist', function(done) {
346 var url = '/peer-management/1234';
347 request(getHttpServer(cloud))
348 .put(url)
349 .set('Content-Type', 'application/x-www-form-urlencoded')
350 .send({ url: 'http://localhost:1234' })
351 .expect(404, done);
352 });
353
354 it('will proxy a connection update between two peers', function(done) {
355 this.timeout(10000);
356 var localTwoPort = null;
357 var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
358 localTwo.name('localTwo');
359 localTwo.silent();
360
361 var url = 'http://localhost:';
362
363 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
364 assert.equal(connectionId, data.peer.connectionId);
365 });
366
367 localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
368 done();
369 });
370
371 localTwo.listen(0, function(err) {
372 if(err) {
373 return done(err);
374 }
375
376 localTwoPort = localTwo.httpServer.server.address().port;
377 var serverUrl = url + localTwoPort;
378 putRequest(cloudPort, connectionId, serverUrl);
379 });
380 });
381
382 it('will update a connection between two peers', function(done) {
383 this.timeout(10000);
384 var localTwoPort = null;
385 var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
386 localTwo.name('localTwo');
387 localTwo.silent();
388
389 var url = 'http://localhost:';
390 var serverUrl = null;
391 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
392 assert.equal(connectionId, data.peer.connectionId);
393 });
394
395 localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
396 // make sure there is only one peer in /peer-management list
397 // PUT Should not add a new peer
398 request(getHttpServer(localOne))
399 .get('/peer-management')
400 .expect(200)
401 .expect(getBody(function(res, body) {
402 assert.equal(body.entities.length, 1);
403 // url should be updated to new peer
404 assert.equal(body.entities[0].properties.url, serverUrl);
405 }))
406 .end(done);
407 });
408
409 localTwo.listen(0, function(err) {
410 if(err) {
411 done(err);
412 }
413
414 localTwoPort = localTwo.httpServer.server.address().port;
415 serverUrl = url + localTwoPort;
416 putRequest(localPort, connectionId, serverUrl);
417 });
418 });
419 });
420});