UNPKG

14.9 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerGrpc = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const rxjs_1 = require("rxjs");
6const operators_1 = require("rxjs/operators");
7const constants_1 = require("../constants");
8const decorators_1 = require("../decorators");
9const enums_1 = require("../enums");
10const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
11const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
12const server_1 = require("./server");
13let grpcPackage = {};
14let grpcProtoLoaderPackage = {};
15class ServerGrpc extends server_1.Server {
16 constructor(options) {
17 super();
18 this.options = options;
19 this.transportId = enums_1.Transport.GRPC;
20 this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
21 const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
22 grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () => require('@grpc/grpc-js'));
23 grpcProtoLoaderPackage = this.loadPackage(protoLoader, ServerGrpc.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
24 ? require('@grpc/proto-loader')
25 : require(protoLoader));
26 }
27 async listen(callback) {
28 try {
29 this.grpcClient = await this.createClient();
30 await this.start(callback);
31 }
32 catch (err) {
33 callback(err);
34 }
35 }
36 async start(callback) {
37 await this.bindEvents();
38 this.grpcClient.start();
39 callback();
40 }
41 async bindEvents() {
42 const grpcContext = this.loadProto();
43 const packageOption = this.getOptionsProp(this.options, 'package');
44 const packageNames = Array.isArray(packageOption)
45 ? packageOption
46 : [packageOption];
47 for (const packageName of packageNames) {
48 const grpcPkg = this.lookupPackage(grpcContext, packageName);
49 await this.createServices(grpcPkg);
50 }
51 }
52 /**
53 * Will return all of the services along with their fully namespaced
54 * names as an array of objects.
55 * This method initiates recursive scan of grpcPkg object
56 */
57 getServiceNames(grpcPkg) {
58 // Define accumulator to collect all of the services available to load
59 const services = [];
60 // Initiate recursive services collector starting with empty name
61 this.collectDeepServices('', grpcPkg, services);
62 return services;
63 }
64 /**
65 * Will create service mapping from gRPC generated Object to handlers
66 * defined with @GrpcMethod or @GrpcStreamMethod annotations
67 *
68 * @param grpcService
69 * @param name
70 */
71 async createService(grpcService, name) {
72 const service = {};
73 for (const methodName in grpcService.prototype) {
74 let pattern = '';
75 let methodHandler = null;
76 let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
77 const methodFunction = grpcService.prototype[methodName];
78 const methodReqStreaming = methodFunction.requestStream;
79 if (!(0, shared_utils_1.isUndefined)(methodReqStreaming) && methodReqStreaming) {
80 // Try first pattern to be presented, RX streaming pattern would be
81 // a preferable pattern to select among a few defined
82 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING);
83 methodHandler = this.messageHandlers.get(pattern);
84 streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING;
85 // If first pattern didn't match to any of handlers then try
86 // pass-through handler to be presented
87 if (!methodHandler) {
88 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING);
89 methodHandler = this.messageHandlers.get(pattern);
90 streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING;
91 }
92 }
93 else {
94 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING);
95 // Select handler if any presented for No-Streaming pattern
96 methodHandler = this.messageHandlers.get(pattern);
97 streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
98 }
99 if (!methodHandler) {
100 continue;
101 }
102 service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType);
103 }
104 return service;
105 }
106 /**
107 * Will create a string of a JSON serialized format
108 *
109 * @param service name of the service which should be a match to gRPC service definition name
110 * @param methodName name of the method which is coming after rpc keyword
111 * @param streaming GrpcMethodStreamingType parameter which should correspond to
112 * stream keyword in gRPC service request part
113 */
114 createPattern(service, methodName, streaming) {
115 return JSON.stringify({
116 service,
117 rpc: methodName,
118 streaming,
119 });
120 }
121 /**
122 * Will return async function which will handle gRPC call
123 * with Rx streams or as a direct call passthrough
124 *
125 * @param methodHandler
126 * @param protoNativeHandler
127 */
128 createServiceMethod(methodHandler, protoNativeHandler, streamType) {
129 // If proto handler has request stream as "true" then we expect it to have
130 // streaming from the side of requester
131 if (protoNativeHandler.requestStream) {
132 // If any handlers were defined with GrpcStreamMethod annotation use RX
133 if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING) {
134 return this.createRequestStreamMethod(methodHandler, protoNativeHandler.responseStream);
135 }
136 // If any handlers were defined with GrpcStreamCall annotation
137 else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING) {
138 return this.createStreamCallMethod(methodHandler, protoNativeHandler.responseStream);
139 }
140 }
141 return protoNativeHandler.responseStream
142 ? this.createStreamServiceMethod(methodHandler)
143 : this.createUnaryServiceMethod(methodHandler);
144 }
145 createUnaryServiceMethod(methodHandler) {
146 return async (call, callback) => {
147 const handler = methodHandler(call.request, call.metadata, call);
148 this.transformToObservable(await handler).subscribe({
149 next: data => callback(null, data),
150 error: (err) => callback(err),
151 });
152 };
153 }
154 createStreamServiceMethod(methodHandler) {
155 return async (call, callback) => {
156 const handler = methodHandler(call.request, call.metadata, call);
157 const result$ = this.transformToObservable(await handler);
158 await result$
159 .pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
160 call.emit('error', err);
161 return rxjs_1.EMPTY;
162 }))
163 .forEach(data => call.write(data));
164 call.end();
165 };
166 }
167 createRequestStreamMethod(methodHandler, isResponseStream) {
168 return async (call, callback) => {
169 const req = new rxjs_1.Subject();
170 call.on('data', (m) => req.next(m));
171 call.on('error', (e) => {
172 // Check if error means that stream ended on other end
173 const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
174 if (isCancelledError) {
175 call.end();
176 return;
177 }
178 // If another error then just pass it along
179 req.error(e);
180 });
181 call.on('end', () => req.complete());
182 const handler = methodHandler(req.asObservable(), call.metadata, call);
183 const res = this.transformToObservable(await handler);
184 if (isResponseStream) {
185 await res
186 .pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
187 call.emit('error', err);
188 return rxjs_1.EMPTY;
189 }))
190 .forEach(m => call.write(m));
191 call.end();
192 }
193 else {
194 const response = await (0, rxjs_1.lastValueFrom)(res.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
195 callback(err, null);
196 return rxjs_1.EMPTY;
197 })));
198 if (!(0, shared_utils_1.isUndefined)(response)) {
199 callback(null, response);
200 }
201 }
202 };
203 }
204 createStreamCallMethod(methodHandler, isResponseStream) {
205 return async (call, callback) => {
206 if (isResponseStream) {
207 methodHandler(call);
208 }
209 else {
210 methodHandler(call, callback);
211 }
212 };
213 }
214 close() {
215 this.grpcClient && this.grpcClient.forceShutdown();
216 this.grpcClient = null;
217 }
218 deserialize(obj) {
219 try {
220 return JSON.parse(obj);
221 }
222 catch (e) {
223 return obj;
224 }
225 }
226 addHandler(pattern, callback, isEventHandler = false) {
227 const route = (0, shared_utils_1.isString)(pattern) ? pattern : JSON.stringify(pattern);
228 callback.isEventHandler = isEventHandler;
229 this.messageHandlers.set(route, callback);
230 }
231 async createClient() {
232 const channelOptions = this.options && this.options.channelOptions
233 ? this.options.channelOptions
234 : {};
235 if (this.options && this.options.maxSendMessageLength) {
236 channelOptions['grpc.max_send_message_length'] =
237 this.options.maxSendMessageLength;
238 }
239 if (this.options && this.options.maxReceiveMessageLength) {
240 channelOptions['grpc.max_receive_message_length'] =
241 this.options.maxReceiveMessageLength;
242 }
243 if (this.options && this.options.maxMetadataSize) {
244 channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
245 }
246 const server = new grpcPackage.Server(channelOptions);
247 const credentials = this.getOptionsProp(this.options, 'credentials');
248 await new Promise((resolve, reject) => {
249 server.bindAsync(this.url, credentials || grpcPackage.ServerCredentials.createInsecure(), (error, port) => error ? reject(error) : resolve(port));
250 });
251 return server;
252 }
253 lookupPackage(root, packageName) {
254 /** Reference: https://github.com/kondi/rxjs-grpc */
255 let pkg = root;
256 for (const name of packageName.split(/\./)) {
257 pkg = pkg[name];
258 }
259 return pkg;
260 }
261 loadProto() {
262 try {
263 const file = this.getOptionsProp(this.options, 'protoPath');
264 const loader = this.getOptionsProp(this.options, 'loader');
265 const packageDefinition = grpcProtoLoaderPackage.loadSync(file, loader);
266 const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
267 return packageObject;
268 }
269 catch (err) {
270 const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException();
271 const message = err && err.message ? err.message : invalidProtoError.message;
272 this.logger.error(message, invalidProtoError.stack);
273 throw err;
274 }
275 }
276 /**
277 * Recursively fetch all of the service methods available on loaded
278 * protobuf descriptor object, and collect those as an objects with
279 * dot-syntax full-path names.
280 *
281 * Example:
282 * for proto package Bundle.FirstService with service Events { rpc...
283 * will be resolved to object of (while loaded for Bundle package):
284 * {
285 * name: "FirstService.Events",
286 * service: {Object}
287 * }
288 */
289 collectDeepServices(name, grpcDefinition, accumulator) {
290 if (!(0, shared_utils_1.isObject)(grpcDefinition)) {
291 return;
292 }
293 const keysToTraverse = Object.keys(grpcDefinition);
294 // Traverse definitions or namespace extensions
295 for (const key of keysToTraverse) {
296 const nameExtended = this.parseDeepServiceName(name, key);
297 const deepDefinition = grpcDefinition[key];
298 const isServiceDefined = deepDefinition && !(0, shared_utils_1.isUndefined)(deepDefinition.service);
299 const isServiceBoolean = isServiceDefined
300 ? deepDefinition.service !== false
301 : false;
302 if (isServiceDefined && isServiceBoolean) {
303 accumulator.push({
304 name: nameExtended,
305 service: deepDefinition,
306 });
307 }
308 // Continue recursion until objects end or service definition found
309 else {
310 this.collectDeepServices(nameExtended, deepDefinition, accumulator);
311 }
312 }
313 }
314 parseDeepServiceName(name, key) {
315 // If depth is zero then just return key
316 if (name.length === 0) {
317 return key;
318 }
319 // Otherwise add next through dot syntax
320 return name + '.' + key;
321 }
322 async createServices(grpcPkg) {
323 if (!grpcPkg) {
324 const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException();
325 this.logger.error(invalidPackageError.message, invalidPackageError.stack);
326 throw invalidPackageError;
327 }
328 // Take all of the services defined in grpcPkg and assign them to
329 // method handlers defined in Controllers
330 for (const definition of this.getServiceNames(grpcPkg)) {
331 this.grpcClient.addService(
332 // First parameter requires exact service definition from proto
333 definition.service.service,
334 // Here full proto definition required along with namespaced pattern name
335 await this.createService(definition.service, definition.name));
336 }
337 }
338}
339exports.ServerGrpc = ServerGrpc;