1 | import { Channel, Connection, connect as amqpConnect } from "amqplib/callback_api";
|
2 | import { EventEmitter } from "events";
|
3 | import util = require('util');
|
4 |
|
5 | const MAX_LISTENERS = 10000;
|
6 |
|
7 | var debug = util.debuglog("amqptools");
|
8 |
|
9 | export class ChannelManager extends EventEmitter {
|
10 | connectionURI: string;
|
11 | channel: Channel;
|
12 | channelPromise: Promise<Channel>;
|
13 | connection: Connection;
|
14 | maxReconnectionAttempts = 100;
|
15 | randomReconnectionInterval = true;
|
16 |
|
17 | eventListeners: Event
|
18 |
|
19 | private connectCallbacks: ((err: Error, channel: Channel) => void)[] = [];
|
20 | private connectInProgress: boolean;
|
21 |
|
22 | constructor() {
|
23 | super();
|
24 | this.setMaxListeners(MAX_LISTENERS);
|
25 | }
|
26 |
|
27 | onConnectionClose = (error) => {
|
28 | debug("amqp connection has been closed");
|
29 | this.channel = null;
|
30 | this.connection = null;
|
31 | this.channelPromise = null;
|
32 | var reconnections = 0;
|
33 | var tryReconnect = () => {
|
34 | debug("Reconnection attempt...");
|
35 | this.connect((err) => {
|
36 | reconnections++;
|
37 | if (!err) {
|
38 | this.emit("reconnect");
|
39 | return debug("Connection has been restored");
|
40 | }
|
41 | if (reconnections >= this.maxReconnectionAttempts) {
|
42 | throw new Error("Fail to establish a connection with rabbitmq");
|
43 | }
|
44 | var timeout = this.randomReconnectionInterval ? Math.floor(Math.random()*(10-1)) + 1 : 1;
|
45 | debug("Next reconnect in %d seconds", timeout);
|
46 | setTimeout(tryReconnect, timeout * 1000);
|
47 | });
|
48 | };
|
49 | tryReconnect();
|
50 | }
|
51 |
|
52 | connect(cb) {
|
53 | debug("Connecting to the rabbitmq server")
|
54 | if (this.channel) {
|
55 | return cb(null, this.channel);
|
56 | }
|
57 |
|
58 | this.connectCallbacks.push(cb);
|
59 | if (this.connectInProgress) return;
|
60 | this.connectInProgress = true;
|
61 |
|
62 | amqpConnect(this.connectionURI, (err, connection) => {
|
63 | if (err) {
|
64 | return this.connectRespond(err, null);
|
65 | }
|
66 | this.connection = connection;
|
67 | this.connection.on("close", this.onConnectionClose);
|
68 | this.connection.on("error", err => {
|
69 | console.error("Received an error with connection to rabbitmq", err)
|
70 | });
|
71 | this.connection.createChannel((err, channel) => {
|
72 | if (err) {
|
73 | return this.connectRespond(err, null);
|
74 | }
|
75 | this.channel = channel;
|
76 |
|
77 | this.channel.on('error', err => {
|
78 | debug("Got error on channel: ", err.message, " trying to reconnect")
|
79 | this.reconnect()
|
80 | });
|
81 |
|
82 | this.connectRespond(null, this.channel)
|
83 | });
|
84 | });
|
85 | }
|
86 |
|
87 | connectRespond(err, channel) {
|
88 | this.connectInProgress = false;
|
89 | if (err) {
|
90 | debug("Fail to connect...", err);
|
91 | }
|
92 | else {
|
93 | debug("Connected");
|
94 | }
|
95 | this.connectCallbacks.forEach((extraCb) => {
|
96 | if (!extraCb) return;
|
97 | extraCb(err, channel);
|
98 | });
|
99 | this.connectCallbacks = [];
|
100 | }
|
101 |
|
102 | getChannel(): Promise<Channel> {
|
103 | return new Promise<Channel>((resolve, reject) => {
|
104 | if (this.channel) {
|
105 | return resolve(this.channel);
|
106 | }
|
107 | this.connect((err, channel) => {
|
108 | if (err) return reject(err);
|
109 | resolve(channel);
|
110 | })
|
111 | });
|
112 | }
|
113 |
|
114 | setConnectionURI(uri) {
|
115 | this.connectionURI = uri;
|
116 | }
|
117 |
|
118 | disconnect(final, cb) {
|
119 | if (typeof final === 'function') {
|
120 | cb = final
|
121 | final = false
|
122 | }
|
123 | debug("Disconnecting from the rabbitmq server")
|
124 | if (!this.connection) {
|
125 | return cb();
|
126 | }
|
127 | this.connection.removeListener("close", this.onConnectionClose);
|
128 | this.connection.close(() => {
|
129 | this.connection = null;
|
130 | this.channel = null;
|
131 | this.channelPromise = null;
|
132 |
|
133 | cb();
|
134 | });
|
135 | }
|
136 |
|
137 | finalize() {
|
138 | this.emit("finalize");
|
139 | }
|
140 |
|
141 | reconnect(cb?) {
|
142 | this.disconnect(() => {
|
143 | this.connect(cb);
|
144 | });
|
145 | }
|
146 | }
|
147 |
|
148 | export var channelManager = new ChannelManager(); |
\ | No newline at end of file |