UNPKG

3.79 kBPlain TextView Raw
1import { Channel, Connection, connect as amqpConnect } from "amqplib/callback_api";
2import { EventEmitter } from "events";
3import util = require('util');
4
5const MAX_LISTENERS = 10000;
6
7var debug = util.debuglog("amqptools");
8
9export class ChannelManager extends EventEmitter {
10 connectionURI: string;
11 channel: Channel;
12 channelPromise: Promise<Channel>;
13 connection: Connection;
14 maxReconnectionAttempts = 100;
15 randomReconnectionInterval = true;
16
17 eventListeners: Event
18
19 private connectCallbacks: ((err: Error, channel: Channel) => void)[] = [];
20 private connectInProgress: boolean;
21
22 constructor() {
23 super();
24 this.setMaxListeners(MAX_LISTENERS);
25 }
26
27 onConnectionClose = (error) => {
28 debug("amqp connection has been closed");
29 this.channel = null;
30 this.connection = null;
31 this.channelPromise = null;
32 var reconnections = 0;
33 var tryReconnect = () => {
34 debug("Reconnection attempt...");
35 this.connect((err) => {
36 reconnections++;
37 if (!err) {
38 this.emit("reconnect");
39 return debug("Connection has been restored");
40 }
41 if (reconnections >= this.maxReconnectionAttempts) {
42 throw new Error("Fail to establish a connection with rabbitmq");
43 }
44 var timeout = this.randomReconnectionInterval ? Math.floor(Math.random()*(10-1)) + 1 : 1;
45 debug("Next reconnect in %d seconds", timeout);
46 setTimeout(tryReconnect, timeout * 1000);
47 });
48 };
49 tryReconnect();
50 }
51
52 connect(cb) {
53 debug("Connecting to the rabbitmq server")
54 if (this.channel) {
55 return cb(null, this.channel);
56 }
57
58 this.connectCallbacks.push(cb);
59 if (this.connectInProgress) return;
60 this.connectInProgress = true;
61
62 amqpConnect(this.connectionURI, (err, connection) => {
63 if (err) {
64 return this.connectRespond(err, null);
65 }
66 this.connection = connection;
67 this.connection.on("close", this.onConnectionClose);
68 this.connection.on("error", err => {
69 console.error("Received an error with connection to rabbitmq", err)
70 });
71 this.connection.createChannel((err, channel) => {
72 if (err) {
73 return this.connectRespond(err, null);
74 }
75 this.channel = channel;
76
77 this.channel.on('error', err => {
78 debug("Got error on channel: ", err.message, " trying to reconnect")
79 this.reconnect()
80 });
81
82 this.connectRespond(null, this.channel)
83 });
84 });
85 }
86
87 connectRespond(err, channel) {
88 this.connectInProgress = false;
89 if (err) {
90 debug("Fail to connect...", err);
91 }
92 else {
93 debug("Connected");
94 }
95 this.connectCallbacks.forEach((extraCb) => {
96 if (!extraCb) return;
97 extraCb(err, channel);
98 });
99 this.connectCallbacks = [];
100 }
101
102 getChannel(): Promise<Channel> {
103 return new Promise<Channel>((resolve, reject) => {
104 if (this.channel) {
105 return resolve(this.channel);
106 }
107 this.connect((err, channel) => {
108 if (err) return reject(err);
109 resolve(channel);
110 })
111 });
112 }
113
114 setConnectionURI(uri) {
115 this.connectionURI = uri;
116 }
117
118 disconnect(final, cb) {
119 if (typeof final === 'function') {
120 cb = final
121 final = false
122 }
123 debug("Disconnecting from the rabbitmq server")
124 if (!this.connection) {
125 return cb();
126 }
127 this.connection.removeListener("close", this.onConnectionClose);
128 this.connection.close(() => {
129 this.connection = null;
130 this.channel = null;
131 this.channelPromise = null;
132
133 cb();
134 });
135 }
136
137 finalize() {
138 this.emit("finalize");
139 }
140
141 reconnect(cb?) {
142 this.disconnect(() => {
143 this.connect(cb);
144 });
145 }
146}
147
148export var channelManager = new ChannelManager();
\No newline at end of file