UNPKG

6.84 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRMQ = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const rmq_record_serializer_1 = require("../serializers/rmq-record.serializer");
9const server_1 = require("./server");
10let rqmPackage = {};
11const INFINITE_CONNECTION_ATTEMPTS = -1;
12class ServerRMQ extends server_1.Server {
13 constructor(options) {
14 super();
15 this.options = options;
16 this.transportId = enums_1.Transport.RMQ;
17 this.server = null;
18 this.channel = null;
19 this.connectionAttempts = 0;
20 this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
21 this.queue =
22 this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
23 this.prefetchCount =
24 this.getOptionsProp(this.options, 'prefetchCount') ||
25 constants_1.RQM_DEFAULT_PREFETCH_COUNT;
26 this.noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
27 this.isGlobalPrefetchCount =
28 this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
29 constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
30 this.queueOptions =
31 this.getOptionsProp(this.options, 'queueOptions') ||
32 constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
33 this.noAssert =
34 this.getOptionsProp(this.options, 'noAssert') || constants_1.RQM_DEFAULT_NO_ASSERT;
35 this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
36 rqmPackage = this.loadPackage('amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'));
37 this.initializeSerializer(options);
38 this.initializeDeserializer(options);
39 }
40 async listen(callback) {
41 try {
42 await this.start(callback);
43 }
44 catch (err) {
45 callback(err);
46 }
47 }
48 close() {
49 this.channel && this.channel.close();
50 this.server && this.server.close();
51 }
52 async start(callback) {
53 this.server = this.createClient();
54 this.server.on(constants_1.CONNECT_EVENT, () => {
55 if (this.channel) {
56 return;
57 }
58 this.channel = this.server.createChannel({
59 json: false,
60 setup: (channel) => this.setupChannel(channel, callback),
61 });
62 });
63 const maxConnectionAttempts = this.getOptionsProp(this.options, 'maxConnectionAttempts', INFINITE_CONNECTION_ATTEMPTS);
64 this.server.on(constants_1.DISCONNECT_EVENT, (err) => {
65 this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
66 this.logger.error(err);
67 });
68 this.server.on(constants_1.CONNECT_FAILED_EVENT, (error) => {
69 this.logger.error(constants_1.CONNECTION_FAILED_MESSAGE);
70 if (error?.err) {
71 this.logger.error(error.err);
72 }
73 const isReconnecting = !!this.channel;
74 if (maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
75 isReconnecting) {
76 return;
77 }
78 if (++this.connectionAttempts === maxConnectionAttempts) {
79 this.close();
80 callback?.(error.err ?? new Error(constants_1.CONNECTION_FAILED_MESSAGE));
81 }
82 });
83 }
84 createClient() {
85 const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
86 return rqmPackage.connect(this.urls, {
87 connectionOptions: socketOptions,
88 heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds,
89 reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds,
90 });
91 }
92 async setupChannel(channel, callback) {
93 if (!this.queueOptions.noAssert) {
94 await channel.assertQueue(this.queue, this.queueOptions);
95 }
96 await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
97 channel.consume(this.queue, (msg) => this.handleMessage(msg, channel), {
98 noAck: this.noAck,
99 consumerTag: this.getOptionsProp(this.options, 'consumerTag', undefined)
100 });
101 callback();
102 }
103 async handleMessage(message, channel) {
104 if ((0, shared_utils_1.isNil)(message)) {
105 return;
106 }
107 const { content, properties } = message;
108 const rawMessage = this.parseMessageContent(content);
109 const packet = await this.deserializer.deserialize(rawMessage, properties);
110 const pattern = (0, shared_utils_1.isString)(packet.pattern)
111 ? packet.pattern
112 : JSON.stringify(packet.pattern);
113 const rmqContext = new ctx_host_1.RmqContext([message, channel, pattern]);
114 if ((0, shared_utils_1.isUndefined)(packet.id)) {
115 return this.handleEvent(pattern, packet, rmqContext);
116 }
117 const handler = this.getHandlerByPattern(pattern);
118 if (!handler) {
119 const status = 'error';
120 const noHandlerPacket = {
121 id: packet.id,
122 err: constants_1.NO_MESSAGE_HANDLER,
123 status,
124 };
125 return this.sendMessage(noHandlerPacket, properties.replyTo, properties.correlationId);
126 }
127 const response$ = this.transformToObservable(await handler(packet.data, rmqContext));
128 const publish = (data) => this.sendMessage(data, properties.replyTo, properties.correlationId);
129 response$ && this.send(response$, publish);
130 }
131 async handleEvent(pattern, packet, context) {
132 const handler = this.getHandlerByPattern(pattern);
133 if (!handler && !this.noAck) {
134 this.channel.nack(context.getMessage(), false, false);
135 return this.logger.warn((0, constants_1.RQM_NO_EVENT_HANDLER) `${pattern}`);
136 }
137 return super.handleEvent(pattern, packet, context);
138 }
139 sendMessage(message, replyTo, correlationId) {
140 const outgoingResponse = this.serializer.serialize(message);
141 const options = outgoingResponse.options;
142 delete outgoingResponse.options;
143 const buffer = Buffer.from(JSON.stringify(outgoingResponse));
144 this.channel.sendToQueue(replyTo, buffer, { correlationId, ...options });
145 }
146 initializeSerializer(options) {
147 this.serializer = options?.serializer ?? new rmq_record_serializer_1.RmqRecordSerializer();
148 }
149 parseMessageContent(content) {
150 try {
151 return JSON.parse(content.toString());
152 }
153 catch {
154 return content.toString();
155 }
156 }
157}
158exports.ServerRMQ = ServerRMQ;