UNPKG

12.5 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientGrpcProxy = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const rxjs_1 = require("rxjs");
8const constants_1 = require("../constants");
9const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
10const invalid_grpc_service_exception_1 = require("../errors/invalid-grpc-service.exception");
11const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
12const client_proxy_1 = require("./client-proxy");
13const constants_2 = require("./constants");
14let grpcPackage = {};
15let grpcProtoLoaderPackage = {};
16/**
17 * @publicApi
18 */
19class ClientGrpcProxy extends client_proxy_1.ClientProxy {
20 constructor(options) {
21 super();
22 this.options = options;
23 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
24 this.clients = new Map();
25 this.grpcClients = [];
26 this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
27 const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
28 grpcPackage = (0, load_package_util_1.loadPackage)('@grpc/grpc-js', ClientGrpcProxy.name, () => require('@grpc/grpc-js'));
29 grpcProtoLoaderPackage = (0, load_package_util_1.loadPackage)(protoLoader, ClientGrpcProxy.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
30 ? require('@grpc/proto-loader')
31 : require(protoLoader));
32 this.grpcClients = this.createClients();
33 }
34 getService(name) {
35 const grpcClient = this.createClientByServiceName(name);
36 const clientRef = this.getClient(name);
37 if (!clientRef) {
38 throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException(name);
39 }
40 const protoMethods = Object.keys(clientRef[name].prototype);
41 const grpcService = {};
42 protoMethods.forEach(m => {
43 grpcService[m] = this.createServiceMethod(grpcClient, m);
44 });
45 return grpcService;
46 }
47 getClientByServiceName(name) {
48 return this.clients.get(name) || this.createClientByServiceName(name);
49 }
50 createClientByServiceName(name) {
51 const clientRef = this.getClient(name);
52 if (!clientRef) {
53 throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException(name);
54 }
55 const channelOptions = this.options && this.options.channelOptions
56 ? this.options.channelOptions
57 : {};
58 if (this.options && this.options.maxSendMessageLength) {
59 channelOptions['grpc.max_send_message_length'] =
60 this.options.maxSendMessageLength;
61 }
62 if (this.options && this.options.maxReceiveMessageLength) {
63 channelOptions['grpc.max_receive_message_length'] =
64 this.options.maxReceiveMessageLength;
65 }
66 if (this.options && this.options.maxMetadataSize) {
67 channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
68 }
69 const keepaliveOptions = this.getKeepaliveOptions();
70 const options = {
71 ...channelOptions,
72 ...keepaliveOptions,
73 };
74 const credentials = this.options.credentials || grpcPackage.credentials.createInsecure();
75 const grpcClient = new clientRef[name](this.url, credentials, options);
76 this.clients.set(name, grpcClient);
77 return grpcClient;
78 }
79 getKeepaliveOptions() {
80 if (!(0, shared_utils_1.isObject)(this.options.keepalive)) {
81 return {};
82 }
83 const keepaliveKeys = {
84 keepaliveTimeMs: 'grpc.keepalive_time_ms',
85 keepaliveTimeoutMs: 'grpc.keepalive_timeout_ms',
86 keepalivePermitWithoutCalls: 'grpc.keepalive_permit_without_calls',
87 http2MaxPingsWithoutData: 'grpc.http2.max_pings_without_data',
88 http2MinTimeBetweenPingsMs: 'grpc.http2.min_time_between_pings_ms',
89 http2MinPingIntervalWithoutDataMs: 'grpc.http2.min_ping_interval_without_data_ms',
90 http2MaxPingStrikes: 'grpc.http2.max_ping_strikes',
91 };
92 const keepaliveOptions = {};
93 for (const [optionKey, optionValue] of Object.entries(this.options.keepalive)) {
94 const key = keepaliveKeys[optionKey];
95 if (key === undefined) {
96 continue;
97 }
98 keepaliveOptions[key] = optionValue;
99 }
100 return keepaliveOptions;
101 }
102 createServiceMethod(client, methodName) {
103 return client[methodName].responseStream
104 ? this.createStreamServiceMethod(client, methodName)
105 : this.createUnaryServiceMethod(client, methodName);
106 }
107 createStreamServiceMethod(client, methodName) {
108 return (...args) => {
109 const isRequestStream = client[methodName].requestStream;
110 const stream = new rxjs_1.Observable(observer => {
111 let isClientCanceled = false;
112 let upstreamSubscription;
113 const upstreamSubjectOrData = args[0];
114 const maybeMetadata = args[1];
115 const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
116 const call = isRequestStream && isUpstreamSubject
117 ? client[methodName](maybeMetadata)
118 : client[methodName](...args);
119 if (isRequestStream && isUpstreamSubject) {
120 upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
121 }
122 call.on('data', (data) => observer.next(data));
123 call.on('error', (error) => {
124 if (error.details === constants_2.GRPC_CANCELLED) {
125 call.destroy();
126 if (isClientCanceled) {
127 return;
128 }
129 }
130 observer.error(this.serializeError(error));
131 });
132 call.on('end', () => {
133 if (upstreamSubscription) {
134 upstreamSubscription.unsubscribe();
135 upstreamSubscription = null;
136 }
137 call.removeAllListeners();
138 observer.complete();
139 });
140 return () => {
141 if (upstreamSubscription) {
142 upstreamSubscription.unsubscribe();
143 upstreamSubscription = null;
144 }
145 if (call.finished) {
146 return undefined;
147 }
148 isClientCanceled = true;
149 call.cancel();
150 };
151 });
152 return stream;
153 };
154 }
155 createUnaryServiceMethod(client, methodName) {
156 return (...args) => {
157 const isRequestStream = client[methodName].requestStream;
158 const upstreamSubjectOrData = args[0];
159 const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
160 if (isRequestStream && isUpstreamSubject) {
161 return new rxjs_1.Observable(observer => {
162 let isClientCanceled = false;
163 const callArgs = [
164 (error, data) => {
165 if (error) {
166 if (error.details === constants_2.GRPC_CANCELLED || error.code === 1) {
167 call.destroy();
168 if (isClientCanceled) {
169 return;
170 }
171 }
172 return observer.error(this.serializeError(error));
173 }
174 observer.next(data);
175 observer.complete();
176 },
177 ];
178 const maybeMetadata = args[1];
179 if (maybeMetadata) {
180 callArgs.unshift(maybeMetadata);
181 }
182 const call = client[methodName](...callArgs);
183 const upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
184 return () => {
185 upstreamSubscription.unsubscribe();
186 if (!call.finished) {
187 isClientCanceled = true;
188 call.cancel();
189 }
190 };
191 });
192 }
193 return new rxjs_1.Observable(observer => {
194 const call = client[methodName](...args, (error, data) => {
195 if (error) {
196 return observer.error(this.serializeError(error));
197 }
198 observer.next(data);
199 observer.complete();
200 });
201 return () => {
202 if (!call.finished) {
203 call.cancel();
204 }
205 };
206 });
207 };
208 }
209 createClients() {
210 const grpcContext = this.loadProto();
211 const packageOption = this.getOptionsProp(this.options, 'package');
212 const grpcPackages = [];
213 const packageNames = Array.isArray(packageOption)
214 ? packageOption
215 : [packageOption];
216 for (const packageName of packageNames) {
217 const grpcPkg = this.lookupPackage(grpcContext, packageName);
218 if (!grpcPkg) {
219 const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException(packageName);
220 this.logger.error(invalidPackageError.message, invalidPackageError.stack);
221 throw invalidPackageError;
222 }
223 grpcPackages.push(grpcPkg);
224 }
225 return grpcPackages;
226 }
227 loadProto() {
228 try {
229 const file = this.getOptionsProp(this.options, 'protoPath');
230 const loader = this.getOptionsProp(this.options, 'loader');
231 const packageDefinition = this.getOptionsProp(this.options, 'packageDefinition') ||
232 grpcProtoLoaderPackage.loadSync(file, loader);
233 const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
234 return packageObject;
235 }
236 catch (err) {
237 const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException(err.path);
238 const message = err && err.message ? err.message : invalidProtoError.message;
239 this.logger.error(message, invalidProtoError.stack);
240 throw invalidProtoError;
241 }
242 }
243 lookupPackage(root, packageName) {
244 /** Reference: https://github.com/kondi/rxjs-grpc */
245 let pkg = root;
246 if (packageName) {
247 for (const name of packageName.split('.')) {
248 pkg = pkg[name];
249 }
250 }
251 return pkg;
252 }
253 close() {
254 this.grpcClients
255 .filter(client => client && (0, shared_utils_1.isFunction)(client.close))
256 .forEach(client => client.close());
257 this.grpcClients = [];
258 }
259 async connect() {
260 throw new Error('The "connect()" method is not supported in gRPC mode.');
261 }
262 send(pattern, data) {
263 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
264 }
265 getClient(name) {
266 return this.grpcClients.find(client => client.hasOwnProperty(name));
267 }
268 publish(packet, callback) {
269 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
270 }
271 async dispatchEvent(packet) {
272 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
273 }
274}
275exports.ClientGrpcProxy = ClientGrpcProxy;