UNPKG

8.57 kBJavaScriptView Raw
1// Test the channel model API
2
3var assert = require('assert');
4var defer = require('when').defer;
5var Channel = require('../lib/channel').Channel;
6var Connection = require('../lib/connection').Connection;
7var mock = require('./mocknet');
8var succeed = mock.succeed, fail = mock.fail, latch = mock.latch;
9var defs = require('../lib/defs');
10var conn_handshake = require('./connection').connection_handshake;
11var OPEN_OPTS = require('./connection').OPEN_OPTS;
12
13var LOG_ERRORS = process.env.LOG_ERRORS;
14
15function baseChannelTest(client, server) {
16 return function(done) {
17 var pair = mock.socketPair();
18 var c = new Connection(pair.client);
19 if (LOG_ERRORS) c.on('error', console.warn);
20 c.open(OPEN_OPTS).then(function() {
21 client(c, done);
22 }, fail(done));
23
24 pair.server.read(8); // discard the protocol header
25 var s = mock.runServer(pair.server, function(send, await) {
26 conn_handshake(send, await)
27 .then(function() {
28 server(send, await, done);
29 }, fail(done));
30 });
31 };
32}
33
34function channelTest(client, server) {
35 return baseChannelTest(
36 function(conn, done) {
37 var ch = new Channel(conn);
38 if (LOG_ERRORS) ch.on('error', console.warn);
39 client(ch, done);
40 },
41 function(send, await, done) {
42 channel_handshake(send, await)
43 .then(function(ch) {
44 return server(send, await, done, ch);
45 }).then(null, fail(done)); // so you can return a promise to let
46 // errors bubble out
47 }
48 );
49};
50
51function channel_handshake(send, await) {
52 return await(defs.ChannelOpen)()
53 .then(function(open) {
54 assert.notEqual(0, open.channel);
55 send(defs.ChannelOpenOk, {channelId: new Buffer('')}, open.channel);
56 return open.channel;
57 });
58}
59
60// fields for deliver and publish and get-ok
61var DELIVER_FIELDS = {
62 consumerTag: 'fake',
63 deliveryTag: 1,
64 redelivered: false,
65 exchange: 'foo',
66 routingKey: 'bar',
67 replyCode: defs.constants.NO_ROUTE,
68 replyText: 'derp',
69};
70
71suite("channel open and close", function() {
72
73test("open", channelTest(
74 function(ch, done) {
75 ch.open().then(succeed(done), fail(done));
76 },
77 function(send, await, done) {
78 return true;
79 }));
80
81test("bad server", baseChannelTest(
82 function(c, done) {
83 var ch = new Channel(c);
84 ch.open().then(fail(done), succeed(done));
85 },
86 function(send, await, done) {
87 return await(defs.ChannelOpen)()
88 .then(function(open) {
89 send(defs.ChannelCloseOk, {}, open.channel);
90 });
91 }));
92
93test("open, close", channelTest(
94 function(ch, done) {
95 ch.open()
96 .then(function() {
97 ch.close();
98 })
99 .then(succeed(done), fail(done));
100 },
101 function(send, await, done, ch) {
102 return await(defs.ChannelClose)()
103 .then(function(close) {
104 send(defs.ChannelCloseOk, {}, ch);
105 });
106 }));
107
108test("server close", function(done0) {
109 var doneLatch = latch(2, done0);
110
111 channelTest(
112 function(ch, done) {
113 ch.on('error', succeed(done));
114 ch.open();
115 },
116 function(send, await, done, ch) {
117 send(defs.ChannelClose, {
118 replyText: 'Forced close',
119 replyCode: defs.constants.CHANNEL_ERROR,
120 classId: 0, methodId: 0
121 }, ch);
122 await(defs.ChannelCloseOk)()
123 .then(succeed(done), fail(done));
124 })(doneLatch);
125});
126
127}); //suite
128
129suite("channel machinery", function() {
130
131test("RPC", channelTest(
132 function(ch, done) {
133 ch.open().then(function() {
134 var rpcLatch = latch(3, done);
135 var whee = succeed(rpcLatch);
136 var boom = fail(rpcLatch);
137 var fields = {
138 prefetchCount: 10,
139 prefetchSize: 0,
140 global: false
141 };
142
143 ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
144 ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
145 ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
146 }).then(null, fail(done));
147 },
148 function(send, await, done, ch) {
149 function sendOk(f) {
150 send(defs.BasicQosOk, {}, ch);
151 }
152
153 return await(defs.BasicQos)()
154 .then(sendOk)
155 .then(await(defs.BasicQos))
156 .then(sendOk)
157 .then(await(defs.BasicQos))
158 .then(sendOk);
159 }));
160
161test("Bad RPC", channelTest(
162 function(ch, done) {
163 // We want to see the RPC rejected and the channel closed (with an
164 // error)
165 var errLatch = latch(2, done);
166 ch.on('error', succeed(errLatch));
167
168 ch.open()
169 .then(function() {
170 ch.rpc(defs.BasicRecover, {requeue: true}, defs.BasicRecoverOk)
171 .then(fail(done), succeed(errLatch));
172 }, fail(done));
173 },
174 function(send, await, done, ch) {
175 return await()()
176 .then(function() {
177 send(defs.BasicGetEmpty, {clusterId: ''}, ch);
178 }) // oh wait! that was wrong! expect a channel close
179 .then(await(defs.ChannelClose))
180 .then(function() {
181 send(defs.ChannelCloseOk, {}, ch);
182 });
183 }));
184
185test("RPC on closed channel", channelTest(
186 function(ch, done) {
187 ch.open();
188 var close = defer(), fail1 = defer(), fail2 = defer();
189 ch.on('error', close.resolve);
190 ch.rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk)
191 .then(fail1.reject, fail1.resolve);
192 ch.rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk)
193 .then(fail2.reject, fail2.resolve);
194
195 close.promise
196 .then(function(){ return fail1.promise; })
197 .then(function() { return fail2.promise; })
198 .then(succeed(done), fail(done));
199 },
200 function(send, await, done, ch) {
201 await(defs.BasicRecover)()
202 .then(function() {
203 send(defs.ChannelClose, {
204 replyText: 'Nuh-uh!',
205 replyCode: defs.constants.CHANNEL_ERROR,
206 methodId: 0, classId: 0
207 }, ch);
208 return await(defs.ChannelCloseOk);
209 })
210 .then(null, fail(done));
211 }));
212
213test("publish", channelTest(
214 function(ch, done) {
215 ch.open()
216 .then(function() {
217 ch.sendMessage({
218 exchange: 'foo', routingKey: 'bar',
219 mandatory: false, immediate: false, ticket: 0
220 }, {}, new Buffer('foobar'));
221 })
222 .then(null, fail(done));
223 },
224 function(send, await, done, ch) {
225 await(defs.BasicPublish)()
226 .then(await(defs.BasicProperties))
227 .then(await(undefined)) // content frame
228 .then(function(f) {
229 assert.equal('foobar', f.content.toString());
230 }).then(succeed(done), fail(done));
231 }));
232
233test("delivery", channelTest(
234 function(ch, done) {
235 ch.open();
236 ch.on('delivery', function(m) {
237 assert.equal('barfoo', m.content.toString());
238 done();
239 });
240 },
241 function(send, await, done, ch) {
242 send(defs.BasicDeliver, DELIVER_FIELDS, ch);
243 send(false, {}, ch, new Buffer('barfoo'));
244 }));
245
246test("bad delivery", channelTest(
247 function(ch, done) {
248 ch.on('error', succeed(done));
249 ch.open();
250 },
251 function(send, await, done, ch) {
252 send(defs.BasicDeliver, DELIVER_FIELDS, ch);
253 // now send another deliver without having sent the content
254 send(defs.BasicDeliver, DELIVER_FIELDS, ch);
255 return await(defs.ChannelClose)()
256 .then(function() {
257 send(defs.ChannelCloseOk, {}, ch);
258 });
259 }));
260
261test("return", channelTest(
262 function(ch, done) {
263 ch.on('return', function(m) {
264 assert.equal('barfoo', m.content.toString());
265 done();
266 });
267 ch.open();
268 },
269 function(send, await, done, ch) {
270 send(defs.BasicReturn, DELIVER_FIELDS, ch);
271 send(null, {}, ch, new Buffer('barfoo'));
272 }));
273
274function confirmTest(variety, Method) {
275 return test('confirm ' + variety, channelTest(
276 function(ch, done) {
277 ch.on(variety, function(f) {
278 assert.equal(1, f.deliveryTag);
279 done();
280 });
281 ch.open();
282 },
283 function(send, await, done, ch) {
284 send(Method, {
285 deliveryTag: 1,
286 multiple: false
287 }, ch);
288 }));
289}
290
291confirmTest("ack", defs.BasicAck);
292confirmTest("nack", defs.BasicNack);
293
294test("interleaved RPC/delivery", channelTest(
295 function(ch, done) {
296 var both = latch(2, done);
297 ch.on('delivery', succeed(both));
298
299 ch.open().then(function() {
300 ch.rpc(defs.BasicQos,
301 { global: false, prefetchSize: 0, prefetchCount: 7},
302 defs.BasicQosOk)
303 .then(succeed(both), fail(both));
304 }, fail(both));
305 },
306 function(send, await, done, ch) {
307 return await(defs.BasicQos)()
308 .then(function() {
309 send(defs.BasicDeliver, DELIVER_FIELDS, ch);
310 send(defs.BasicQosOk, {}, ch);
311 send(false, {}, ch, new Buffer('boofar'));
312 });
313 }));
314
315});