UNPKG

5.36 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerRMQ = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const ctx_host_1 = require("../ctx-host");
7const enums_1 = require("../enums");
8const rmq_record_serializer_1 = require("../serializers/rmq-record.serializer");
9const server_1 = require("./server");
10let rqmPackage = {};
11class ServerRMQ extends server_1.Server {
12 constructor(options) {
13 super();
14 this.options = options;
15 this.transportId = enums_1.Transport.RMQ;
16 this.server = null;
17 this.channel = null;
18 this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
19 this.queue =
20 this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
21 this.prefetchCount =
22 this.getOptionsProp(this.options, 'prefetchCount') ||
23 constants_1.RQM_DEFAULT_PREFETCH_COUNT;
24 this.isGlobalPrefetchCount =
25 this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
26 constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
27 this.queueOptions =
28 this.getOptionsProp(this.options, 'queueOptions') ||
29 constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
30 this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
31 rqmPackage = this.loadPackage('amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'));
32 this.initializeSerializer(options);
33 this.initializeDeserializer(options);
34 }
35 async listen(callback) {
36 try {
37 await this.start(callback);
38 }
39 catch (err) {
40 callback(err);
41 }
42 }
43 close() {
44 this.channel && this.channel.close();
45 this.server && this.server.close();
46 }
47 async start(callback) {
48 this.server = this.createClient();
49 this.server.on(constants_1.CONNECT_EVENT, () => {
50 if (this.channel) {
51 return;
52 }
53 this.channel = this.server.createChannel({
54 json: false,
55 setup: (channel) => this.setupChannel(channel, callback),
56 });
57 });
58 this.server.on(constants_1.DISCONNECT_EVENT, (err) => {
59 this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
60 this.logger.error(err);
61 });
62 }
63 createClient() {
64 const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
65 return rqmPackage.connect(this.urls, {
66 connectionOptions: socketOptions,
67 heartbeatIntervalInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.heartbeatIntervalInSeconds,
68 reconnectTimeInSeconds: socketOptions === null || socketOptions === void 0 ? void 0 : socketOptions.reconnectTimeInSeconds,
69 });
70 }
71 async setupChannel(channel, callback) {
72 const noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
73 await channel.assertQueue(this.queue, this.queueOptions);
74 await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
75 channel.consume(this.queue, (msg) => this.handleMessage(msg, channel), {
76 noAck,
77 });
78 callback();
79 }
80 async handleMessage(message, channel) {
81 if (shared_utils_1.isNil(message)) {
82 return;
83 }
84 const { content, properties } = message;
85 const rawMessage = JSON.parse(content.toString());
86 const packet = await this.deserializer.deserialize(rawMessage);
87 const pattern = shared_utils_1.isString(packet.pattern)
88 ? packet.pattern
89 : JSON.stringify(packet.pattern);
90 const rmqContext = new ctx_host_1.RmqContext([message, channel, pattern]);
91 if (shared_utils_1.isUndefined(packet.id)) {
92 return this.handleEvent(pattern, packet, rmqContext);
93 }
94 const handler = this.getHandlerByPattern(pattern);
95 if (!handler) {
96 const status = 'error';
97 const noHandlerPacket = {
98 id: packet.id,
99 err: constants_1.NO_MESSAGE_HANDLER,
100 status,
101 };
102 return this.sendMessage(noHandlerPacket, properties.replyTo, properties.correlationId);
103 }
104 const response$ = this.transformToObservable(await handler(packet.data, rmqContext));
105 const publish = (data) => this.sendMessage(data, properties.replyTo, properties.correlationId);
106 response$ && this.send(response$, publish);
107 }
108 sendMessage(message, replyTo, correlationId) {
109 const outgoingResponse = this.serializer.serialize(message);
110 const options = outgoingResponse.options;
111 delete outgoingResponse.options;
112 const buffer = Buffer.from(JSON.stringify(outgoingResponse));
113 this.channel.sendToQueue(replyTo, buffer, Object.assign({ correlationId }, options));
114 }
115 initializeSerializer(options) {
116 var _a;
117 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new rmq_record_serializer_1.RmqRecordSerializer();
118 }
119}
120exports.ServerRMQ = ServerRMQ;