1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ClientNats = void 0;
|
4 | const tslib_1 = require("tslib");
|
5 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
6 | const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
|
7 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
8 | const constants_1 = require("../constants");
|
9 | const nats_response_json_deserializer_1 = require("../deserializers/nats-response-json.deserializer");
|
10 | const empty_response_exception_1 = require("../errors/empty-response.exception");
|
11 | const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
|
12 | const client_proxy_1 = require("./client-proxy");
|
13 | let natsPackage = {};
|
14 | class ClientNats extends client_proxy_1.ClientProxy {
|
15 | constructor(options) {
|
16 | super();
|
17 | this.options = options;
|
18 | this.logger = new logger_service_1.Logger(ClientNats.name);
|
19 | natsPackage = (0, load_package_util_1.loadPackage)('nats', ClientNats.name, () => require('nats'));
|
20 | this.initializeSerializer(options);
|
21 | this.initializeDeserializer(options);
|
22 | }
|
23 | async close() {
|
24 | var _a;
|
25 | await ((_a = this.natsClient) === null || _a === void 0 ? void 0 : _a.close());
|
26 | this.natsClient = null;
|
27 | }
|
28 | async connect() {
|
29 | if (this.natsClient) {
|
30 | return this.natsClient;
|
31 | }
|
32 | this.natsClient = await this.createClient();
|
33 | this.handleStatusUpdates(this.natsClient);
|
34 | return this.natsClient;
|
35 | }
|
36 | createClient() {
|
37 | const options = this.options || {};
|
38 | return natsPackage.connect(Object.assign({ servers: constants_1.NATS_DEFAULT_URL }, options));
|
39 | }
|
40 | async handleStatusUpdates(client) {
|
41 | var e_1, _a;
|
42 | try {
|
43 | for (var _b = tslib_1.__asyncValues(client.status()), _c; _c = await _b.next(), !_c.done;) {
|
44 | const status = _c.value;
|
45 | const data = status.data && (0, shared_utils_1.isObject)(status.data)
|
46 | ? JSON.stringify(status.data)
|
47 | : status.data;
|
48 | if (status.type === 'disconnect' || status.type === 'error') {
|
49 | this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
|
50 | }
|
51 | else {
|
52 | const message = `NatsStatus: type: "${status.type}", data: "${data}".`;
|
53 | if (status.type === 'pingTimer') {
|
54 | this.logger.debug(message);
|
55 | }
|
56 | else {
|
57 | this.logger.log(message);
|
58 | }
|
59 | }
|
60 | }
|
61 | }
|
62 | catch (e_1_1) { e_1 = { error: e_1_1 }; }
|
63 | finally {
|
64 | try {
|
65 | if (_c && !_c.done && (_a = _b.return)) await _a.call(_b);
|
66 | }
|
67 | finally { if (e_1) throw e_1.error; }
|
68 | }
|
69 | }
|
70 | createSubscriptionHandler(packet, callback) {
|
71 | return async (error, natsMsg) => {
|
72 | if (error) {
|
73 | return callback({
|
74 | err: error,
|
75 | });
|
76 | }
|
77 | const rawPacket = natsMsg.data;
|
78 | if ((rawPacket === null || rawPacket === void 0 ? void 0 : rawPacket.length) === 0) {
|
79 | return callback({
|
80 | err: new empty_response_exception_1.EmptyResponseException(this.normalizePattern(packet.pattern)),
|
81 | isDisposed: true,
|
82 | });
|
83 | }
|
84 | const message = await this.deserializer.deserialize(rawPacket);
|
85 | if (message.id && message.id !== packet.id) {
|
86 | return undefined;
|
87 | }
|
88 | const { err, response, isDisposed } = message;
|
89 | if (isDisposed || err) {
|
90 | return callback({
|
91 | err,
|
92 | response,
|
93 | isDisposed: true,
|
94 | });
|
95 | }
|
96 | callback({
|
97 | err,
|
98 | response,
|
99 | });
|
100 | };
|
101 | }
|
102 | publish(partialPacket, callback) {
|
103 | try {
|
104 | const packet = this.assignPacketId(partialPacket);
|
105 | const channel = this.normalizePattern(partialPacket.pattern);
|
106 | const serializedPacket = this.serializer.serialize(packet);
|
107 | const inbox = natsPackage.createInbox();
|
108 | const subscriptionHandler = this.createSubscriptionHandler(packet, callback);
|
109 | const subscription = this.natsClient.subscribe(inbox, {
|
110 | callback: subscriptionHandler,
|
111 | });
|
112 | const headers = this.mergeHeaders(serializedPacket.headers);
|
113 | this.natsClient.publish(channel, serializedPacket.data, {
|
114 | reply: inbox,
|
115 | headers,
|
116 | });
|
117 | return () => subscription.unsubscribe();
|
118 | }
|
119 | catch (err) {
|
120 | callback({ err });
|
121 | }
|
122 | }
|
123 | dispatchEvent(packet) {
|
124 | const pattern = this.normalizePattern(packet.pattern);
|
125 | const serializedPacket = this.serializer.serialize(packet);
|
126 | const headers = this.mergeHeaders(serializedPacket.headers);
|
127 | return new Promise((resolve, reject) => {
|
128 | try {
|
129 | this.natsClient.publish(pattern, serializedPacket.data, {
|
130 | headers,
|
131 | });
|
132 | resolve();
|
133 | }
|
134 | catch (err) {
|
135 | reject(err);
|
136 | }
|
137 | });
|
138 | }
|
139 | initializeSerializer(options) {
|
140 | var _a;
|
141 | this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new nats_record_serializer_1.NatsRecordSerializer();
|
142 | }
|
143 | initializeDeserializer(options) {
|
144 | var _a;
|
145 | this.deserializer =
|
146 | (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new nats_response_json_deserializer_1.NatsResponseJSONDeserializer();
|
147 | }
|
148 | mergeHeaders(requestHeaders) {
|
149 | var _a, _b;
|
150 | if (!requestHeaders && !((_a = this.options) === null || _a === void 0 ? void 0 : _a.headers)) {
|
151 | return undefined;
|
152 | }
|
153 | const headers = requestHeaders !== null && requestHeaders !== void 0 ? requestHeaders : natsPackage.headers();
|
154 | for (const [key, value] of Object.entries(((_b = this.options) === null || _b === void 0 ? void 0 : _b.headers) || {})) {
|
155 | if (!headers.has(key)) {
|
156 | headers.set(key, value);
|
157 | }
|
158 | }
|
159 | return headers;
|
160 | }
|
161 | }
|
162 | exports.ClientNats = ClientNats;
|