1 |
|
2 |
|
3 | var assert = require('assert');
|
4 | var defer = require('when').defer;
|
5 | var Channel = require('../lib/channel').Channel;
|
6 | var Connection = require('../lib/connection').Connection;
|
7 | var mock = require('./mocknet');
|
8 | var succeed = mock.succeed, fail = mock.fail, latch = mock.latch;
|
9 | var defs = require('../lib/defs');
|
10 | var conn_handshake = require('./connection').connection_handshake;
|
11 | var OPEN_OPTS = require('./connection').OPEN_OPTS;
|
12 |
|
13 | var LOG_ERRORS = process.env.LOG_ERRORS;
|
14 |
|
15 | function baseChannelTest(client, server) {
|
16 | return function(done) {
|
17 | var pair = mock.socketPair();
|
18 | var c = new Connection(pair.client);
|
19 | if (LOG_ERRORS) c.on('error', console.warn);
|
20 | c.open(OPEN_OPTS).then(function() {
|
21 | client(c, done);
|
22 | }, fail(done));
|
23 |
|
24 | pair.server.read(8);
|
25 | var s = mock.runServer(pair.server, function(send, await) {
|
26 | conn_handshake(send, await)
|
27 | .then(function() {
|
28 | server(send, await, done);
|
29 | }, fail(done));
|
30 | });
|
31 | };
|
32 | }
|
33 |
|
34 | function channelTest(client, server) {
|
35 | return baseChannelTest(
|
36 | function(conn, done) {
|
37 | var ch = new Channel(conn);
|
38 | if (LOG_ERRORS) ch.on('error', console.warn);
|
39 | client(ch, done);
|
40 | },
|
41 | function(send, await, done) {
|
42 | channel_handshake(send, await)
|
43 | .then(function(ch) {
|
44 | return server(send, await, done, ch);
|
45 | }).then(null, fail(done));
|
46 |
|
47 | }
|
48 | );
|
49 | };
|
50 |
|
51 | function channel_handshake(send, await) {
|
52 | return await(defs.ChannelOpen)()
|
53 | .then(function(open) {
|
54 | assert.notEqual(0, open.channel);
|
55 | send(defs.ChannelOpenOk, {channelId: new Buffer('')}, open.channel);
|
56 | return open.channel;
|
57 | });
|
58 | }
|
59 |
|
60 |
|
61 | var DELIVER_FIELDS = {
|
62 | consumerTag: 'fake',
|
63 | deliveryTag: 1,
|
64 | redelivered: false,
|
65 | exchange: 'foo',
|
66 | routingKey: 'bar',
|
67 | replyCode: defs.constants.NO_ROUTE,
|
68 | replyText: 'derp',
|
69 | };
|
70 |
|
71 | suite("channel open and close", function() {
|
72 |
|
73 | test("open", channelTest(
|
74 | function(ch, done) {
|
75 | ch.open().then(succeed(done), fail(done));
|
76 | },
|
77 | function(send, await, done) {
|
78 | return true;
|
79 | }));
|
80 |
|
81 | test("bad server", baseChannelTest(
|
82 | function(c, done) {
|
83 | var ch = new Channel(c);
|
84 | ch.open().then(fail(done), succeed(done));
|
85 | },
|
86 | function(send, await, done) {
|
87 | return await(defs.ChannelOpen)()
|
88 | .then(function(open) {
|
89 | send(defs.ChannelCloseOk, {}, open.channel);
|
90 | });
|
91 | }));
|
92 |
|
93 | test("open, close", channelTest(
|
94 | function(ch, done) {
|
95 | ch.open()
|
96 | .then(function() {
|
97 | ch.close();
|
98 | })
|
99 | .then(succeed(done), fail(done));
|
100 | },
|
101 | function(send, await, done, ch) {
|
102 | return await(defs.ChannelClose)()
|
103 | .then(function(close) {
|
104 | send(defs.ChannelCloseOk, {}, ch);
|
105 | });
|
106 | }));
|
107 |
|
108 | test("server close", function(done0) {
|
109 | var doneLatch = latch(2, done0);
|
110 |
|
111 | channelTest(
|
112 | function(ch, done) {
|
113 | ch.on('error', succeed(done));
|
114 | ch.open();
|
115 | },
|
116 | function(send, await, done, ch) {
|
117 | send(defs.ChannelClose, {
|
118 | replyText: 'Forced close',
|
119 | replyCode: defs.constants.CHANNEL_ERROR,
|
120 | classId: 0, methodId: 0
|
121 | }, ch);
|
122 | await(defs.ChannelCloseOk)()
|
123 | .then(succeed(done), fail(done));
|
124 | })(doneLatch);
|
125 | });
|
126 |
|
127 | });
|
128 |
|
129 | suite("channel machinery", function() {
|
130 |
|
131 | test("RPC", channelTest(
|
132 | function(ch, done) {
|
133 | ch.open().then(function() {
|
134 | var rpcLatch = latch(3, done);
|
135 | var whee = succeed(rpcLatch);
|
136 | var boom = fail(rpcLatch);
|
137 | var fields = {
|
138 | prefetchCount: 10,
|
139 | prefetchSize: 0,
|
140 | global: false
|
141 | };
|
142 |
|
143 | ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
|
144 | ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
|
145 | ch.rpc(defs.BasicQos, fields, defs.BasicQosOk).then(whee, boom);
|
146 | }).then(null, fail(done));
|
147 | },
|
148 | function(send, await, done, ch) {
|
149 | function sendOk(f) {
|
150 | send(defs.BasicQosOk, {}, ch);
|
151 | }
|
152 |
|
153 | return await(defs.BasicQos)()
|
154 | .then(sendOk)
|
155 | .then(await(defs.BasicQos))
|
156 | .then(sendOk)
|
157 | .then(await(defs.BasicQos))
|
158 | .then(sendOk);
|
159 | }));
|
160 |
|
161 | test("Bad RPC", channelTest(
|
162 | function(ch, done) {
|
163 |
|
164 |
|
165 | var errLatch = latch(2, done);
|
166 | ch.on('error', succeed(errLatch));
|
167 |
|
168 | ch.open()
|
169 | .then(function() {
|
170 | ch.rpc(defs.BasicRecover, {requeue: true}, defs.BasicRecoverOk)
|
171 | .then(fail(done), succeed(errLatch));
|
172 | }, fail(done));
|
173 | },
|
174 | function(send, await, done, ch) {
|
175 | return await()()
|
176 | .then(function() {
|
177 | send(defs.BasicGetEmpty, {clusterId: ''}, ch);
|
178 | })
|
179 | .then(await(defs.ChannelClose))
|
180 | .then(function() {
|
181 | send(defs.ChannelCloseOk, {}, ch);
|
182 | });
|
183 | }));
|
184 |
|
185 | test("RPC on closed channel", channelTest(
|
186 | function(ch, done) {
|
187 | ch.open();
|
188 | var close = defer(), fail1 = defer(), fail2 = defer();
|
189 | ch.on('error', close.resolve);
|
190 | ch.rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk)
|
191 | .then(fail1.reject, fail1.resolve);
|
192 | ch.rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk)
|
193 | .then(fail2.reject, fail2.resolve);
|
194 |
|
195 | close.promise
|
196 | .then(function(){ return fail1.promise; })
|
197 | .then(function() { return fail2.promise; })
|
198 | .then(succeed(done), fail(done));
|
199 | },
|
200 | function(send, await, done, ch) {
|
201 | await(defs.BasicRecover)()
|
202 | .then(function() {
|
203 | send(defs.ChannelClose, {
|
204 | replyText: 'Nuh-uh!',
|
205 | replyCode: defs.constants.CHANNEL_ERROR,
|
206 | methodId: 0, classId: 0
|
207 | }, ch);
|
208 | return await(defs.ChannelCloseOk);
|
209 | })
|
210 | .then(null, fail(done));
|
211 | }));
|
212 |
|
213 | test("publish", channelTest(
|
214 | function(ch, done) {
|
215 | ch.open()
|
216 | .then(function() {
|
217 | ch.sendMessage({
|
218 | exchange: 'foo', routingKey: 'bar',
|
219 | mandatory: false, immediate: false, ticket: 0
|
220 | }, {}, new Buffer('foobar'));
|
221 | })
|
222 | .then(null, fail(done));
|
223 | },
|
224 | function(send, await, done, ch) {
|
225 | await(defs.BasicPublish)()
|
226 | .then(await(defs.BasicProperties))
|
227 | .then(await(undefined))
|
228 | .then(function(f) {
|
229 | assert.equal('foobar', f.content.toString());
|
230 | }).then(succeed(done), fail(done));
|
231 | }));
|
232 |
|
233 | test("delivery", channelTest(
|
234 | function(ch, done) {
|
235 | ch.open();
|
236 | ch.on('delivery', function(m) {
|
237 | assert.equal('barfoo', m.content.toString());
|
238 | done();
|
239 | });
|
240 | },
|
241 | function(send, await, done, ch) {
|
242 | send(defs.BasicDeliver, DELIVER_FIELDS, ch);
|
243 | send(false, {}, ch, new Buffer('barfoo'));
|
244 | }));
|
245 |
|
246 | test("bad delivery", channelTest(
|
247 | function(ch, done) {
|
248 | ch.on('error', succeed(done));
|
249 | ch.open();
|
250 | },
|
251 | function(send, await, done, ch) {
|
252 | send(defs.BasicDeliver, DELIVER_FIELDS, ch);
|
253 |
|
254 | send(defs.BasicDeliver, DELIVER_FIELDS, ch);
|
255 | return await(defs.ChannelClose)()
|
256 | .then(function() {
|
257 | send(defs.ChannelCloseOk, {}, ch);
|
258 | });
|
259 | }));
|
260 |
|
261 | test("return", channelTest(
|
262 | function(ch, done) {
|
263 | ch.on('return', function(m) {
|
264 | assert.equal('barfoo', m.content.toString());
|
265 | done();
|
266 | });
|
267 | ch.open();
|
268 | },
|
269 | function(send, await, done, ch) {
|
270 | send(defs.BasicReturn, DELIVER_FIELDS, ch);
|
271 | send(null, {}, ch, new Buffer('barfoo'));
|
272 | }));
|
273 |
|
274 | function confirmTest(variety, Method) {
|
275 | return test('confirm ' + variety, channelTest(
|
276 | function(ch, done) {
|
277 | ch.on(variety, function(f) {
|
278 | assert.equal(1, f.deliveryTag);
|
279 | done();
|
280 | });
|
281 | ch.open();
|
282 | },
|
283 | function(send, await, done, ch) {
|
284 | send(Method, {
|
285 | deliveryTag: 1,
|
286 | multiple: false
|
287 | }, ch);
|
288 | }));
|
289 | }
|
290 |
|
291 | confirmTest("ack", defs.BasicAck);
|
292 | confirmTest("nack", defs.BasicNack);
|
293 |
|
294 | test("interleaved RPC/delivery", channelTest(
|
295 | function(ch, done) {
|
296 | var both = latch(2, done);
|
297 | ch.on('delivery', succeed(both));
|
298 |
|
299 | ch.open().then(function() {
|
300 | ch.rpc(defs.BasicQos,
|
301 | { global: false, prefetchSize: 0, prefetchCount: 7},
|
302 | defs.BasicQosOk)
|
303 | .then(succeed(both), fail(both));
|
304 | }, fail(both));
|
305 | },
|
306 | function(send, await, done, ch) {
|
307 | return await(defs.BasicQos)()
|
308 | .then(function() {
|
309 | send(defs.BasicDeliver, DELIVER_FIELDS, ch);
|
310 | send(defs.BasicQosOk, {}, ch);
|
311 | send(false, {}, ch, new Buffer('boofar'));
|
312 | });
|
313 | }));
|
314 |
|
315 | });
|