UNPKG

5.13 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerNats = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const constants_1 = require("../constants");
6const nats_context_1 = require("../ctx-host/nats.context");
7const nats_request_json_deserializer_1 = require("../deserializers/nats-request-json.deserializer");
8const enums_1 = require("../enums");
9const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
10const server_1 = require("./server");
11let natsPackage = {};
12class ServerNats extends server_1.Server {
13 constructor(options) {
14 super();
15 this.options = options;
16 this.transportId = enums_1.Transport.NATS;
17 natsPackage = this.loadPackage('nats', ServerNats.name, () => require('nats'));
18 this.initializeSerializer(options);
19 this.initializeDeserializer(options);
20 }
21 async listen(callback) {
22 try {
23 this.natsClient = await this.createNatsClient();
24 this.handleStatusUpdates(this.natsClient);
25 this.start(callback);
26 }
27 catch (err) {
28 callback(err);
29 }
30 }
31 start(callback) {
32 this.bindEvents(this.natsClient);
33 callback();
34 }
35 bindEvents(client) {
36 const queue = this.getOptionsProp(this.options, 'queue');
37 const subscribe = (channel) => client.subscribe(channel, {
38 queue,
39 callback: this.getMessageHandler(channel).bind(this),
40 });
41 const registeredPatterns = [...this.messageHandlers.keys()];
42 registeredPatterns.forEach(channel => subscribe(channel));
43 }
44 async close() {
45 await this.natsClient?.close();
46 this.natsClient = null;
47 }
48 createNatsClient() {
49 const options = this.options || {};
50 return natsPackage.connect({
51 servers: constants_1.NATS_DEFAULT_URL,
52 ...options,
53 });
54 }
55 getMessageHandler(channel) {
56 return async (error, message) => {
57 if (error) {
58 return this.logger.error(error);
59 }
60 return this.handleMessage(channel, message);
61 };
62 }
63 async handleMessage(channel, natsMsg) {
64 const callerSubject = natsMsg.subject;
65 const rawMessage = natsMsg.data;
66 const replyTo = natsMsg.reply;
67 const natsCtx = new nats_context_1.NatsContext([callerSubject, natsMsg.headers]);
68 const message = await this.deserializer.deserialize(rawMessage, {
69 channel,
70 replyTo,
71 });
72 if ((0, shared_utils_1.isUndefined)(message.id)) {
73 return this.handleEvent(channel, message, natsCtx);
74 }
75 const publish = this.getPublisher(natsMsg, message.id);
76 const handler = this.getHandlerByPattern(channel);
77 if (!handler) {
78 const status = 'error';
79 const noHandlerPacket = {
80 id: message.id,
81 status,
82 err: constants_1.NO_MESSAGE_HANDLER,
83 };
84 return publish(noHandlerPacket);
85 }
86 const response$ = this.transformToObservable(await handler(message.data, natsCtx));
87 response$ && this.send(response$, publish);
88 }
89 getPublisher(natsMsg, id) {
90 if (natsMsg.reply) {
91 return (response) => {
92 Object.assign(response, { id });
93 const outgoingResponse = this.serializer.serialize(response);
94 return natsMsg.respond(outgoingResponse.data, {
95 headers: outgoingResponse.headers,
96 });
97 };
98 }
99 // In case the "reply" topic is not provided, there's no need for a reply.
100 // Method returns a noop function instead
101 // eslint-disable-next-line @typescript-eslint/no-empty-function
102 return () => { };
103 }
104 async handleStatusUpdates(client) {
105 for await (const status of client.status()) {
106 const data = status.data && (0, shared_utils_1.isObject)(status.data)
107 ? JSON.stringify(status.data)
108 : status.data;
109 switch (status.type) {
110 case 'error':
111 case 'disconnect':
112 this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
113 break;
114 case 'pingTimer':
115 if (this.options.debug) {
116 this.logger.debug(`NatsStatus: type: "${status.type}", data: "${data}".`);
117 }
118 break;
119 default:
120 this.logger.log(`NatsStatus: type: "${status.type}", data: "${data}".`);
121 break;
122 }
123 }
124 }
125 initializeSerializer(options) {
126 this.serializer = options?.serializer ?? new nats_record_serializer_1.NatsRecordSerializer();
127 }
128 initializeDeserializer(options) {
129 this.deserializer =
130 options?.deserializer ?? new nats_request_json_deserializer_1.NatsRequestJSONDeserializer();
131 }
132}
133exports.ServerNats = ServerNats;