1 | //
|
2 | //
|
3 | // Beating train rhythms
|
4 |
|
5 | var defs = require('./defs');
|
6 | var Frames = require('./frame');
|
7 | var HEARTBEAT = Frames.HEARTBEAT;
|
8 | var Heart = require('./heartbeat').Heart;
|
9 | var inspect = require('./format').inspect;
|
10 | var methodName = require('./format').methodName;
|
11 | var closeMsg = require('./format').closeMessage;
|
12 | var BitSet = require('./bitset').BitSet;
|
13 | var defer = require('when').defer;
|
14 | var inherits = require('util').inherits;
|
15 | var fmt = require('util').format;
|
16 |
|
17 | function Connection(underlying) {
|
18 | Frames.call(this, underlying);
|
19 | // `'close'` isn't emitted by all streams, but `'end'` should be; we
|
20 | // want to know if the stream closes without a closing handshake
|
21 | underlying.once(
|
22 | 'end', this.onSocketError.bind(this, new Error('Unexpected close')));
|
23 | // or with an error
|
24 | underlying.once('error', this.onSocketError.bind(this));
|
25 | this.expectSocketClose = false;
|
26 | this.freeChannels = new BitSet();
|
27 | this.channels = [{accept: channel0(this)}];
|
28 | }
|
29 | inherits(Connection, Frames);
|
30 |
|
31 | var C = Connection.prototype;
|
32 |
|
33 | // Usual frame accept mode
|
34 | function mainAccept(frame) {
|
35 | var channel = this.channels[frame.channel];
|
36 | if (channel) { return channel.accept(frame); }
|
37 | // NB CHANNEL_ERROR may not be right, but I don't know what is ..
|
38 | else this.closeBecause(fmt("Frame on unknown channel: %s",
|
39 | inspect(frame, false)),
|
40 | defs.constants.CHANNEL_ERROR);
|
41 | };
|
42 |
|
43 | // Handle anything that comes through on channel 0, that's the
|
44 | // connection control channel. This is only used once mainAccept is
|
45 | // installed as the frame handler, after the opening handshake.
|
46 | function channel0(connection) {
|
47 | return function(f) {
|
48 | // Once we get a 'close', we know 1. we'll get no more frames, and
|
49 | // 2. anything we send except close, or close-ok, will be
|
50 | // ignored. If we already sent 'close', this won't be invoked since
|
51 | // we're already in closing mode; if we didn't well we're not going
|
52 | // to send it now are we.
|
53 | if (f === HEARTBEAT); // ignore; it's already counted as activity
|
54 | // on the socket, which is its purpose
|
55 | else if (f.id === defs.ConnectionClose) {
|
56 | // Oh. OK. I guess we're done here then.
|
57 | connection.stop();
|
58 | connection.sendMethod(0, defs.ConnectionCloseOk, {});
|
59 | var err = new Error(fmt('Connection closed: %s', closeMsg(f)));
|
60 | connection.toClosed(err);
|
61 | }
|
62 | else {
|
63 | connection.closeBecause(fmt("Unexpected frame on channel 0: %s",
|
64 | inspect(f, false)),
|
65 | defs.constants.UNEXPECTED_FRAME);
|
66 | }
|
67 | };
|
68 | };
|
69 |
|
70 |
|
71 | // This changed between versions, as did the codec, methods, etc. AMQP
|
72 | // 0-9-1 is fairly similar to 0.8, but better, and nothing implements
|
73 | // 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
|
74 | // much sense to generalise here.
|
75 | C.sendProtocolHeader = function() {
|
76 | this.sendBytes("AMQP" + String.fromCharCode(0, 0, 9, 1));
|
77 | };
|
78 |
|
79 | /*
|
80 | The frighteningly complicated opening protocol (spec section 2.2.4):
|
81 |
|
82 | Client -> Server
|
83 |
|
84 | protocol header ->
|
85 | <- start
|
86 | start-ok ->
|
87 | .. next two zero or more times ..
|
88 | <- secure
|
89 | secure-ok ->
|
90 | <- tune
|
91 | tune-ok ->
|
92 | open ->
|
93 | <- open-ok
|
94 |
|
95 | If I'm only supporting SASL's PLAIN mechanism (which I am for the time
|
96 | being), it gets a bit easier since the server won't in general send
|
97 | back a `secure`, it'll just send `tune` after the `start-ok`.
|
98 | (SASL PLAIN: http://tools.ietf.org/html/rfc4616)
|
99 |
|
100 | */
|
101 |
|
102 | C.open = function(allOptions) {
|
103 | this.sendProtocolHeader();
|
104 | var self = this;
|
105 |
|
106 | // This is where we'll put our negotiated values
|
107 | var tunedOptions = Object.create(allOptions);
|
108 |
|
109 | function await() {
|
110 | var reply = defer();
|
111 | self.accept = function(frame) {
|
112 | if (frame.channel !== 0)
|
113 | reply.reject(
|
114 | new Error(fmt("Frame on channel != 0 during handshake: %s",
|
115 | inspect(frame, false))));
|
116 | else
|
117 | reply.resolve(frame);
|
118 | };
|
119 | self.step();
|
120 | return reply.promise;
|
121 | }
|
122 |
|
123 | function expect(Method) {
|
124 | return await().then(function(frame) {
|
125 | if (frame.id === Method)
|
126 | return frame;
|
127 | else
|
128 | throw new Error(fmt("Expected %s; got %s",
|
129 | methodName(Method), inspect(frame, false)));
|
130 | });
|
131 | }
|
132 |
|
133 | function send(Method) {
|
134 | self.sendMethod(0, Method, tunedOptions);
|
135 | }
|
136 |
|
137 | function negotiate(server, client) {
|
138 | // We get sent values for channelMax, frameMax and heartbeat,
|
139 | // which we may accept or lower (subject to a minimum for
|
140 | // frameMax, but we'll leave that to the server to enforce). In
|
141 | // all cases, `0` really means `+infinity`, that is, no limit
|
142 | // (except that of the encoded representation, e.g., unsigned
|
143 | // short for channelMax). RabbitMQ allows all these figures to be
|
144 | // negotiated downward, *including* to zero i.e., no limit.
|
145 | return (server === 0) ? client : Math.min(server, client);
|
146 | }
|
147 |
|
148 | var opened = expect(defs.ConnectionStart)
|
149 | .then(function(start) {
|
150 | send(defs.ConnectionStartOk);
|
151 | return await();
|
152 | })
|
153 | .then(function(reply) {
|
154 | switch (reply.id) {
|
155 | case defs.ConnectionSecure:
|
156 | throw new Error(
|
157 | "Wasn't expecting to have to go through secure");
|
158 | case defs.ConnectionTune:
|
159 | var fields = reply.fields;
|
160 | tunedOptions.frameMax =
|
161 | negotiate(fields.frameMax, allOptions.frameMax);
|
162 | tunedOptions.channelMax =
|
163 | negotiate(fields.channelMax, allOptions.channelMax);
|
164 | tunedOptions.heartbeat =
|
165 | negotiate(fields.heartbeat, allOptions.heartbeat);
|
166 | send(defs.ConnectionTuneOk);
|
167 | send(defs.ConnectionOpen);
|
168 | return expect(defs.ConnectionOpenOk);
|
169 | default:
|
170 | throw new Error(
|
171 | fmt("Expected secure or tune during handshake; got %s",
|
172 | inspect(reply, false)));
|
173 | }
|
174 | })
|
175 | .then(function(openOk) {
|
176 | self.accept = mainAccept;
|
177 | self.channelMax = tunedOptions.channelMax || 0xffff;
|
178 | self.frameMax = tunedOptions.frameMax || 0xffffffff;
|
179 | self.heartbeat = tunedOptions.heartbeat;
|
180 | self.heartbeater = self.startHeartbeater();
|
181 | self.run();
|
182 | return openOk;
|
183 | });
|
184 |
|
185 | return opened;
|
186 | };
|
187 |
|
188 | // Closing things: AMQP has a closing handshake that applies to
|
189 | // closing both connects and channels. As the initiating party, I send
|
190 | // 'close', then ignore all frames until I see either 'close-ok' --
|
191 | // which signifies that the other party has seen the 'close' and shut
|
192 | // the connection or channel down, so it's fine to free resources; or
|
193 | // 'close', which means the other party also wanted to close the
|
194 | // whatever, and I should send 'close-ok' so it can free resources,
|
195 | // then go back to waiting for the 'close-ok'. If I receive a 'close'
|
196 | // out of the blue, I should throw away any unsent frames (they will
|
197 | // be ignored anyway) and send 'close-ok', then clean up resources. In
|
198 | // general, 'close' out of the blue signals an error (or a forced
|
199 | // closure, which may as well be an error).
|
200 | //
|
201 | // RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
|
202 | // | | [3]
|
203 | // | +------ send CloseOk ------+
|
204 | // recv Close recv CloseOk
|
205 | // | |
|
206 | // V V
|
207 | // Ended [4] ---- send CloseOk ---> Closed [5]
|
208 | //
|
209 | // [1] All frames accepted; getting a Close frame from the server
|
210 | // moves to Ended; client may initiate a close by sending Close
|
211 | // itself.
|
212 | // [2] Client has initiated a close; only CloseOk or (simulataneously
|
213 | // sent) Close is accepted.
|
214 | // [3] Simutaneous close: signal 'end'.
|
215 | // [4] Server won't send any more frames; accept no more frames, send
|
216 | // CloseOk. Signal 'end'.
|
217 | // [5] Fully closed, client will send no more, server will send no
|
218 | // more. Signal 'close' or 'error'.
|
219 | //
|
220 | // There are two signalling mechanisms used in the API. The first is
|
221 | // that calling `close` or `closeBecause` will return a promise, that
|
222 | // will either resolve once the connection or channel is cleanly shut
|
223 | // down, or will reject if the shutdown times out.
|
224 | //
|
225 | // The second is the 'end', 'close' and 'error' events. These are
|
226 | // emitted as above. The events will fire *before* promises are
|
227 | // resolved.
|
228 |
|
229 | // Close the connection without even giving a reason. Typical.
|
230 | C.close = function() {
|
231 | return this.closeBecause("Cheers", defs.constants.REPLY_SUCCESS);
|
232 | };
|
233 |
|
234 | // Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
|
235 | // ignores these; maybe it logs them. Returns a promise that will be
|
236 | // resolved when the CloseOk has been received; NB the 'close' event
|
237 | // will be emitted once the underlying stream is ended.
|
238 | C.closeBecause = function(reason, code) {
|
239 | this.sendMethod(0, defs.ConnectionClose, {
|
240 | replyText: reason,
|
241 | replyCode: code,
|
242 | methodId: 0, classId: 0
|
243 | });
|
244 |
|
245 | var err;
|
246 | if (code !== defs.constants.REPLY_SUCCESS)
|
247 | err = new Error(reason);
|
248 |
|
249 | var self = this;
|
250 | var done = defer();
|
251 | this.accept = function(f) {
|
252 | if (f.id === defs.ConnectionCloseOk) {
|
253 | done.resolve();
|
254 | self.toClosed(err);
|
255 | }
|
256 | else if (f.id === defs.ConnectionClose) {
|
257 | self.sendMethod(0, defs.ConnectionCloseOk, {});
|
258 | }
|
259 | else;
|
260 | };
|
261 | this.stop();
|
262 | return done.promise;
|
263 | };
|
264 |
|
265 | // A close has been initiated. Repeat: a close has been initiated.
|
266 | // This means we should not send more frames, anyway they will be
|
267 | // ignored. We also have to shut down all the channels.
|
268 | C.stop = function() {
|
269 | for (var i = 1; i < this.channels.length; i++) {
|
270 | var ch = this.channels[i];
|
271 | if (ch) {
|
272 | ch.stop();
|
273 | ch.toClosed(); // %%% or with an error? not clear
|
274 | }
|
275 | }
|
276 | };
|
277 |
|
278 | function closedSend() {
|
279 | throw new Error("Connection closed");
|
280 | }
|
281 | function closedAccept(f) {
|
282 | throw new Error(fmt("Unexpected frame on closed connection: %s",
|
283 | inspect(f, false)));
|
284 | }
|
285 |
|
286 | C.onSocketError = function(err) {
|
287 | if (!this.expectSocketClose) {
|
288 | // forestall any more calls to onSocketError, since we're signed
|
289 | // up for `'error'` *and* `'end'`
|
290 | this.expectSocketClose = true;
|
291 | this.stop();
|
292 | this.toClosed(err);
|
293 | }
|
294 | };
|
295 |
|
296 | // A close has been confirmed. Cease all communication.
|
297 | C.toClosed = function(err) {
|
298 | // Tidy up, invalidate enverything, dynamite the bridges.
|
299 | this.sendMethod = this.sendContent = closedSend;
|
300 | this.accept = closedAccept;
|
301 | if (this.heartbeater) this.heartbeater.clear();
|
302 | // This is certainly true now, if it wasn't before
|
303 | this.expectSocketClose = true;
|
304 | this.end();
|
305 | if (err) this.emit('error', err);
|
306 | this.emit('close');
|
307 | };
|
308 |
|
309 | C.startHeartbeater = function() {
|
310 | if (this.heartbeat === 0) return null;
|
311 | else {
|
312 | var self = this;
|
313 | var hb = new Heart(this.heartbeat,
|
314 | this.checkSend.bind(this),
|
315 | this.checkRecv.bind(this));
|
316 | hb.on('timeout', function() {
|
317 | self.toClosed(new Error("Heartbeat timeout"));
|
318 | });
|
319 | hb.on('beat', function() {
|
320 | self.sendHeartbeat();
|
321 | });
|
322 | return hb;
|
323 | }
|
324 | };
|
325 |
|
326 | // I use an array to keep track of the channels, rather than an
|
327 | // object. The channel identifiers are numbers, and allocated by the
|
328 | // connection. If I try to allocate low numbers when they are
|
329 | // available (which I do, by looking from the start of the bitset),
|
330 | // this ought to keep the array small, and out of 'sparse array
|
331 | // storage'. I also set entries to null, rather than deleting them, in
|
332 | // the expectation that the next channel allocation will fill the slot
|
333 | // again rather than growing the array. See
|
334 | // http://www.html5rocks.com/en/tutorials/speed/v8/
|
335 | C.freshChannel = function(channel) {
|
336 | var next = this.freeChannels.nextClearBit(1);
|
337 | if (next < 0 || next > this.channelMax)
|
338 | throw new Error("No channels left to allocate");
|
339 | this.freeChannels.set(next);
|
340 | this.channels[next] = channel;
|
341 | return next;
|
342 | };
|
343 |
|
344 | C.releaseChannel = function(channel) {
|
345 | this.freeChannels.clear(channel);
|
346 | this.channels[channel] = null;
|
347 | };
|
348 |
|
349 | module.exports.Connection = Connection;
|