UNPKG

3.1 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const prom_client_1 = require("prom-client");
4const LogManager_1 = require("../log/LogManager");
5const RabbitmqConfig_1 = require("./RabbitmqConfig");
6const RabbitmqClient_1 = require("./RabbitmqClient");
7const logger = LogManager_1.LogManager.getLogger(__filename);
8const rabbitmqPublishTime = new prom_client_1.Histogram({
9 name: 'rabbitmq_publish_time',
10 help: 'Time required to publish messages to RabbitMQ',
11 labelNames: ['name'],
12 buckets: [0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3]
13});
14const rabbitmqPublishErrorCounter = new prom_client_1.Counter({
15 name: 'rabbitmq_publish_error_counter',
16 help: 'Number of errors encountered when publishing messages',
17 labelNames: ['name'],
18});
19class RabbitmqProducer extends RabbitmqClient_1.RabbitmqClient {
20 constructor(clientConfig, name, producerConfig) {
21 super(clientConfig, name);
22 this.producerConfig = Object.assign({}, producerConfig);
23 this.producerConfig.backPressureStrategy = producerConfig.backPressureStrategy || RabbitmqConfig_1.RabbitmqBackPressureStrategy.ERROR;
24 this.logger = logger;
25 this.publishDurationHistogram = rabbitmqPublishTime.labels(name);
26 this.publishFailures = rabbitmqPublishErrorCounter.labels(name);
27 }
28 /**
29 * publish msg with routing key
30 * @param msg
31 * @param routingKey
32 */
33 async publish(msg, routingKey, optionsPublish = {}) {
34 if (this.closed) {
35 throw new Error('Connection to RabbitMQ is closed, can\'t publish message');
36 }
37 if (!this.readyGate.isChannelReady()) {
38 await this.readyGate.awaitChannelReady();
39 }
40 optionsPublish.headers = {
41 retriesCount: 0,
42 };
43 const timer = this.publishDurationHistogram.startTimer();
44 try {
45 while (!this.channel.publish(this.producerConfig.exchangeName, routingKey, new Buffer(msg), optionsPublish)) {
46 if (this.producerConfig.backPressureStrategy === RabbitmqConfig_1.RabbitmqBackPressureStrategy.ERROR) {
47 throw new Error('Failed to publish message');
48 }
49 if (optionsPublish.headers.retriesCount >= 3) {
50 throw new Error('Failed to publish message after 3 attempts');
51 }
52 this.publishFailures.inc();
53 optionsPublish.headers.retriesCount++;
54 this.logger.error(`publish failed. ====> ${msg}`);
55 await new Promise((resolve, reject) => {
56 this.channel.once('drain', function () {
57 this.logger.info(`drain event received. ====> ${msg}`);
58 resolve();
59 });
60 });
61 }
62 }
63 finally {
64 timer();
65 }
66 return true;
67 }
68 init() {
69 return super.init();
70 }
71 close() {
72 return super.close();
73 }
74}
75exports.RabbitmqProducer = RabbitmqProducer;
76//# sourceMappingURL=RabbitmqProducer.js.map
\No newline at end of file