1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerNats = void 0;
|
4 | const tslib_1 = require("tslib");
|
5 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
6 | const constants_1 = require("../constants");
|
7 | const nats_context_1 = require("../ctx-host/nats.context");
|
8 | const nats_request_json_deserializer_1 = require("../deserializers/nats-request-json.deserializer");
|
9 | const enums_1 = require("../enums");
|
10 | const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
|
11 | const server_1 = require("./server");
|
12 | let natsPackage = {};
|
13 | class ServerNats extends server_1.Server {
|
14 | constructor(options) {
|
15 | super();
|
16 | this.options = options;
|
17 | this.transportId = enums_1.Transport.NATS;
|
18 | natsPackage = this.loadPackage('nats', ServerNats.name, () => require('nats'));
|
19 | this.initializeSerializer(options);
|
20 | this.initializeDeserializer(options);
|
21 | }
|
22 | async listen(callback) {
|
23 | try {
|
24 | this.natsClient = await this.createNatsClient();
|
25 | this.handleStatusUpdates(this.natsClient);
|
26 | this.start(callback);
|
27 | }
|
28 | catch (err) {
|
29 | callback(err);
|
30 | }
|
31 | }
|
32 | start(callback) {
|
33 | this.bindEvents(this.natsClient);
|
34 | callback();
|
35 | }
|
36 | bindEvents(client) {
|
37 | const queue = this.getOptionsProp(this.options, 'queue');
|
38 | const subscribe = (channel) => client.subscribe(channel, {
|
39 | queue,
|
40 | callback: this.getMessageHandler(channel).bind(this),
|
41 | });
|
42 | const registeredPatterns = [...this.messageHandlers.keys()];
|
43 | registeredPatterns.forEach(channel => subscribe(channel));
|
44 | }
|
45 | async close() {
|
46 | var _a;
|
47 | await ((_a = this.natsClient) === null || _a === void 0 ? void 0 : _a.close());
|
48 | this.natsClient = null;
|
49 | }
|
50 | createNatsClient() {
|
51 | const options = this.options || {};
|
52 | return natsPackage.connect(Object.assign({ servers: constants_1.NATS_DEFAULT_URL }, options));
|
53 | }
|
54 | getMessageHandler(channel) {
|
55 | return async (error, message) => {
|
56 | if (error) {
|
57 | return this.logger.error(error);
|
58 | }
|
59 | return this.handleMessage(channel, message);
|
60 | };
|
61 | }
|
62 | async handleMessage(channel, natsMsg) {
|
63 | const callerSubject = natsMsg.subject;
|
64 | const rawMessage = natsMsg.data;
|
65 | const replyTo = natsMsg.reply;
|
66 | const natsCtx = new nats_context_1.NatsContext([callerSubject, natsMsg.headers]);
|
67 | const message = await this.deserializer.deserialize(rawMessage, {
|
68 | channel,
|
69 | replyTo,
|
70 | });
|
71 | if ((0, shared_utils_1.isUndefined)(message.id)) {
|
72 | return this.handleEvent(channel, message, natsCtx);
|
73 | }
|
74 | const publish = this.getPublisher(natsMsg, message.id);
|
75 | const handler = this.getHandlerByPattern(channel);
|
76 | if (!handler) {
|
77 | const status = 'error';
|
78 | const noHandlerPacket = {
|
79 | id: message.id,
|
80 | status,
|
81 | err: constants_1.NO_MESSAGE_HANDLER,
|
82 | };
|
83 | return publish(noHandlerPacket);
|
84 | }
|
85 | const response$ = this.transformToObservable(await handler(message.data, natsCtx));
|
86 | response$ && this.send(response$, publish);
|
87 | }
|
88 | getPublisher(natsMsg, id) {
|
89 | if (natsMsg.reply) {
|
90 | return (response) => {
|
91 | Object.assign(response, { id });
|
92 | const outgoingResponse = this.serializer.serialize(response);
|
93 | return natsMsg.respond(outgoingResponse.data, {
|
94 | headers: outgoingResponse.headers,
|
95 | });
|
96 | };
|
97 | }
|
98 |
|
99 |
|
100 |
|
101 | return () => { };
|
102 | }
|
103 | async handleStatusUpdates(client) {
|
104 | var e_1, _a;
|
105 | try {
|
106 | for (var _b = tslib_1.__asyncValues(client.status()), _c; _c = await _b.next(), !_c.done;) {
|
107 | const status = _c.value;
|
108 | const data = status.data && (0, shared_utils_1.isObject)(status.data)
|
109 | ? JSON.stringify(status.data)
|
110 | : status.data;
|
111 | if (status.type === 'disconnect' || status.type === 'error') {
|
112 | this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
|
113 | }
|
114 | else {
|
115 | const message = `NatsStatus: type: "${status.type}", data: "${data}".`;
|
116 | if (status.type === 'pingTimer') {
|
117 | this.logger.debug(message);
|
118 | }
|
119 | else {
|
120 | this.logger.log(message);
|
121 | }
|
122 | }
|
123 | }
|
124 | }
|
125 | catch (e_1_1) { e_1 = { error: e_1_1 }; }
|
126 | finally {
|
127 | try {
|
128 | if (_c && !_c.done && (_a = _b.return)) await _a.call(_b);
|
129 | }
|
130 | finally { if (e_1) throw e_1.error; }
|
131 | }
|
132 | }
|
133 | initializeSerializer(options) {
|
134 | var _a;
|
135 | this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new nats_record_serializer_1.NatsRecordSerializer();
|
136 | }
|
137 | initializeDeserializer(options) {
|
138 | var _a;
|
139 | this.deserializer =
|
140 | (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new nats_request_json_deserializer_1.NatsRequestJSONDeserializer();
|
141 | }
|
142 | }
|
143 | exports.ServerNats = ServerNats;
|