1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | var defs = require('./defs');
|
8 | var closeMsg = require('./format').closeMessage;
|
9 | var inspect = require('./format').inspect;
|
10 | var methodName = require('./format').methodName;
|
11 | var when = require('when'), defer = when.defer;
|
12 | var deferSync = require('./sync_defer').defer;
|
13 | var assert = require('assert');
|
14 | var inherits = require('util').inherits;
|
15 | var EventEmitter = require('events').EventEmitter;
|
16 | var fmt = require('util').format;
|
17 |
|
18 | function Channel(connection) {
|
19 | this.ch = connection.freshChannel(this);
|
20 | this.connection = connection;
|
21 |
|
22 | this.reply = null;
|
23 |
|
24 | this.pending = [];
|
25 | this.handleMessage = acceptDeliveryOrReturn;
|
26 | }
|
27 | inherits(Channel, EventEmitter);
|
28 |
|
29 | module.exports.Channel = Channel;
|
30 |
|
31 | var C = Channel.prototype;
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 | C.sendImmediately = function(method, fields) {
|
47 | return this.connection.sendMethod(this.ch, method, fields);
|
48 | };
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 | C.sendOrEnqueue = function(method, fields, reply) {
|
55 | if (!this.reply) {
|
56 | assert(this.pending.length === 0);
|
57 | this.reply = reply;
|
58 | this.sendImmediately(method, fields);
|
59 | }
|
60 | else {
|
61 | this.pending.push({method: method,
|
62 | fields: fields,
|
63 | reply: reply});
|
64 | }
|
65 | };
|
66 |
|
67 | C.sendMessage = function(fields, properties, content) {
|
68 | this.sendImmediately(defs.BasicPublish, fields);
|
69 | return this.connection.sendContent(this.ch,
|
70 | defs.BasicProperties, properties,
|
71 | content);
|
72 | };
|
73 |
|
74 |
|
75 |
|
76 | C._rpc = function(method, fields, expect) {
|
77 | var self = this;
|
78 | var reply = deferSync();
|
79 | this.sendOrEnqueue(method, fields, reply);
|
80 | return reply.promise.then(function(f) {
|
81 | if (f.id === expect)
|
82 | return f;
|
83 | else {
|
84 | var err = fmt("Expected %s; got %s",
|
85 | methodName(expect), inspect(f, false));
|
86 | self.closeBecause(err, defs.constants.CHANNEL_ERROR);
|
87 | throw new Error(err);
|
88 | }
|
89 | }, function(close) {
|
90 | if (close instanceof Error) throw close;
|
91 | var closeReason =
|
92 | (close.fields.classId << 16)
|
93 | + close.fields.methodId;
|
94 | var err = (method === closeReason)
|
95 | ? fmt("Operation failed: %s; %s",
|
96 | methodName(method), closeMsg(close))
|
97 | : fmt("Channel closed by server: %s", closeMsg(close));
|
98 | throw new Error(err);
|
99 | });
|
100 | };
|
101 |
|
102 |
|
103 |
|
104 |
|
105 | C.rpc = function(method, fields, expect) {
|
106 | function fields_(f) { return f.fields; }
|
107 | return when(this._rpc(method, fields, expect).then(fields_));
|
108 | };
|
109 |
|
110 |
|
111 | C.open = function() {
|
112 | return this.rpc(defs.ChannelOpen, {outOfBand: ""},
|
113 | defs.ChannelOpenOk);
|
114 | };
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 | C.toClosed = function(err) {
|
132 | this.sendImmediately = Channel.sendMessage = Channel.closedSend;
|
133 | this.sendOrEnqueue = Channel.closedEnqueue;
|
134 | this.accept = Channel.closedAccept;
|
135 | this.connection.releaseChannel(this.ch);
|
136 | if (err) this.emit('error', err);
|
137 | this.emit('close');
|
138 | };
|
139 |
|
140 | Channel.closedSend = function() {
|
141 | throw new Error("Channel is closed, do not use");
|
142 | };
|
143 | Channel.closedAccept = function(f) {
|
144 | throw new Error(fmt("Channel is closed, not expecting frame: %s",
|
145 | inspect(f, false)));
|
146 | };
|
147 | Channel.closedEnqueue = function(_method, _fields, reply) {
|
148 | reply.reject(new Error("Channel is closed"));
|
149 | };
|
150 |
|
151 |
|
152 |
|
153 | C.stop = function() {
|
154 |
|
155 |
|
156 | this.emit('end');
|
157 |
|
158 | function rej(r) {
|
159 | r.reject(new Error("Channel ended, no reply will be forthcoming"));
|
160 | }
|
161 | if (this.reply) rej(this.reply);
|
162 | var discard;
|
163 | while (discard = this.pending.shift()) rej(discard.reply);
|
164 | this.pending = null;
|
165 | };
|
166 |
|
167 |
|
168 | C.close = function() {
|
169 | return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS);
|
170 | };
|
171 |
|
172 | C.closeBecause = function(reason, code) {
|
173 | this.sendImmediately(defs.ChannelClose, {
|
174 | replyText: reason,
|
175 | replyCode: code,
|
176 | methodId:0, classId: 0
|
177 | });
|
178 |
|
179 |
|
180 |
|
181 | var err;
|
182 | if (code !== defs.constants.REPLY_SUCCESS)
|
183 | err = new Error(reason);
|
184 |
|
185 | var done = defer();
|
186 | var self = this;
|
187 | this.accept = function(f) {
|
188 | if (f.id === defs.ChannelCloseOk) {
|
189 | done.resolve();
|
190 | self.toClosed(err);
|
191 | }
|
192 | else if (f.id === defs.ChannelClose) {
|
193 | this.sendImmediately(defs.ChannelCloseOk, {});
|
194 | }
|
195 | else;
|
196 | };
|
197 | this.stop();
|
198 | return done.promise;
|
199 | };
|
200 |
|
201 |
|
202 |
|
203 |
|
204 |
|
205 |
|
206 |
|
207 |
|
208 |
|
209 |
|
210 |
|
211 | C.acceptMessageFrame = function(f) {
|
212 | try {
|
213 | this.handleMessage = this.handleMessage(f);
|
214 | }
|
215 | catch (msg) {
|
216 | this.closeBecause(msg, defs.constants.UNEXPECTED_FRAME);
|
217 | }
|
218 | };
|
219 |
|
220 |
|
221 |
|
222 | function acceptDeliveryOrReturn(f) {
|
223 | var event;
|
224 | if (f.id === defs.BasicDeliver) event = 'delivery';
|
225 | else if (f.id === defs.BasicReturn) event = 'return';
|
226 | else throw fmt("Expected BasicDeliver or BasicReturn; got %s",
|
227 | inspect(f));
|
228 |
|
229 | var self = this;
|
230 | var fields = f.fields;
|
231 | return Channel.acceptMessage(function(message) {
|
232 | message.fields = fields;
|
233 | self.emit(event, message);
|
234 | });
|
235 | }
|
236 |
|
237 |
|
238 |
|
239 | Channel.acceptMessage = function(continuation) {
|
240 | var totalSize = 0, remaining = 0;
|
241 | var buffers = null;
|
242 |
|
243 | var message = {
|
244 | fields: null,
|
245 | properties: null,
|
246 | content: null
|
247 | };
|
248 |
|
249 | return headers;
|
250 |
|
251 |
|
252 | function headers(f) {
|
253 | if (f.id === defs.BasicProperties) {
|
254 | message.properties = f.fields;
|
255 | totalSize = remaining = f.size;
|
256 | return content;
|
257 | }
|
258 | else {
|
259 | throw "Expected headers frame after delivery";
|
260 | }
|
261 | }
|
262 |
|
263 |
|
264 |
|
265 | function content(f) {
|
266 | if (f.content) {
|
267 | var size = f.content.length;
|
268 | remaining -= size;
|
269 | if (remaining === 0) {
|
270 | if (buffers !== null) {
|
271 | buffers.push(f.content);
|
272 | message.content = Buffer.concat(buffers);
|
273 | }
|
274 | else {
|
275 | message.content = f.content;
|
276 | }
|
277 | continuation(message);
|
278 | return acceptDeliveryOrReturn;
|
279 | }
|
280 | else if (remaining < 0) {
|
281 | throw fmt("Too much content sent! Expected %d bytes",
|
282 | totalSize);
|
283 | }
|
284 | else {
|
285 | if (buffers !== null)
|
286 | buffers.push(f.content);
|
287 | else
|
288 | buffers = [f.content];
|
289 | return content;
|
290 | }
|
291 | }
|
292 | else throw "Expected content frame after headers"
|
293 | }
|
294 | }
|
295 |
|
296 | C.accept = function(f) {
|
297 |
|
298 | switch (f.id) {
|
299 |
|
300 |
|
301 | case undefined:
|
302 | case defs.BasicDeliver:
|
303 | case defs.BasicReturn:
|
304 | case defs.BasicProperties:
|
305 | return this.acceptMessageFrame(f);
|
306 |
|
307 |
|
308 | case defs.BasicAck:
|
309 | return this.emit('ack', f.fields);
|
310 | case defs.BasicNack:
|
311 | return this.emit('nack', f.fields);
|
312 |
|
313 | case defs.ChannelClose:
|
314 |
|
315 | if (this.reply) {
|
316 | var reply = this.reply; this.reply = null;
|
317 | reply.reject(f);
|
318 | }
|
319 | var e = new Error("Channel closed: " + closeMsg(f));
|
320 | this.stop();
|
321 | this.sendImmediately(defs.ChannelCloseOk, {});
|
322 | this.toClosed(e);
|
323 | return;
|
324 |
|
325 | case defs.BasicCancel:
|
326 |
|
327 |
|
328 | return this.closeBecause("Cancel not implemented",
|
329 | defs.constants.NOT_IMPLEMENTED);
|
330 | case defs.BasicFlow:
|
331 |
|
332 | return this.closeBecause("Flow not implemented",
|
333 | defs.constants.NOT_IMPLEMENTED);
|
334 |
|
335 | default:
|
336 |
|
337 |
|
338 | var reply = this.reply; this.reply = null;
|
339 |
|
340 |
|
341 |
|
342 |
|
343 | if (this.pending.length > 0) {
|
344 | var send = this.pending.shift();
|
345 | this.reply = send.reply;
|
346 | this.sendImmediately(send.method, send.fields);
|
347 | }
|
348 | return reply.resolve(f);
|
349 | }
|
350 | };
|