1 | import { Channel, Connection, connect as amqpConnect } from "amqplib/callback_api";
|
2 | import { EventEmitter } from "events";
|
3 | import util = require('util');
|
4 |
|
5 | const MAX_LISTENERS = 10000;
|
6 |
|
7 | var debug = util.debuglog("amqptools");
|
8 |
|
9 | export class ChannelManager extends EventEmitter {
|
10 | connectionURI: string;
|
11 | channel: Channel;
|
12 | channelPromise: Promise<Channel>;
|
13 | connection: Connection;
|
14 | maxReconnectionAttempts = 100;
|
15 | randomReconnectionInterval = true;
|
16 |
|
17 | eventListeners: Event
|
18 |
|
19 | private connectCallbacks: ((err: Error, channel: Channel) => void)[] = [];
|
20 | private connectInProgress: boolean;
|
21 |
|
22 | constructor() {
|
23 | super();
|
24 | this.setMaxListeners(MAX_LISTENERS);
|
25 | }
|
26 |
|
27 | onConnectionClose = (error) => {
|
28 | debug("amqp connection has been closed");
|
29 | this.channel = null;
|
30 | this.connection = null;
|
31 | this.channelPromise = null;
|
32 | var reconnections = 0;
|
33 | var tryReconnect = () => {
|
34 | debug("Reconnection attempt...");
|
35 | this.connect((err) => {
|
36 | reconnections++;
|
37 | if (!err) {
|
38 | this.emit("reconnect");
|
39 | return debug("Connection has been restored");
|
40 | }
|
41 | if (reconnections >= this.maxReconnectionAttempts) {
|
42 | throw new Error("Fail to establish a connection with rabbitmq");
|
43 | }
|
44 | var timeout = this.randomReconnectionInterval ? Math.floor(Math.random()*(10-1)) + 1 : 1;
|
45 | debug("Next reconnect in %d seconds", timeout);
|
46 | setTimeout(tryReconnect, timeout * 1000);
|
47 | });
|
48 | };
|
49 | tryReconnect();
|
50 | }
|
51 |
|
52 | connect(cb) {
|
53 | if (this.channel) {
|
54 | return cb(null, this.channel);
|
55 | }
|
56 |
|
57 | this.connectCallbacks.push(cb);
|
58 | if (this.connectInProgress) return;
|
59 | this.connectInProgress = true;
|
60 |
|
61 | amqpConnect(this.connectionURI, (err, connection) => {
|
62 | if (err) {
|
63 | return this.connectRespond(err, null);
|
64 | }
|
65 | this.connection = connection;
|
66 | this.connection.on("close", this.onConnectionClose);
|
67 | this.connection.createChannel((err, channel) => {
|
68 | if (err) {
|
69 | return this.connectRespond(err, null);
|
70 | }
|
71 | this.channel = channel;
|
72 |
|
73 | this.channel.on('error', () => { this.reconnect() });
|
74 |
|
75 | this.connectRespond(null, this.channel)
|
76 | });
|
77 | });
|
78 | }
|
79 |
|
80 | connectRespond(err, channel) {
|
81 | this.connectInProgress = false;
|
82 | if (err) {
|
83 | debug("Fail to connect...", err);
|
84 | }
|
85 | else {
|
86 | debug("Connected");
|
87 | }
|
88 | this.connectCallbacks.forEach((extraCb) => {
|
89 | if (!extraCb) return;
|
90 | extraCb(err, channel);
|
91 | });
|
92 | this.connectCallbacks = [];
|
93 | }
|
94 |
|
95 | getChannel(): Promise<Channel> {
|
96 | return new Promise<Channel>((resolve, reject) => {
|
97 | if (this.channel) {
|
98 | return resolve(this.channel);
|
99 | }
|
100 | this.connect((err, channel) => {
|
101 | if (err) return reject(err);
|
102 | resolve(channel);
|
103 | })
|
104 | });
|
105 | }
|
106 |
|
107 | setConnectionURI(uri) {
|
108 | this.connectionURI = uri;
|
109 | }
|
110 |
|
111 | disconnect(cb) {
|
112 | if (!this.connection) {
|
113 | return cb();
|
114 | }
|
115 | this.connection.removeListener("close", this.onConnectionClose);
|
116 | this.connection.close(() => {
|
117 | this.connection = null;
|
118 | this.channel = null;
|
119 | this.channelPromise = null;
|
120 | cb();
|
121 | });
|
122 | }
|
123 |
|
124 | reconnect(cb?) {
|
125 | this.disconnect(() => {
|
126 | this.connect(cb);
|
127 | });
|
128 | }
|
129 | }
|
130 |
|
131 | export var channelManager = new ChannelManager(); |
\ | No newline at end of file |