UNPKG

11 kBJavaScriptView Raw
1//
2//
3//
4
5// Channel machinery.
6
7var defs = require('./defs');
8var closeMsg = require('./format').closeMessage;
9var inspect = require('./format').inspect;
10var methodName = require('./format').methodName;
11var when = require('when'), defer = when.defer;
12var deferSync = require('./sync_defer').defer;
13var assert = require('assert');
14var inherits = require('util').inherits;
15var EventEmitter = require('events').EventEmitter;
16var fmt = require('util').format;
17
18function Channel(connection) {
19 this.ch = connection.freshChannel(this); // %% move to open?
20 this.connection = connection;
21 // for the presently outstanding RPC
22 this.reply = null;
23 // for the RPCs awaiting action
24 this.pending = [];
25 this.handleMessage = acceptDeliveryOrReturn;
26}
27inherits(Channel, EventEmitter);
28
29module.exports.Channel = Channel;
30
31var C = Channel.prototype;
32
33// Incoming frames are either notifications of e.g., message delivery,
34// or replies to something we've sent. In general I deal with the
35// former by emitting an event, and with the latter by keeping a track
36// of what's expecting a reply.
37//
38// The AMQP specification implies that RPCs can't be pipelined; that
39// is, you can have only one outstanding RPC on a channel at a
40// time. Certainly that's what RabbitMQ and its clients assume. For
41// this reason, I buffer RPCs if the channel is already waiting for a
42// reply.
43
44
45// Just send the damn frame.
46C.sendImmediately = function(method, fields) {
47 return this.connection.sendMethod(this.ch, method, fields);
48};
49
50// Invariant: !this.reply -> pending.length == 0. That is, whenever we
51// clear a reply, we must send another RPC (and thereby fill
52// this.reply) if there is one waiting. The invariant relevant here
53// and in `accept`.
54C.sendOrEnqueue = function(method, fields, reply) {
55 if (!this.reply) { // if no reply waiting, we can go
56 assert(this.pending.length === 0);
57 this.reply = reply;
58 this.sendImmediately(method, fields);
59 }
60 else {
61 this.pending.push({method: method,
62 fields: fields,
63 reply: reply});
64 }
65};
66
67C.sendMessage = function(fields, properties, content) {
68 this.sendImmediately(defs.BasicPublish, fields);
69 return this.connection.sendContent(this.ch,
70 defs.BasicProperties, properties,
71 content);
72};
73
74// Internal, synchronously resolved RPC; the return value is resolved
75// with the whole frame.
76C._rpc = function(method, fields, expect) {
77 var self = this;
78 var reply = deferSync();
79 this.sendOrEnqueue(method, fields, reply);
80 return reply.promise.then(function(f) {
81 if (f.id === expect)
82 return f;
83 else {
84 var err = fmt("Expected %s; got %s",
85 methodName(expect), inspect(f, false));
86 self.closeBecause(err, defs.constants.CHANNEL_ERROR);
87 throw new Error(err); // to reject the implicit promise returned
88 }
89 }, function(close) {
90 if (close instanceof Error) throw close;
91 var closeReason =
92 (close.fields.classId << 16)
93 + close.fields.methodId;
94 var err = (method === closeReason)
95 ? fmt("Operation failed: %s; %s",
96 methodName(method), closeMsg(close))
97 : fmt("Channel closed by server: %s", closeMsg(close));
98 throw new Error(err);
99 });
100};
101
102// An RPC that returns a 'proper' promise, which resolves to just the
103// response's fields; this is intended to be suitable for implementing
104// API procedures.
105C.rpc = function(method, fields, expect) {
106 function fields_(f) { return f.fields; }
107 return when(this._rpc(method, fields, expect).then(fields_));
108};
109
110// Do the remarkably simple channel open handshake
111C.open = function() {
112 return this.rpc(defs.ChannelOpen, {outOfBand: ""},
113 defs.ChannelOpenOk);
114};
115
116// Shutdown protocol. There's three scenarios:
117//
118// 1. The application decides to shut the channel
119// 2. The server decides to shut the channel, possibly because of
120// something the application did
121// 3. The connection is closing, so there won't be any more frames
122// going back and forth.
123//
124// 1 and 2 involve an exchange of method frames (Close and CloseOk),
125// while 3 doesn't; the connection simply says "shutdown" to the
126// channel, which then acts as if it's closing, without going through
127// the exchange.
128
129// Move to entirely closed state. If err is provided, it was closed
130// because there was an error; if not, it was all as intended.
131C.toClosed = function(err) {
132 this.sendImmediately = Channel.sendMessage = Channel.closedSend;
133 this.sendOrEnqueue = Channel.closedEnqueue;
134 this.accept = Channel.closedAccept;
135 this.connection.releaseChannel(this.ch);
136 if (err) this.emit('error', err);
137 this.emit('close');
138};
139
140Channel.closedSend = function() {
141 throw new Error("Channel is closed, do not use");
142};
143Channel.closedAccept = function(f) {
144 throw new Error(fmt("Channel is closed, not expecting frame: %s",
145 inspect(f, false)));
146};
147Channel.closedEnqueue = function(_method, _fields, reply) {
148 reply.reject(new Error("Channel is closed"));
149};
150
151// Stop being able to send and receive methods and content. Used when
152// a channel close happens, and also by the connection when it closes.
153C.stop = function() {
154 // emit this now so apps have a chance to prepare for pending RPCs
155 // to fail; but they can't respond by sending more.
156 this.emit('end');
157
158 function rej(r) {
159 r.reject(new Error("Channel ended, no reply will be forthcoming"));
160 }
161 if (this.reply) rej(this.reply);
162 var discard;
163 while (discard = this.pending.shift()) rej(discard.reply);
164 this.pending = null; // so pushes will break
165};
166
167// And the API for closing channels.
168C.close = function() {
169 return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS);
170};
171
172C.closeBecause = function(reason, code) {
173 this.sendImmediately(defs.ChannelClose, {
174 replyText: reason,
175 replyCode: code,
176 methodId:0, classId: 0
177 });
178
179 // If we're closing for any reason other than the application wanted
180 // to, it's an error
181 var err;
182 if (code !== defs.constants.REPLY_SUCCESS)
183 err = new Error(reason);
184
185 var done = defer();
186 var self = this;
187 this.accept = function(f) {
188 if (f.id === defs.ChannelCloseOk) {
189 done.resolve();
190 self.toClosed(err);
191 }
192 else if (f.id === defs.ChannelClose) {
193 this.sendImmediately(defs.ChannelCloseOk, {});
194 }
195 else; // drop the frame
196 };
197 this.stop();
198 return done.promise;
199};
200
201// A trampolining state machine for message frames on a channel. A
202// message arrives in at least three frames: first, a method
203// announcing the message (either a BasicDeliver or BasicGetOk); then,
204// a message header with the message properties; then, one or more
205// content frames. Message frames may be interleaved with method
206// frames per channel, but they must not be interleaved with other
207// message frames.
208
209// Keep the try/catch localised, in an attempt to avoid disabling
210// optimisation
211C.acceptMessageFrame = function(f) {
212 try {
213 this.handleMessage = this.handleMessage(f);
214 }
215 catch (msg) {
216 this.closeBecause(msg, defs.constants.UNEXPECTED_FRAME);
217 }
218};
219
220// Kick off a message delivery given a BasicDeliver or BasicReturn
221// frame (BasicGet uses the RPC mechanism)
222function acceptDeliveryOrReturn(f) {
223 var event;
224 if (f.id === defs.BasicDeliver) event = 'delivery';
225 else if (f.id === defs.BasicReturn) event = 'return';
226 else throw fmt("Expected BasicDeliver or BasicReturn; got %s",
227 inspect(f));
228
229 var self = this;
230 var fields = f.fields;
231 return Channel.acceptMessage(function(message) {
232 message.fields = fields;
233 self.emit(event, message);
234 });
235}
236
237// Move to the state of waiting for message frames (headers, then
238// one or more content frames)
239Channel.acceptMessage = function(continuation) {
240 var totalSize = 0, remaining = 0;
241 var buffers = null;
242
243 var message = {
244 fields: null,
245 properties: null,
246 content: null
247 };
248
249 return headers;
250
251 // expect a headers frame
252 function headers(f) {
253 if (f.id === defs.BasicProperties) {
254 message.properties = f.fields;
255 totalSize = remaining = f.size;
256 return content;
257 }
258 else {
259 throw "Expected headers frame after delivery";
260 }
261 }
262
263 // expect a content frame
264 // %%% TODO cancelled messages (sent as zero-length content frame)
265 function content(f) {
266 if (f.content) {
267 var size = f.content.length;
268 remaining -= size;
269 if (remaining === 0) {
270 if (buffers !== null) {
271 buffers.push(f.content);
272 message.content = Buffer.concat(buffers);
273 }
274 else {
275 message.content = f.content;
276 }
277 continuation(message);
278 return acceptDeliveryOrReturn;
279 }
280 else if (remaining < 0) {
281 throw fmt("Too much content sent! Expected %d bytes",
282 totalSize);
283 }
284 else {
285 if (buffers !== null)
286 buffers.push(f.content);
287 else
288 buffers = [f.content];
289 return content;
290 }
291 }
292 else throw "Expected content frame after headers"
293 }
294}
295
296C.accept = function(f) {
297
298 switch (f.id) {
299
300 // Message frames
301 case undefined: // content frame!
302 case defs.BasicDeliver:
303 case defs.BasicReturn:
304 case defs.BasicProperties:
305 return this.acceptMessageFrame(f);
306
307 // confirmations, need to do confirm.select first
308 case defs.BasicAck:
309 return this.emit('ack', f.fields);
310 case defs.BasicNack:
311 return this.emit('nack', f.fields);
312
313 case defs.ChannelClose:
314 // Any remote closure is an error to us.
315 if (this.reply) {
316 var reply = this.reply; this.reply = null;
317 reply.reject(f);
318 }
319 var e = new Error("Channel closed: " + closeMsg(f));
320 this.stop();
321 this.sendImmediately(defs.ChannelCloseOk, {});
322 this.toClosed(e);
323 return;
324
325 case defs.BasicCancel:
326 // The broker can send this if e.g., the queue is deleted.
327 // TODO (also needs to send capability in client properties)
328 return this.closeBecause("Cancel not implemented",
329 defs.constants.NOT_IMPLEMENTED);
330 case defs.BasicFlow:
331 // RabbitMQ doesn't send this, it just blocks the TCP socket
332 return this.closeBecause("Flow not implemented",
333 defs.constants.NOT_IMPLEMENTED);
334
335 default: // assume all other things are replies
336 // Resolving the reply may lead to another RPC; to make sure we
337 // don't hold that up, clear this.reply
338 var reply = this.reply; this.reply = null;
339 // however, maybe there's an RPC waiting to go? If so, that'll
340 // fill this.reply again, restoring the invariant. This does rely
341 // on any response being recv'ed after resolving the promise,
342 // below; hence, I use synchronous defer.
343 if (this.pending.length > 0) {
344 var send = this.pending.shift();
345 this.reply = send.reply;
346 this.sendImmediately(send.method, send.fields);
347 }
348 return reply.resolve(f);
349 }
350};