UNPKG

6.95 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const prom_client_1 = require("prom-client");
4const NewrelicUtil_1 = require("../newrelic/NewrelicUtil");
5const LogManager_1 = require("../log/LogManager");
6const RabbitmqClient_1 = require("./RabbitmqClient");
7const RabbitmqConsumerHandler_1 = require("./RabbitmqConsumerHandler");
8const RabbitmqConsumerHandlerError_1 = require("./RabbitmqConsumerHandlerError");
9const logger = LogManager_1.LogManager.getLogger(__filename);
10const newrelic = NewrelicUtil_1.NewrelicUtil.getNewrelicIfAvailable();
11class NewrelichandlerWrapper extends RabbitmqConsumerHandler_1.RabbitmqConsumerHandler {
12 constructor(baseHandler) {
13 super();
14 this.baseHandler = baseHandler;
15 }
16 async handle(message) {
17 await newrelic.startBackgroundTransaction(this.baseHandler.constructor.name, 'RabbitMQConsumer', async () => {
18 const transaction = newrelic.getTransaction();
19 try {
20 await this.baseHandler.handle(message);
21 }
22 finally {
23 transaction.end();
24 }
25 });
26 }
27}
28const rabbitmqConsumeTime = new prom_client_1.Histogram({
29 name: 'rabbitmq_consume_time',
30 help: 'Time required to consumer a messages from RabbitMQ',
31 labelNames: ['name'],
32 buckets: [0.003, 0.01, 0.05, 0.1, 0.3, 0.5, 1, 2, 5]
33});
34const rabbitmqConsumeErrorCounter = new prom_client_1.Counter({
35 name: 'rabbitmq_consume_error_counter',
36 help: 'Number of errors encountered when consuming messages',
37 labelNames: ['name'],
38});
39const rabbitmqConsumeDLQCounter = new prom_client_1.Counter({
40 name: 'rabbitmq_consume_to_dlq_counter',
41 help: 'Number of messages sent to the DLQ',
42 labelNames: ['name'],
43});
44const rabbitmqConsumeDelayedCounter = new prom_client_1.Counter({
45 name: 'rabbitmq_consume_delayed_counter',
46 help: 'Number of messages sent to the delay queue',
47 labelNames: ['name'],
48});
49class RabbitmqConsumer extends RabbitmqClient_1.RabbitmqClient {
50 constructor(clientConfig, name, consumerConfig, handler) {
51 super(clientConfig, name);
52 this.messageHandler = newrelic ? new NewrelichandlerWrapper(handler) : handler;
53 this.consumerConfig = Object.assign({}, consumerConfig);
54 this.consumerConfig.options = this.consumerConfig.options || {};
55 this.logger = logger;
56 this.consumeDurationHistogram = rabbitmqConsumeTime.labels(name);
57 this.consumeFailures = rabbitmqConsumeErrorCounter.labels(name);
58 this.consumeFailuresDLQ = rabbitmqConsumeDLQCounter.labels(name);
59 this.consumeFailuresDelayed = rabbitmqConsumeDelayedCounter.labels(name);
60 }
61 async init() {
62 try {
63 await super.init();
64 await this.subscribe(this.consumerConfig.appQueueName, this.consumerConfig);
65 }
66 catch (e) {
67 const c = Object.assign({}, this.consumerConfig);
68 this.logger.error(e, `failed to subscribe with config - ${c.toString()}`);
69 NewrelicUtil_1.NewrelicUtil.noticeError(e, { config: c });
70 throw e;
71 }
72 }
73 /**
74 * Subscribe to a queue
75 */
76 async subscribe(queueName, consumerConfig) {
77 if (consumerConfig.prefetch && consumerConfig.prefetch > 0) {
78 this.channel.prefetch(consumerConfig.prefetch);
79 }
80 return this.channel.consume(queueName, (message) => {
81 this.handleMessage(message);
82 }, consumerConfig.options);
83 }
84 async handleMessage(message) {
85 const timer = this.consumeDurationHistogram.startTimer();
86 try {
87 await this.messageHandler.handle(message);
88 this.channel.ack(message);
89 }
90 catch (e) {
91 this.logger.error(e, 'failed to handle message');
92 this.consumeFailures.inc();
93 NewrelicUtil_1.NewrelicUtil.noticeError(e, message);
94 if (message.properties.headers.retriesCount === undefined) {
95 message.properties.headers.retriesCount = 0;
96 }
97 const retriesCount = ++message.properties.headers.retriesCount;
98 if (e instanceof RabbitmqConsumerHandlerError_1.RabbitmqConsumerHandlerUnrecoverableError || !this.allowRetry(retriesCount)) {
99 // add to dlq
100 this.consumeFailuresDLQ.inc();
101 try {
102 this.sendMessageToDlq(message);
103 this.channel.ack(message);
104 }
105 catch (err) {
106 this.logger.error(err, 'failed to send message to dlq');
107 NewrelicUtil_1.NewrelicUtil.noticeError(err, message);
108 this.channel.nack(message);
109 }
110 }
111 else {
112 this.consumeFailuresDelayed.inc();
113 try {
114 this.sendMessageToDelayedQueue(message, retriesCount, e);
115 this.channel.ack(message);
116 }
117 catch (error) {
118 // put message back to rabbitmq
119 this.logger.error(error, 'failed to send message to delayed queue');
120 NewrelicUtil_1.NewrelicUtil.noticeError(error, message);
121 this.channel.nack(message);
122 }
123 }
124 }
125 finally {
126 timer();
127 }
128 }
129 sendMessageToDlq(message) {
130 this.channel.sendToQueue(this.consumerConfig.dlqName, message.content);
131 this.logger.info(this.stringyifyMessageContent(message), 'sent message to dlq');
132 }
133 sendMessageToDelayedQueue(message, retriesCount, e) {
134 const ct = this.stringyifyMessageContent(message);
135 // depending on retries config, retry
136 const ttl = this.getTtl(retriesCount);
137 const options = {
138 expiration: ttl,
139 headers: message.properties.headers,
140 };
141 this.channel.sendToQueue(this.consumerConfig.delayQueueName, message.content, options);
142 const data = {
143 queueName: this.consumerConfig.delayQueueName,
144 messageContent: ct,
145 options,
146 };
147 this.logger.info(data, `sent message to delayed queue`);
148 }
149 stringyifyMessageContent(message) {
150 return message.content.toString();
151 }
152 /**
153 *
154 * @param retriesCount number
155 * @reutrn number in milliseconds
156 */
157 getTtl(retriesCount = 1) {
158 if (this.allowRetry(retriesCount)) {
159 return Math.pow(retriesCount, this.consumerConfig.retryDelayFactor)
160 * this.consumerConfig.retryDelayInMinute * 60 * 1000;
161 }
162 return 0;
163 }
164 allowRetry(retriesCount) {
165 return retriesCount && this.consumerConfig.maxRetries >= retriesCount;
166 }
167 async close() {
168 await super.close();
169 }
170}
171exports.RabbitmqConsumer = RabbitmqConsumer;
172//# sourceMappingURL=RabbitmqConsumer.js.map
\No newline at end of file