UNPKG

3.88 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const callback_api_1 = require("amqplib/callback_api");
4const events_1 = require("events");
5const util = require("util");
6const MAX_LISTENERS = 10000;
7var debug = util.debuglog("amqptools");
8class ChannelManager extends events_1.EventEmitter {
9 constructor() {
10 super();
11 this.maxReconnectionAttempts = 100;
12 this.randomReconnectionInterval = true;
13 this.connectCallbacks = [];
14 this.onConnectionClose = (error) => {
15 debug("amqp connection has been closed");
16 this.channel = null;
17 this.connection = null;
18 this.channelPromise = null;
19 var reconnections = 0;
20 var tryReconnect = () => {
21 debug("Reconnection attempt...");
22 this.connect((err) => {
23 reconnections++;
24 if (!err) {
25 this.emit("reconnect");
26 return debug("Connection has been restored");
27 }
28 if (reconnections >= this.maxReconnectionAttempts) {
29 throw new Error("Fail to establish a connection with rabbitmq");
30 }
31 var timeout = this.randomReconnectionInterval ? Math.floor(Math.random() * (10 - 1)) + 1 : 1;
32 debug("Next reconnect in %d seconds", timeout);
33 setTimeout(tryReconnect, timeout * 1000);
34 });
35 };
36 tryReconnect();
37 };
38 this.setMaxListeners(MAX_LISTENERS);
39 }
40 connect(cb) {
41 if (this.channel) {
42 return cb(null, this.channel);
43 }
44 this.connectCallbacks.push(cb);
45 if (this.connectInProgress)
46 return;
47 this.connectInProgress = true;
48 callback_api_1.connect(this.connectionURI, (err, connection) => {
49 if (err) {
50 return this.connectRespond(err, null);
51 }
52 this.connection = connection;
53 this.connection.on("close", this.onConnectionClose);
54 this.connection.createChannel((err, channel) => {
55 if (err) {
56 return this.connectRespond(err, null);
57 }
58 this.channel = channel;
59 this.channel.on('error', () => { this.reconnect(); });
60 this.connectRespond(null, this.channel);
61 });
62 });
63 }
64 connectRespond(err, channel) {
65 this.connectInProgress = false;
66 if (err) {
67 debug("Fail to connect...", err);
68 }
69 else {
70 debug("Connected");
71 }
72 this.connectCallbacks.forEach((extraCb) => {
73 if (!extraCb)
74 return;
75 extraCb(err, channel);
76 });
77 this.connectCallbacks = [];
78 }
79 getChannel() {
80 return new Promise((resolve, reject) => {
81 if (this.channel) {
82 return resolve(this.channel);
83 }
84 this.connect((err, channel) => {
85 if (err)
86 return reject(err);
87 resolve(channel);
88 });
89 });
90 }
91 setConnectionURI(uri) {
92 this.connectionURI = uri;
93 }
94 disconnect(cb) {
95 if (!this.connection) {
96 return cb();
97 }
98 this.connection.removeListener("close", this.onConnectionClose);
99 this.connection.close(() => {
100 this.connection = null;
101 this.channel = null;
102 this.channelPromise = null;
103 cb();
104 });
105 }
106 reconnect(cb) {
107 this.disconnect(() => {
108 this.connect(cb);
109 });
110 }
111}
112exports.ChannelManager = ChannelManager;
113exports.channelManager = new ChannelManager();
114//# sourceMappingURL=ChannelManager.js.map
\No newline at end of file