UNPKG

11.7 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientGrpcProxy = void 0;
4const logger_service_1 = require("@nestjs/common/services/logger.service");
5const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
6const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
7const rxjs_1 = require("rxjs");
8const constants_1 = require("../constants");
9const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");
10const invalid_grpc_service_exception_1 = require("../errors/invalid-grpc-service.exception");
11const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");
12const client_proxy_1 = require("./client-proxy");
13const constants_2 = require("./constants");
14let grpcPackage = {};
15let grpcProtoLoaderPackage = {};
16class ClientGrpcProxy extends client_proxy_1.ClientProxy {
17 constructor(options) {
18 super();
19 this.options = options;
20 this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
21 this.clients = new Map();
22 this.grpcClients = [];
23 this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
24 const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
25 grpcPackage = (0, load_package_util_1.loadPackage)('@grpc/grpc-js', ClientGrpcProxy.name, () => require('@grpc/grpc-js'));
26 grpcProtoLoaderPackage = (0, load_package_util_1.loadPackage)(protoLoader, ClientGrpcProxy.name, () => protoLoader === constants_1.GRPC_DEFAULT_PROTO_LOADER
27 ? require('@grpc/proto-loader')
28 : require(protoLoader));
29 this.grpcClients = this.createClients();
30 }
31 getService(name) {
32 const grpcClient = this.createClientByServiceName(name);
33 const clientRef = this.getClient(name);
34 if (!clientRef) {
35 throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException();
36 }
37 const protoMethods = Object.keys(clientRef[name].prototype);
38 const grpcService = {};
39 protoMethods.forEach(m => {
40 grpcService[m] = this.createServiceMethod(grpcClient, m);
41 });
42 return grpcService;
43 }
44 getClientByServiceName(name) {
45 return this.clients.get(name) || this.createClientByServiceName(name);
46 }
47 createClientByServiceName(name) {
48 const clientRef = this.getClient(name);
49 if (!clientRef) {
50 throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException();
51 }
52 const channelOptions = this.options && this.options.channelOptions
53 ? this.options.channelOptions
54 : {};
55 if (this.options && this.options.maxSendMessageLength) {
56 channelOptions['grpc.max_send_message_length'] =
57 this.options.maxSendMessageLength;
58 }
59 if (this.options && this.options.maxReceiveMessageLength) {
60 channelOptions['grpc.max_receive_message_length'] =
61 this.options.maxReceiveMessageLength;
62 }
63 if (this.options && this.options.maxMetadataSize) {
64 channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
65 }
66 const keepaliveOptions = this.getKeepaliveOptions();
67 const options = Object.assign(Object.assign({}, channelOptions), keepaliveOptions);
68 const credentials = this.options.credentials || grpcPackage.credentials.createInsecure();
69 const grpcClient = new clientRef[name](this.url, credentials, options);
70 this.clients.set(name, grpcClient);
71 return grpcClient;
72 }
73 getKeepaliveOptions() {
74 if (!(0, shared_utils_1.isObject)(this.options.keepalive)) {
75 return {};
76 }
77 const keepaliveKeys = {
78 keepaliveTimeMs: 'grpc.keepalive_time_ms',
79 keepaliveTimeoutMs: 'grpc.keepalive_timeout_ms',
80 keepalivePermitWithoutCalls: 'grpc.keepalive_permit_without_calls',
81 http2MaxPingsWithoutData: 'grpc.http2.max_pings_without_data',
82 http2MinTimeBetweenPingsMs: 'grpc.http2.min_time_between_pings_ms',
83 http2MinPingIntervalWithoutDataMs: 'grpc.http2.min_ping_interval_without_data_ms',
84 http2MaxPingStrikes: 'grpc.http2.max_ping_strikes',
85 };
86 const keepaliveOptions = {};
87 for (const [optionKey, optionValue] of Object.entries(this.options.keepalive)) {
88 const key = keepaliveKeys[optionKey];
89 if (key === undefined) {
90 continue;
91 }
92 keepaliveOptions[key] = optionValue;
93 }
94 return keepaliveOptions;
95 }
96 createServiceMethod(client, methodName) {
97 return client[methodName].responseStream
98 ? this.createStreamServiceMethod(client, methodName)
99 : this.createUnaryServiceMethod(client, methodName);
100 }
101 createStreamServiceMethod(client, methodName) {
102 return (...args) => {
103 const isRequestStream = client[methodName].requestStream;
104 const stream = new rxjs_1.Observable(observer => {
105 let isClientCanceled = false;
106 let upstreamSubscription;
107 const upstreamSubjectOrData = args[0];
108 const maybeMetadata = args[1];
109 const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
110 const call = isRequestStream && isUpstreamSubject
111 ? client[methodName](maybeMetadata)
112 : client[methodName](...args);
113 if (isRequestStream && isUpstreamSubject) {
114 upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
115 }
116 call.on('data', (data) => observer.next(data));
117 call.on('error', (error) => {
118 if (error.details === constants_2.GRPC_CANCELLED) {
119 call.destroy();
120 if (isClientCanceled) {
121 return;
122 }
123 }
124 observer.error(error);
125 });
126 call.on('end', () => {
127 if (upstreamSubscription) {
128 upstreamSubscription.unsubscribe();
129 upstreamSubscription = null;
130 }
131 call.removeAllListeners();
132 observer.complete();
133 });
134 return () => {
135 if (upstreamSubscription) {
136 upstreamSubscription.unsubscribe();
137 upstreamSubscription = null;
138 }
139 if (call.finished) {
140 return undefined;
141 }
142 isClientCanceled = true;
143 call.cancel();
144 };
145 });
146 return stream;
147 };
148 }
149 createUnaryServiceMethod(client, methodName) {
150 return (...args) => {
151 const isRequestStream = client[methodName].requestStream;
152 const upstreamSubjectOrData = args[0];
153 const isUpstreamSubject = upstreamSubjectOrData && (0, shared_utils_1.isFunction)(upstreamSubjectOrData.subscribe);
154 if (isRequestStream && isUpstreamSubject) {
155 return new rxjs_1.Observable(observer => {
156 const callArgs = [
157 (error, data) => {
158 if (error) {
159 return observer.error(error);
160 }
161 observer.next(data);
162 observer.complete();
163 },
164 ];
165 const maybeMetadata = args[1];
166 if (maybeMetadata) {
167 callArgs.unshift(maybeMetadata);
168 }
169 const call = client[methodName](...callArgs);
170 const upstreamSubscription = upstreamSubjectOrData.subscribe((val) => call.write(val), (err) => call.emit('error', err), () => call.end());
171 return () => {
172 upstreamSubscription.unsubscribe();
173 };
174 });
175 }
176 return new rxjs_1.Observable(observer => {
177 client[methodName](...args, (error, data) => {
178 if (error) {
179 return observer.error(error);
180 }
181 observer.next(data);
182 observer.complete();
183 });
184 });
185 };
186 }
187 createClients() {
188 const grpcContext = this.loadProto();
189 const packageOption = this.getOptionsProp(this.options, 'package');
190 const grpcPackages = [];
191 const packageNames = Array.isArray(packageOption)
192 ? packageOption
193 : [packageOption];
194 for (const packageName of packageNames) {
195 const grpcPkg = this.lookupPackage(grpcContext, packageName);
196 if (!grpcPkg) {
197 const invalidPackageError = new invalid_grpc_package_exception_1.InvalidGrpcPackageException();
198 this.logger.error(invalidPackageError.message, invalidPackageError.stack);
199 throw invalidPackageError;
200 }
201 grpcPackages.push(grpcPkg);
202 }
203 return grpcPackages;
204 }
205 loadProto() {
206 try {
207 const file = this.getOptionsProp(this.options, 'protoPath');
208 const loader = this.getOptionsProp(this.options, 'loader');
209 const packageDefinition = this.getOptionsProp(this.options, 'packageDefinition') ||
210 grpcProtoLoaderPackage.loadSync(file, loader);
211 const packageObject = grpcPackage.loadPackageDefinition(packageDefinition);
212 return packageObject;
213 }
214 catch (err) {
215 const invalidProtoError = new invalid_proto_definition_exception_1.InvalidProtoDefinitionException();
216 const message = err && err.message ? err.message : invalidProtoError.message;
217 this.logger.error(message, invalidProtoError.stack);
218 throw invalidProtoError;
219 }
220 }
221 lookupPackage(root, packageName) {
222 /** Reference: https://github.com/kondi/rxjs-grpc */
223 let pkg = root;
224 if (packageName) {
225 for (const name of packageName.split('.')) {
226 pkg = pkg[name];
227 }
228 }
229 return pkg;
230 }
231 close() {
232 this.grpcClients
233 .filter(client => client && (0, shared_utils_1.isFunction)(client.close))
234 .forEach(client => client.close());
235 this.grpcClients = [];
236 }
237 async connect() {
238 throw new Error('The "connect()" method is not supported in gRPC mode.');
239 }
240 send(pattern, data) {
241 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
242 }
243 getClient(name) {
244 return this.grpcClients.find(client => client.hasOwnProperty(name));
245 }
246 publish(packet, callback) {
247 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
248 }
249 async dispatchEvent(packet) {
250 throw new Error('Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).');
251 }
252}
253exports.ClientGrpcProxy = ClientGrpcProxy;