UNPKG

5.75 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerNats = void 0;
4const tslib_1 = require("tslib");
5const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
6const constants_1 = require("../constants");
7const nats_context_1 = require("../ctx-host/nats.context");
8const nats_request_json_deserializer_1 = require("../deserializers/nats-request-json.deserializer");
9const enums_1 = require("../enums");
10const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
11const server_1 = require("./server");
12let natsPackage = {};
13class ServerNats extends server_1.Server {
14 constructor(options) {
15 super();
16 this.options = options;
17 this.transportId = enums_1.Transport.NATS;
18 natsPackage = this.loadPackage('nats', ServerNats.name, () => require('nats'));
19 this.initializeSerializer(options);
20 this.initializeDeserializer(options);
21 }
22 async listen(callback) {
23 try {
24 this.natsClient = await this.createNatsClient();
25 this.handleStatusUpdates(this.natsClient);
26 this.start(callback);
27 }
28 catch (err) {
29 callback(err);
30 }
31 }
32 start(callback) {
33 this.bindEvents(this.natsClient);
34 callback();
35 }
36 bindEvents(client) {
37 const queue = this.getOptionsProp(this.options, 'queue');
38 const subscribe = (channel) => client.subscribe(channel, {
39 queue,
40 callback: this.getMessageHandler(channel).bind(this),
41 });
42 const registeredPatterns = [...this.messageHandlers.keys()];
43 registeredPatterns.forEach(channel => subscribe(channel));
44 }
45 async close() {
46 var _a;
47 await ((_a = this.natsClient) === null || _a === void 0 ? void 0 : _a.close());
48 this.natsClient = null;
49 }
50 createNatsClient() {
51 const options = this.options || {};
52 return natsPackage.connect(Object.assign({ servers: constants_1.NATS_DEFAULT_URL }, options));
53 }
54 getMessageHandler(channel) {
55 return async (error, message) => {
56 if (error) {
57 return this.logger.error(error);
58 }
59 return this.handleMessage(channel, message);
60 };
61 }
62 async handleMessage(channel, natsMsg) {
63 const callerSubject = natsMsg.subject;
64 const rawMessage = natsMsg.data;
65 const replyTo = natsMsg.reply;
66 const natsCtx = new nats_context_1.NatsContext([callerSubject, natsMsg.headers]);
67 const message = await this.deserializer.deserialize(rawMessage, {
68 channel,
69 replyTo,
70 });
71 if ((0, shared_utils_1.isUndefined)(message.id)) {
72 return this.handleEvent(channel, message, natsCtx);
73 }
74 const publish = this.getPublisher(natsMsg, message.id);
75 const handler = this.getHandlerByPattern(channel);
76 if (!handler) {
77 const status = 'error';
78 const noHandlerPacket = {
79 id: message.id,
80 status,
81 err: constants_1.NO_MESSAGE_HANDLER,
82 };
83 return publish(noHandlerPacket);
84 }
85 const response$ = this.transformToObservable(await handler(message.data, natsCtx));
86 response$ && this.send(response$, publish);
87 }
88 getPublisher(natsMsg, id) {
89 if (natsMsg.reply) {
90 return (response) => {
91 Object.assign(response, { id });
92 const outgoingResponse = this.serializer.serialize(response);
93 return natsMsg.respond(outgoingResponse.data, {
94 headers: outgoingResponse.headers,
95 });
96 };
97 }
98 // In case the "reply" topic is not provided, there's no need for a reply.
99 // Method returns a noop function instead
100 // eslint-disable-next-line @typescript-eslint/no-empty-function
101 return () => { };
102 }
103 async handleStatusUpdates(client) {
104 var e_1, _a;
105 try {
106 for (var _b = tslib_1.__asyncValues(client.status()), _c; _c = await _b.next(), !_c.done;) {
107 const status = _c.value;
108 const data = status.data && (0, shared_utils_1.isObject)(status.data)
109 ? JSON.stringify(status.data)
110 : status.data;
111 if (status.type === 'disconnect' || status.type === 'error') {
112 this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
113 }
114 else {
115 const message = `NatsStatus: type: "${status.type}", data: "${data}".`;
116 if (status.type === 'pingTimer') {
117 this.logger.debug(message);
118 }
119 else {
120 this.logger.log(message);
121 }
122 }
123 }
124 }
125 catch (e_1_1) { e_1 = { error: e_1_1 }; }
126 finally {
127 try {
128 if (_c && !_c.done && (_a = _b.return)) await _a.call(_b);
129 }
130 finally { if (e_1) throw e_1.error; }
131 }
132 }
133 initializeSerializer(options) {
134 var _a;
135 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new nats_record_serializer_1.NatsRecordSerializer();
136 }
137 initializeDeserializer(options) {
138 var _a;
139 this.deserializer =
140 (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new nats_request_json_deserializer_1.NatsRequestJSONDeserializer();
141 }
142}
143exports.ServerNats = ServerNats;