1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const prom_client_1 = require("prom-client");
|
4 | const LogManager_1 = require("../log/LogManager");
|
5 | const RabbitmqConfig_1 = require("./RabbitmqConfig");
|
6 | const RabbitmqClient_1 = require("./RabbitmqClient");
|
7 | const logger = LogManager_1.LogManager.getLogger(__filename);
|
8 | const rabbitmqPublishTime = new prom_client_1.Histogram({
|
9 | name: 'rabbitmq_publish_time',
|
10 | help: 'Time required to publish messages to RabbitMQ',
|
11 | labelNames: ['name'],
|
12 | buckets: [0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3]
|
13 | });
|
14 | const rabbitmqPublishErrorCounter = new prom_client_1.Counter({
|
15 | name: 'rabbitmq_publish_error_counter',
|
16 | help: 'Number of errors encountered when publishing messages',
|
17 | labelNames: ['name'],
|
18 | });
|
19 | class RabbitmqProducer extends RabbitmqClient_1.RabbitmqClient {
|
20 | constructor(clientConfig, name, producerConfig) {
|
21 | super(clientConfig, name);
|
22 | this.producerConfig = Object.assign({}, producerConfig);
|
23 | this.producerConfig.backPressureStrategy = producerConfig.backPressureStrategy || RabbitmqConfig_1.RabbitmqBackPressureStrategy.ERROR;
|
24 | this.logger = logger;
|
25 | this.publishDurationHistogram = rabbitmqPublishTime.labels(name);
|
26 | this.publishFailures = rabbitmqPublishErrorCounter.labels(name);
|
27 | }
|
28 | |
29 |
|
30 |
|
31 |
|
32 |
|
33 | async publish(msg, routingKey, optionsPublish = {}) {
|
34 | if (this.closed) {
|
35 | throw new Error('Connection to RabbitMQ is closed, can\'t publish message');
|
36 | }
|
37 | if (!this.readyGate.isChannelReady()) {
|
38 | await this.readyGate.awaitChannelReady();
|
39 | }
|
40 | optionsPublish.headers = {
|
41 | retriesCount: 0,
|
42 | };
|
43 | const timer = this.publishDurationHistogram.startTimer();
|
44 | try {
|
45 | while (!this.channel.publish(this.producerConfig.exchangeName, routingKey, new Buffer(msg), optionsPublish)) {
|
46 | if (this.producerConfig.backPressureStrategy === RabbitmqConfig_1.RabbitmqBackPressureStrategy.ERROR) {
|
47 | throw new Error('Failed to publish message');
|
48 | }
|
49 | if (optionsPublish.headers.retriesCount >= 3) {
|
50 | throw new Error('Failed to publish message after 3 attempts');
|
51 | }
|
52 | this.publishFailures.inc();
|
53 | optionsPublish.headers.retriesCount++;
|
54 | this.logger.error(`publish failed. ====> ${msg}`);
|
55 | await new Promise((resolve, reject) => {
|
56 | this.channel.once('drain', function () {
|
57 | this.logger.info(`drain event received. ====> ${msg}`);
|
58 | resolve();
|
59 | });
|
60 | });
|
61 | }
|
62 | }
|
63 | finally {
|
64 | timer();
|
65 | }
|
66 | return true;
|
67 | }
|
68 | init() {
|
69 | return super.init();
|
70 | }
|
71 | close() {
|
72 | return super.close();
|
73 | }
|
74 | }
|
75 | exports.RabbitmqProducer = RabbitmqProducer;
|
76 |
|
\ | No newline at end of file |