1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.ServerGrpc = void 0;
|
4 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
5 | const rxjs_1 = require("rxjs");
|
6 | const operators_1 = require("rxjs/operators");
|
7 | const constants_1 = require("../constants");
|
8 | const decorators_1 = require("../decorators");
|
9 | const enums_1 = require("../enums");
|
10 | const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
|
11 | const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
|
12 | const server_1 = require("./server");
|
13 | let grpcPackage = {};
|
14 | let grpcProtoLoaderPackage = {};
|
15 | class ServerGrpc extends server_1.Server {
|
16 | constructor(options) {
|
17 | super();
|
18 | this.options = options;
|
19 | this.transportId = enums_1.Transport.GRPC;
|
20 | this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
|
21 | const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
|
22 | grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () => require('@grpc/grpc-js'));
|
23 | grpcProtoLoaderPackage = this.loadPackage(protoLoader, ServerGrpc.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
|
24 | ? require('@grpc/proto-loader')
|
25 | : require(protoLoader));
|
26 | }
|
27 | async listen(callback) {
|
28 | try {
|
29 | this.grpcClient = await this.createClient();
|
30 | await this.start(callback);
|
31 | }
|
32 | catch (err) {
|
33 | callback(err);
|
34 | }
|
35 | }
|
36 | async start(callback) {
|
37 | await this.bindEvents();
|
38 | this.grpcClient.start();
|
39 | callback();
|
40 | }
|
41 | async bindEvents() {
|
42 | const grpcContext = this.loadProto();
|
43 | const packageOption = this.getOptionsProp(this.options, 'package');
|
44 | const packageNames = Array.isArray(packageOption)
|
45 | ? packageOption
|
46 | : [packageOption];
|
47 | for (const packageName of packageNames) {
|
48 | const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
49 | await this.createServices(grpcPkg, packageName);
|
50 | }
|
51 | }
|
52 | |
53 |
|
54 |
|
55 |
|
56 |
|
57 | getServiceNames(grpcPkg) {
|
58 |
|
59 | const services = [];
|
60 |
|
61 | this.collectDeepServices('', grpcPkg, services);
|
62 | return services;
|
63 | }
|
64 | |
65 |
|
66 |
|
67 |
|
68 |
|
69 |
|
70 |
|
71 | async createService(grpcService, name) {
|
72 | const service = {};
|
73 | for (const methodName in grpcService.prototype) {
|
74 | let pattern = '';
|
75 | let methodHandler = null;
|
76 | let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
|
77 | const methodFunction = grpcService.prototype[methodName];
|
78 | const methodReqStreaming = methodFunction.requestStream;
|
79 | if (!(0, shared_utils_1.isUndefined)(methodReqStreaming) && methodReqStreaming) {
|
80 |
|
81 |
|
82 | pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING);
|
83 | methodHandler = this.messageHandlers.get(pattern);
|
84 | streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING;
|
85 |
|
86 |
|
87 | if (!methodHandler) {
|
88 | pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING);
|
89 | methodHandler = this.messageHandlers.get(pattern);
|
90 | streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING;
|
91 | }
|
92 | }
|
93 | else {
|
94 | pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING);
|
95 |
|
96 | methodHandler = this.messageHandlers.get(pattern);
|
97 | streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
|
98 | }
|
99 | if (!methodHandler) {
|
100 | continue;
|
101 | }
|
102 | service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType);
|
103 | }
|
104 | return service;
|
105 | }
|
106 | |
107 |
|
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 | createPattern(service, methodName, streaming) {
|
115 | return JSON.stringify({
|
116 | service,
|
117 | rpc: methodName,
|
118 | streaming,
|
119 | });
|
120 | }
|
121 | |
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 | createServiceMethod(methodHandler, protoNativeHandler, streamType) {
|
129 |
|
130 |
|
131 | if (protoNativeHandler.requestStream) {
|
132 |
|
133 | if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING) {
|
134 | return this.createRequestStreamMethod(methodHandler, protoNativeHandler.responseStream);
|
135 | }
|
136 |
|
137 | else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING) {
|
138 | return this.createStreamCallMethod(methodHandler, protoNativeHandler.responseStream);
|
139 | }
|
140 | }
|
141 | return protoNativeHandler.responseStream
|
142 | ? this.createStreamServiceMethod(methodHandler)
|
143 | : this.createUnaryServiceMethod(methodHandler);
|
144 | }
|
145 | createUnaryServiceMethod(methodHandler) {
|
146 | return async (call, callback) => {
|
147 | const handler = methodHandler(call.request, call.metadata, call);
|
148 | this.transformToObservable(await handler).subscribe({
|
149 | next: async (data) => callback(null, await data),
|
150 | error: (err) => callback(err),
|
151 | });
|
152 | };
|
153 | }
|
154 | createStreamServiceMethod(methodHandler) {
|
155 | return async (call, callback) => {
|
156 | const handler = methodHandler(call.request, call.metadata, call);
|
157 | const result$ = this.transformToObservable(await handler);
|
158 | try {
|
159 | await this.writeObservableToGrpc(result$, call);
|
160 | }
|
161 | catch (err) {
|
162 | call.emit('error', err);
|
163 | return;
|
164 | }
|
165 | };
|
166 | }
|
167 | |
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 |
|
174 |
|
175 |
|
176 |
|
177 | writeObservableToGrpc(source, call) {
|
178 | return new Promise((resolve, reject) => {
|
179 | const valuesWaitingToBeDrained = [];
|
180 | let shouldErrorAfterDraining = false;
|
181 | let error;
|
182 | let shouldResolveAfterDraining = false;
|
183 | let writing = true;
|
184 |
|
185 | const subscription = new rxjs_1.Subscription();
|
186 |
|
187 | const cancelHandler = () => {
|
188 | subscription.unsubscribe();
|
189 |
|
190 |
|
191 |
|
192 |
|
193 |
|
194 | resolve();
|
195 | };
|
196 | call.on(constants_1.CANCEL_EVENT, cancelHandler);
|
197 | subscription.add(() => call.off(constants_1.CANCEL_EVENT, cancelHandler));
|
198 |
|
199 | subscription.add(() => call.end());
|
200 | const drain = () => {
|
201 | writing = true;
|
202 | while (valuesWaitingToBeDrained.length > 0) {
|
203 | const value = valuesWaitingToBeDrained.shift();
|
204 | if (writing) {
|
205 |
|
206 |
|
207 | writing = call.write(value);
|
208 | if (!writing) {
|
209 |
|
210 | return;
|
211 | }
|
212 | }
|
213 | }
|
214 | if (shouldResolveAfterDraining) {
|
215 | subscription.unsubscribe();
|
216 | resolve();
|
217 | }
|
218 | else if (shouldErrorAfterDraining) {
|
219 | subscription.unsubscribe();
|
220 | reject(error);
|
221 | }
|
222 | };
|
223 | call.on('drain', drain);
|
224 | subscription.add(() => call.off('drain', drain));
|
225 | subscription.add(source.subscribe({
|
226 | next(value) {
|
227 | if (writing) {
|
228 | writing = call.write(value);
|
229 | }
|
230 | else {
|
231 |
|
232 |
|
233 |
|
234 | valuesWaitingToBeDrained.push(value);
|
235 | }
|
236 | },
|
237 | error(err) {
|
238 | if (valuesWaitingToBeDrained.length === 0) {
|
239 |
|
240 |
|
241 | subscription.unsubscribe();
|
242 | reject(err);
|
243 | }
|
244 | else {
|
245 |
|
246 |
|
247 | shouldErrorAfterDraining = true;
|
248 | error = err;
|
249 | }
|
250 | },
|
251 | complete() {
|
252 | if (valuesWaitingToBeDrained.length === 0) {
|
253 |
|
254 |
|
255 | subscription.unsubscribe();
|
256 | resolve();
|
257 | }
|
258 | else {
|
259 | shouldResolveAfterDraining = true;
|
260 | }
|
261 | },
|
262 | }));
|
263 | });
|
264 | }
|
265 | createRequestStreamMethod(methodHandler, isResponseStream) {
|
266 | return async (call, callback) => {
|
267 | const req = new rxjs_1.Subject();
|
268 | call.on('data', (m) => req.next(m));
|
269 | call.on('error', (e) => {
|
270 |
|
271 | const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
|
272 | if (isCancelledError) {
|
273 | call.end();
|
274 | return;
|
275 | }
|
276 |
|
277 | req.error(e);
|
278 | });
|
279 | call.on('end', () => req.complete());
|
280 | const handler = methodHandler(req.asObservable(), call.metadata, call);
|
281 | const res = this.transformToObservable(await handler);
|
282 | if (isResponseStream) {
|
283 | try {
|
284 | await this.writeObservableToGrpc(res, call);
|
285 | }
|
286 | catch (err) {
|
287 | call.emit('error', err);
|
288 | return;
|
289 | }
|
290 | }
|
291 | else {
|
292 | const response = await (0, rxjs_1.lastValueFrom)(res.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
|
293 | callback(err, null);
|
294 | return rxjs_1.EMPTY;
|
295 | }), (0, rxjs_1.defaultIfEmpty)(undefined)));
|
296 | if (!(0, shared_utils_1.isUndefined)(response)) {
|
297 | callback(null, response);
|
298 | }
|
299 | }
|
300 | };
|
301 | }
|
302 | createStreamCallMethod(methodHandler, isResponseStream) {
|
303 | return async (call, callback) => {
|
304 | if (isResponseStream) {
|
305 | methodHandler(call);
|
306 | }
|
307 | else {
|
308 | methodHandler(call, callback);
|
309 | }
|
310 | };
|
311 | }
|
312 | async close() {
|
313 | if (this.grpcClient) {
|
314 | const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
|
315 | if (graceful) {
|
316 | await new Promise((resolve, reject) => {
|
317 | this.grpcClient.tryShutdown((error) => {
|
318 | if (error)
|
319 | reject(error);
|
320 | else
|
321 | resolve();
|
322 | });
|
323 | });
|
324 | }
|
325 | else {
|
326 | this.grpcClient.forceShutdown();
|
327 | }
|
328 | }
|
329 | this.grpcClient = null;
|
330 | }
|
331 | deserialize(obj) {
|
332 | try {
|
333 | return JSON.parse(obj);
|
334 | }
|
335 | catch (e) {
|
336 | return obj;
|
337 | }
|
338 | }
|
339 | addHandler(pattern, callback, isEventHandler = false) {
|
340 | const route = (0, shared_utils_1.isString)(pattern) ? pattern : JSON.stringify(pattern);
|
341 | callback.isEventHandler = isEventHandler;
|
342 | this.messageHandlers.set(route, callback);
|
343 | }
|
344 | async createClient() {
|
345 | const channelOptions = this.options && this.options.channelOptions
|
346 | ? this.options.channelOptions
|
347 | : {};
|
348 | if (this.options && this.options.maxSendMessageLength) {
|
349 | channelOptions['grpc.max_send_message_length'] =
|
350 | this.options.maxSendMessageLength;
|
351 | }
|
352 | if (this.options && this.options.maxReceiveMessageLength) {
|
353 | channelOptions['grpc.max_receive_message_length'] =
|
354 | this.options.maxReceiveMessageLength;
|
355 | }
|
356 | if (this.options && this.options.maxMetadataSize) {
|
357 | channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
|
358 | }
|
359 | const server = new grpcPackage.Server(channelOptions);
|
360 | const credentials = this.getOptionsProp(this.options, 'credentials');
|
361 | await new Promise((resolve, reject) => {
|
362 | server.bindAsync(this.url, credentials || grpcPackage.ServerCredentials.createInsecure(), (error, port) => error ? reject(error) : resolve(port));
|
363 | });
|
364 | return server;
|
365 | }
|
366 | lookupPackage(root, packageName) {
|
367 |
|
368 | let pkg = root;
|
369 | for (const name of packageName.split(/\./)) {
|
370 | pkg = pkg[name];
|
371 | }
|
372 | return pkg;
|
373 | }
|
374 | loadProto() {
|
375 | try {
|
376 | const file = this.getOptionsProp(this.options, 'protoPath');
|
377 | const loader = this.getOptionsProp(this.options, 'loader');
|
378 | const packageDefinition = grpcProtoLoaderPackage.loadSync(file, loader);
|
379 | const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
|
380 | return packageObject;
|
381 | }
|
382 | catch (err) {
|
383 | const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException(err.path);
|
384 | const message = err && err.message ? err.message : invalidProtoError.message;
|
385 | this.logger.error(message, invalidProtoError.stack);
|
386 | throw err;
|
387 | }
|
388 | }
|
389 | |
390 |
|
391 |
|
392 |
|
393 |
|
394 |
|
395 |
|
396 |
|
397 |
|
398 |
|
399 |
|
400 |
|
401 |
|
402 | collectDeepServices(name, grpcDefinition, accumulator) {
|
403 | if (!(0, shared_utils_1.isObject)(grpcDefinition)) {
|
404 | return;
|
405 | }
|
406 | const keysToTraverse = Object.keys(grpcDefinition);
|
407 |
|
408 | for (const key of keysToTraverse) {
|
409 | const nameExtended = this.parseDeepServiceName(name, key);
|
410 | const deepDefinition = grpcDefinition[key];
|
411 | const isServiceDefined = deepDefinition && !(0, shared_utils_1.isUndefined)(deepDefinition.service);
|
412 | const isServiceBoolean = isServiceDefined
|
413 | ? deepDefinition.service !== false
|
414 | : false;
|
415 | if (isServiceDefined && isServiceBoolean) {
|
416 | accumulator.push({
|
417 | name: nameExtended,
|
418 | service: deepDefinition,
|
419 | });
|
420 | }
|
421 |
|
422 | else {
|
423 | this.collectDeepServices(nameExtended, deepDefinition, accumulator);
|
424 | }
|
425 | }
|
426 | }
|
427 | parseDeepServiceName(name, key) {
|
428 |
|
429 | if (name.length === 0) {
|
430 | return key;
|
431 | }
|
432 |
|
433 | return name + '.' + key;
|
434 | }
|
435 | async createServices(grpcPkg, packageName) {
|
436 | if (!grpcPkg) {
|
437 | const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException(packageName);
|
438 | this.logger.error(invalidPackageError.message, invalidPackageError.stack);
|
439 | throw invalidPackageError;
|
440 | }
|
441 |
|
442 |
|
443 | for (const definition of this.getServiceNames(grpcPkg)) {
|
444 | this.grpcClient.addService(
|
445 |
|
446 | definition.service.service,
|
447 |
|
448 | await this.createService(definition.service, definition.name));
|
449 | }
|
450 | }
|
451 | }
|
452 | exports.ServerGrpc = ServerGrpc;
|