UNPKG

10.6 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var zetta = require('../zetta');
4var MemRegistry = require('./fixture/mem_registry');
5var MemPeerRegistry = require('./fixture/mem_peer_registry');
6var request = require('supertest');
7var PeerRegistry = require('../lib/peer_registry');
8var Query = require('calypso').Query;
9var querystring = require('querystring');
10
11function deleteRequest(port, connectionId) {
12 var opts = {
13 host: 'localhost',
14 port: port,
15 method: 'DELETE',
16 path: '/peer-management/' + connectionId
17 }
18
19 var req = http.request(opts);
20 req.end();
21}
22
23function putRequest(port, connectionId, url) {
24 var qs = {
25 url: url
26 };
27 var string = querystring.stringify(qs);
28 var opts = {
29 host: 'localhost',
30 port: port,
31 method: 'PUT',
32 path: '/peer-management/' + connectionId,
33 headers: {
34 'Content-Length': string.length
35 }
36 };
37
38 var req = http.request(opts);
39 req.write(string);
40 req.end();
41}
42
43
44function getHttpServer(app) {
45 return app.httpServer.server;
46}
47
48function getBody(fn) {
49 return function(res) {
50 try {
51 if(res.text) {
52 var body = JSON.parse(res.text);
53 } else {
54 var body = '';
55 }
56 } catch(err) {
57 throw new Error('Failed to parse json body');
58 }
59
60 fn(res, body);
61 }
62}
63
64describe('Peer Connection API', function() {
65 describe('/peer-management embedded entities', function() {
66 var peerRegistry = null;
67 var app = null;
68
69 beforeEach(function(done) {
70 peerRegistry = new MemPeerRegistry();
71 app = zetta({ registry: new MemRegistry(), peerRegistry: peerRegistry })
72 .silent()
73 .name('local')
74 ._run(done);
75 });
76
77 it('exposes actions on the embedded entity', function(done) {
78 peerRegistry.save({id:'foo', connectionId:'12345'}, function() {
79 var url = '/peer-management';
80 request(getHttpServer(app))
81 .get(url)
82 .expect(getBody(function(res, body) {
83 assert.equal(body.entities.length, 1);
84 assert.equal(body.entities[0].actions.length, 2);
85 body.entities[0].actions.forEach(function(action) {
86 assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
87 })
88 }))
89 .end(done);
90 });
91 });
92
93 it('exposes actions on the full entity', function(done) {
94 peerRegistry.save({id:'foo', connectionId:'12345'}, function() {
95 var url = '/peer-management/foo';
96 request(getHttpServer(app))
97 .get(url)
98 .expect(getBody(function(res, body) {
99 assert.equal(body.actions.length, 2);
100 body.actions.forEach(function(action) {
101 assert.ok(action.href.indexOf('/peer-management/12345') !== -1);
102 });
103 }))
104 .end(done);
105 });
106 });
107 });
108
109 describe('Root API for peers', function() {
110 var cloud = null;
111 var cloudUrl = null;
112 var cloudPort = null;
113 var db1 = null;
114 var db2 = null;
115
116 beforeEach(function(done) {
117
118 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
119 cloud.name('cloud');
120 cloud.silent();
121
122 cloud.listen(0, function(err) {
123 if(err) {
124 return done(err);
125 }
126
127 cloudPort = cloud.httpServer.server.address().port;
128 cloudUrl = 'http://localhost:' + cloudPort;
129 done();
130 });
131 });
132
133 afterEach(function(done) {
134 cloud.httpServer.server.close();
135 done();
136 });
137
138 it('will have rel of server on peer', function(done) {
139 this.timeout(10000);
140 var connected = false;
141 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
142 z.name('local');
143
144 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
145 setImmediate(function() {
146 var url = '/';
147 request(getHttpServer(cloud))
148 .get(url)
149 .expect(getBody(function(res, body) {
150 var peerLinks = body.links.filter(function(link) { return link.rel.indexOf('http://rels.zettajs.io/peer') !== -1; });
151 var peerLink = peerLinks[0];
152 assert.ok(peerLink.rel.indexOf('http://rels.zettajs.io/server') !== -1);
153 }))
154 .end(done);
155 });
156 });
157
158 z.silent();
159 z.link(cloudUrl);
160 z.listen(0);
161
162
163 });
164
165 });
166
167 describe('/peer-management disconnection API', function() {
168 var cloud = null;
169 var cloudUrl = null;
170 var cloudPort = null;
171 var db1 = null;
172 var db2 = null;
173
174 beforeEach(function(done) {
175
176 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
177 cloud.name('cloud');
178 cloud.silent();
179
180 cloud.listen(0, function(err) {
181 if(err) {
182 return done(err);
183 }
184
185 cloudPort = cloud.httpServer.server.address().port;
186 cloudUrl = 'http://localhost:' + cloudPort;
187 done();
188 });
189 });
190
191 afterEach(function(done) {
192 cloud.httpServer.server.close();
193 done();
194 });
195
196 it('will return 404 if connection does not exist', function(done) {
197 var url = '/peer-management/1234';
198 request(getHttpServer(cloud))
199 .del(url)
200 .expect(404, done);
201 });
202
203 it('will proxy a disconnection between two peers', function(done) {
204 //Had to increase the timeout. The standard two seconds may not be long enough for a connection to be established.
205 this.timeout(10000);
206 var connected = false;
207 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
208 z.name('local');
209 var connectionId = null;
210
211 z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
212 assert.equal(connectionId, data.peer.connectionId);
213 done();
214 });
215
216 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
217 assert.equal(connected, true);
218 connectionId = data.peer.connectionId;
219 deleteRequest(cloudPort, connectionId);
220 });
221 z.pubsub.subscribe('_peer/connect', function(topic, data) {
222 connected = true;
223 });
224
225 z.silent();
226 z.link(cloudUrl);
227 z.listen(0);
228
229
230 });
231
232 it('will disconnect two peers', function(done) {
233 this.timeout(10000);
234 var connected = false;
235 var localPort = null;
236 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
237 z.name('local');
238
239 var connectionId = null;
240
241 z.pubsub.subscribe('_peer/disconnect', function(topic, data) {
242 assert.equal(connectionId, data.peer.connectionId);
243 done();
244 });
245
246 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
247 assert.equal(connected, true);
248 connectionId = data.peer.connectionId;
249 deleteRequest(localPort, connectionId);
250 });
251
252 z.pubsub.subscribe('_peer/connect', function(topic, data) {
253 connected = true;
254 });
255
256 z.silent();
257 z.link(cloudUrl);
258 z.listen(0, function(err) {
259 if(err) {
260 done(err);
261 }
262
263 localPort = z.httpServer.server.address().port;
264 });
265
266 });
267 });
268
269 describe('/peer-management update API', function() {
270 var cloud = null;
271 var localOne = null;
272 var cloudPort = null;
273 var localOnePort = null;
274 var connectionId = null;
275
276
277 beforeEach(function(done) {
278 this.timeout(10000);
279 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
280 cloud.name('cloud');
281 cloud.silent();
282
283 localOne = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
284 localOne.name('localOne');
285 localOne.silent();
286
287 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
288 connectionId = data.peer.connectionId;
289 done();
290 });
291
292 cloud.listen(0, function(err) {
293 if(err) {
294 return done(err);
295 }
296
297 cloudPort = cloud.httpServer.server.address().port;
298 var cloudUrl = 'http://localhost:' + cloudPort;
299
300 localOne.link(cloudUrl);
301 localOne.listen(0, function(err) {
302 if(err) {
303 done(err);
304 }
305
306 localPort = localOne.httpServer.server.address().port;
307 });
308 });
309 });
310
311 afterEach(function(done) {
312 cloud.httpServer.server.close();
313 localOne.httpServer.server.close();
314 done();
315 });
316
317 it('will return 404 if connection does not exist', function(done) {
318 var url = '/peer-management/1234';
319 request(getHttpServer(cloud))
320 .put(url)
321 .set('Content-Type', 'application/x-www-form-urlencoded')
322 .send({ url: 'http://localhost:1234' })
323 .expect(404, done);
324 });
325
326 it('will proxy a connection update between two peers', function(done) {
327 this.timeout(10000);
328 var localTwoPort = null;
329 var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
330 localTwo.name('localTwo');
331 localTwo.silent();
332
333 var url = 'http://localhost:';
334
335 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
336 assert.equal(connectionId, data.peer.connectionId);
337 });
338
339 localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
340 done();
341 });
342
343 localTwo.listen(0, function(err) {
344 if(err) {
345 return done(err);
346 }
347
348 localTwoPort = localTwo.httpServer.server.address().port;
349 var serverUrl = url + localTwoPort;
350 putRequest(cloudPort, connectionId, serverUrl);
351 });
352 });
353
354 it('will update a connection between two peers', function(done) {
355 this.timeout(10000);
356 var localTwoPort = null;
357 var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
358 localTwo.name('localTwo');
359 localTwo.silent();
360
361 var url = 'http://localhost:';
362
363 cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) {
364 assert.equal(connectionId, data.peer.connectionId);
365 });
366
367 localTwo.pubsub.subscribe('_peer/connect', function(topic, data) {
368 done();
369 });
370
371 localTwo.listen(0, function(err) {
372 if(err) {
373 done(err);
374 }
375
376 localTwoPort = localTwo.httpServer.server.address().port;
377 var serverUrl = url + localTwoPort;
378 putRequest(localPort, connectionId, serverUrl);
379 });
380 });
381 });
382});