1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerNats = void 0;
|
4 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
5 | const constants_1 = require("../constants");
|
6 | const nats_context_1 = require("../ctx-host/nats.context");
|
7 | const nats_request_json_deserializer_1 = require("../deserializers/nats-request-json.deserializer");
|
8 | const enums_1 = require("../enums");
|
9 | const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
|
10 | const server_1 = require("./server");
|
11 | let natsPackage = {};
|
12 | class ServerNats extends server_1.Server {
|
13 | constructor(options) {
|
14 | super();
|
15 | this.options = options;
|
16 | this.transportId = enums_1.Transport.NATS;
|
17 | natsPackage = this.loadPackage('nats', ServerNats.name, () => require('nats'));
|
18 | this.initializeSerializer(options);
|
19 | this.initializeDeserializer(options);
|
20 | }
|
21 | async listen(callback) {
|
22 | try {
|
23 | this.natsClient = await this.createNatsClient();
|
24 | this.handleStatusUpdates(this.natsClient);
|
25 | this.start(callback);
|
26 | }
|
27 | catch (err) {
|
28 | callback(err);
|
29 | }
|
30 | }
|
31 | start(callback) {
|
32 | this.bindEvents(this.natsClient);
|
33 | callback();
|
34 | }
|
35 | bindEvents(client) {
|
36 | const queue = this.getOptionsProp(this.options, 'queue');
|
37 | const subscribe = (channel) => client.subscribe(channel, {
|
38 | queue,
|
39 | callback: this.getMessageHandler(channel).bind(this),
|
40 | });
|
41 | const registeredPatterns = [...this.messageHandlers.keys()];
|
42 | registeredPatterns.forEach(channel => subscribe(channel));
|
43 | }
|
44 | async close() {
|
45 | await this.natsClient?.close();
|
46 | this.natsClient = null;
|
47 | }
|
48 | createNatsClient() {
|
49 | const options = this.options || {};
|
50 | return natsPackage.connect({
|
51 | servers: constants_1.NATS_DEFAULT_URL,
|
52 | ...options,
|
53 | });
|
54 | }
|
55 | getMessageHandler(channel) {
|
56 | return async (error, message) => {
|
57 | if (error) {
|
58 | return this.logger.error(error);
|
59 | }
|
60 | return this.handleMessage(channel, message);
|
61 | };
|
62 | }
|
63 | async handleMessage(channel, natsMsg) {
|
64 | const callerSubject = natsMsg.subject;
|
65 | const rawMessage = natsMsg.data;
|
66 | const replyTo = natsMsg.reply;
|
67 | const natsCtx = new nats_context_1.NatsContext([callerSubject, natsMsg.headers]);
|
68 | const message = await this.deserializer.deserialize(rawMessage, {
|
69 | channel,
|
70 | replyTo,
|
71 | });
|
72 | if ((0, shared_utils_1.isUndefined)(message.id)) {
|
73 | return this.handleEvent(channel, message, natsCtx);
|
74 | }
|
75 | const publish = this.getPublisher(natsMsg, message.id);
|
76 | const handler = this.getHandlerByPattern(channel);
|
77 | if (!handler) {
|
78 | const status = 'error';
|
79 | const noHandlerPacket = {
|
80 | id: message.id,
|
81 | status,
|
82 | err: constants_1.NO_MESSAGE_HANDLER,
|
83 | };
|
84 | return publish(noHandlerPacket);
|
85 | }
|
86 | const response$ = this.transformToObservable(await handler(message.data, natsCtx));
|
87 | response$ && this.send(response$, publish);
|
88 | }
|
89 | getPublisher(natsMsg, id) {
|
90 | if (natsMsg.reply) {
|
91 | return (response) => {
|
92 | Object.assign(response, { id });
|
93 | const outgoingResponse = this.serializer.serialize(response);
|
94 | return natsMsg.respond(outgoingResponse.data, {
|
95 | headers: outgoingResponse.headers,
|
96 | });
|
97 | };
|
98 | }
|
99 |
|
100 |
|
101 |
|
102 | return () => { };
|
103 | }
|
104 | async handleStatusUpdates(client) {
|
105 | for await (const status of client.status()) {
|
106 | const data = status.data && (0, shared_utils_1.isObject)(status.data)
|
107 | ? JSON.stringify(status.data)
|
108 | : status.data;
|
109 | switch (status.type) {
|
110 | case 'error':
|
111 | case 'disconnect':
|
112 | this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
|
113 | break;
|
114 | case 'pingTimer':
|
115 | if (this.options.debug) {
|
116 | this.logger.debug(`NatsStatus: type: "${status.type}", data: "${data}".`);
|
117 | }
|
118 | break;
|
119 | default:
|
120 | this.logger.log(`NatsStatus: type: "${status.type}", data: "${data}".`);
|
121 | break;
|
122 | }
|
123 | }
|
124 | }
|
125 | initializeSerializer(options) {
|
126 | this.serializer = options?.serializer ?? new nats_record_serializer_1.NatsRecordSerializer();
|
127 | }
|
128 | initializeDeserializer(options) {
|
129 | this.deserializer =
|
130 | options?.deserializer ?? new nats_request_json_deserializer_1.NatsRequestJSONDeserializer();
|
131 | }
|
132 | }
|
133 | exports.ServerNats = ServerNats;
|