UNPKG

12 kBJavaScriptView Raw
1//
2//
3// Beating train rhythms
4
5var defs = require('./defs');
6var Frames = require('./frame');
7var HEARTBEAT = Frames.HEARTBEAT;
8var Heart = require('./heartbeat').Heart;
9var inspect = require('./format').inspect;
10var methodName = require('./format').methodName;
11var closeMsg = require('./format').closeMessage;
12var BitSet = require('./bitset').BitSet;
13var defer = require('when').defer;
14var inherits = require('util').inherits;
15var fmt = require('util').format;
16
17function Connection(underlying) {
18 Frames.call(this, underlying);
19 // `'close'` isn't emitted by all streams, but `'end'` should be; we
20 // want to know if the stream closes without a closing handshake
21 underlying.once(
22 'end', this.onSocketError.bind(this, new Error('Unexpected close')));
23 // or with an error
24 underlying.once('error', this.onSocketError.bind(this));
25 this.expectSocketClose = false;
26 this.freeChannels = new BitSet();
27 this.channels = [{accept: channel0(this)}];
28}
29inherits(Connection, Frames);
30
31var C = Connection.prototype;
32
33// Usual frame accept mode
34function mainAccept(frame) {
35 var channel = this.channels[frame.channel];
36 if (channel) { return channel.accept(frame); }
37 // NB CHANNEL_ERROR may not be right, but I don't know what is ..
38 else this.closeBecause(fmt("Frame on unknown channel: %s",
39 inspect(frame, false)),
40 defs.constants.CHANNEL_ERROR);
41};
42
43// Handle anything that comes through on channel 0, that's the
44// connection control channel. This is only used once mainAccept is
45// installed as the frame handler, after the opening handshake.
46function channel0(connection) {
47 return function(f) {
48 // Once we get a 'close', we know 1. we'll get no more frames, and
49 // 2. anything we send except close, or close-ok, will be
50 // ignored. If we already sent 'close', this won't be invoked since
51 // we're already in closing mode; if we didn't well we're not going
52 // to send it now are we.
53 if (f === HEARTBEAT); // ignore; it's already counted as activity
54 // on the socket, which is its purpose
55 else if (f.id === defs.ConnectionClose) {
56 // Oh. OK. I guess we're done here then.
57 connection.stop();
58 connection.sendMethod(0, defs.ConnectionCloseOk, {});
59 var err = new Error(fmt('Connection closed: %s', closeMsg(f)));
60 connection.toClosed(err);
61 }
62 else {
63 connection.closeBecause(fmt("Unexpected frame on channel 0: %s",
64 inspect(f, false)),
65 defs.constants.UNEXPECTED_FRAME);
66 }
67 };
68};
69
70
71// This changed between versions, as did the codec, methods, etc. AMQP
72// 0-9-1 is fairly similar to 0.8, but better, and nothing implements
73// 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
74// much sense to generalise here.
75C.sendProtocolHeader = function() {
76 this.sendBytes("AMQP" + String.fromCharCode(0, 0, 9, 1));
77};
78
79/*
80 The frighteningly complicated opening protocol (spec section 2.2.4):
81
82 Client -> Server
83
84 protocol header ->
85 <- start
86 start-ok ->
87 .. next two zero or more times ..
88 <- secure
89 secure-ok ->
90 <- tune
91 tune-ok ->
92 open ->
93 <- open-ok
94
95If I'm only supporting SASL's PLAIN mechanism (which I am for the time
96being), it gets a bit easier since the server won't in general send
97back a `secure`, it'll just send `tune` after the `start-ok`.
98(SASL PLAIN: http://tools.ietf.org/html/rfc4616)
99
100*/
101
102C.open = function(allOptions) {
103 this.sendProtocolHeader();
104 var self = this;
105
106 // This is where we'll put our negotiated values
107 var tunedOptions = Object.create(allOptions);
108
109 function await() {
110 var reply = defer();
111 self.accept = function(frame) {
112 if (frame.channel !== 0)
113 reply.reject(
114 new Error(fmt("Frame on channel != 0 during handshake: %s",
115 inspect(frame, false))));
116 else
117 reply.resolve(frame);
118 };
119 self.step();
120 return reply.promise;
121 }
122
123 function expect(Method) {
124 return await().then(function(frame) {
125 if (frame.id === Method)
126 return frame;
127 else
128 throw new Error(fmt("Expected %s; got %s",
129 methodName(Method), inspect(frame, false)));
130 });
131 }
132
133 function send(Method) {
134 self.sendMethod(0, Method, tunedOptions);
135 }
136
137 function negotiate(server, client) {
138 // We get sent values for channelMax, frameMax and heartbeat,
139 // which we may accept or lower (subject to a minimum for
140 // frameMax, but we'll leave that to the server to enforce). In
141 // all cases, `0` really means `+infinity`, that is, no limit
142 // (except that of the encoded representation, e.g., unsigned
143 // short for channelMax). RabbitMQ allows all these figures to be
144 // negotiated downward, *including* to zero i.e., no limit.
145 return (server === 0) ? client : Math.min(server, client);
146 }
147
148 var opened = expect(defs.ConnectionStart)
149 .then(function(start) {
150 send(defs.ConnectionStartOk);
151 return await();
152 })
153 .then(function(reply) {
154 switch (reply.id) {
155 case defs.ConnectionSecure:
156 throw new Error(
157 "Wasn't expecting to have to go through secure");
158 case defs.ConnectionTune:
159 var fields = reply.fields;
160 tunedOptions.frameMax =
161 negotiate(fields.frameMax, allOptions.frameMax);
162 tunedOptions.channelMax =
163 negotiate(fields.channelMax, allOptions.channelMax);
164 tunedOptions.heartbeat =
165 negotiate(fields.heartbeat, allOptions.heartbeat);
166 send(defs.ConnectionTuneOk);
167 send(defs.ConnectionOpen);
168 return expect(defs.ConnectionOpenOk);
169 default:
170 throw new Error(
171 fmt("Expected secure or tune during handshake; got %s",
172 inspect(reply, false)));
173 }
174 })
175 .then(function(openOk) {
176 self.accept = mainAccept;
177 self.channelMax = tunedOptions.channelMax || 0xffff;
178 self.frameMax = tunedOptions.frameMax || 0xffffffff;
179 self.heartbeat = tunedOptions.heartbeat;
180 self.heartbeater = self.startHeartbeater();
181 self.run();
182 return openOk;
183 });
184
185 return opened;
186};
187
188// Closing things: AMQP has a closing handshake that applies to
189// closing both connects and channels. As the initiating party, I send
190// 'close', then ignore all frames until I see either 'close-ok' --
191// which signifies that the other party has seen the 'close' and shut
192// the connection or channel down, so it's fine to free resources; or
193// 'close', which means the other party also wanted to close the
194// whatever, and I should send 'close-ok' so it can free resources,
195// then go back to waiting for the 'close-ok'. If I receive a 'close'
196// out of the blue, I should throw away any unsent frames (they will
197// be ignored anyway) and send 'close-ok', then clean up resources. In
198// general, 'close' out of the blue signals an error (or a forced
199// closure, which may as well be an error).
200//
201// RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
202// | | [3]
203// | +------ send CloseOk ------+
204// recv Close recv CloseOk
205// | |
206// V V
207// Ended [4] ---- send CloseOk ---> Closed [5]
208//
209// [1] All frames accepted; getting a Close frame from the server
210// moves to Ended; client may initiate a close by sending Close
211// itself.
212// [2] Client has initiated a close; only CloseOk or (simulataneously
213// sent) Close is accepted.
214// [3] Simutaneous close: signal 'end'.
215// [4] Server won't send any more frames; accept no more frames, send
216// CloseOk. Signal 'end'.
217// [5] Fully closed, client will send no more, server will send no
218// more. Signal 'close' or 'error'.
219//
220// There are two signalling mechanisms used in the API. The first is
221// that calling `close` or `closeBecause` will return a promise, that
222// will either resolve once the connection or channel is cleanly shut
223// down, or will reject if the shutdown times out.
224//
225// The second is the 'end', 'close' and 'error' events. These are
226// emitted as above. The events will fire *before* promises are
227// resolved.
228
229// Close the connection without even giving a reason. Typical.
230C.close = function() {
231 return this.closeBecause("Cheers", defs.constants.REPLY_SUCCESS);
232};
233
234// Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
235// ignores these; maybe it logs them. Returns a promise that will be
236// resolved when the CloseOk has been received; NB the 'close' event
237// will be emitted once the underlying stream is ended.
238C.closeBecause = function(reason, code) {
239 this.sendMethod(0, defs.ConnectionClose, {
240 replyText: reason,
241 replyCode: code,
242 methodId: 0, classId: 0
243 });
244
245 var err;
246 if (code !== defs.constants.REPLY_SUCCESS)
247 err = new Error(reason);
248
249 var self = this;
250 var done = defer();
251 this.accept = function(f) {
252 if (f.id === defs.ConnectionCloseOk) {
253 done.resolve();
254 self.toClosed(err);
255 }
256 else if (f.id === defs.ConnectionClose) {
257 self.sendMethod(0, defs.ConnectionCloseOk, {});
258 }
259 else;
260 };
261 this.stop();
262 return done.promise;
263};
264
265// A close has been initiated. Repeat: a close has been initiated.
266// This means we should not send more frames, anyway they will be
267// ignored. We also have to shut down all the channels.
268C.stop = function() {
269 for (var i = 1; i < this.channels.length; i++) {
270 var ch = this.channels[i];
271 if (ch) {
272 ch.stop();
273 ch.toClosed(); // %%% or with an error? not clear
274 }
275 }
276};
277
278function closedSend() {
279 throw new Error("Connection closed");
280}
281function closedAccept(f) {
282 throw new Error(fmt("Unexpected frame on closed connection: %s",
283 inspect(f, false)));
284}
285
286C.onSocketError = function(err) {
287 if (!this.expectSocketClose) {
288 // forestall any more calls to onSocketError, since we're signed
289 // up for `'error'` *and* `'end'`
290 this.expectSocketClose = true;
291 this.stop();
292 this.toClosed(err);
293 }
294};
295
296// A close has been confirmed. Cease all communication.
297C.toClosed = function(err) {
298 // Tidy up, invalidate enverything, dynamite the bridges.
299 this.sendMethod = this.sendContent = closedSend;
300 this.accept = closedAccept;
301 if (this.heartbeater) this.heartbeater.clear();
302 // This is certainly true now, if it wasn't before
303 this.expectSocketClose = true;
304 this.end();
305 if (err) this.emit('error', err);
306 this.emit('close');
307};
308
309C.startHeartbeater = function() {
310 if (this.heartbeat === 0) return null;
311 else {
312 var self = this;
313 var hb = new Heart(this.heartbeat,
314 this.checkSend.bind(this),
315 this.checkRecv.bind(this));
316 hb.on('timeout', function() {
317 self.toClosed(new Error("Heartbeat timeout"));
318 });
319 hb.on('beat', function() {
320 self.sendHeartbeat();
321 });
322 return hb;
323 }
324};
325
326// I use an array to keep track of the channels, rather than an
327// object. The channel identifiers are numbers, and allocated by the
328// connection. If I try to allocate low numbers when they are
329// available (which I do, by looking from the start of the bitset),
330// this ought to keep the array small, and out of 'sparse array
331// storage'. I also set entries to null, rather than deleting them, in
332// the expectation that the next channel allocation will fill the slot
333// again rather than growing the array. See
334// http://www.html5rocks.com/en/tutorials/speed/v8/
335C.freshChannel = function(channel) {
336 var next = this.freeChannels.nextClearBit(1);
337 if (next < 0 || next > this.channelMax)
338 throw new Error("No channels left to allocate");
339 this.freeChannels.set(next);
340 this.channels[next] = channel;
341 return next;
342};
343
344C.releaseChannel = function(channel) {
345 this.freeChannels.clear(channel);
346 this.channels[channel] = null;
347};
348
349module.exports.Connection = Connection;