UNPKG

19.6 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerGrpc = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const rxjs_1 = require("rxjs");
6const operators_1 = require("rxjs/operators");
7const constants_1 = require("../constants");
8const decorators_1 = require("../decorators");
9const enums_1 = require("../enums");
10const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
11const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
12const server_1 = require("./server");
13let grpcPackage = {};
14let grpcProtoLoaderPackage = {};
15class ServerGrpc extends server_1.Server {
16 constructor(options) {
17 super();
18 this.options = options;
19 this.transportId = enums_1.Transport.GRPC;
20 this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
21 const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
22 grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () => require('@grpc/grpc-js'));
23 grpcProtoLoaderPackage = this.loadPackage(protoLoader, ServerGrpc.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
24 ? require('@grpc/proto-loader')
25 : require(protoLoader));
26 }
27 async listen(callback) {
28 try {
29 this.grpcClient = await this.createClient();
30 await this.start(callback);
31 }
32 catch (err) {
33 callback(err);
34 }
35 }
36 async start(callback) {
37 await this.bindEvents();
38 this.grpcClient.start();
39 callback();
40 }
41 async bindEvents() {
42 const grpcContext = this.loadProto();
43 const packageOption = this.getOptionsProp(this.options, 'package');
44 const packageNames = Array.isArray(packageOption)
45 ? packageOption
46 : [packageOption];
47 for (const packageName of packageNames) {
48 const grpcPkg = this.lookupPackage(grpcContext, packageName);
49 await this.createServices(grpcPkg, packageName);
50 }
51 }
52 /**
53 * Will return all of the services along with their fully namespaced
54 * names as an array of objects.
55 * This method initiates recursive scan of grpcPkg object
56 */
57 getServiceNames(grpcPkg) {
58 // Define accumulator to collect all of the services available to load
59 const services = [];
60 // Initiate recursive services collector starting with empty name
61 this.collectDeepServices('', grpcPkg, services);
62 return services;
63 }
64 /**
65 * Will create service mapping from gRPC generated Object to handlers
66 * defined with @GrpcMethod or @GrpcStreamMethod annotations
67 *
68 * @param grpcService
69 * @param name
70 */
71 async createService(grpcService, name) {
72 const service = {};
73 for (const methodName in grpcService.prototype) {
74 let pattern = '';
75 let methodHandler = null;
76 let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
77 const methodFunction = grpcService.prototype[methodName];
78 const methodReqStreaming = methodFunction.requestStream;
79 if (!(0, shared_utils_1.isUndefined)(methodReqStreaming) && methodReqStreaming) {
80 // Try first pattern to be presented, RX streaming pattern would be
81 // a preferable pattern to select among a few defined
82 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING);
83 methodHandler = this.messageHandlers.get(pattern);
84 streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING;
85 // If first pattern didn't match to any of handlers then try
86 // pass-through handler to be presented
87 if (!methodHandler) {
88 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING);
89 methodHandler = this.messageHandlers.get(pattern);
90 streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING;
91 }
92 }
93 else {
94 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING);
95 // Select handler if any presented for No-Streaming pattern
96 methodHandler = this.messageHandlers.get(pattern);
97 streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
98 }
99 if (!methodHandler) {
100 continue;
101 }
102 service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType);
103 }
104 return service;
105 }
106 /**
107 * Will create a string of a JSON serialized format
108 *
109 * @param service name of the service which should be a match to gRPC service definition name
110 * @param methodName name of the method which is coming after rpc keyword
111 * @param streaming GrpcMethodStreamingType parameter which should correspond to
112 * stream keyword in gRPC service request part
113 */
114 createPattern(service, methodName, streaming) {
115 return JSON.stringify({
116 service,
117 rpc: methodName,
118 streaming,
119 });
120 }
121 /**
122 * Will return async function which will handle gRPC call
123 * with Rx streams or as a direct call passthrough
124 *
125 * @param methodHandler
126 * @param protoNativeHandler
127 */
128 createServiceMethod(methodHandler, protoNativeHandler, streamType) {
129 // If proto handler has request stream as "true" then we expect it to have
130 // streaming from the side of requester
131 if (protoNativeHandler.requestStream) {
132 // If any handlers were defined with GrpcStreamMethod annotation use RX
133 if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING) {
134 return this.createRequestStreamMethod(methodHandler, protoNativeHandler.responseStream);
135 }
136 // If any handlers were defined with GrpcStreamCall annotation
137 else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING) {
138 return this.createStreamCallMethod(methodHandler, protoNativeHandler.responseStream);
139 }
140 }
141 return protoNativeHandler.responseStream
142 ? this.createStreamServiceMethod(methodHandler)
143 : this.createUnaryServiceMethod(methodHandler);
144 }
145 createUnaryServiceMethod(methodHandler) {
146 return async (call, callback) => {
147 const handler = methodHandler(call.request, call.metadata, call);
148 this.transformToObservable(await handler).subscribe({
149 next: async (data) => callback(null, await data),
150 error: (err) => callback(err),
151 });
152 };
153 }
154 createStreamServiceMethod(methodHandler) {
155 return async (call, callback) => {
156 const handler = methodHandler(call.request, call.metadata, call);
157 const result$ = this.transformToObservable(await handler);
158 try {
159 await this.writeObservableToGrpc(result$, call);
160 }
161 catch (err) {
162 call.emit('error', err);
163 return;
164 }
165 };
166 }
167 /**
168 * Writes an observable to a GRPC call.
169 *
170 * This function will ensure that backpressure is managed while writing values
171 * that come from an observable to a GRPC call.
172 *
173 * @param source The observable we want to write out to the GRPC call.
174 * @param call The GRPC call we want to write to.
175 * @returns A promise that resolves when we're done writing to the call.
176 */
177 writeObservableToGrpc(source, call) {
178 return new Promise((resolve, reject) => {
179 const valuesWaitingToBeDrained = [];
180 let shouldErrorAfterDraining = false;
181 let error;
182 let shouldResolveAfterDraining = false;
183 let writing = true;
184 // Used to manage finalization
185 const subscription = new rxjs_1.Subscription();
186 // If the call is cancelled, unsubscribe from the source
187 const cancelHandler = () => {
188 subscription.unsubscribe();
189 // The call has been cancelled, so we need to either resolve
190 // or reject the promise. We're resolving in this case because
191 // rejection is noisy. If at any point in the future, we need to
192 // know that cancellation happened, we can either reject or
193 // start resolving with some sort of outcome value.
194 resolve();
195 };
196 call.on(constants_1.CANCEL_EVENT, cancelHandler);
197 subscription.add(() => call.off(constants_1.CANCEL_EVENT, cancelHandler));
198 // In all cases, when we finalize, end the writable stream
199 subscription.add(() => call.end());
200 const drain = () => {
201 writing = true;
202 while (valuesWaitingToBeDrained.length > 0) {
203 const value = valuesWaitingToBeDrained.shift();
204 if (writing) {
205 // The first time `call.write` returns false, we need to stop.
206 // It wrote the value, but it won't write anything else.
207 writing = call.write(value);
208 if (!writing) {
209 // We can't write anymore so we need to wait for the drain event
210 return;
211 }
212 }
213 }
214 if (shouldResolveAfterDraining) {
215 subscription.unsubscribe();
216 resolve();
217 }
218 else if (shouldErrorAfterDraining) {
219 subscription.unsubscribe();
220 reject(error);
221 }
222 };
223 call.on('drain', drain);
224 subscription.add(() => call.off('drain', drain));
225 subscription.add(source.subscribe({
226 next(value) {
227 if (writing) {
228 writing = call.write(value);
229 }
230 else {
231 // If we can't write, that's because we need to
232 // wait for the drain event before we can write again
233 // buffer the value and wait for the drain event
234 valuesWaitingToBeDrained.push(value);
235 }
236 },
237 error(err) {
238 if (valuesWaitingToBeDrained.length === 0) {
239 // We're not waiting for a drain event, so we can just
240 // reject and teardown.
241 subscription.unsubscribe();
242 reject(err);
243 }
244 else {
245 // We're waiting for a drain event, record the
246 // error so it can be handled after everything is drained.
247 shouldErrorAfterDraining = true;
248 error = err;
249 }
250 },
251 complete() {
252 if (valuesWaitingToBeDrained.length === 0) {
253 // We're not waiting for a drain event, so we can just
254 // resolve and teardown.
255 subscription.unsubscribe();
256 resolve();
257 }
258 else {
259 shouldResolveAfterDraining = true;
260 }
261 },
262 }));
263 });
264 }
265 createRequestStreamMethod(methodHandler, isResponseStream) {
266 return async (call, callback) => {
267 const req = new rxjs_1.Subject();
268 call.on('data', (m) => req.next(m));
269 call.on('error', (e) => {
270 // Check if error means that stream ended on other end
271 const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
272 if (isCancelledError) {
273 call.end();
274 return;
275 }
276 // If another error then just pass it along
277 req.error(e);
278 });
279 call.on('end', () => req.complete());
280 const handler = methodHandler(req.asObservable(), call.metadata, call);
281 const res = this.transformToObservable(await handler);
282 if (isResponseStream) {
283 try {
284 await this.writeObservableToGrpc(res, call);
285 }
286 catch (err) {
287 call.emit('error', err);
288 return;
289 }
290 }
291 else {
292 const response = await (0, rxjs_1.lastValueFrom)(res.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
293 callback(err, null);
294 return rxjs_1.EMPTY;
295 }), (0, rxjs_1.defaultIfEmpty)(undefined)));
296 if (!(0, shared_utils_1.isUndefined)(response)) {
297 callback(null, response);
298 }
299 }
300 };
301 }
302 createStreamCallMethod(methodHandler, isResponseStream) {
303 return async (call, callback) => {
304 if (isResponseStream) {
305 methodHandler(call);
306 }
307 else {
308 methodHandler(call, callback);
309 }
310 };
311 }
312 async close() {
313 if (this.grpcClient) {
314 const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
315 if (graceful) {
316 await new Promise((resolve, reject) => {
317 this.grpcClient.tryShutdown((error) => {
318 if (error)
319 reject(error);
320 else
321 resolve();
322 });
323 });
324 }
325 else {
326 this.grpcClient.forceShutdown();
327 }
328 }
329 this.grpcClient = null;
330 }
331 deserialize(obj) {
332 try {
333 return JSON.parse(obj);
334 }
335 catch (e) {
336 return obj;
337 }
338 }
339 addHandler(pattern, callback, isEventHandler = false) {
340 const route = (0, shared_utils_1.isString)(pattern) ? pattern : JSON.stringify(pattern);
341 callback.isEventHandler = isEventHandler;
342 this.messageHandlers.set(route, callback);
343 }
344 async createClient() {
345 const channelOptions = this.options && this.options.channelOptions
346 ? this.options.channelOptions
347 : {};
348 if (this.options && this.options.maxSendMessageLength) {
349 channelOptions['grpc.max_send_message_length'] =
350 this.options.maxSendMessageLength;
351 }
352 if (this.options && this.options.maxReceiveMessageLength) {
353 channelOptions['grpc.max_receive_message_length'] =
354 this.options.maxReceiveMessageLength;
355 }
356 if (this.options && this.options.maxMetadataSize) {
357 channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
358 }
359 const server = new grpcPackage.Server(channelOptions);
360 const credentials = this.getOptionsProp(this.options, 'credentials');
361 await new Promise((resolve, reject) => {
362 server.bindAsync(this.url, credentials || grpcPackage.ServerCredentials.createInsecure(), (error, port) => error ? reject(error) : resolve(port));
363 });
364 return server;
365 }
366 lookupPackage(root, packageName) {
367 /** Reference: https://github.com/kondi/rxjs-grpc */
368 let pkg = root;
369 for (const name of packageName.split(/\./)) {
370 pkg = pkg[name];
371 }
372 return pkg;
373 }
374 loadProto() {
375 try {
376 const file = this.getOptionsProp(this.options, 'protoPath');
377 const loader = this.getOptionsProp(this.options, 'loader');
378 const packageDefinition = grpcProtoLoaderPackage.loadSync(file, loader);
379 const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
380 return packageObject;
381 }
382 catch (err) {
383 const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException(err.path);
384 const message = err && err.message ? err.message : invalidProtoError.message;
385 this.logger.error(message, invalidProtoError.stack);
386 throw err;
387 }
388 }
389 /**
390 * Recursively fetch all of the service methods available on loaded
391 * protobuf descriptor object, and collect those as an objects with
392 * dot-syntax full-path names.
393 *
394 * Example:
395 * for proto package Bundle.FirstService with service Events { rpc...
396 * will be resolved to object of (while loaded for Bundle package):
397 * {
398 * name: "FirstService.Events",
399 * service: {Object}
400 * }
401 */
402 collectDeepServices(name, grpcDefinition, accumulator) {
403 if (!(0, shared_utils_1.isObject)(grpcDefinition)) {
404 return;
405 }
406 const keysToTraverse = Object.keys(grpcDefinition);
407 // Traverse definitions or namespace extensions
408 for (const key of keysToTraverse) {
409 const nameExtended = this.parseDeepServiceName(name, key);
410 const deepDefinition = grpcDefinition[key];
411 const isServiceDefined = deepDefinition && !(0, shared_utils_1.isUndefined)(deepDefinition.service);
412 const isServiceBoolean = isServiceDefined
413 ? deepDefinition.service !== false
414 : false;
415 if (isServiceDefined && isServiceBoolean) {
416 accumulator.push({
417 name: nameExtended,
418 service: deepDefinition,
419 });
420 }
421 // Continue recursion until objects end or service definition found
422 else {
423 this.collectDeepServices(nameExtended, deepDefinition, accumulator);
424 }
425 }
426 }
427 parseDeepServiceName(name, key) {
428 // If depth is zero then just return key
429 if (name.length === 0) {
430 return key;
431 }
432 // Otherwise add next through dot syntax
433 return name + '.' + key;
434 }
435 async createServices(grpcPkg, packageName) {
436 if (!grpcPkg) {
437 const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException(packageName);
438 this.logger.error(invalidPackageError.message, invalidPackageError.stack);
439 throw invalidPackageError;
440 }
441 // Take all of the services defined in grpcPkg and assign them to
442 // method handlers defined in Controllers
443 for (const definition of this.getServiceNames(grpcPkg)) {
444 this.grpcClient.addService(
445 // First parameter requires exact service definition from proto
446 definition.service.service,
447 // Here full proto definition required along with namespaced pattern name
448 await this.createService(definition.service, definition.name));
449 }
450 }
451}
452exports.ServerGrpc = ServerGrpc;