UNPKG

5.82 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientNats = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const constants_1 = require("../constants");
8const nats_response_json_deserializer_1 = require("../deserializers/nats-response-json.deserializer");
9const empty_response_exception_1 = require("../errors/empty-response.exception");
10const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
11const client_proxy_1 = require("./client-proxy");
12let natsPackage = {};
13/**
14 * @publicApi
15 */
16class ClientNats extends client_proxy_1.ClientProxy {
17 constructor(options) {
18 super();
19 this.options = options;
20 this.logger = new logger_service_1.Logger(ClientNats.name);
21 natsPackage = (0, load_package_util_1.loadPackage)('nats', ClientNats.name, () => require('nats'));
22 this.initializeSerializer(options);
23 this.initializeDeserializer(options);
24 }
25 async close() {
26 await this.natsClient?.close();
27 this.natsClient = null;
28 }
29 async connect() {
30 if (this.natsClient) {
31 return this.natsClient;
32 }
33 this.natsClient = await this.createClient();
34 this.handleStatusUpdates(this.natsClient);
35 return this.natsClient;
36 }
37 createClient() {
38 const options = this.options || {};
39 return natsPackage.connect({
40 servers: constants_1.NATS_DEFAULT_URL,
41 ...options,
42 });
43 }
44 async handleStatusUpdates(client) {
45 for await (const status of client.status()) {
46 const data = status.data && (0, shared_utils_1.isObject)(status.data)
47 ? JSON.stringify(status.data)
48 : status.data;
49 switch (status.type) {
50 case 'error':
51 case 'disconnect':
52 this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
53 break;
54 case 'pingTimer':
55 if (this.options.debug) {
56 this.logger.debug(`NatsStatus: type: "${status.type}", data: "${data}".`);
57 }
58 break;
59 default:
60 this.logger.log(`NatsStatus: type: "${status.type}", data: "${data}".`);
61 break;
62 }
63 }
64 }
65 createSubscriptionHandler(packet, callback) {
66 return async (error, natsMsg) => {
67 if (error) {
68 return callback({
69 err: error,
70 });
71 }
72 const rawPacket = natsMsg.data;
73 if (rawPacket?.length === 0) {
74 return callback({
75 err: new empty_response_exception_1.EmptyResponseException(this.normalizePattern(packet.pattern)),
76 isDisposed: true,
77 });
78 }
79 const message = await this.deserializer.deserialize(rawPacket);
80 if (message.id && message.id !== packet.id) {
81 return undefined;
82 }
83 const { err, response, isDisposed } = message;
84 if (isDisposed || err) {
85 return callback({
86 err,
87 response,
88 isDisposed: true,
89 });
90 }
91 callback({
92 err,
93 response,
94 });
95 };
96 }
97 publish(partialPacket, callback) {
98 try {
99 const packet = this.assignPacketId(partialPacket);
100 const channel = this.normalizePattern(partialPacket.pattern);
101 const serializedPacket = this.serializer.serialize(packet);
102 const inbox = natsPackage.createInbox();
103 const subscriptionHandler = this.createSubscriptionHandler(packet, callback);
104 const subscription = this.natsClient.subscribe(inbox, {
105 callback: subscriptionHandler,
106 });
107 const headers = this.mergeHeaders(serializedPacket.headers);
108 this.natsClient.publish(channel, serializedPacket.data, {
109 reply: inbox,
110 headers,
111 });
112 return () => subscription.unsubscribe();
113 }
114 catch (err) {
115 callback({ err });
116 }
117 }
118 dispatchEvent(packet) {
119 const pattern = this.normalizePattern(packet.pattern);
120 const serializedPacket = this.serializer.serialize(packet);
121 const headers = this.mergeHeaders(serializedPacket.headers);
122 return new Promise((resolve, reject) => {
123 try {
124 this.natsClient.publish(pattern, serializedPacket.data, {
125 headers,
126 });
127 resolve();
128 }
129 catch (err) {
130 reject(err);
131 }
132 });
133 }
134 initializeSerializer(options) {
135 this.serializer = options?.serializer ?? new nats_record_serializer_1.NatsRecordSerializer();
136 }
137 initializeDeserializer(options) {
138 this.deserializer =
139 options?.deserializer ?? new nats_response_json_deserializer_1.NatsResponseJSONDeserializer();
140 }
141 mergeHeaders(requestHeaders) {
142 if (!requestHeaders && !this.options?.headers) {
143 return undefined;
144 }
145 const headers = requestHeaders ?? natsPackage.headers();
146 for (const [key, value] of Object.entries(this.options?.headers || {})) {
147 if (!headers.has(key)) {
148 headers.set(key, value);
149 }
150 }
151 return headers;
152 }
153}
154exports.ClientNats = ClientNats;