1 | var assert = require('assert');
|
2 | var http = require('http');
|
3 | var zetta = require('../zetta');
|
4 | var MemRegistry = require('./fixture/mem_registry');
|
5 | var MemPeerRegistry = require('./fixture/mem_peer_registry');
|
6 | var request = require('supertest');
|
7 | var PeerRegistry = require('../lib/peer_registry');
|
8 | var Query = require('calypso').Query;
|
9 | var querystring = require('querystring');
|
10 |
|
11 | function deleteRequest(port, connectionId) {
|
12 | var opts = {
|
13 | host: 'localhost',
|
14 | port: port,
|
15 | method: 'DELETE',
|
16 | path: '/peer-management/' + connectionId
|
17 | }
|
18 |
|
19 | var req = http.request(opts);
|
20 | req.end();
|
21 | }
|
22 |
|
23 | function putRequest(port, connectionId, url) {
|
24 | var qs = {
|
25 | url: url
|
26 | };
|
27 | var string = querystring.stringify(qs);
|
28 | var opts = {
|
29 | host: 'localhost',
|
30 | port: port,
|
31 | method: 'PUT',
|
32 | path: '/peer-management/' + connectionId,
|
33 | headers: {
|
34 | 'Content-Length': string.length
|
35 | }
|
36 | };
|
37 |
|
38 | var req = http.request(opts);
|
39 | req.write(string);
|
40 | req.end();
|
41 | }
|
42 |
|
43 |
|
44 | function getHttpServer(app) {
|
45 | return app.httpServer.server;
|
46 | }
|
47 |
|
48 | function getBody(fn) {
|
49 | return function(res) {
|
50 | try {
|
51 | if(res.text) {
|
52 | var body = JSON.parse(res.text);
|
53 | } else {
|
54 | var body = '';
|
55 | }
|
56 | } catch(err) {
|
57 | throw new Error('Failed to parse json body');
|
58 | }
|
59 |
|
60 | fn(res, body);
|
61 | }
|
62 | }
|
63 |
|
64 | describe('Peer Connection API', function() {
|
65 | describe('/peer-management embedded entities', function() {
|
66 | var peerRegistry = null;
|
67 | var app = null;
|
68 |
|
69 | beforeEach(function(done) {
|
70 | peerRegistry = new MemPeerRegistry();
|
71 | app = zetta({ registry: new MemRegistry(), peerRegistry: peerRegistry })
|
72 | .silent()
|
73 | .name('local')
|
74 | ._run(done);
|
75 | });
|
76 |
|
77 | checkPeersActionsForState('initiator', 'connected', 2);
|
78 | checkPeersActionsForState('initiator', 'disconnected', 2);
|
79 | checkPeersActionsForState('acceptor', 'connected', 2);
|
80 | checkPeersActionsForState('acceptor', 'disconnected', 0);
|
81 |
|
82 | function checkPeersActionsForState(direction, status, numberOfActionsExpected) {
|
83 | it('exposes ' + numberOfActionsExpected + ' actions on the embedded entity when ' + status + ' and ' + direction, function(done) {
|
84 |
|
85 | var peer = {
|
86 | id:'foo',
|
87 | connectionId:'12345',
|
88 | direction: direction,
|
89 | status: status
|
90 | };
|
91 |
|
92 | peerRegistry.save(peer, function() {
|
93 | var url = '/peer-management';
|
94 | request(getHttpServer(app))
|
95 | .get(url)
|
96 | .expect(getBody(function(res, body) {
|
97 | assert.equal(body.entities.length, 1);
|
98 | assert.equal(body.entities[0].actions.length, numberOfActionsExpected);
|
99 | body.entities[0].actions.forEach(function(action) {
|
100 | assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
|
101 | })
|
102 | }))
|
103 | .end(done);
|
104 | });
|
105 | });
|
106 | }
|
107 |
|
108 | checkPeersActionsOnFullEntity('initiator', 'connected', 2);
|
109 | checkPeersActionsOnFullEntity('initiator', 'disconnected', 2);
|
110 | checkPeersActionsOnFullEntity('acceptor', 'connected', 2);
|
111 | checkPeersActionsOnFullEntity('acceptor', 'disconnected', 0);
|
112 |
|
113 | function checkPeersActionsOnFullEntity(direction, status, numberOfActionsExpected) {
|
114 | it('when ' + direction + ' exposes ' + numberOfActionsExpected + ' actions on the full entity when ' + status, function(done) {
|
115 | var peer = {
|
116 | id:'foo',
|
117 | connectionId:'12345',
|
118 | direction: direction,
|
119 | status: status
|
120 | };
|
121 | peerRegistry.save(peer, function() {
|
122 | var url = '/peer-management/foo';
|
123 | request(getHttpServer(app))
|
124 | .get(url)
|
125 | .expect(getBody(function(res, body) {
|
126 | assert.equal(body.actions.length, numberOfActionsExpected);
|
127 | body.actions.forEach(function(action) {
|
128 | assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
|
129 | });
|
130 | }))
|
131 | .end(done);
|
132 | });
|
133 | });
|
134 | }
|
135 | });
|
136 |
|
137 | describe('Root API for peers', function() {
|
138 | var cloud = null;
|
139 | var cloudUrl = null;
|
140 | var cloudPort = null;
|
141 | var db1 = null;
|
142 | var db2 = null;
|
143 |
|
144 | beforeEach(function(done) {
|
145 |
|
146 | cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
147 | cloud.name('cloud');
|
148 | cloud.silent();
|
149 |
|
150 | cloud.listen(0, function(err) {
|
151 | if(err) {
|
152 | return done(err);
|
153 | }
|
154 |
|
155 | cloudPort = cloud.httpServer.server.address().port;
|
156 | cloudUrl = 'http://localhost:' + cloudPort;
|
157 | done();
|
158 | });
|
159 | });
|
160 |
|
161 | afterEach(function(done) {
|
162 | cloud.httpServer.server.close();
|
163 | done();
|
164 | });
|
165 |
|
166 | it('will have rel of server on peer', function(done) {
|
167 | this.timeout(10000);
|
168 | var connected = false;
|
169 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
170 | z.name('local');
|
171 |
|
172 | cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
|
173 | setImmediate(function() {
|
174 | var url = '/';
|
175 | request(getHttpServer(cloud))
|
176 | .get(url)
|
177 | .expect(getBody(function(res, body) {
|
178 | var peerLinks = body.links.filter(function(link) { return link.rel.indexOf('http://rels.zettajs.io/peer') !== -1; });
|
179 | var peerLink = peerLinks[0];
|
180 | assert.ok(peerLink.rel.indexOf('http://rels.zettajs.io/server') !== -1);
|
181 | }))
|
182 | .end(done);
|
183 | });
|
184 | });
|
185 |
|
186 | z.silent();
|
187 | z.link(cloudUrl);
|
188 | z.listen(0);
|
189 |
|
190 |
|
191 | });
|
192 |
|
193 | });
|
194 |
|
195 | describe('/peer-management disconnection API', function() {
|
196 | var cloud = null;
|
197 | var cloudUrl = null;
|
198 | var cloudPort = null;
|
199 | var db1 = null;
|
200 | var db2 = null;
|
201 |
|
202 | beforeEach(function(done) {
|
203 |
|
204 | cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
205 | cloud.name('cloud');
|
206 | cloud.silent();
|
207 |
|
208 | cloud.listen(0, function(err) {
|
209 | if(err) {
|
210 | return done(err);
|
211 | }
|
212 |
|
213 | cloudPort = cloud.httpServer.server.address().port;
|
214 | cloudUrl = 'http://localhost:' + cloudPort;
|
215 | done();
|
216 | });
|
217 | });
|
218 |
|
219 | afterEach(function(done) {
|
220 | cloud.httpServer.server.close();
|
221 | done();
|
222 | });
|
223 |
|
224 | it('will return 404 if connection does not exist', function(done) {
|
225 | var url = '/peer-management/1234';
|
226 | request(getHttpServer(cloud))
|
227 | .del(url)
|
228 | .expect(404, done);
|
229 | });
|
230 |
|
231 | it('will proxy a disconnection between two peers', function(done) {
|
232 |
|
233 | this.timeout(10000);
|
234 | var connected = false;
|
235 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
236 | z.name('local');
|
237 | var connectionId = null;
|
238 |
|
239 | z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
|
240 | assert.equal(connectionId, data.peer.connectionId);
|
241 | done();
|
242 | });
|
243 |
|
244 | cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
|
245 | assert.equal(connected, true);
|
246 | connectionId = data.peer.connectionId;
|
247 | deleteRequest(cloudPort, connectionId);
|
248 | });
|
249 | z.pubsub.subscribe('_peer/connect', function(topic, data) {
|
250 | connected = true;
|
251 | });
|
252 |
|
253 | z.silent();
|
254 | z.link(cloudUrl);
|
255 | z.listen(0);
|
256 |
|
257 |
|
258 | });
|
259 |
|
260 | it('will disconnect two peers', function(done) {
|
261 | this.timeout(10000);
|
262 | var connected = false;
|
263 | var localPort = null;
|
264 | var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
265 | z.name('local');
|
266 |
|
267 | var connectionId = null;
|
268 |
|
269 | z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
|
270 | assert.equal(connectionId, data.peer.connectionId);
|
271 | done();
|
272 | });
|
273 |
|
274 | cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
|
275 | assert.equal(connected, true);
|
276 | connectionId = data.peer.connectionId;
|
277 | deleteRequest(localPort, connectionId);
|
278 | });
|
279 |
|
280 | z.pubsub.subscribe('_peer/connect', function(topic, data) {
|
281 | connected = true;
|
282 | });
|
283 |
|
284 | z.silent();
|
285 | z.link(cloudUrl);
|
286 | z.listen(0, function(err) {
|
287 | if(err) {
|
288 | done(err);
|
289 | }
|
290 |
|
291 | localPort = z.httpServer.server.address().port;
|
292 | });
|
293 |
|
294 | });
|
295 | });
|
296 |
|
297 | describe('/peer-management update API', function() {
|
298 | var cloud = null;
|
299 | var localOne = null;
|
300 | var cloudPort = null;
|
301 | var localOnePort = null;
|
302 | var connectionId = null;
|
303 |
|
304 |
|
305 | beforeEach(function(done) {
|
306 | this.timeout(10000);
|
307 | cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
308 | cloud.name('cloud');
|
309 | cloud.silent();
|
310 |
|
311 | localOne = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
312 | localOne.name('localOne');
|
313 | localOne.silent();
|
314 |
|
315 | cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
|
316 | connectionId = data.peer.connectionId;
|
317 | done();
|
318 | });
|
319 |
|
320 | cloud.listen(0, function(err) {
|
321 | if(err) {
|
322 | return done(err);
|
323 | }
|
324 |
|
325 | cloudPort = cloud.httpServer.server.address().port;
|
326 | var cloudUrl = 'http://localhost:' + cloudPort;
|
327 |
|
328 | localOne.link(cloudUrl);
|
329 | localOne.listen(0, function(err) {
|
330 | if(err) {
|
331 | done(err);
|
332 | }
|
333 |
|
334 | localPort = localOne.httpServer.server.address().port;
|
335 | });
|
336 | });
|
337 | });
|
338 |
|
339 | afterEach(function(done) {
|
340 | cloud.httpServer.server.close();
|
341 | localOne.httpServer.server.close();
|
342 | done();
|
343 | });
|
344 |
|
345 | it('will return 404 if connection does not exist', function(done) {
|
346 | var url = '/peer-management/1234';
|
347 | request(getHttpServer(cloud))
|
348 | .put(url)
|
349 | .set('Content-Type', 'application/x-www-form-urlencoded')
|
350 | .send({ url: 'http://localhost:1234' })
|
351 | .expect(404, done);
|
352 | });
|
353 |
|
354 | it('will proxy a connection update between two peers', function(done) {
|
355 | this.timeout(10000);
|
356 | var localTwoPort = null;
|
357 | var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
358 | localTwo.name('localTwo');
|
359 | localTwo.silent();
|
360 |
|
361 | var url = 'http://localhost:';
|
362 |
|
363 | cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
|
364 | assert.equal(connectionId, data.peer.connectionId);
|
365 | });
|
366 |
|
367 | localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
|
368 | done();
|
369 | });
|
370 |
|
371 | localTwo.listen(0, function(err) {
|
372 | if(err) {
|
373 | return done(err);
|
374 | }
|
375 |
|
376 | localTwoPort = localTwo.httpServer.server.address().port;
|
377 | var serverUrl = url + localTwoPort;
|
378 | putRequest(cloudPort, connectionId, serverUrl);
|
379 | });
|
380 | });
|
381 |
|
382 | it('will update a connection between two peers', function(done) {
|
383 | this.timeout(10000);
|
384 | var localTwoPort = null;
|
385 | var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
|
386 | localTwo.name('localTwo');
|
387 | localTwo.silent();
|
388 |
|
389 | var url = 'http://localhost:';
|
390 | var serverUrl = null;
|
391 | cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
|
392 | assert.equal(connectionId, data.peer.connectionId);
|
393 | });
|
394 |
|
395 | localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
|
396 |
|
397 |
|
398 | request(getHttpServer(localOne))
|
399 | .get('/peer-management')
|
400 | .expect(200)
|
401 | .expect(getBody(function(res, body) {
|
402 | assert.equal(body.entities.length, 1);
|
403 |
|
404 | assert.equal(body.entities[0].properties.url, serverUrl);
|
405 | }))
|
406 | .end(done);
|
407 | });
|
408 |
|
409 | localTwo.listen(0, function(err) {
|
410 | if(err) {
|
411 | done(err);
|
412 | }
|
413 |
|
414 | localTwoPort = localTwo.httpServer.server.address().port;
|
415 | serverUrl = url + localTwoPort;
|
416 | putRequest(localPort, connectionId, serverUrl);
|
417 | });
|
418 | });
|
419 | });
|
420 | });
|