1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | var URL = require('url');
|
8 | var QS = require('querystring');
|
9 | var defer = require('when').defer;
|
10 | var Connection = require('./connection').Connection;
|
11 | var clone = require('util')._extend.bind(null, {});
|
12 |
|
13 |
|
14 | function openOptionsFromURL(parts) {
|
15 | var user = 'guest', passwd = 'guest';
|
16 | if (parts.auth) {
|
17 | auth = parts.auth.split(':');
|
18 | user = auth[0];
|
19 | passwd = auth[1];
|
20 | }
|
21 |
|
22 | var vhost = parts.pathname;
|
23 | if (!vhost)
|
24 | vhost = '/';
|
25 | else
|
26 | vhost = QS.unescape(vhost.substr(1));
|
27 |
|
28 | var q = parts.query || {};
|
29 |
|
30 | function intOrDefault(val, def) {
|
31 | return (val === undefined) ? def : parseInt(val);
|
32 | }
|
33 |
|
34 | return {
|
35 |
|
36 | 'clientProperties': {},
|
37 | 'mechanism': 'PLAIN',
|
38 | 'response': new Buffer(['', user, passwd].join(String.fromCharCode(0))),
|
39 | 'locale': q.locale || 'en_US',
|
40 |
|
41 |
|
42 | 'channelMax': intOrDefault(q.channelMax, 0),
|
43 | 'frameMax': intOrDefault(q.frameMax, 0x1000),
|
44 | 'heartbeat': intOrDefault(q.heartbeat, 0),
|
45 |
|
46 |
|
47 | 'virtualHost': vhost,
|
48 | 'capabilities': '',
|
49 | 'insist': 0
|
50 | };
|
51 | }
|
52 |
|
53 | function connect(url, socketOptions) {
|
54 |
|
55 |
|
56 |
|
57 |
|
58 | var sockopts = clone(socketOptions || {});
|
59 | url = url || 'amqp://localhost';
|
60 |
|
61 | var parts = URL.parse(url, true);
|
62 | var protocol = parts.protocol;
|
63 | var net;
|
64 |
|
65 | var options = openOptionsFromURL(parts);
|
66 | var port = parts.port || ((protocol === 'amqp:') ? 5672 : 5671);
|
67 | sockopts.host = parts.hostname;
|
68 | sockopts.port = parseInt(port);
|
69 |
|
70 | var result = defer();
|
71 |
|
72 | var sockok = false;
|
73 | var sock;
|
74 |
|
75 | function onConnect() {
|
76 | sockok = true;
|
77 | var c = new Connection(sock);
|
78 | c.open(options).then(function (_openok) { result.resolve(c); },
|
79 | function(err) { result.reject(err); });
|
80 | }
|
81 |
|
82 | if (protocol === 'amqp:') {
|
83 | sock = require('net').connect(sockopts, onConnect);
|
84 | }
|
85 | else if (protocol === 'amqps:') {
|
86 | sock = require('tls').connect(sockopts, onConnect);
|
87 | }
|
88 | else {
|
89 | throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol);
|
90 | }
|
91 |
|
92 | sock.once('error', function(err) {
|
93 | if (!sockok) result.reject(err);
|
94 | });
|
95 |
|
96 | return result.promise;
|
97 | }
|
98 |
|
99 | module.exports.connect = connect;
|