UNPKG

7.1 kBJavaScriptView Raw
1var assert = require('assert');
2var defs = require('../lib/defs');
3var Connection = require('../lib/connection').Connection;
4var HEARTBEAT = require('../lib/frame').HEARTBEAT;
5var HB_BUF = require('../lib/frame').HEARTBEAT_BUF;
6var mock = require('./mocknet');
7var succeed = mock.succeed, fail = mock.fail, latch = mock.latch;
8
9var LOG_ERRORS = process.env.LOG_ERRORS;
10
11var OPEN_OPTS = {
12 // start-ok
13 'clientProperties': {},
14 'mechanism': 'PLAIN',
15 'response': new Buffer(['', 'guest', 'guest'].join(String.fromCharCode(0))),
16 'locale': 'en_US',
17
18 // tune-ok
19 'channelMax': 0,
20 'frameMax': 0,
21 'heartbeat': 0,
22
23 // open
24 'virtualHost': '/',
25 'capabilities': '',
26 'insist': 0
27};
28module.exports.OPEN_OPTS = OPEN_OPTS;
29
30function happy_open(send, await) {
31 // kick it off
32 send(defs.ConnectionStart,
33 {versionMajor: 0,
34 versionMinor: 9,
35 serverProperties: {},
36 mechanisms: new Buffer('PLAIN'),
37 locales: new Buffer('en_US')});
38 return await(defs.ConnectionStartOk)()
39 .then(function(f) {
40 send(defs.ConnectionTune,
41 {channelMax: 0,
42 heartbeat: 0,
43 frameMax: 0});
44 })
45 .then(await(defs.ConnectionTuneOk))
46 .then(await(defs.ConnectionOpen))
47 .then(function(f) {
48 send(defs.ConnectionOpenOk,
49 {knownHosts: ''});
50 });
51}
52module.exports.connection_handshake = happy_open;
53
54function connectionTest(client, server) {
55 return function(done) {
56 var pair = mock.socketPair();
57 var c = new Connection(pair.client);
58 if (LOG_ERRORS) c.on('error', console.warn);
59 client(c, done);
60
61 // NB only not a race here because the writes are synchronous
62 var protocolHeader = pair.server.read(8);
63 assert.deepEqual(new Buffer("AMQP" + String.fromCharCode(0,0,9,1)),
64 protocolHeader);
65
66 var s = mock.runServer(pair.server, function(send, await) {
67 server(send, await, done, pair.server);
68 });
69 };
70}
71
72suite("Connection open", function() {
73
74test("happy", connectionTest(
75 function(c, done) {
76 c.open(OPEN_OPTS).then(succeed(done), fail(done));
77 },
78 function(send, await, done) {
79 happy_open(send, await).then(null, fail(done));
80 }));
81
82test("wrong first frame", connectionTest(
83 function(c, done) {
84 c.open(OPEN_OPTS).then(fail(done), succeed(done));
85 },
86 function(send, await, done) {
87 // bad server! bad! whatever were you thinking?
88 send(defs.ConnectionTune,
89 {channelMax: 0,
90 heartbeat: 0,
91 frameMax: 0});
92 }));
93
94});
95
96suite("Connection running", function() {
97
98test("wrong frame on channel 0", connectionTest(
99 function(c, done) {
100 c.on('error', succeed(done));
101 c.open(OPEN_OPTS);
102 },
103 function(send, await, done) {
104 happy_open(send, await)
105 .then(function() {
106 // there's actually nothing that would plausibly be sent to a
107 // just opened connection, so this is violating more than one
108 // rule. Nonetheless.
109 send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 0);
110 })
111 .then(await(defs.ConnectionClose))
112 .then(function(close) {
113 send(defs.ConnectionCloseOk, {}, 0);
114 }).then(null, fail(done));
115 }));
116
117test("unopened channel", connectionTest(
118 function(c, done) {
119 c.on('error', succeed(done));
120 c.open(OPEN_OPTS);
121 },
122 function(send, await, done) {
123 happy_open(send, await)
124 .then(function() {
125 // there's actually nothing that would plausibly be sent to a
126 // just opened connection, so this is violating more than one
127 // rule. Nonetheless.
128 send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 3);
129 })
130 .then(await(defs.ConnectionClose))
131 .then(function(close) {
132 send(defs.ConnectionCloseOk, {}, 0);
133 }).then(null, fail(done));
134 }));
135
136test("Unexpected socket close", connectionTest(
137 function(c, done) {
138 var errorAndClosed = latch(2, done);
139 c.on('error', succeed(errorAndClosed));
140 c.on('close', succeed(errorAndClosed));
141 c.open(OPEN_OPTS);
142 },
143 function(send, await, done, socket) {
144 happy_open(send, await)
145 .then(function() {
146 socket.end();
147 })
148 }));
149
150});
151
152suite("Connection close", function() {
153
154test("happy", connectionTest(
155 function(c, done0) {
156 var done = latch(2, done0);
157 c.on('close', done);
158 c.open(OPEN_OPTS).then(function(_ok) {
159 c.close().then(succeed(done), fail(done));
160 });
161 },
162 function(send, await, done) {
163 happy_open(send, await)
164 .then(await(defs.ConnectionClose))
165 .then(function(close) {
166 send(defs.ConnectionCloseOk, {});
167 })
168 .then(null, fail(done));
169 }));
170
171test("interleaved close frames", connectionTest(
172 function(c, done0) {
173 var done = latch(2, done0);
174 c.on('close', done);
175 c.open(OPEN_OPTS).then(function(_ok) {
176 c.close().then(succeed(done), fail(done));
177 });
178 },
179 function(send, await, done) {
180 happy_open(send, await)
181 .then(await(defs.ConnectionClose))
182 .then(function(f) {
183 send(defs.ConnectionClose, {
184 replyText: "Ha!",
185 replyCode: defs.constants.REPLY_SUCCESS,
186 methodId: 0, classId: 0
187 });
188 })
189 .then(await(defs.ConnectionCloseOk))
190 .then(function(f) {
191 send(defs.ConnectionCloseOk, {});
192 })
193 .then(null, fail(done));
194 }));
195
196test("server-initiated close", connectionTest(
197 function(c, done0) {
198 var done = latch(2, done0);
199 c.on('close', succeed(done));
200 c.on('error', succeed(done));
201 c.open(OPEN_OPTS);
202 },
203 function(send, await, done) {
204 happy_open(send, await)
205 .then(function(f) {
206 send(defs.ConnectionClose, {
207 replyText: "Begone",
208 replyCode: defs.constants.INTERNAL_ERROR,
209 methodId: 0, classId: 0
210 });
211 })
212 .then(await(defs.ConnectionCloseOk))
213 .then(null, fail(done));
214 }));
215});
216
217suite("heartbeats", function() {
218
219var heartbeat = require('../lib/heartbeat');
220
221setup(function() {
222 heartbeat.UNITS_TO_MS = 20;
223});
224
225teardown(function() {
226 heartbeat.UNITS_TO_MS = 1000;
227});
228
229test("send heartbeat after open", connectionTest(
230 function(c, done) {
231 var opts = Object.create(OPEN_OPTS);
232 opts.heartbeat = 1;
233 // Don't leave the error waiting to happen for the next test, this
234 // confuses mocha awfully
235 c.on('error', function() {});
236 c.open(opts);
237 },
238 function(send, await, done, socket) {
239 var timer;
240 happy_open(send, await)
241 .then(function() {
242 timer = setInterval(function() {
243 socket.write(HB_BUF);
244 }, heartbeat.UNITS_TO_MS);
245 })
246 .then(await())
247 .then(function(hb) {
248 if (hb === HEARTBEAT) done();
249 else done("Next frame after silence not a heartbeat");
250 clearInterval(timer);
251 });
252 }));
253
254test("detect lack of heartbeats", connectionTest(
255 function(c, done) {
256 var opts = Object.create(OPEN_OPTS);
257 opts.heartbeat = 1;
258 c.on('error', succeed(done));
259 c.open(opts);
260 },
261 function(send, await, done, socket) {
262 happy_open(send, await);
263 // conspicuously not sending anything ...
264 }));
265
266});