UNPKG

2.53 kBJavaScriptView Raw
1//
2//
3//
4
5// General-purpose API for glueing everything together.
6
7var URL = require('url');
8var QS = require('querystring');
9var defer = require('when').defer;
10var Connection = require('./connection').Connection;
11var clone = require('util')._extend.bind(null, {});
12
13// Parse a URL to get the options used in the opening protocol
14function openOptionsFromURL(parts) {
15 var user = 'guest', passwd = 'guest';
16 if (parts.auth) {
17 auth = parts.auth.split(':');
18 user = auth[0];
19 passwd = auth[1];
20 }
21
22 var vhost = parts.pathname;
23 if (!vhost)
24 vhost = '/';
25 else
26 vhost = QS.unescape(vhost.substr(1));
27
28 var q = parts.query || {};
29
30 function intOrDefault(val, def) {
31 return (val === undefined) ? def : parseInt(val);
32 }
33
34 return {
35 // start-ok
36 'clientProperties': {},
37 'mechanism': 'PLAIN',
38 'response': new Buffer(['', user, passwd].join(String.fromCharCode(0))),
39 'locale': q.locale || 'en_US',
40
41 // tune-ok
42 'channelMax': intOrDefault(q.channelMax, 0),
43 'frameMax': intOrDefault(q.frameMax, 0x1000),
44 'heartbeat': intOrDefault(q.heartbeat, 0),
45
46 // open
47 'virtualHost': vhost,
48 'capabilities': '',
49 'insist': 0
50 };
51}
52
53function connect(url, socketOptions) {
54 // tls.connect uses `util._extend()` on the options given it, which
55 // copies only properties mentioned in `Object.keys()`, when
56 // processing the options. So I have to makes copies too, rather
57 // than using `Object.create()`.
58 var sockopts = clone(socketOptions || {});
59 url = url || 'amqp://localhost';
60
61 var parts = URL.parse(url, true); // yes, parse the query string
62 var protocol = parts.protocol;
63 var net;
64
65 var options = openOptionsFromURL(parts);
66 var port = parts.port || ((protocol === 'amqp:') ? 5672 : 5671);
67 sockopts.host = parts.hostname;
68 sockopts.port = parseInt(port);
69
70 var result = defer();
71
72 var sockok = false;
73 var sock;
74
75 function onConnect() {
76 sockok = true;
77 var c = new Connection(sock);
78 c.open(options).then(function (_openok) { result.resolve(c); },
79 function(err) { result.reject(err); });
80 }
81
82 if (protocol === 'amqp:') {
83 sock = require('net').connect(sockopts, onConnect);
84 }
85 else if (protocol === 'amqps:') {
86 sock = require('tls').connect(sockopts, onConnect);
87 }
88 else {
89 throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol);
90 }
91
92 sock.once('error', function(err) {
93 if (!sockok) result.reject(err);
94 });
95
96 return result.promise;
97}
98
99module.exports.connect = connect;