1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerRMQ = void 0;
|
4 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
5 | const constants_1 = require("../constants");
|
6 | const ctx_host_1 = require("../ctx-host");
|
7 | const enums_1 = require("../enums");
|
8 | const rmq_record_serializer_1 = require("../serializers/rmq-record.serializer");
|
9 | const server_1 = require("./server");
|
10 | let rqmPackage = {};
|
11 | class ServerRMQ extends server_1.Server {
|
12 | constructor(options) {
|
13 | super();
|
14 | this.options = options;
|
15 | this.transportId = enums_1.Transport.RMQ;
|
16 | this.server = null;
|
17 | this.channel = null;
|
18 | this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
|
19 | this.queue =
|
20 | this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
|
21 | this.prefetchCount =
|
22 | this.getOptionsProp(this.options, 'prefetchCount') ||
|
23 | constants_1.RQM_DEFAULT_PREFETCH_COUNT;
|
24 | this.isGlobalPrefetchCount =
|
25 | this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
|
26 | constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
|
27 | this.queueOptions =
|
28 | this.getOptionsProp(this.options, 'queueOptions') ||
|
29 | constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
|
30 | this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
|
31 | rqmPackage = this.loadPackage('amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'));
|
32 | this.initializeSerializer(options);
|
33 | this.initializeDeserializer(options);
|
34 | }
|
35 | async listen(callback) {
|
36 | try {
|
37 | await this.start(callback);
|
38 | }
|
39 | catch (err) {
|
40 | callback(err);
|
41 | }
|
42 | }
|
43 | close() {
|
44 | this.channel && this.channel.close();
|
45 | this.server && this.server.close();
|
46 | }
|
47 | async start(callback) {
|
48 | this.server = this.createClient();
|
49 | this.server.on(constants_1.CONNECT_EVENT, () => {
|
50 | if (this.channel) {
|
51 | return;
|
52 | }
|
53 | this.channel = this.server.createChannel({
|
54 | json: false,
|
55 | setup: (channel) => this.setupChannel(channel, callback),
|
56 | });
|
57 | });
|
58 | this.server.on(constants_1.DISCONNECT_EVENT, (err) => {
|
59 | this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
|
60 | this.logger.error(err);
|
61 | });
|
62 | this.server.on(constants_1.CONNECT_FAILED_EVENT, (err) => {
|
63 | this.logger.error(constants_1.CONNECTION_FAILED_MESSAGE);
|
64 | this.logger.error(err);
|
65 | });
|
66 | }
|
67 | createClient() {
|
68 | const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
|
69 | return rqmPackage.connect(this.urls, {
|
70 | connectionOptions: socketOptions,
|
71 | heartbeatIntervalInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.heartbeatIntervalInSeconds,
|
72 | reconnectTimeInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.reconnectTimeInSeconds,
|
73 | });
|
74 | }
|
75 | async setupChannel(channel, callback) {
|
76 | const noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
|
77 | await channel.assertQueue(this.queue, this.queueOptions);
|
78 | await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
|
79 | channel.consume(this.queue, (msg) => this.handleMessage(msg, channel), {
|
80 | noAck,
|
81 | });
|
82 | callback();
|
83 | }
|
84 | async handleMessage(message, channel) {
|
85 | if ((0, shared_utils_1.isNil)(message)) {
|
86 | return;
|
87 | }
|
88 | const { content, properties } = message;
|
89 | const rawMessage = JSON.parse(content.toString());
|
90 | const packet = await this.deserializer.deserialize(rawMessage);
|
91 | const pattern = (0, shared_utils_1.isString)(packet.pattern)
|
92 | ? packet.pattern
|
93 | : JSON.stringify(packet.pattern);
|
94 | const rmqContext = new ctx_host_1.RmqContext([message, channel, pattern]);
|
95 | if ((0, shared_utils_1.isUndefined)(packet.id)) {
|
96 | return this.handleEvent(pattern, packet, rmqContext);
|
97 | }
|
98 | const handler = this.getHandlerByPattern(pattern);
|
99 | if (!handler) {
|
100 | const status = 'error';
|
101 | const noHandlerPacket = {
|
102 | id: packet.id,
|
103 | err: constants_1.NO_MESSAGE_HANDLER,
|
104 | status,
|
105 | };
|
106 | return this.sendMessage(noHandlerPacket, properties.replyTo, properties.correlationId);
|
107 | }
|
108 | const response$ = this.transformToObservable(await handler(packet.data, rmqContext));
|
109 | const publish = (data) => this.sendMessage(data, properties.replyTo, properties.correlationId);
|
110 | response$ && this.send(response$, publish);
|
111 | }
|
112 | sendMessage(message, replyTo, correlationId) {
|
113 | const outgoingResponse = this.serializer.serialize(message);
|
114 | const options = outgoingResponse.options;
|
115 | delete outgoingResponse.options;
|
116 | const buffer = Buffer.from(JSON.stringify(outgoingResponse));
|
117 | this.channel.sendToQueue(replyTo, buffer, Object.assign({ correlationId }, options));
|
118 | }
|
119 | initializeSerializer(options) {
|
120 | var _a;
|
121 | this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new rmq_record_serializer_1.RmqRecordSerializer();
|
122 | }
|
123 | }
|
124 | exports.ServerRMQ = ServerRMQ;
|