1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ClientNats = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
|
6 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
7 | const constants_1 = require("../constants");
|
8 | const nats_response_json_deserializer_1 = require("../deserializers/nats-response-json.deserializer");
|
9 | const empty_response_exception_1 = require("../errors/empty-response.exception");
|
10 | const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
|
11 | const client_proxy_1 = require("./client-proxy");
|
12 | let natsPackage = {};
|
13 |
|
14 |
|
15 |
|
16 | class ClientNats extends client_proxy_1.ClientProxy {
|
17 | constructor(options) {
|
18 | super();
|
19 | this.options = options;
|
20 | this.logger = new logger_service_1.Logger(ClientNats.name);
|
21 | natsPackage = (0, load_package_util_1.loadPackage)('nats', ClientNats.name, () => require('nats'));
|
22 | this.initializeSerializer(options);
|
23 | this.initializeDeserializer(options);
|
24 | }
|
25 | async close() {
|
26 | await this.natsClient?.close();
|
27 | this.natsClient = null;
|
28 | }
|
29 | async connect() {
|
30 | if (this.natsClient) {
|
31 | return this.natsClient;
|
32 | }
|
33 | this.natsClient = await this.createClient();
|
34 | this.handleStatusUpdates(this.natsClient);
|
35 | return this.natsClient;
|
36 | }
|
37 | createClient() {
|
38 | const options = this.options || {};
|
39 | return natsPackage.connect({
|
40 | servers: constants_1.NATS_DEFAULT_URL,
|
41 | ...options,
|
42 | });
|
43 | }
|
44 | async handleStatusUpdates(client) {
|
45 | for await (const status of client.status()) {
|
46 | const data = status.data && (0, shared_utils_1.isObject)(status.data)
|
47 | ? JSON.stringify(status.data)
|
48 | : status.data;
|
49 | switch (status.type) {
|
50 | case 'error':
|
51 | case 'disconnect':
|
52 | this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
|
53 | break;
|
54 | case 'pingTimer':
|
55 | if (this.options.debug) {
|
56 | this.logger.debug(`NatsStatus: type: "${status.type}", data: "${data}".`);
|
57 | }
|
58 | break;
|
59 | default:
|
60 | this.logger.log(`NatsStatus: type: "${status.type}", data: "${data}".`);
|
61 | break;
|
62 | }
|
63 | }
|
64 | }
|
65 | createSubscriptionHandler(packet, callback) {
|
66 | return async (error, natsMsg) => {
|
67 | if (error) {
|
68 | return callback({
|
69 | err: error,
|
70 | });
|
71 | }
|
72 | const rawPacket = natsMsg.data;
|
73 | if (rawPacket?.length === 0) {
|
74 | return callback({
|
75 | err: new empty_response_exception_1.EmptyResponseException(this.normalizePattern(packet.pattern)),
|
76 | isDisposed: true,
|
77 | });
|
78 | }
|
79 | const message = await this.deserializer.deserialize(rawPacket);
|
80 | if (message.id && message.id !== packet.id) {
|
81 | return undefined;
|
82 | }
|
83 | const { err, response, isDisposed } = message;
|
84 | if (isDisposed || err) {
|
85 | return callback({
|
86 | err,
|
87 | response,
|
88 | isDisposed: true,
|
89 | });
|
90 | }
|
91 | callback({
|
92 | err,
|
93 | response,
|
94 | });
|
95 | };
|
96 | }
|
97 | publish(partialPacket, callback) {
|
98 | try {
|
99 | const packet = this.assignPacketId(partialPacket);
|
100 | const channel = this.normalizePattern(partialPacket.pattern);
|
101 | const serializedPacket = this.serializer.serialize(packet);
|
102 | const inbox = natsPackage.createInbox();
|
103 | const subscriptionHandler = this.createSubscriptionHandler(packet, callback);
|
104 | const subscription = this.natsClient.subscribe(inbox, {
|
105 | callback: subscriptionHandler,
|
106 | });
|
107 | const headers = this.mergeHeaders(serializedPacket.headers);
|
108 | this.natsClient.publish(channel, serializedPacket.data, {
|
109 | reply: inbox,
|
110 | headers,
|
111 | });
|
112 | return () => subscription.unsubscribe();
|
113 | }
|
114 | catch (err) {
|
115 | callback({ err });
|
116 | }
|
117 | }
|
118 | dispatchEvent(packet) {
|
119 | const pattern = this.normalizePattern(packet.pattern);
|
120 | const serializedPacket = this.serializer.serialize(packet);
|
121 | const headers = this.mergeHeaders(serializedPacket.headers);
|
122 | return new Promise((resolve, reject) => {
|
123 | try {
|
124 | this.natsClient.publish(pattern, serializedPacket.data, {
|
125 | headers,
|
126 | });
|
127 | resolve();
|
128 | }
|
129 | catch (err) {
|
130 | reject(err);
|
131 | }
|
132 | });
|
133 | }
|
134 | initializeSerializer(options) {
|
135 | this.serializer = options?.serializer ?? new nats_record_serializer_1.NatsRecordSerializer();
|
136 | }
|
137 | initializeDeserializer(options) {
|
138 | this.deserializer =
|
139 | options?.deserializer ?? new nats_response_json_deserializer_1.NatsResponseJSONDeserializer();
|
140 | }
|
141 | mergeHeaders(requestHeaders) {
|
142 | if (!requestHeaders && !this.options?.headers) {
|
143 | return undefined;
|
144 | }
|
145 | const headers = requestHeaders ?? natsPackage.headers();
|
146 | for (const [key, value] of Object.entries(this.options?.headers || {})) {
|
147 | if (!headers.has(key)) {
|
148 | headers.set(key, value);
|
149 | }
|
150 | }
|
151 | return headers;
|
152 | }
|
153 | }
|
154 | exports.ClientNats = ClientNats;
|