UNPKG

5.46 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const amqplib_1 = require("amqplib");
4const ReadyGate_1 = require("../util/ReadyGate");
5const PromiseUtil_1 = require("../util/PromiseUtil");
6const RabbitmqConfig_1 = require("./RabbitmqConfig");
7class RabbitmqClient {
8 constructor(clientConfig, name) {
9 /**
10 * Whether the connection should be closed. This is not a statud indicator that show whether the connection is closed.
11 * It is a flag that indicates whether the client has been purposely closed by code, as opposed to being closed because of an error.
12 */
13 this.closed = false;
14 this.reconnecting = false;
15 this.readyGate = new ReadyGate_1.ReadyGate();
16 this.connectFunction = amqplib_1.connect;
17 this.clientConfig = Object.assign({ protocol: 'amqp', maxConnectionAttempts: RabbitmqConfig_1.DEFAULT_MAX_CONNECTION_ATTEMPTS }, clientConfig);
18 this.name = name;
19 this.readyGate.channelNotReady();
20 }
21 async init() {
22 await this.connect();
23 }
24 /**
25 * Connect to RabbitMQ broker
26 */
27 async connect() {
28 await this.createConnection();
29 await this.createChannel();
30 this.reconnecting = false;
31 this.readyGate.channelReady();
32 }
33 async createConnection() {
34 const newConnection = await this.connectFunction(this.clientConfig);
35 newConnection.on('close', (err) => { this.handleErrorOrClose('Connection Closed', err); });
36 newConnection.on('error', (err) => { this.handleErrorOrClose('Connection Error', err); });
37 this.connection = newConnection;
38 this.logger.info('Connection established');
39 }
40 async createChannel() {
41 const newChannel = await this.connection.createChannel();
42 newChannel.on('close', (err) => { this.handleErrorOrClose('Channel closed unexpectedly', err); });
43 newChannel.on('error', (err) => { this.handleErrorOrClose('Channel error', err); });
44 this.channel = newChannel;
45 this.logger.info('Channel opened');
46 }
47 handleErrorOrClose(cause, err) {
48 if (err) {
49 this.logger.error(err, this.debugMsg(`${cause}.`));
50 }
51 else {
52 this.logger.error(this.debugMsg(`${cause}.`));
53 }
54 this.closeAllAndScheduleReconnection();
55 }
56 async closeAllAndScheduleReconnection() {
57 if (!this.reconnecting) {
58 this.readyGate.channelNotReady();
59 this.reconnecting = true;
60 if (this.channel) {
61 try {
62 await this.closeChannel();
63 }
64 catch (e) {
65 // Do nothing... we tried to play nice
66 }
67 }
68 if (this.connection) {
69 try {
70 await this.closeConnection();
71 }
72 catch (e) {
73 // Do nothing... we tried to play nice
74 // this.logger.error(e);
75 }
76 }
77 await this.attemptReconnection();
78 }
79 else {
80 this.logger.warn(this.debugMsg('already reconnecting'));
81 }
82 }
83 backoffWait(tryNum) {
84 // 1 second, 5 seconds, 25 seconds, 30 seconds, 30 seconds, ....
85 const waitBase = Math.min(Math.pow(5, Math.max(0, tryNum - 1)), 30) * 1000;
86 const waitMillis = waitBase + (Math.round(Math.random() * 800));
87 this.logger.warn(this.debugMsg(`Waiting for attempt #${tryNum} - ${waitMillis} ms`));
88 return PromiseUtil_1.PromiseUtil.sleepPromise(waitMillis, null);
89 }
90 async attemptReconnection() {
91 this.logger.warn(this.debugMsg(`reconnecting... max attempts ${this.clientConfig.maxConnectionAttempts}`));
92 let attempts = 0;
93 while (attempts < this.clientConfig.maxConnectionAttempts) {
94 attempts++;
95 await this.backoffWait(attempts);
96 try {
97 this.logger.warn(this.debugMsg(`initialising attempt #${attempts}`));
98 await this.init();
99 this.logger.warn(this.debugMsg(`attempt #${attempts} is successful.`));
100 return;
101 }
102 catch (e) {
103 this.logger.warn(e, this.debugMsg(`Failed reconnection attempt #${attempts}`));
104 }
105 }
106 this.logger.error(this.debugMsg(`Couldn't reconnect after ${this.clientConfig.maxConnectionAttempts} attempts`));
107 this.reconnecting = false;
108 // tslint:disable-next-line
109 if (this.clientConfig.exitOnIrrecoverableReconnect !== false) {
110 this.logger.error(this.debugMsg('Cowardly refusing to continue. Calling shutdown function'));
111 this.shutdownFunction();
112 }
113 }
114 async close() {
115 this.closed = true;
116 await this.closeChannel();
117 await this.closeConnection();
118 }
119 async closeChannel() {
120 if (this.channel) {
121 this.channel.removeAllListeners();
122 await this.channel.close();
123 this.channel = undefined;
124 }
125 }
126 async closeConnection() {
127 if (this.connection) {
128 this.connection.removeAllListeners();
129 await this.connection.close();
130 this.connection = undefined;
131 }
132 }
133 debugMsg(str) {
134 return `${this.name}: ${str}`;
135 }
136}
137exports.RabbitmqClient = RabbitmqClient;
138//# sourceMappingURL=RabbitmqClient.js.map
\No newline at end of file