UNPKG

8.2 kBJavaScriptView Raw
1var assert = require('assert');
2var http = require('http');
3var util = require('util');
4var net = require('net');
5var EventEmitter = require('events').EventEmitter;
6var zetta = require('../zetta');
7var MemRegistry = require('./fixture/mem_registry');
8var MemPeerRegistry = require('./fixture/mem_peer_registry');
9var PeerSocket = require('../lib/peer_socket');
10var PeerClient = require('../lib/peer_client');
11
12var Ws = function() {
13 EventEmitter.call(this)
14 this._socket = new net.Socket();
15 this.upgradeReq = { url: '/peers/0ac7e9c2-f03f-478c-95f5-2028fc9c2b6e?connectionId=46f466b0-1017-430b-8993-d7a8c896e014'};
16};
17util.inherits(Ws, EventEmitter);
18Ws.prototype.close = function() {};
19Ws.prototype.send = function(data, options, cb) {
20 var r = this.emit('onsend', data, options, cb);
21};
22
23
24describe('Peer Connection Logic', function() {
25 var cloud = null;
26 var cloudUrl = null;
27 beforeEach(function(done) {
28 cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() });
29 cloud.silent();
30 cloud.listen(0, function(err) {
31 if (err) {
32 return done(err);
33 }
34
35 cloudUrl = 'ws://localhost:' + cloud.httpServer.server.address().port;
36 done();
37 })
38 });
39
40 afterEach(function(done) {
41 cloud.httpServer.server.close();
42 done();
43 });
44
45 describe('#link', function() {
46 it('should work before .listen is ran', function(done) {
47 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
48 .silent()
49 .link(cloudUrl)
50 .listen(0);
51
52 z.pubsub.subscribe('_peer/connect', function(topic, data) {
53 if (data.peer.url.indexOf(cloudUrl) === 0) {
54 done();
55 } else {
56 done(new Error('Peer connected to another url then expected'))
57 }
58 })
59 })
60
61 it('should work after .listen is ran', function(done) {
62 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
63 .silent()
64 .listen(0, function() {
65 z.link(cloudUrl);
66 });
67
68 z.pubsub.subscribe('_peer/connect', function(topic, data) {
69 if (data.peer.url.indexOf(cloudUrl) === 0) {
70 done();
71 } else {
72 done(new Error('Peer connected to another url then expected'))
73 }
74 })
75 })
76
77 it('should wire up request extensions', function(done) {
78 var called = false;
79 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
80 .silent()
81 .use(function(server) {
82 server.onPeerRequest(function(client) {
83 client
84 .use(function(handle) {
85 handle('request', function(pipeline) {
86 return pipeline.map(function(env) {
87 assert(env.request);
88 if (!called) {
89 called = true;
90 done();
91 }
92 return env;
93 });
94 });
95 });
96 });
97 })
98 .link(cloudUrl)
99 .listen(0);
100 });
101
102 it('should wire up response extensions', function(done) {
103 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
104 .silent()
105 .use(function(server) {
106 server.onPeerResponse(function(request) {
107 return request
108 .map(function(env) {
109 assert(env.request);
110 assert(env.response);
111 assert(env.upgrade);
112 done();
113 return env;
114 });
115 });
116 })
117 .link(cloudUrl)
118 .listen(0);
119 });
120 })
121
122 describe('Handle spdy agent errors', function() {
123 it('should catch error event', function(done) {
124 var ws = new Ws();
125 var socket = new PeerSocket(ws, 'some-peer', new MemPeerRegistry);
126 socket.on('error', function(err) {
127 if (err.message === 'spdy-error') {
128 done();
129 }
130 });
131 socket.agent.emit('error', new Error('spdy-error'));
132 });
133 })
134
135 describe('Peer_socket error events', function() {
136
137 it('http-server should handle multiple error events', function(done) {
138 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
139 .name('test-peer')
140 .silent()
141 .link(cloudUrl)
142 .listen(0);
143
144 var onConnect = function(topic, data) {
145 cloud.pubsub.unsubscribe('_peer/connect', onConnect);
146 assert(cloud.httpServer.peers['test-peer']);
147 cloud.pubsub.subscribe('_peer/disconnect', onDisconnect);
148 var peer = cloud.httpServer.peers['test-peer'];
149 peer.emit('error', new Error('some error'));
150 peer.emit('error', new Error('some error'));
151 };
152
153 var onDisconnect = function(topic, data) {
154 assert.equal(data.peer.state, PeerSocket.DISCONNECTED);
155 cloud.pubsub.unsubscribe('_peer/disconnect', onDisconnect);
156 done();
157 };
158
159 cloud.pubsub.subscribe('_peer/connect', onConnect);
160 });
161
162
163 it('http-server should handle multiple end events', function(done) {
164 var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
165 .name('test-peer')
166 .silent()
167 .link(cloudUrl)
168 .listen(0);
169
170 var onConnect = function(topic, data) {
171 assert(cloud.httpServer.peers['test-peer']);
172 cloud.pubsub.unsubscribe('_peer/connect', onConnect);
173 cloud.pubsub.subscribe('_peer/disconnect', onDisconnect);
174 var peer = cloud.httpServer.peers['test-peer'];
175 peer.emit('end');
176 peer.emit('end');
177 };
178
179 var onDisconnect = function(topic, data) {
180 assert.equal(data.peer.state, PeerSocket.DISCONNECTED);
181 cloud.pubsub.unsubscribe('_peer/disconnect', onDisconnect);
182 done();
183 }
184
185 cloud.pubsub.subscribe('_peer/connect', onConnect)
186 });
187
188 });
189
190 describe('Handle timings with ws connects vs actual peer connects', function() {
191 var hub = null;
192 beforeEach(function(done) {
193 hub = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() })
194 .name('peer-1')
195 .silent()
196 .listen(0, done);
197 })
198
199 afterEach(function(done) {
200 hub.httpServer.server.close();
201 done();
202 })
203
204 it('peer connects should be the same peer object on the cloud', function(done) {
205 var client = new PeerClient(cloudUrl, hub);
206
207 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
208 assert(data.peer === cloud.httpServer.peers['peer-1']);
209 done();
210 });
211
212 client.start();
213 })
214
215 it('peer connects should be the same peer object on the cloud with reconnect', function(done) {
216 var client = new PeerClient(cloudUrl, hub);
217
218 var count = 0;
219 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
220 count++;
221 assert(data.peer === cloud.httpServer.peers['peer-1']);
222 if (count === 2) {
223 return done();
224 }
225 cloud.httpServer.peers['peer-1'].close();
226 });
227 client.start();
228 });
229
230 it('peer connects should be the same peer object on the cloud with reconnect with timing issue', function(done) {
231 this.timeout(5000);
232 var client = new PeerClient(cloudUrl, hub);
233
234 var lastPeer = null;
235 var count = 0;
236 cloud.pubsub.subscribe('_peer/connect', function(topic, data) {
237 count++;
238 assert(data.peer === cloud.httpServer.peers['peer-1']);
239 if (count === 1) {
240 lastPeer = data.peer;
241 cloud.httpServer.peers['peer-1'].close();
242
243 client.once('connecting', function() {
244 var origRequest = client.onRequest;
245 client.server.removeListener('request', client.onRequest);
246 client.onRequest = function(req, res) {
247 client.ws.close();
248 };
249 client.server.once('request', client.onRequest.bind(client));
250 })
251
252 } else if (count === 2) {
253 assert(data.peer === lastPeer, 'a new PeerSocket on the cloud was created instead of a reuse');
254 done();
255 }
256 });
257 client.start();
258 });
259
260
261 });
262
263})