UNPKG

3.32 kBPlain TextView Raw
1import { Channel, Connection, connect as amqpConnect } from "amqplib/callback_api";
2import { EventEmitter } from "events";
3import util = require('util');
4
5const MAX_LISTENERS = 10000;
6
7var debug = util.debuglog("amqptools");
8
9export class ChannelManager extends EventEmitter {
10 connectionURI: string;
11 channel: Channel;
12 channelPromise: Promise<Channel>;
13 connection: Connection;
14 maxReconnectionAttempts = 100;
15 randomReconnectionInterval = true;
16
17 eventListeners: Event
18
19 private connectCallbacks: ((err: Error, channel: Channel) => void)[] = [];
20 private connectInProgress: boolean;
21
22 constructor() {
23 super();
24 this.setMaxListeners(MAX_LISTENERS);
25 }
26
27 onConnectionClose = (error) => {
28 debug("amqp connection has been closed");
29 this.channel = null;
30 this.connection = null;
31 this.channelPromise = null;
32 var reconnections = 0;
33 var tryReconnect = () => {
34 debug("Reconnection attempt...");
35 this.connect((err) => {
36 reconnections++;
37 if (!err) {
38 this.emit("reconnect");
39 return debug("Connection has been restored");
40 }
41 if (reconnections >= this.maxReconnectionAttempts) {
42 throw new Error("Fail to establish a connection with rabbitmq");
43 }
44 var timeout = this.randomReconnectionInterval ? Math.floor(Math.random()*(10-1)) + 1 : 1;
45 debug("Next reconnect in %d seconds", timeout);
46 setTimeout(tryReconnect, timeout * 1000);
47 });
48 };
49 tryReconnect();
50 }
51
52 connect(cb) {
53 if (this.channel) {
54 return cb(null, this.channel);
55 }
56
57 this.connectCallbacks.push(cb);
58 if (this.connectInProgress) return;
59 this.connectInProgress = true;
60
61 amqpConnect(this.connectionURI, (err, connection) => {
62 if (err) {
63 return this.connectRespond(err, null);
64 }
65 this.connection = connection;
66 this.connection.on("close", this.onConnectionClose);
67 this.connection.createChannel((err, channel) => {
68 if (err) {
69 return this.connectRespond(err, null);
70 }
71 this.channel = channel;
72
73 this.channel.on('error', () => { this.reconnect() });
74
75 this.connectRespond(null, this.channel)
76 });
77 });
78 }
79
80 connectRespond(err, channel) {
81 this.connectInProgress = false;
82 if (err) {
83 debug("Fail to connect...", err);
84 }
85 else {
86 debug("Connected");
87 }
88 this.connectCallbacks.forEach((extraCb) => {
89 if (!extraCb) return;
90 extraCb(err, channel);
91 });
92 this.connectCallbacks = [];
93 }
94
95 getChannel(): Promise<Channel> {
96 return new Promise<Channel>((resolve, reject) => {
97 if (this.channel) {
98 return resolve(this.channel);
99 }
100 this.connect((err, channel) => {
101 if (err) return reject(err);
102 resolve(channel);
103 })
104 });
105 }
106
107 setConnectionURI(uri) {
108 this.connectionURI = uri;
109 }
110
111 disconnect(cb) {
112 if (!this.connection) {
113 return cb();
114 }
115 this.connection.removeListener("close", this.onConnectionClose);
116 this.connection.close(() => {
117 this.connection = null;
118 this.channel = null;
119 this.channelPromise = null;
120 cb();
121 });
122 }
123
124 reconnect(cb?) {
125 this.disconnect(() => {
126 this.connect(cb);
127 });
128 }
129}
130
131export var channelManager = new ChannelManager();
\No newline at end of file