1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const amqplib_1 = require("amqplib");
|
4 | const ReadyGate_1 = require("../util/ReadyGate");
|
5 | const PromiseUtil_1 = require("../util/PromiseUtil");
|
6 | const RabbitmqConfig_1 = require("./RabbitmqConfig");
|
7 | class RabbitmqClient {
|
8 | constructor(clientConfig, name) {
|
9 | |
10 |
|
11 |
|
12 |
|
13 | this.closed = false;
|
14 | this.reconnecting = false;
|
15 | this.readyGate = new ReadyGate_1.ReadyGate();
|
16 | this.connectFunction = amqplib_1.connect;
|
17 | this.clientConfig = Object.assign({ protocol: 'amqp', maxConnectionAttempts: RabbitmqConfig_1.DEFAULT_MAX_CONNECTION_ATTEMPTS }, clientConfig);
|
18 | this.name = name;
|
19 | this.readyGate.channelNotReady();
|
20 | }
|
21 | async init() {
|
22 | await this.connect();
|
23 | }
|
24 | |
25 |
|
26 |
|
27 | async connect() {
|
28 | await this.createConnection();
|
29 | await this.createChannel();
|
30 | this.reconnecting = false;
|
31 | this.readyGate.channelReady();
|
32 | }
|
33 | async createConnection() {
|
34 | const newConnection = await this.connectFunction(this.clientConfig);
|
35 | newConnection.on('close', (err) => { this.handleErrorOrClose('Connection Closed', err); });
|
36 | newConnection.on('error', (err) => { this.handleErrorOrClose('Connection Error', err); });
|
37 | this.connection = newConnection;
|
38 | this.logger.info('Connection established');
|
39 | }
|
40 | async createChannel() {
|
41 | const newChannel = await this.connection.createChannel();
|
42 | newChannel.on('close', (err) => { this.handleErrorOrClose('Channel closed unexpectedly', err); });
|
43 | newChannel.on('error', (err) => { this.handleErrorOrClose('Channel error', err); });
|
44 | this.channel = newChannel;
|
45 | this.logger.info('Channel opened');
|
46 | }
|
47 | handleErrorOrClose(cause, err) {
|
48 | if (err) {
|
49 | this.logger.error(err, this.debugMsg(`${cause}.`));
|
50 | }
|
51 | else {
|
52 | this.logger.error(this.debugMsg(`${cause}.`));
|
53 | }
|
54 | this.closeAllAndScheduleReconnection();
|
55 | }
|
56 | async closeAllAndScheduleReconnection() {
|
57 | if (!this.reconnecting) {
|
58 | this.readyGate.channelNotReady();
|
59 | this.reconnecting = true;
|
60 | if (this.channel) {
|
61 | try {
|
62 | await this.closeChannel();
|
63 | }
|
64 | catch (e) {
|
65 |
|
66 | }
|
67 | }
|
68 | if (this.connection) {
|
69 | try {
|
70 | await this.closeConnection();
|
71 | }
|
72 | catch (e) {
|
73 |
|
74 |
|
75 | }
|
76 | }
|
77 | await this.attemptReconnection();
|
78 | }
|
79 | else {
|
80 | this.logger.warn(this.debugMsg('already reconnecting'));
|
81 | }
|
82 | }
|
83 | backoffWait(tryNum) {
|
84 |
|
85 | const waitBase = Math.min(Math.pow(5, Math.max(0, tryNum - 1)), 30) * 1000;
|
86 | const waitMillis = waitBase + (Math.round(Math.random() * 800));
|
87 | this.logger.warn(this.debugMsg(`Waiting for attempt #${tryNum} - ${waitMillis} ms`));
|
88 | return PromiseUtil_1.PromiseUtil.sleepPromise(waitMillis, null);
|
89 | }
|
90 | async attemptReconnection() {
|
91 | this.logger.warn(this.debugMsg(`reconnecting... max attempts ${this.clientConfig.maxConnectionAttempts}`));
|
92 | let attempts = 0;
|
93 | while (attempts < this.clientConfig.maxConnectionAttempts) {
|
94 | attempts++;
|
95 | await this.backoffWait(attempts);
|
96 | try {
|
97 | this.logger.warn(this.debugMsg(`initialising attempt #${attempts}`));
|
98 | await this.init();
|
99 | this.logger.warn(this.debugMsg(`attempt #${attempts} is successful.`));
|
100 | return;
|
101 | }
|
102 | catch (e) {
|
103 | this.logger.warn(e, this.debugMsg(`Failed reconnection attempt #${attempts}`));
|
104 | }
|
105 | }
|
106 | this.logger.error(this.debugMsg(`Couldn't reconnect after ${this.clientConfig.maxConnectionAttempts} attempts`));
|
107 | this.reconnecting = false;
|
108 |
|
109 | if (this.clientConfig.exitOnIrrecoverableReconnect !== false) {
|
110 | this.logger.error(this.debugMsg('Cowardly refusing to continue. Calling shutdown function'));
|
111 | this.shutdownFunction();
|
112 | }
|
113 | }
|
114 | async close() {
|
115 | this.closed = true;
|
116 | await this.closeChannel();
|
117 | await this.closeConnection();
|
118 | }
|
119 | async closeChannel() {
|
120 | if (this.channel) {
|
121 | this.channel.removeAllListeners();
|
122 | await this.channel.close();
|
123 | this.channel = undefined;
|
124 | }
|
125 | }
|
126 | async closeConnection() {
|
127 | if (this.connection) {
|
128 | this.connection.removeAllListeners();
|
129 | await this.connection.close();
|
130 | this.connection = undefined;
|
131 | }
|
132 | }
|
133 | debugMsg(str) {
|
134 | return `${this.name}: ${str}`;
|
135 | }
|
136 | }
|
137 | exports.RabbitmqClient = RabbitmqClient;
|
138 |
|
\ | No newline at end of file |