1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const prom_client_1 = require("prom-client");
|
4 | const NewrelicUtil_1 = require("../newrelic/NewrelicUtil");
|
5 | const LogManager_1 = require("../log/LogManager");
|
6 | const RabbitmqClient_1 = require("./RabbitmqClient");
|
7 | const RabbitmqConsumerHandler_1 = require("./RabbitmqConsumerHandler");
|
8 | const RabbitmqConsumerHandlerError_1 = require("./RabbitmqConsumerHandlerError");
|
9 | const logger = LogManager_1.LogManager.getLogger(__filename);
|
10 | const newrelic = NewrelicUtil_1.NewrelicUtil.getNewrelicIfAvailable();
|
11 | class NewrelichandlerWrapper extends RabbitmqConsumerHandler_1.RabbitmqConsumerHandler {
|
12 | constructor(baseHandler) {
|
13 | super();
|
14 | this.baseHandler = baseHandler;
|
15 | }
|
16 | async handle(message) {
|
17 | await newrelic.startBackgroundTransaction(this.baseHandler.constructor.name, 'RabbitMQConsumer', async () => {
|
18 | const transaction = newrelic.getTransaction();
|
19 | try {
|
20 | await this.baseHandler.handle(message);
|
21 | }
|
22 | finally {
|
23 | transaction.end();
|
24 | }
|
25 | });
|
26 | }
|
27 | }
|
28 | const rabbitmqConsumeTime = new prom_client_1.Histogram({
|
29 | name: 'rabbitmq_consume_time',
|
30 | help: 'Time required to consumer a messages from RabbitMQ',
|
31 | labelNames: ['name'],
|
32 | buckets: [0.003, 0.01, 0.05, 0.1, 0.3, 0.5, 1, 2, 5]
|
33 | });
|
34 | const rabbitmqConsumeErrorCounter = new prom_client_1.Counter({
|
35 | name: 'rabbitmq_consume_error_counter',
|
36 | help: 'Number of errors encountered when consuming messages',
|
37 | labelNames: ['name'],
|
38 | });
|
39 | const rabbitmqConsumeDLQCounter = new prom_client_1.Counter({
|
40 | name: 'rabbitmq_consume_to_dlq_counter',
|
41 | help: 'Number of messages sent to the DLQ',
|
42 | labelNames: ['name'],
|
43 | });
|
44 | const rabbitmqConsumeDelayedCounter = new prom_client_1.Counter({
|
45 | name: 'rabbitmq_consume_delayed_counter',
|
46 | help: 'Number of messages sent to the delay queue',
|
47 | labelNames: ['name'],
|
48 | });
|
49 | class RabbitmqConsumer extends RabbitmqClient_1.RabbitmqClient {
|
50 | constructor(clientConfig, name, consumerConfig, handler) {
|
51 | super(clientConfig, name);
|
52 | this.messageHandler = newrelic ? new NewrelichandlerWrapper(handler) : handler;
|
53 | this.consumerConfig = Object.assign({}, consumerConfig);
|
54 | this.consumerConfig.options = this.consumerConfig.options || {};
|
55 | this.logger = logger;
|
56 | this.consumeDurationHistogram = rabbitmqConsumeTime.labels(name);
|
57 | this.consumeFailures = rabbitmqConsumeErrorCounter.labels(name);
|
58 | this.consumeFailuresDLQ = rabbitmqConsumeDLQCounter.labels(name);
|
59 | this.consumeFailuresDelayed = rabbitmqConsumeDelayedCounter.labels(name);
|
60 | }
|
61 | async init() {
|
62 | try {
|
63 | await super.init();
|
64 | await this.subscribe(this.consumerConfig.appQueueName, this.consumerConfig);
|
65 | }
|
66 | catch (e) {
|
67 | const c = Object.assign({}, this.consumerConfig);
|
68 | this.logger.error(e, `failed to subscribe with config - ${c.toString()}`);
|
69 | NewrelicUtil_1.NewrelicUtil.noticeError(e, { config: c });
|
70 | throw e;
|
71 | }
|
72 | }
|
73 | |
74 |
|
75 |
|
76 | async subscribe(queueName, consumerConfig) {
|
77 | if (consumerConfig.prefetch && consumerConfig.prefetch > 0) {
|
78 | this.channel.prefetch(consumerConfig.prefetch);
|
79 | }
|
80 | return this.channel.consume(queueName, (message) => {
|
81 | this.handleMessage(message);
|
82 | }, consumerConfig.options);
|
83 | }
|
84 | async handleMessage(message) {
|
85 | const timer = this.consumeDurationHistogram.startTimer();
|
86 | try {
|
87 | await this.messageHandler.handle(message);
|
88 | this.channel.ack(message);
|
89 | }
|
90 | catch (e) {
|
91 | this.logger.error(e, 'failed to handle message');
|
92 | this.consumeFailures.inc();
|
93 | NewrelicUtil_1.NewrelicUtil.noticeError(e, message);
|
94 | if (message.properties.headers.retriesCount === undefined) {
|
95 | message.properties.headers.retriesCount = 0;
|
96 | }
|
97 | const retriesCount = ++message.properties.headers.retriesCount;
|
98 | if (e instanceof RabbitmqConsumerHandlerError_1.RabbitmqConsumerHandlerUnrecoverableError || !this.allowRetry(retriesCount)) {
|
99 |
|
100 | this.consumeFailuresDLQ.inc();
|
101 | try {
|
102 | this.sendMessageToDlq(message);
|
103 | this.channel.ack(message);
|
104 | }
|
105 | catch (err) {
|
106 | this.logger.error(err, 'failed to send message to dlq');
|
107 | NewrelicUtil_1.NewrelicUtil.noticeError(err, message);
|
108 | this.channel.nack(message);
|
109 | }
|
110 | }
|
111 | else {
|
112 | this.consumeFailuresDelayed.inc();
|
113 | try {
|
114 | this.sendMessageToDelayedQueue(message, retriesCount, e);
|
115 | this.channel.ack(message);
|
116 | }
|
117 | catch (error) {
|
118 |
|
119 | this.logger.error(error, 'failed to send message to delayed queue');
|
120 | NewrelicUtil_1.NewrelicUtil.noticeError(error, message);
|
121 | this.channel.nack(message);
|
122 | }
|
123 | }
|
124 | }
|
125 | finally {
|
126 | timer();
|
127 | }
|
128 | }
|
129 | sendMessageToDlq(message) {
|
130 | this.channel.sendToQueue(this.consumerConfig.dlqName, message.content);
|
131 | this.logger.info(this.stringyifyMessageContent(message), 'sent message to dlq');
|
132 | }
|
133 | sendMessageToDelayedQueue(message, retriesCount, e) {
|
134 | const ct = this.stringyifyMessageContent(message);
|
135 |
|
136 | const ttl = this.getTtl(retriesCount);
|
137 | const options = {
|
138 | expiration: ttl,
|
139 | headers: message.properties.headers,
|
140 | };
|
141 | this.channel.sendToQueue(this.consumerConfig.delayQueueName, message.content, options);
|
142 | const data = {
|
143 | queueName: this.consumerConfig.delayQueueName,
|
144 | messageContent: ct,
|
145 | options,
|
146 | };
|
147 | this.logger.info(data, `sent message to delayed queue`);
|
148 | }
|
149 | stringyifyMessageContent(message) {
|
150 | return message.content.toString();
|
151 | }
|
152 | |
153 |
|
154 |
|
155 |
|
156 |
|
157 | getTtl(retriesCount = 1) {
|
158 | if (this.allowRetry(retriesCount)) {
|
159 | return Math.pow(retriesCount, this.consumerConfig.retryDelayFactor)
|
160 | * this.consumerConfig.retryDelayInMinute * 60 * 1000;
|
161 | }
|
162 | return 0;
|
163 | }
|
164 | allowRetry(retriesCount) {
|
165 | return retriesCount && this.consumerConfig.maxRetries >= retriesCount;
|
166 | }
|
167 | async close() {
|
168 | await super.close();
|
169 | }
|
170 | }
|
171 | exports.RabbitmqConsumer = RabbitmqConsumer;
|
172 |
|
\ | No newline at end of file |