UNPKG

6.57 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRMQ = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const rmq_record_serializer_1 = require("../serializers/rmq-record.serializer");
9const server_1 = require("./server");
10let rqmPackage = {};
11const INFINITE_CONNECTION_ATTEMPTS = -1;
12class ServerRMQ extends server_1.Server {
13 constructor(options) {
14 super();
15 this.options = options;
16 this.transportId = enums_1.Transport.RMQ;
17 this.server = null;
18 this.channel = null;
19 this.connectionAttempts = 0;
20 this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
21 this.queue =
22 this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
23 this.prefetchCount =
24 this.getOptionsProp(this.options, 'prefetchCount') ||
25 constants_1.RQM_DEFAULT_PREFETCH_COUNT;
26 this.isGlobalPrefetchCount =
27 this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
28 constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
29 this.queueOptions =
30 this.getOptionsProp(this.options, 'queueOptions') ||
31 constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
32 this.noAssert =
33 this.getOptionsProp(this.options, 'noAssert') || constants_1.RQM_DEFAULT_NO_ASSERT;
34 this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
35 rqmPackage = this.loadPackage('amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'));
36 this.initializeSerializer(options);
37 this.initializeDeserializer(options);
38 }
39 async listen(callback) {
40 try {
41 await this.start(callback);
42 }
43 catch (err) {
44 callback(err);
45 }
46 }
47 close() {
48 this.channel && this.channel.close();
49 this.server && this.server.close();
50 }
51 async start(callback) {
52 this.server = this.createClient();
53 this.server.on(constants_1.CONNECT_EVENT, () => {
54 if (this.channel) {
55 return;
56 }
57 this.channel = this.server.createChannel({
58 json: false,
59 setup: (channel) => this.setupChannel(channel, callback),
60 });
61 });
62 const maxConnectionAttempts = this.getOptionsProp(this.options, 'maxConnectionAttempts', INFINITE_CONNECTION_ATTEMPTS);
63 this.server.on(constants_1.DISCONNECT_EVENT, (err) => {
64 this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
65 this.logger.error(err);
66 });
67 this.server.on(constants_1.CONNECT_FAILED_EVENT, (error) => {
68 var _a;
69 this.logger.error(constants_1.CONNECTION_FAILED_MESSAGE);
70 if (error === null || error === void 0 ? void 0 : error.err) {
71 this.logger.error(error.err);
72 }
73 const isReconnecting = !!this.channel;
74 if (maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
75 isReconnecting) {
76 return;
77 }
78 if (++this.connectionAttempts === maxConnectionAttempts) {
79 this.close();
80 callback === null || callback === void 0 ? void 0 : callback((_a = error.err) !== null && _a !== void 0 ? _a : new Error(constants_1.CONNECTION_FAILED_MESSAGE));
81 }
82 });
83 }
84 createClient() {
85 const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
86 return rqmPackage.connect(this.urls, {
87 connectionOptions: socketOptions,
88 heartbeatIntervalInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.heartbeatIntervalInSeconds,
89 reconnectTimeInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.reconnectTimeInSeconds,
90 });
91 }
92 async setupChannel(channel, callback) {
93 const noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
94 if (!this.queueOptions.noAssert) {
95 await channel.assertQueue(this.queue, this.queueOptions);
96 }
97 await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
98 channel.consume(this.queue, (msg) => this.handleMessage(msg, channel), {
99 noAck,
100 });
101 callback();
102 }
103 async handleMessage(message, channel) {
104 if ((0, shared_utils_1.isNil)(message)) {
105 return;
106 }
107 const { content, properties } = message;
108 const rawMessage = JSON.parse(content.toString());
109 const packet = await this.deserializer.deserialize(rawMessage, properties);
110 const pattern = (0, shared_utils_1.isString)(packet.pattern)
111 ? packet.pattern
112 : JSON.stringify(packet.pattern);
113 const rmqContext = new ctx_host_1.RmqContext([message, channel, pattern]);
114 if ((0, shared_utils_1.isUndefined)(packet.id)) {
115 return this.handleEvent(pattern, packet, rmqContext);
116 }
117 const handler = this.getHandlerByPattern(pattern);
118 if (!handler) {
119 const status = 'error';
120 const noHandlerPacket = {
121 id: packet.id,
122 err: constants_1.NO_MESSAGE_HANDLER,
123 status,
124 };
125 return this.sendMessage(noHandlerPacket, properties.replyTo, properties.correlationId);
126 }
127 const response$ = this.transformToObservable(await handler(packet.data, rmqContext));
128 const publish = (data) => this.sendMessage(data, properties.replyTo, properties.correlationId);
129 response$ && this.send(response$, publish);
130 }
131 sendMessage(message, replyTo, correlationId) {
132 const outgoingResponse = this.serializer.serialize(message);
133 const options = outgoingResponse.options;
134 delete outgoingResponse.options;
135 const buffer = Buffer.from(JSON.stringify(outgoingResponse));
136 this.channel.sendToQueue(replyTo, buffer, Object.assign({ correlationId }, options));
137 }
138 initializeSerializer(options) {
139 var _a;
140 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new rmq_record_serializer_1.RmqRecordSerializer();
141 }
142}
143exports.ServerRMQ = ServerRMQ;