1 | var assert = require('assert');
|
2 | var defs = require('../lib/defs');
|
3 | var Connection = require('../lib/connection').Connection;
|
4 | var HEARTBEAT = require('../lib/frame').HEARTBEAT;
|
5 | var HB_BUF = require('../lib/frame').HEARTBEAT_BUF;
|
6 | var mock = require('./mocknet');
|
7 | var succeed = mock.succeed, fail = mock.fail, latch = mock.latch;
|
8 |
|
9 | var LOG_ERRORS = process.env.LOG_ERRORS;
|
10 |
|
11 | var OPEN_OPTS = {
|
12 |
|
13 | 'clientProperties': {},
|
14 | 'mechanism': 'PLAIN',
|
15 | 'response': new Buffer(['', 'guest', 'guest'].join(String.fromCharCode(0))),
|
16 | 'locale': 'en_US',
|
17 |
|
18 |
|
19 | 'channelMax': 0,
|
20 | 'frameMax': 0,
|
21 | 'heartbeat': 0,
|
22 |
|
23 |
|
24 | 'virtualHost': '/',
|
25 | 'capabilities': '',
|
26 | 'insist': 0
|
27 | };
|
28 | module.exports.OPEN_OPTS = OPEN_OPTS;
|
29 |
|
30 | function happy_open(send, await) {
|
31 |
|
32 | send(defs.ConnectionStart,
|
33 | {versionMajor: 0,
|
34 | versionMinor: 9,
|
35 | serverProperties: {},
|
36 | mechanisms: new Buffer('PLAIN'),
|
37 | locales: new Buffer('en_US')});
|
38 | return await(defs.ConnectionStartOk)()
|
39 | .then(function(f) {
|
40 | send(defs.ConnectionTune,
|
41 | {channelMax: 0,
|
42 | heartbeat: 0,
|
43 | frameMax: 0});
|
44 | })
|
45 | .then(await(defs.ConnectionTuneOk))
|
46 | .then(await(defs.ConnectionOpen))
|
47 | .then(function(f) {
|
48 | send(defs.ConnectionOpenOk,
|
49 | {knownHosts: ''});
|
50 | });
|
51 | }
|
52 | module.exports.connection_handshake = happy_open;
|
53 |
|
54 | function connectionTest(client, server) {
|
55 | return function(done) {
|
56 | var pair = mock.socketPair();
|
57 | var c = new Connection(pair.client);
|
58 | if (LOG_ERRORS) c.on('error', console.warn);
|
59 | client(c, done);
|
60 |
|
61 |
|
62 | var protocolHeader = pair.server.read(8);
|
63 | assert.deepEqual(new Buffer("AMQP" + String.fromCharCode(0,0,9,1)),
|
64 | protocolHeader);
|
65 |
|
66 | var s = mock.runServer(pair.server, function(send, await) {
|
67 | server(send, await, done, pair.server);
|
68 | });
|
69 | };
|
70 | }
|
71 |
|
72 | suite("Connection open", function() {
|
73 |
|
74 | test("happy", connectionTest(
|
75 | function(c, done) {
|
76 | c.open(OPEN_OPTS).then(succeed(done), fail(done));
|
77 | },
|
78 | function(send, await, done) {
|
79 | happy_open(send, await).then(null, fail(done));
|
80 | }));
|
81 |
|
82 | test("wrong first frame", connectionTest(
|
83 | function(c, done) {
|
84 | c.open(OPEN_OPTS).then(fail(done), succeed(done));
|
85 | },
|
86 | function(send, await, done) {
|
87 |
|
88 | send(defs.ConnectionTune,
|
89 | {channelMax: 0,
|
90 | heartbeat: 0,
|
91 | frameMax: 0});
|
92 | }));
|
93 |
|
94 | });
|
95 |
|
96 | suite("Connection running", function() {
|
97 |
|
98 | test("wrong frame on channel 0", connectionTest(
|
99 | function(c, done) {
|
100 | c.on('error', succeed(done));
|
101 | c.open(OPEN_OPTS);
|
102 | },
|
103 | function(send, await, done) {
|
104 | happy_open(send, await)
|
105 | .then(function() {
|
106 |
|
107 |
|
108 |
|
109 | send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 0);
|
110 | })
|
111 | .then(await(defs.ConnectionClose))
|
112 | .then(function(close) {
|
113 | send(defs.ConnectionCloseOk, {}, 0);
|
114 | }).then(null, fail(done));
|
115 | }));
|
116 |
|
117 | test("unopened channel", connectionTest(
|
118 | function(c, done) {
|
119 | c.on('error', succeed(done));
|
120 | c.open(OPEN_OPTS);
|
121 | },
|
122 | function(send, await, done) {
|
123 | happy_open(send, await)
|
124 | .then(function() {
|
125 |
|
126 |
|
127 |
|
128 | send(defs.ChannelOpenOk, {channelId: new Buffer('')}, 3);
|
129 | })
|
130 | .then(await(defs.ConnectionClose))
|
131 | .then(function(close) {
|
132 | send(defs.ConnectionCloseOk, {}, 0);
|
133 | }).then(null, fail(done));
|
134 | }));
|
135 |
|
136 | test("Unexpected socket close", connectionTest(
|
137 | function(c, done) {
|
138 | var errorAndClosed = latch(2, done);
|
139 | c.on('error', succeed(errorAndClosed));
|
140 | c.on('close', succeed(errorAndClosed));
|
141 | c.open(OPEN_OPTS);
|
142 | },
|
143 | function(send, await, done, socket) {
|
144 | happy_open(send, await)
|
145 | .then(function() {
|
146 | socket.end();
|
147 | })
|
148 | }));
|
149 |
|
150 | });
|
151 |
|
152 | suite("Connection close", function() {
|
153 |
|
154 | test("happy", connectionTest(
|
155 | function(c, done0) {
|
156 | var done = latch(2, done0);
|
157 | c.on('close', done);
|
158 | c.open(OPEN_OPTS).then(function(_ok) {
|
159 | c.close().then(succeed(done), fail(done));
|
160 | });
|
161 | },
|
162 | function(send, await, done) {
|
163 | happy_open(send, await)
|
164 | .then(await(defs.ConnectionClose))
|
165 | .then(function(close) {
|
166 | send(defs.ConnectionCloseOk, {});
|
167 | })
|
168 | .then(null, fail(done));
|
169 | }));
|
170 |
|
171 | test("interleaved close frames", connectionTest(
|
172 | function(c, done0) {
|
173 | var done = latch(2, done0);
|
174 | c.on('close', done);
|
175 | c.open(OPEN_OPTS).then(function(_ok) {
|
176 | c.close().then(succeed(done), fail(done));
|
177 | });
|
178 | },
|
179 | function(send, await, done) {
|
180 | happy_open(send, await)
|
181 | .then(await(defs.ConnectionClose))
|
182 | .then(function(f) {
|
183 | send(defs.ConnectionClose, {
|
184 | replyText: "Ha!",
|
185 | replyCode: defs.constants.REPLY_SUCCESS,
|
186 | methodId: 0, classId: 0
|
187 | });
|
188 | })
|
189 | .then(await(defs.ConnectionCloseOk))
|
190 | .then(function(f) {
|
191 | send(defs.ConnectionCloseOk, {});
|
192 | })
|
193 | .then(null, fail(done));
|
194 | }));
|
195 |
|
196 | test("server-initiated close", connectionTest(
|
197 | function(c, done0) {
|
198 | var done = latch(2, done0);
|
199 | c.on('close', succeed(done));
|
200 | c.on('error', succeed(done));
|
201 | c.open(OPEN_OPTS);
|
202 | },
|
203 | function(send, await, done) {
|
204 | happy_open(send, await)
|
205 | .then(function(f) {
|
206 | send(defs.ConnectionClose, {
|
207 | replyText: "Begone",
|
208 | replyCode: defs.constants.INTERNAL_ERROR,
|
209 | methodId: 0, classId: 0
|
210 | });
|
211 | })
|
212 | .then(await(defs.ConnectionCloseOk))
|
213 | .then(null, fail(done));
|
214 | }));
|
215 | });
|
216 |
|
217 | suite("heartbeats", function() {
|
218 |
|
219 | var heartbeat = require('../lib/heartbeat');
|
220 |
|
221 | setup(function() {
|
222 | heartbeat.UNITS_TO_MS = 20;
|
223 | });
|
224 |
|
225 | teardown(function() {
|
226 | heartbeat.UNITS_TO_MS = 1000;
|
227 | });
|
228 |
|
229 | test("send heartbeat after open", connectionTest(
|
230 | function(c, done) {
|
231 | var opts = Object.create(OPEN_OPTS);
|
232 | opts.heartbeat = 1;
|
233 |
|
234 |
|
235 | c.on('error', function() {});
|
236 | c.open(opts);
|
237 | },
|
238 | function(send, await, done, socket) {
|
239 | var timer;
|
240 | happy_open(send, await)
|
241 | .then(function() {
|
242 | timer = setInterval(function() {
|
243 | socket.write(HB_BUF);
|
244 | }, heartbeat.UNITS_TO_MS);
|
245 | })
|
246 | .then(await())
|
247 | .then(function(hb) {
|
248 | if (hb === HEARTBEAT) done();
|
249 | else done("Next frame after silence not a heartbeat");
|
250 | clearInterval(timer);
|
251 | });
|
252 | }));
|
253 |
|
254 | test("detect lack of heartbeats", connectionTest(
|
255 | function(c, done) {
|
256 | var opts = Object.create(OPEN_OPTS);
|
257 | opts.heartbeat = 1;
|
258 | c.on('error', succeed(done));
|
259 | c.open(opts);
|
260 | },
|
261 | function(send, await, done, socket) {
|
262 | happy_open(send, await);
|
263 |
|
264 | }));
|
265 |
|
266 | });
|