UNPKG

6.24 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientRMQ = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const random_string_generator_util_1 = require("@nestjs/common/utils/random-string-generator.util");
7const events_1 = require("events");
8const rxjs_1 = require("rxjs");
9const operators_1 = require("rxjs/operators");
10const constants_1 = require("../constants");
11const client_proxy_1 = require("./client-proxy");
12let rqmPackage = {};
13const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
14class ClientRMQ extends client_proxy_1.ClientProxy {
15 constructor(options) {
16 super();
17 this.options = options;
18 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
19 this.client = null;
20 this.channel = null;
21 this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
22 this.queue =
23 this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
24 this.queueOptions =
25 this.getOptionsProp(this.options, 'queueOptions') ||
26 constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
27 this.replyQueue =
28 this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
29 this.persistent =
30 this.getOptionsProp(this.options, 'persistent') || constants_1.RQM_DEFAULT_PERSISTENT;
31 load_package_util_1.loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
32 rqmPackage = load_package_util_1.loadPackage('amqp-connection-manager', ClientRMQ.name, () => require('amqp-connection-manager'));
33 this.initializeSerializer(options);
34 this.initializeDeserializer(options);
35 }
36 close() {
37 this.channel && this.channel.close();
38 this.client && this.client.close();
39 this.channel = null;
40 this.client = null;
41 }
42 consumeChannel() {
43 const noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
44 this.channel.addSetup((channel) => channel.consume(this.replyQueue, (msg) => this.responseEmitter.emit(msg.properties.correlationId, msg), {
45 noAck,
46 }));
47 }
48 connect() {
49 if (this.client) {
50 return this.connection;
51 }
52 this.client = this.createClient();
53 this.handleError(this.client);
54 this.handleDisconnectError(this.client);
55 const connect$ = this.connect$(this.client);
56 this.connection = rxjs_1.lastValueFrom(this.mergeDisconnectEvent(this.client, connect$).pipe(operators_1.switchMap(() => this.createChannel()), operators_1.share())).catch(err => {
57 if (err instanceof rxjs_1.EmptyError) {
58 return;
59 }
60 throw err;
61 });
62 return this.connection;
63 }
64 createChannel() {
65 return new Promise(resolve => {
66 this.channel = this.client.createChannel({
67 json: false,
68 setup: (channel) => this.setupChannel(channel, resolve),
69 });
70 });
71 }
72 createClient() {
73 const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
74 return rqmPackage.connect(this.urls, {
75 connectionOptions: socketOptions,
76 });
77 }
78 mergeDisconnectEvent(instance, source$) {
79 const close$ = rxjs_1.fromEvent(instance, constants_1.DISCONNECT_EVENT).pipe(operators_1.map((err) => {
80 throw err;
81 }));
82 return rxjs_1.merge(source$, close$).pipe(operators_1.first());
83 }
84 async setupChannel(channel, resolve) {
85 const prefetchCount = this.getOptionsProp(this.options, 'prefetchCount') ||
86 constants_1.RQM_DEFAULT_PREFETCH_COUNT;
87 const isGlobalPrefetchCount = this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
88 constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
89 await channel.assertQueue(this.queue, this.queueOptions);
90 await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
91 this.responseEmitter = new events_1.EventEmitter();
92 this.responseEmitter.setMaxListeners(0);
93 this.consumeChannel();
94 resolve();
95 }
96 handleError(client) {
97 client.addListener(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
98 }
99 handleDisconnectError(client) {
100 client.addListener(constants_1.DISCONNECT_EVENT, (err) => {
101 this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
102 this.logger.error(err);
103 this.close();
104 });
105 }
106 async handleMessage(packet, callback) {
107 const { err, response, isDisposed } = await this.deserializer.deserialize(packet);
108 if (isDisposed || err) {
109 callback({
110 err,
111 response,
112 isDisposed: true,
113 });
114 }
115 callback({
116 err,
117 response,
118 });
119 }
120 publish(message, callback) {
121 try {
122 const correlationId = random_string_generator_util_1.randomStringGenerator();
123 const listener = ({ content }) => this.handleMessage(JSON.parse(content.toString()), callback);
124 Object.assign(message, { id: correlationId });
125 const serializedPacket = this.serializer.serialize(message);
126 this.responseEmitter.on(correlationId, listener);
127 this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(serializedPacket)), {
128 replyTo: this.replyQueue,
129 correlationId,
130 persistent: this.persistent,
131 });
132 return () => this.responseEmitter.removeListener(correlationId, listener);
133 }
134 catch (err) {
135 callback({ err });
136 }
137 }
138 dispatchEvent(packet) {
139 const serializedPacket = this.serializer.serialize(packet);
140 return new Promise((resolve, reject) => this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(serializedPacket)), {
141 persistent: this.persistent,
142 }, (err) => (err ? reject(err) : resolve())));
143 }
144}
145exports.ClientRMQ = ClientRMQ;