1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ClientGrpcProxy = void 0;
|
4 | const logger_service_1 = require("@nestjs/common/services/logger.service");
|
5 | const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
|
6 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
7 | const rxjs_1 = require("rxjs");
|
8 | const constants_1 = require("../constants");
|
9 | const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
|
10 | const invalid_grpc_service_exception_1 = require("../errors/invalid-grpc-service.exception");
|
11 | const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
|
12 | const client_proxy_1 = require("./client-proxy");
|
13 | const constants_2 = require("./constants");
|
14 | let grpcPackage = {};
|
15 | let grpcProtoLoaderPackage = {};
|
16 |
|
17 |
|
18 |
|
19 | class ClientGrpcProxy extends client_proxy_1.ClientProxy {
|
20 | constructor(options) {
|
21 | super();
|
22 | this.options = options;
|
23 | this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
|
24 | this.clients = new Map();
|
25 | this.grpcClients = [];
|
26 | this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
|
27 | const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
|
28 | grpcPackage = (0, load_package_util_1.loadPackage)('@grpc/grpc-js', ClientGrpcProxy.name, () => require('@grpc/grpc-js'));
|
29 | grpcProtoLoaderPackage = (0, load_package_util_1.loadPackage)(protoLoader, ClientGrpcProxy.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
|
30 | ? require('@grpc/proto-loader')
|
31 | : require(protoLoader));
|
32 | this.grpcClients = this.createClients();
|
33 | }
|
34 | getService(name) {
|
35 | const grpcClient = this.createClientByServiceName(name);
|
36 | const clientRef = this.getClient(name);
|
37 | if (!clientRef) {
|
38 | throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException(name);
|
39 | }
|
40 | const protoMethods = Object.keys(clientRef[name].prototype);
|
41 | const grpcService = {};
|
42 | protoMethods.forEach(m => {
|
43 | grpcService[m] = this.createServiceMethod(grpcClient, m);
|
44 | });
|
45 | return grpcService;
|
46 | }
|
47 | getClientByServiceName(name) {
|
48 | return this.clients.get(name) || this.createClientByServiceName(name);
|
49 | }
|
50 | createClientByServiceName(name) {
|
51 | const clientRef = this.getClient(name);
|
52 | if (!clientRef) {
|
53 | throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException(name);
|
54 | }
|
55 | const channelOptions = this.options && this.options.channelOptions
|
56 | ? this.options.channelOptions
|
57 | : {};
|
58 | if (this.options && this.options.maxSendMessageLength) {
|
59 | channelOptions['grpc.max_send_message_length'] =
|
60 | this.options.maxSendMessageLength;
|
61 | }
|
62 | if (this.options && this.options.maxReceiveMessageLength) {
|
63 | channelOptions['grpc.max_receive_message_length'] =
|
64 | this.options.maxReceiveMessageLength;
|
65 | }
|
66 | if (this.options && this.options.maxMetadataSize) {
|
67 | channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
|
68 | }
|
69 | const keepaliveOptions = this.getKeepaliveOptions();
|
70 | const options = {
|
71 | ...channelOptions,
|
72 | ...keepaliveOptions,
|
73 | };
|
74 | const credentials = this.options.credentials || grpcPackage.credentials.createInsecure();
|
75 | const grpcClient = new clientRef[name](this.url, credentials, options);
|
76 | this.clients.set(name, grpcClient);
|
77 | return grpcClient;
|
78 | }
|
79 | getKeepaliveOptions() {
|
80 | if (!(0, shared_utils_1.isObject)(this.options.keepalive)) {
|
81 | return {};
|
82 | }
|
83 | const keepaliveKeys = {
|
84 | keepaliveTimeMs: 'grpc.keepalive_time_ms',
|
85 | keepaliveTimeoutMs: 'grpc.keepalive_timeout_ms',
|
86 | keepalivePermitWithoutCalls: 'grpc.keepalive_permit_without_calls',
|
87 | http2MaxPingsWithoutData: 'grpc.http2.max_pings_without_data',
|
88 | http2MinTimeBetweenPingsMs: 'grpc.http2.min_time_between_pings_ms',
|
89 | http2MinPingIntervalWithoutDataMs: 'grpc.http2.min_ping_interval_without_data_ms',
|
90 | http2MaxPingStrikes: 'grpc.http2.max_ping_strikes',
|
91 | };
|
92 | const keepaliveOptions = {};
|
93 | for (const [optionKey, optionValue] of Object.entries(this.options.keepalive)) {
|
94 | const key = keepaliveKeys[optionKey];
|
95 | if (key === undefined) {
|
96 | continue;
|
97 | }
|
98 | keepaliveOptions[key] = optionValue;
|
99 | }
|
100 | return keepaliveOptions;
|
101 | }
|
102 | createServiceMethod(client, methodName) {
|
103 | return client[methodName].responseStream
|
104 | ? this.createStreamServiceMethod(client, methodName)
|
105 | : this.createUnaryServiceMethod(client, methodName);
|
106 | }
|
107 | createStreamServiceMethod(client, methodName) {
|
108 | return (...args) => {
|
109 | const isRequestStream = client[methodName].requestStream;
|
110 | const stream = new rxjs_1.Observable(observer => {
|
111 | let isClientCanceled = false;
|
112 | let upstreamSubscription;
|
113 | const upstreamSubjectOrData = args[0];
|
114 | const maybeMetadata = args[1];
|
115 | const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
|
116 | const call = isRequestStream && isUpstreamSubject
|
117 | ? client[methodName](maybeMetadata)
|
118 | : client[methodName](...args);
|
119 | if (isRequestStream && isUpstreamSubject) {
|
120 | upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
|
121 | }
|
122 | call.on('data', (data) => observer.next(data));
|
123 | call.on('error', (error) => {
|
124 | if (error.details === constants_2.GRPC_CANCELLED) {
|
125 | call.destroy();
|
126 | if (isClientCanceled) {
|
127 | return;
|
128 | }
|
129 | }
|
130 | observer.error(this.serializeError(error));
|
131 | });
|
132 | call.on('end', () => {
|
133 | if (upstreamSubscription) {
|
134 | upstreamSubscription.unsubscribe();
|
135 | upstreamSubscription = null;
|
136 | }
|
137 | call.removeAllListeners();
|
138 | observer.complete();
|
139 | });
|
140 | return () => {
|
141 | if (upstreamSubscription) {
|
142 | upstreamSubscription.unsubscribe();
|
143 | upstreamSubscription = null;
|
144 | }
|
145 | if (call.finished) {
|
146 | return undefined;
|
147 | }
|
148 | isClientCanceled = true;
|
149 | call.cancel();
|
150 | };
|
151 | });
|
152 | return stream;
|
153 | };
|
154 | }
|
155 | createUnaryServiceMethod(client, methodName) {
|
156 | return (...args) => {
|
157 | const isRequestStream = client[methodName].requestStream;
|
158 | const upstreamSubjectOrData = args[0];
|
159 | const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
|
160 | if (isRequestStream && isUpstreamSubject) {
|
161 | return new rxjs_1.Observable(observer => {
|
162 | let isClientCanceled = false;
|
163 | const callArgs = [
|
164 | (error, data) => {
|
165 | if (error) {
|
166 | if (error.details === constants_2.GRPC_CANCELLED || error.code === 1) {
|
167 | call.destroy();
|
168 | if (isClientCanceled) {
|
169 | return;
|
170 | }
|
171 | }
|
172 | return observer.error(this.serializeError(error));
|
173 | }
|
174 | observer.next(data);
|
175 | observer.complete();
|
176 | },
|
177 | ];
|
178 | const maybeMetadata = args[1];
|
179 | if (maybeMetadata) {
|
180 | callArgs.unshift(maybeMetadata);
|
181 | }
|
182 | const call = client[methodName](...callArgs);
|
183 | const upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
|
184 | return () => {
|
185 | upstreamSubscription.unsubscribe();
|
186 | if (!call.finished) {
|
187 | isClientCanceled = true;
|
188 | call.cancel();
|
189 | }
|
190 | };
|
191 | });
|
192 | }
|
193 | return new rxjs_1.Observable(observer => {
|
194 | const call = client[methodName](...args, (error, data) => {
|
195 | if (error) {
|
196 | return observer.error(this.serializeError(error));
|
197 | }
|
198 | observer.next(data);
|
199 | observer.complete();
|
200 | });
|
201 | return () => {
|
202 | if (!call.finished) {
|
203 | call.cancel();
|
204 | }
|
205 | };
|
206 | });
|
207 | };
|
208 | }
|
209 | createClients() {
|
210 | const grpcContext = this.loadProto();
|
211 | const packageOption = this.getOptionsProp(this.options, 'package');
|
212 | const grpcPackages = [];
|
213 | const packageNames = Array.isArray(packageOption)
|
214 | ? packageOption
|
215 | : [packageOption];
|
216 | for (const packageName of packageNames) {
|
217 | const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
218 | if (!grpcPkg) {
|
219 | const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException(packageName);
|
220 | this.logger.error(invalidPackageError.message, invalidPackageError.stack);
|
221 | throw invalidPackageError;
|
222 | }
|
223 | grpcPackages.push(grpcPkg);
|
224 | }
|
225 | return grpcPackages;
|
226 | }
|
227 | loadProto() {
|
228 | try {
|
229 | const file = this.getOptionsProp(this.options, 'protoPath');
|
230 | const loader = this.getOptionsProp(this.options, 'loader');
|
231 | const packageDefinition = this.getOptionsProp(this.options, 'packageDefinition') ||
|
232 | grpcProtoLoaderPackage.loadSync(file, loader);
|
233 | const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
|
234 | return packageObject;
|
235 | }
|
236 | catch (err) {
|
237 | const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException(err.path);
|
238 | const message = err && err.message ? err.message : invalidProtoError.message;
|
239 | this.logger.error(message, invalidProtoError.stack);
|
240 | throw invalidProtoError;
|
241 | }
|
242 | }
|
243 | lookupPackage(root, packageName) {
|
244 |
|
245 | let pkg = root;
|
246 | if (packageName) {
|
247 | for (const name of packageName.split('.')) {
|
248 | pkg = pkg[name];
|
249 | }
|
250 | }
|
251 | return pkg;
|
252 | }
|
253 | close() {
|
254 | this.grpcClients
|
255 | .filter(client => client && (0, shared_utils_1.isFunction)(client.close))
|
256 | .forEach(client => client.close());
|
257 | this.grpcClients = [];
|
258 | }
|
259 | async connect() {
|
260 | throw new Error('The "connect()" method is not supported in gRPC mode.');
|
261 | }
|
262 | send(pattern, data) {
|
263 | throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
|
264 | }
|
265 | getClient(name) {
|
266 | return this.grpcClients.find(client => client.hasOwnProperty(name));
|
267 | }
|
268 | publish(packet, callback) {
|
269 | throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
|
270 | }
|
271 | async dispatchEvent(packet) {
|
272 | throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
|
273 | }
|
274 | }
|
275 | exports.ClientGrpcProxy = ClientGrpcProxy;
|