1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const callback_api_1 = require("amqplib/callback_api");
|
4 | const events_1 = require("events");
|
5 | const util = require("util");
|
6 | const MAX_LISTENERS = 10000;
|
7 | var debug = util.debuglog("amqptools");
|
8 | class ChannelManager extends events_1.EventEmitter {
|
9 | constructor() {
|
10 | super();
|
11 | this.maxReconnectionAttempts = 100;
|
12 | this.randomReconnectionInterval = true;
|
13 | this.connectCallbacks = [];
|
14 | this.onConnectionClose = (error) => {
|
15 | debug("amqp connection has been closed");
|
16 | this.channel = null;
|
17 | this.connection = null;
|
18 | this.channelPromise = null;
|
19 | var reconnections = 0;
|
20 | var tryReconnect = () => {
|
21 | debug("Reconnection attempt...");
|
22 | this.connect((err) => {
|
23 | reconnections++;
|
24 | if (!err) {
|
25 | this.emit("reconnect");
|
26 | return debug("Connection has been restored");
|
27 | }
|
28 | if (reconnections >= this.maxReconnectionAttempts) {
|
29 | throw new Error("Fail to establish a connection with rabbitmq");
|
30 | }
|
31 | var timeout = this.randomReconnectionInterval ? Math.floor(Math.random() * (10 - 1)) + 1 : 1;
|
32 | debug("Next reconnect in %d seconds", timeout);
|
33 | setTimeout(tryReconnect, timeout * 1000);
|
34 | });
|
35 | };
|
36 | tryReconnect();
|
37 | };
|
38 | this.setMaxListeners(MAX_LISTENERS);
|
39 | }
|
40 | connect(cb) {
|
41 | if (this.channel) {
|
42 | return cb(null, this.channel);
|
43 | }
|
44 | this.connectCallbacks.push(cb);
|
45 | if (this.connectInProgress)
|
46 | return;
|
47 | this.connectInProgress = true;
|
48 | callback_api_1.connect(this.connectionURI, (err, connection) => {
|
49 | if (err) {
|
50 | return this.connectRespond(err, null);
|
51 | }
|
52 | this.connection = connection;
|
53 | this.connection.on("close", this.onConnectionClose);
|
54 | this.connection.createChannel((err, channel) => {
|
55 | if (err) {
|
56 | return this.connectRespond(err, null);
|
57 | }
|
58 | this.channel = channel;
|
59 | this.channel.on('error', () => { this.reconnect(); });
|
60 | this.connectRespond(null, this.channel);
|
61 | });
|
62 | });
|
63 | }
|
64 | connectRespond(err, channel) {
|
65 | this.connectInProgress = false;
|
66 | if (err) {
|
67 | debug("Fail to connect...", err);
|
68 | }
|
69 | else {
|
70 | debug("Connected");
|
71 | }
|
72 | this.connectCallbacks.forEach((extraCb) => {
|
73 | if (!extraCb)
|
74 | return;
|
75 | extraCb(err, channel);
|
76 | });
|
77 | this.connectCallbacks = [];
|
78 | }
|
79 | getChannel() {
|
80 | return new Promise((resolve, reject) => {
|
81 | if (this.channel) {
|
82 | return resolve(this.channel);
|
83 | }
|
84 | this.connect((err, channel) => {
|
85 | if (err)
|
86 | return reject(err);
|
87 | resolve(channel);
|
88 | });
|
89 | });
|
90 | }
|
91 | setConnectionURI(uri) {
|
92 | this.connectionURI = uri;
|
93 | }
|
94 | disconnect(cb) {
|
95 | if (!this.connection) {
|
96 | return cb();
|
97 | }
|
98 | this.connection.removeListener("close", this.onConnectionClose);
|
99 | this.connection.close(() => {
|
100 | this.connection = null;
|
101 | this.channel = null;
|
102 | this.channelPromise = null;
|
103 | cb();
|
104 | });
|
105 | }
|
106 | reconnect(cb) {
|
107 | this.disconnect(() => {
|
108 | this.connect(cb);
|
109 | });
|
110 | }
|
111 | }
|
112 | exports.ChannelManager = ChannelManager;
|
113 | exports.channelManager = new ChannelManager();
|
114 |
|
\ | No newline at end of file |