UNPKG

6.64 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientNats = void 0;
4const tslib_1 = require("tslib");
5const logger_service_1 = require("@nestjs/common/services/logger.service");
6const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
7const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
8const constants_1 = require("../constants");
9const nats_response_json_deserializer_1 = require("../deserializers/nats-response-json.deserializer");
10const empty_response_exception_1 = require("../errors/empty-response.exception");
11const nats_record_serializer_1 = require("../serializers/nats-record.serializer");
12const client_proxy_1 = require("./client-proxy");
13let natsPackage = {};
14class ClientNats extends client_proxy_1.ClientProxy {
15 constructor(options) {
16 super();
17 this.options = options;
18 this.logger = new logger_service_1.Logger(ClientNats.name);
19 natsPackage = (0, load_package_util_1.loadPackage)('nats', ClientNats.name, () => require('nats'));
20 this.initializeSerializer(options);
21 this.initializeDeserializer(options);
22 }
23 async close() {
24 var _a;
25 await ((_a = this.natsClient) === null || _a === void 0 ? void 0 : _a.close());
26 this.natsClient = null;
27 }
28 async connect() {
29 if (this.natsClient) {
30 return this.natsClient;
31 }
32 this.natsClient = await this.createClient();
33 this.handleStatusUpdates(this.natsClient);
34 return this.natsClient;
35 }
36 createClient() {
37 const options = this.options || {};
38 return natsPackage.connect(Object.assign({ servers: constants_1.NATS_DEFAULT_URL }, options));
39 }
40 async handleStatusUpdates(client) {
41 var e_1, _a;
42 try {
43 for (var _b = tslib_1.__asyncValues(client.status()), _c; _c = await _b.next(), !_c.done;) {
44 const status = _c.value;
45 const data = status.data && (0, shared_utils_1.isObject)(status.data)
46 ? JSON.stringify(status.data)
47 : status.data;
48 if (status.type === 'disconnect' || status.type === 'error') {
49 this.logger.error(`NatsError: type: "${status.type}", data: "${data}".`);
50 }
51 else {
52 const message = `NatsStatus: type: "${status.type}", data: "${data}".`;
53 if (status.type === 'pingTimer') {
54 this.logger.debug(message);
55 }
56 else {
57 this.logger.log(message);
58 }
59 }
60 }
61 }
62 catch (e_1_1) { e_1 = { error: e_1_1 }; }
63 finally {
64 try {
65 if (_c && !_c.done && (_a = _b.return)) await _a.call(_b);
66 }
67 finally { if (e_1) throw e_1.error; }
68 }
69 }
70 createSubscriptionHandler(packet, callback) {
71 return async (error, natsMsg) => {
72 if (error) {
73 return callback({
74 err: error,
75 });
76 }
77 const rawPacket = natsMsg.data;
78 if ((rawPacket === null || rawPacket === void 0 ? void 0 : rawPacket.length) === 0) {
79 return callback({
80 err: new empty_response_exception_1.EmptyResponseException(this.normalizePattern(packet.pattern)),
81 isDisposed: true,
82 });
83 }
84 const message = await this.deserializer.deserialize(rawPacket);
85 if (message.id && message.id !== packet.id) {
86 return undefined;
87 }
88 const { err, response, isDisposed } = message;
89 if (isDisposed || err) {
90 return callback({
91 err,
92 response,
93 isDisposed: true,
94 });
95 }
96 callback({
97 err,
98 response,
99 });
100 };
101 }
102 publish(partialPacket, callback) {
103 try {
104 const packet = this.assignPacketId(partialPacket);
105 const channel = this.normalizePattern(partialPacket.pattern);
106 const serializedPacket = this.serializer.serialize(packet);
107 const inbox = natsPackage.createInbox();
108 const subscriptionHandler = this.createSubscriptionHandler(packet, callback);
109 const subscription = this.natsClient.subscribe(inbox, {
110 callback: subscriptionHandler,
111 });
112 const headers = this.mergeHeaders(serializedPacket.headers);
113 this.natsClient.publish(channel, serializedPacket.data, {
114 reply: inbox,
115 headers,
116 });
117 return () => subscription.unsubscribe();
118 }
119 catch (err) {
120 callback({ err });
121 }
122 }
123 dispatchEvent(packet) {
124 const pattern = this.normalizePattern(packet.pattern);
125 const serializedPacket = this.serializer.serialize(packet);
126 const headers = this.mergeHeaders(serializedPacket.headers);
127 return new Promise((resolve, reject) => {
128 try {
129 this.natsClient.publish(pattern, serializedPacket.data, {
130 headers,
131 });
132 resolve();
133 }
134 catch (err) {
135 reject(err);
136 }
137 });
138 }
139 initializeSerializer(options) {
140 var _a;
141 this.serializer = (_a = options === null || options === void 0 ? void 0 : options.serializer) !== null && _a !== void 0 ? _a : new nats_record_serializer_1.NatsRecordSerializer();
142 }
143 initializeDeserializer(options) {
144 var _a;
145 this.deserializer =
146 (_a = options === null || options === void 0 ? void 0 : options.deserializer) !== null && _a !== void 0 ? _a : new nats_response_json_deserializer_1.NatsResponseJSONDeserializer();
147 }
148 mergeHeaders(requestHeaders) {
149 var _a, _b;
150 if (!requestHeaders && !((_a = this.options) === null || _a === void 0 ? void 0 : _a.headers)) {
151 return undefined;
152 }
153 const headers = requestHeaders !== null && requestHeaders !== void 0 ? requestHeaders : natsPackage.headers();
154 for (const [key, value] of Object.entries(((_b = this.options) === null || _b === void 0 ? void 0 : _b.headers) || {})) {
155 if (!headers.has(key)) {
156 headers.set(key, value);
157 }
158 }
159 return headers;
160 }
161}
162exports.ClientNats = ClientNats;