UNPKG

14.5 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerGrpc = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const rxjs_1 = require("rxjs");
6const operators_1 = require("rxjs/operators");
7const constants_1 = require("../constants");
8const decorators_1 = require("../decorators");
9const enums_1 = require("../enums");
10const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
11const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
12const server_1 = require("./server");
13let grpcPackage = {};
14let grpcProtoLoaderPackage = {};
15class ServerGrpc extends server_1.Server {
16 constructor(options) {
17 super();
18 this.options = options;
19 this.transportId = enums_1.Transport.GRPC;
20 this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
21 const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
22 grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () => require('@grpc/grpc-js'));
23 grpcProtoLoaderPackage = this.loadPackage(protoLoader, ServerGrpc.name);
24 }
25 async listen(callback) {
26 try {
27 this.grpcClient = await this.createClient();
28 await this.start(callback);
29 }
30 catch (err) {
31 callback(err);
32 }
33 }
34 async start(callback) {
35 await this.bindEvents();
36 this.grpcClient.start();
37 callback();
38 }
39 async bindEvents() {
40 const grpcContext = this.loadProto();
41 const packageOption = this.getOptionsProp(this.options, 'package');
42 const packageNames = Array.isArray(packageOption)
43 ? packageOption
44 : [packageOption];
45 for (const packageName of packageNames) {
46 const grpcPkg = this.lookupPackage(grpcContext, packageName);
47 await this.createServices(grpcPkg);
48 }
49 }
50 /**
51 * Will return all of the services along with their fully namespaced
52 * names as an array of objects.
53 * This method initiates recursive scan of grpcPkg object
54 */
55 getServiceNames(grpcPkg) {
56 // Define accumulator to collect all of the services available to load
57 const services = [];
58 // Initiate recursive services collector starting with empty name
59 this.collectDeepServices('', grpcPkg, services);
60 return services;
61 }
62 /**
63 * Will create service mapping from gRPC generated Object to handlers
64 * defined with @GrpcMethod or @GrpcStreamMethod annotations
65 *
66 * @param grpcService
67 * @param name
68 */
69 async createService(grpcService, name) {
70 const service = {};
71 for (const methodName in grpcService.prototype) {
72 let pattern = '';
73 let methodHandler = null;
74 let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
75 const methodFunction = grpcService.prototype[methodName];
76 const methodReqStreaming = methodFunction.requestStream;
77 if (!shared_utils_1.isUndefined(methodReqStreaming) && methodReqStreaming) {
78 // Try first pattern to be presented, RX streaming pattern would be
79 // a preferable pattern to select among a few defined
80 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING);
81 methodHandler = this.messageHandlers.get(pattern);
82 streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING;
83 // If first pattern didn't match to any of handlers then try
84 // pass-through handler to be presented
85 if (!methodHandler) {
86 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING);
87 methodHandler = this.messageHandlers.get(pattern);
88 streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING;
89 }
90 }
91 else {
92 pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING);
93 // Select handler if any presented for No-Streaming pattern
94 methodHandler = this.messageHandlers.get(pattern);
95 streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
96 }
97 if (!methodHandler) {
98 continue;
99 }
100 service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType);
101 }
102 return service;
103 }
104 /**
105 * Will create a string of a JSON serialized format
106 *
107 * @param service name of the service which should be a match to gRPC service definition name
108 * @param methodName name of the method which is coming after rpc keyword
109 * @param streaming GrpcMethodStreamingType parameter which should correspond to
110 * stream keyword in gRPC service request part
111 */
112 createPattern(service, methodName, streaming) {
113 return JSON.stringify({
114 service,
115 rpc: methodName,
116 streaming,
117 });
118 }
119 /**
120 * Will return async function which will handle gRPC call
121 * with Rx streams or as a direct call passthrough
122 *
123 * @param methodHandler
124 * @param protoNativeHandler
125 */
126 createServiceMethod(methodHandler, protoNativeHandler, streamType) {
127 // If proto handler has request stream as "true" then we expect it to have
128 // streaming from the side of requester
129 if (protoNativeHandler.requestStream) {
130 // If any handlers were defined with GrpcStreamMethod annotation use RX
131 if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING) {
132 return this.createRequestStreamMethod(methodHandler, protoNativeHandler.responseStream);
133 }
134 // If any handlers were defined with GrpcStreamCall annotation
135 else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING) {
136 return this.createStreamCallMethod(methodHandler, protoNativeHandler.responseStream);
137 }
138 }
139 return protoNativeHandler.responseStream
140 ? this.createStreamServiceMethod(methodHandler)
141 : this.createUnaryServiceMethod(methodHandler);
142 }
143 createUnaryServiceMethod(methodHandler) {
144 return async (call, callback) => {
145 const handler = methodHandler(call.request, call.metadata, call);
146 this.transformToObservable(await handler).subscribe(data => callback(null, data), (err) => callback(err));
147 };
148 }
149 createStreamServiceMethod(methodHandler) {
150 return async (call, callback) => {
151 const handler = methodHandler(call.request, call.metadata, call);
152 const result$ = this.transformToObservable(await handler);
153 await result$
154 .pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => {
155 call.emit('error', err);
156 return rxjs_1.EMPTY;
157 }))
158 .forEach(data => call.write(data));
159 call.end();
160 };
161 }
162 createRequestStreamMethod(methodHandler, isResponseStream) {
163 return async (call, callback) => {
164 const req = new rxjs_1.Subject();
165 call.on('data', (m) => req.next(m));
166 call.on('error', (e) => {
167 // Check if error means that stream ended on other end
168 const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
169 if (isCancelledError) {
170 call.end();
171 return;
172 }
173 // If another error then just pass it along
174 req.error(e);
175 });
176 call.on('end', () => req.complete());
177 const handler = methodHandler(req.asObservable(), call.metadata, call);
178 const res = this.transformToObservable(await handler);
179 if (isResponseStream) {
180 await res
181 .pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => {
182 call.emit('error', err);
183 return rxjs_1.EMPTY;
184 }))
185 .forEach(m => call.write(m));
186 call.end();
187 }
188 else {
189 const response = await rxjs_1.lastValueFrom(res.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => {
190 callback(err, null);
191 return rxjs_1.EMPTY;
192 })));
193 if (typeof response !== 'undefined') {
194 callback(null, response);
195 }
196 }
197 };
198 }
199 createStreamCallMethod(methodHandler, isResponseStream) {
200 return async (call, callback) => {
201 if (isResponseStream) {
202 methodHandler(call);
203 }
204 else {
205 methodHandler(call, callback);
206 }
207 };
208 }
209 close() {
210 this.grpcClient && this.grpcClient.forceShutdown();
211 this.grpcClient = null;
212 }
213 deserialize(obj) {
214 try {
215 return JSON.parse(obj);
216 }
217 catch (e) {
218 return obj;
219 }
220 }
221 addHandler(pattern, callback, isEventHandler = false) {
222 const route = shared_utils_1.isString(pattern) ? pattern : JSON.stringify(pattern);
223 callback.isEventHandler = isEventHandler;
224 this.messageHandlers.set(route, callback);
225 }
226 async createClient() {
227 const grpcOptions = {
228 'grpc.max_send_message_length': this.getOptionsProp(this.options, 'maxSendMessageLength', constants_1.GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH),
229 'grpc.max_receive_message_length': this.getOptionsProp(this.options, 'maxReceiveMessageLength', constants_1.GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH),
230 };
231 const maxMetadataSize = this.getOptionsProp(this.options, 'maxMetadataSize', -1);
232 if (maxMetadataSize > 0) {
233 grpcOptions['grpc.max_metadata_size'] = maxMetadataSize;
234 }
235 const server = new grpcPackage.Server(grpcOptions);
236 const credentials = this.getOptionsProp(this.options, 'credentials');
237 await new Promise((resolve, reject) => {
238 server.bindAsync(this.url, credentials || grpcPackage.ServerCredentials.createInsecure(), (error, port) => error ? reject(error) : resolve(port));
239 });
240 return server;
241 }
242 lookupPackage(root, packageName) {
243 /** Reference: https://github.com/kondi/rxjs-grpc */
244 let pkg = root;
245 for (const name of packageName.split(/\./)) {
246 pkg = pkg[name];
247 }
248 return pkg;
249 }
250 loadProto() {
251 try {
252 const file = this.getOptionsProp(this.options, 'protoPath');
253 const loader = this.getOptionsProp(this.options, 'loader');
254 const packageDefinition = grpcProtoLoaderPackage.loadSync(file, loader);
255 const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
256 return packageObject;
257 }
258 catch (err) {
259 const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException();
260 const message = err && err.message ? err.message : invalidProtoError.message;
261 this.logger.error(message, invalidProtoError.stack);
262 throw err;
263 }
264 }
265 /**
266 * Recursively fetch all of the service methods available on loaded
267 * protobuf descriptor object, and collect those as an objects with
268 * dot-syntax full-path names.
269 *
270 * Example:
271 * for proto package Bundle.FirstService with service Events { rpc...
272 * will be resolved to object of (while loaded for Bundle package):
273 * {
274 * name: "FirstService.Events",
275 * service: {Object}
276 * }
277 */
278 collectDeepServices(name, grpcDefinition, accumulator) {
279 if (!shared_utils_1.isObject(grpcDefinition)) {
280 return;
281 }
282 const keysToTraverse = Object.keys(grpcDefinition);
283 // Traverse definitions or namespace extensions
284 for (const key of keysToTraverse) {
285 const nameExtended = this.parseDeepServiceName(name, key);
286 const deepDefinition = grpcDefinition[key];
287 const isServiceDefined = deepDefinition && !shared_utils_1.isUndefined(deepDefinition.service);
288 const isServiceBoolean = isServiceDefined
289 ? deepDefinition.service !== false
290 : false;
291 if (isServiceDefined && isServiceBoolean) {
292 accumulator.push({
293 name: nameExtended,
294 service: deepDefinition,
295 });
296 }
297 // Continue recursion until objects end or service definition found
298 else {
299 this.collectDeepServices(nameExtended, deepDefinition, accumulator);
300 }
301 }
302 }
303 parseDeepServiceName(name, key) {
304 // If depth is zero then just return key
305 if (name.length === 0) {
306 return key;
307 }
308 // Otherwise add next through dot syntax
309 return name + '.' + key;
310 }
311 async createServices(grpcPkg) {
312 if (!grpcPkg) {
313 const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException();
314 this.logger.error(invalidPackageError.message, invalidPackageError.stack);
315 throw invalidPackageError;
316 }
317 // Take all of the services defined in grpcPkg and assign them to
318 // method handlers defined in Controllers
319 for (const definition of this.getServiceNames(grpcPkg)) {
320 this.grpcClient.addService(
321 // First parameter requires exact service definition from proto
322 definition.service.service,
323 // Here full proto definition required along with namespaced pattern name
324 await this.createService(definition.service, definition.name));
325 }
326 }
327}
328exports.ServerGrpc = ServerGrpc;