1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import { AddressInfo } from 'net';
|
20 |
|
21 | import { ServiceError } from './call';
|
22 | import { Status, LogVerbosity } from './constants';
|
23 | import { Deserialize, Serialize, ServiceDefinition } from './make-client';
|
24 | import { Metadata } from './metadata';
|
25 | import {
|
26 | BidiStreamingHandler,
|
27 | ClientStreamingHandler,
|
28 | HandleCall,
|
29 | Handler,
|
30 | HandlerType,
|
31 | Http2ServerCallStream,
|
32 | sendUnaryData,
|
33 | ServerDuplexStream,
|
34 | ServerDuplexStreamImpl,
|
35 | ServerReadableStream,
|
36 | ServerReadableStreamImpl,
|
37 | ServerStreamingHandler,
|
38 | ServerUnaryCall,
|
39 | ServerUnaryCallImpl,
|
40 | ServerWritableStream,
|
41 | ServerWritableStreamImpl,
|
42 | UnaryHandler,
|
43 | ServerErrorResponse,
|
44 | ServerStatusResponse,
|
45 | } from './server-call';
|
46 | import { ServerCredentials } from './server-credentials';
|
47 | import { ChannelOptions } from './channel-options';
|
48 | import {
|
49 | createResolver,
|
50 | ResolverListener,
|
51 | mapUriDefaultScheme,
|
52 | } from './resolver';
|
53 | import * as logging from './logging';
|
54 | import {
|
55 | SubchannelAddress,
|
56 | TcpSubchannelAddress,
|
57 | isTcpSubchannelAddress,
|
58 | subchannelAddressToString,
|
59 | stringToSubchannelAddress,
|
60 | } from './subchannel-address';
|
61 | import { parseUri } from './uri-parser';
|
62 | import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
|
63 | import { CipherNameAndProtocol, TLSSocket } from 'tls';
|
64 |
|
65 | const TRACER_NAME = 'server';
|
66 |
|
67 | interface BindResult {
|
68 | port: number;
|
69 | count: number;
|
70 | }
|
71 |
|
72 | function noop(): void {}
|
73 |
|
74 | function getUnimplementedStatusResponse(
|
75 | methodName: string
|
76 | ): Partial<ServiceError> {
|
77 | return {
|
78 | code: Status.UNIMPLEMENTED,
|
79 | details: `The server does not implement the method ${methodName}`,
|
80 | metadata: new Metadata(),
|
81 | };
|
82 | }
|
83 |
|
84 |
|
85 | type UntypedUnaryHandler = UnaryHandler<any, any>;
|
86 | type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
|
87 | type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
|
88 | type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
|
89 | export type UntypedHandleCall = HandleCall<any, any>;
|
90 | type UntypedHandler = Handler<any, any>;
|
91 | export interface UntypedServiceImplementation {
|
92 | [name: string]: UntypedHandleCall;
|
93 | }
|
94 |
|
95 | function getDefaultHandler(handlerType: HandlerType, methodName: string) {
|
96 | const unimplementedStatusResponse = getUnimplementedStatusResponse(
|
97 | methodName
|
98 | );
|
99 | switch (handlerType) {
|
100 | case 'unary':
|
101 | return (
|
102 | call: ServerUnaryCall<any, any>,
|
103 | callback: sendUnaryData<any>
|
104 | ) => {
|
105 | callback(unimplementedStatusResponse as ServiceError, null);
|
106 | };
|
107 | case 'clientStream':
|
108 | return (
|
109 | call: ServerReadableStream<any, any>,
|
110 | callback: sendUnaryData<any>
|
111 | ) => {
|
112 | callback(unimplementedStatusResponse as ServiceError, null);
|
113 | };
|
114 | case 'serverStream':
|
115 | return (call: ServerWritableStream<any, any>) => {
|
116 | call.emit('error', unimplementedStatusResponse);
|
117 | };
|
118 | case 'bidi':
|
119 | return (call: ServerDuplexStream<any, any>) => {
|
120 | call.emit('error', unimplementedStatusResponse);
|
121 | };
|
122 | default:
|
123 | throw new Error(`Invalid handlerType ${handlerType}`);
|
124 | }
|
125 | }
|
126 |
|
127 | interface ChannelzSessionInfo {
|
128 | ref: SocketRef;
|
129 | streamTracker: ChannelzCallTracker;
|
130 | messagesSent: number;
|
131 | messagesReceived: number;
|
132 | lastMessageSentTimestamp: Date | null;
|
133 | lastMessageReceivedTimestamp: Date | null;
|
134 | }
|
135 |
|
136 | interface ChannelzListenerInfo {
|
137 | ref: SocketRef;
|
138 | }
|
139 |
|
140 | export class Server {
|
141 | private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = [];
|
142 |
|
143 | private handlers: Map<string, UntypedHandler> = new Map<
|
144 | string,
|
145 | UntypedHandler
|
146 | >();
|
147 | private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
|
148 | private started = false;
|
149 | private options: ChannelOptions;
|
150 |
|
151 |
|
152 | private readonly channelzEnabled: boolean = true;
|
153 | private channelzRef: ServerRef;
|
154 | private channelzTrace = new ChannelzTrace();
|
155 | private callTracker = new ChannelzCallTracker();
|
156 | private listenerChildrenTracker = new ChannelzChildrenTracker();
|
157 | private sessionChildrenTracker = new ChannelzChildrenTracker();
|
158 |
|
159 | constructor(options?: ChannelOptions) {
|
160 | this.options = options ?? {};
|
161 | if (this.options['grpc.enable_channelz'] === 0) {
|
162 | this.channelzEnabled = false;
|
163 | }
|
164 | this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
|
165 | if (this.channelzEnabled) {
|
166 | this.channelzTrace.addTrace('CT_INFO', 'Server created');
|
167 | }
|
168 | this.trace('Server constructed');
|
169 | }
|
170 |
|
171 | private getChannelzInfo(): ServerInfo {
|
172 | return {
|
173 | trace: this.channelzTrace,
|
174 | callTracker: this.callTracker,
|
175 | listenerChildren: this.listenerChildrenTracker.getChildLists(),
|
176 | sessionChildren: this.sessionChildrenTracker.getChildLists()
|
177 | };
|
178 | }
|
179 |
|
180 | private getChannelzSessionInfoGetter(session: http2.ServerHttp2Session): () => SocketInfo {
|
181 | return () => {
|
182 | const sessionInfo = this.sessions.get(session)!;
|
183 | const sessionSocket = session.socket;
|
184 | const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
|
185 | const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress!, sessionSocket.localPort) : null;
|
186 | let tlsInfo: TlsInfo | null;
|
187 | if (session.encrypted) {
|
188 | const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
|
189 | const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
|
190 | const certificate = tlsSocket.getCertificate();
|
191 | const peerCertificate = tlsSocket.getPeerCertificate();
|
192 | tlsInfo = {
|
193 | cipherSuiteStandardName: cipherInfo.standardName ?? null,
|
194 | cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
|
195 | localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
|
196 | remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
|
197 | };
|
198 | } else {
|
199 | tlsInfo = null;
|
200 | }
|
201 | const socketInfo: SocketInfo = {
|
202 | remoteAddress: remoteAddress,
|
203 | localAddress: localAddress,
|
204 | security: tlsInfo,
|
205 | remoteName: null,
|
206 | streamsStarted: sessionInfo.streamTracker.callsStarted,
|
207 | streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
|
208 | streamsFailed: sessionInfo.streamTracker.callsFailed,
|
209 | messagesSent: sessionInfo.messagesSent,
|
210 | messagesReceived: sessionInfo.messagesReceived,
|
211 | keepAlivesSent: 0,
|
212 | lastLocalStreamCreatedTimestamp: null,
|
213 | lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
|
214 | lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
|
215 | lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
|
216 | localFlowControlWindow: session.state.localWindowSize ?? null,
|
217 | remoteFlowControlWindow: session.state.remoteWindowSize ?? null
|
218 | };
|
219 | return socketInfo;
|
220 | };
|
221 | }
|
222 |
|
223 | private trace(text: string): void {
|
224 | logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
|
225 | }
|
226 |
|
227 |
|
228 | addProtoService(): never {
|
229 | throw new Error('Not implemented. Use addService() instead');
|
230 | }
|
231 |
|
232 | addService(
|
233 | service: ServiceDefinition,
|
234 | implementation: UntypedServiceImplementation
|
235 | ): void {
|
236 | if (
|
237 | service === null ||
|
238 | typeof service !== 'object' ||
|
239 | implementation === null ||
|
240 | typeof implementation !== 'object'
|
241 | ) {
|
242 | throw new Error('addService() requires two objects as arguments');
|
243 | }
|
244 |
|
245 | const serviceKeys = Object.keys(service);
|
246 |
|
247 | if (serviceKeys.length === 0) {
|
248 | throw new Error('Cannot add an empty service to a server');
|
249 | }
|
250 |
|
251 | serviceKeys.forEach((name) => {
|
252 | const attrs = service[name];
|
253 | let methodType: HandlerType;
|
254 |
|
255 | if (attrs.requestStream) {
|
256 | if (attrs.responseStream) {
|
257 | methodType = 'bidi';
|
258 | } else {
|
259 | methodType = 'clientStream';
|
260 | }
|
261 | } else {
|
262 | if (attrs.responseStream) {
|
263 | methodType = 'serverStream';
|
264 | } else {
|
265 | methodType = 'unary';
|
266 | }
|
267 | }
|
268 |
|
269 | let implFn = implementation[name];
|
270 | let impl;
|
271 |
|
272 | if (implFn === undefined && typeof attrs.originalName === 'string') {
|
273 | implFn = implementation[attrs.originalName];
|
274 | }
|
275 |
|
276 | if (implFn !== undefined) {
|
277 | impl = implFn.bind(implementation);
|
278 | } else {
|
279 | impl = getDefaultHandler(methodType, name);
|
280 | }
|
281 |
|
282 | const success = this.register(
|
283 | attrs.path,
|
284 | impl as UntypedHandleCall,
|
285 | attrs.responseSerialize,
|
286 | attrs.requestDeserialize,
|
287 | methodType
|
288 | );
|
289 |
|
290 | if (success === false) {
|
291 | throw new Error(`Method handler for ${attrs.path} already provided.`);
|
292 | }
|
293 | });
|
294 | }
|
295 |
|
296 | removeService(service: ServiceDefinition): void {
|
297 | if (service === null || typeof service !== 'object') {
|
298 | throw new Error('removeService() requires object as argument');
|
299 | }
|
300 |
|
301 | const serviceKeys = Object.keys(service);
|
302 | serviceKeys.forEach((name) => {
|
303 | const attrs = service[name];
|
304 | this.unregister(attrs.path);
|
305 | });
|
306 | }
|
307 |
|
308 | bind(port: string, creds: ServerCredentials): never {
|
309 | throw new Error('Not implemented. Use bindAsync() instead');
|
310 | }
|
311 |
|
312 | bindAsync(
|
313 | port: string,
|
314 | creds: ServerCredentials,
|
315 | callback: (error: Error | null, port: number) => void
|
316 | ): void {
|
317 | if (this.started === true) {
|
318 | throw new Error('server is already started');
|
319 | }
|
320 |
|
321 | if (typeof port !== 'string') {
|
322 | throw new TypeError('port must be a string');
|
323 | }
|
324 |
|
325 | if (creds === null || !(creds instanceof ServerCredentials)) {
|
326 | throw new TypeError('creds must be a ServerCredentials object');
|
327 | }
|
328 |
|
329 | if (typeof callback !== 'function') {
|
330 | throw new TypeError('callback must be a function');
|
331 | }
|
332 |
|
333 | const initialPortUri = parseUri(port);
|
334 | if (initialPortUri === null) {
|
335 | throw new Error(`Could not parse port "${port}"`);
|
336 | }
|
337 | const portUri = mapUriDefaultScheme(initialPortUri);
|
338 | if (portUri === null) {
|
339 | throw new Error(`Could not get a default scheme for port "${port}"`);
|
340 | }
|
341 |
|
342 | const serverOptions: http2.ServerOptions = {
|
343 | maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
|
344 | };
|
345 | if ('grpc-node.max_session_memory' in this.options) {
|
346 | serverOptions.maxSessionMemory = this.options[
|
347 | 'grpc-node.max_session_memory'
|
348 | ];
|
349 | }
|
350 | if ('grpc.max_concurrent_streams' in this.options) {
|
351 | serverOptions.settings = {
|
352 | maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
|
353 | };
|
354 | }
|
355 |
|
356 | const deferredCallback = (error: Error | null, port: number) => {
|
357 | process.nextTick(() => callback(error, port));
|
358 | }
|
359 |
|
360 | const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
|
361 | let http2Server: http2.Http2Server | http2.Http2SecureServer;
|
362 | if (creds._isSecure()) {
|
363 | const secureServerOptions = Object.assign(
|
364 | serverOptions,
|
365 | creds._getSettings()!
|
366 | );
|
367 | http2Server = http2.createSecureServer(secureServerOptions);
|
368 | http2Server.on('secureConnection', (socket: TLSSocket) => {
|
369 | |
370 |
|
371 | socket.on('error', (e: Error) => {
|
372 | this.trace('An incoming TLS connection closed with error: ' + e.message);
|
373 | });
|
374 | });
|
375 | } else {
|
376 | http2Server = http2.createServer(serverOptions);
|
377 | }
|
378 |
|
379 | http2Server.setTimeout(0, noop);
|
380 | this._setupHandlers(http2Server);
|
381 | return http2Server;
|
382 | };
|
383 |
|
384 | const bindSpecificPort = (
|
385 | addressList: SubchannelAddress[],
|
386 | portNum: number,
|
387 | previousCount: number
|
388 | ): Promise<BindResult> => {
|
389 | if (addressList.length === 0) {
|
390 | return Promise.resolve({ port: portNum, count: previousCount });
|
391 | }
|
392 | return Promise.all(
|
393 | addressList.map((address) => {
|
394 | this.trace('Attempting to bind ' + subchannelAddressToString(address));
|
395 | let addr: SubchannelAddress;
|
396 | if (isTcpSubchannelAddress(address)) {
|
397 | addr = {
|
398 | host: (address as TcpSubchannelAddress).host,
|
399 | port: portNum,
|
400 | };
|
401 | } else {
|
402 | addr = address;
|
403 | }
|
404 |
|
405 | const http2Server = setupServer();
|
406 | return new Promise<number | Error>((resolve, reject) => {
|
407 | const onError = (err: Error) => {
|
408 | this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
|
409 | resolve(err);
|
410 | }
|
411 |
|
412 | http2Server.once('error', onError);
|
413 |
|
414 | http2Server.listen(addr, () => {
|
415 | const boundAddress = http2Server.address()!;
|
416 | let boundSubchannelAddress: SubchannelAddress;
|
417 | if (typeof boundAddress === 'string') {
|
418 | boundSubchannelAddress = {
|
419 | path: boundAddress
|
420 | };
|
421 | } else {
|
422 | boundSubchannelAddress = {
|
423 | host: boundAddress.address,
|
424 | port: boundAddress.port
|
425 | }
|
426 | }
|
427 | let channelzRef: SocketRef;
|
428 | channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
429 | return {
|
430 | localAddress: boundSubchannelAddress,
|
431 | remoteAddress: null,
|
432 | security: null,
|
433 | remoteName: null,
|
434 | streamsStarted: 0,
|
435 | streamsSucceeded: 0,
|
436 | streamsFailed: 0,
|
437 | messagesSent: 0,
|
438 | messagesReceived: 0,
|
439 | keepAlivesSent: 0,
|
440 | lastLocalStreamCreatedTimestamp: null,
|
441 | lastRemoteStreamCreatedTimestamp: null,
|
442 | lastMessageSentTimestamp: null,
|
443 | lastMessageReceivedTimestamp: null,
|
444 | localFlowControlWindow: null,
|
445 | remoteFlowControlWindow: null
|
446 | };
|
447 | }, this.channelzEnabled);
|
448 | if (this.channelzEnabled) {
|
449 | this.listenerChildrenTracker.refChild(channelzRef);
|
450 | }
|
451 | this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
452 | this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
453 | resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
|
454 | http2Server.removeListener('error', onError);
|
455 | });
|
456 | });
|
457 | })
|
458 | ).then((results) => {
|
459 | let count = 0;
|
460 | for (const result of results) {
|
461 | if (typeof result === 'number') {
|
462 | count += 1;
|
463 | if (result !== portNum) {
|
464 | throw new Error(
|
465 | 'Invalid state: multiple port numbers added from single address'
|
466 | );
|
467 | }
|
468 | }
|
469 | }
|
470 | return {
|
471 | port: portNum,
|
472 | count: count + previousCount,
|
473 | };
|
474 | });
|
475 | };
|
476 |
|
477 | const bindWildcardPort = (
|
478 | addressList: SubchannelAddress[]
|
479 | ): Promise<BindResult> => {
|
480 | if (addressList.length === 0) {
|
481 | return Promise.resolve<BindResult>({ port: 0, count: 0 });
|
482 | }
|
483 | const address = addressList[0];
|
484 | const http2Server = setupServer();
|
485 | return new Promise<BindResult>((resolve, reject) => {
|
486 | const onError = (err: Error) => {
|
487 | this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
|
488 | resolve(bindWildcardPort(addressList.slice(1)));
|
489 | }
|
490 |
|
491 | http2Server.once('error', onError);
|
492 |
|
493 | http2Server.listen(address, () => {
|
494 | const boundAddress = http2Server.address() as AddressInfo;
|
495 | const boundSubchannelAddress: SubchannelAddress = {
|
496 | host: boundAddress.address,
|
497 | port: boundAddress.port
|
498 | };
|
499 | let channelzRef: SocketRef;
|
500 | channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
|
501 | return {
|
502 | localAddress: boundSubchannelAddress,
|
503 | remoteAddress: null,
|
504 | security: null,
|
505 | remoteName: null,
|
506 | streamsStarted: 0,
|
507 | streamsSucceeded: 0,
|
508 | streamsFailed: 0,
|
509 | messagesSent: 0,
|
510 | messagesReceived: 0,
|
511 | keepAlivesSent: 0,
|
512 | lastLocalStreamCreatedTimestamp: null,
|
513 | lastRemoteStreamCreatedTimestamp: null,
|
514 | lastMessageSentTimestamp: null,
|
515 | lastMessageReceivedTimestamp: null,
|
516 | localFlowControlWindow: null,
|
517 | remoteFlowControlWindow: null
|
518 | };
|
519 | }, this.channelzEnabled);
|
520 | if (this.channelzEnabled) {
|
521 | this.listenerChildrenTracker.refChild(channelzRef);
|
522 | }
|
523 | this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
524 | this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
525 | resolve(
|
526 | bindSpecificPort(
|
527 | addressList.slice(1),
|
528 | boundAddress.port,
|
529 | 1
|
530 | )
|
531 | );
|
532 | http2Server.removeListener('error', onError);
|
533 | });
|
534 | });
|
535 | };
|
536 |
|
537 | const resolverListener: ResolverListener = {
|
538 | onSuccessfulResolution: (
|
539 | addressList,
|
540 | serviceConfig,
|
541 | serviceConfigError
|
542 | ) => {
|
543 |
|
544 | resolverListener.onSuccessfulResolution = () => {};
|
545 | if (addressList.length === 0) {
|
546 | deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
|
547 | return;
|
548 | }
|
549 | let bindResultPromise: Promise<BindResult>;
|
550 | if (isTcpSubchannelAddress(addressList[0])) {
|
551 | if (addressList[0].port === 0) {
|
552 | bindResultPromise = bindWildcardPort(addressList);
|
553 | } else {
|
554 | bindResultPromise = bindSpecificPort(
|
555 | addressList,
|
556 | addressList[0].port,
|
557 | 0
|
558 | );
|
559 | }
|
560 | } else {
|
561 |
|
562 | bindResultPromise = bindSpecificPort(addressList, 1, 0);
|
563 | }
|
564 | bindResultPromise.then(
|
565 | (bindResult) => {
|
566 | if (bindResult.count === 0) {
|
567 | const errorString = `No address added out of total ${addressList.length} resolved`;
|
568 | logging.log(LogVerbosity.ERROR, errorString);
|
569 | deferredCallback(new Error(errorString), 0);
|
570 | } else {
|
571 | if (bindResult.count < addressList.length) {
|
572 | logging.log(
|
573 | LogVerbosity.INFO,
|
574 | `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
|
575 | );
|
576 | }
|
577 | deferredCallback(null, bindResult.port);
|
578 | }
|
579 | },
|
580 | (error) => {
|
581 | const errorString = `No address added out of total ${addressList.length} resolved`;
|
582 | logging.log(LogVerbosity.ERROR, errorString);
|
583 | deferredCallback(new Error(errorString), 0);
|
584 | }
|
585 | );
|
586 | },
|
587 | onError: (error) => {
|
588 | deferredCallback(new Error(error.details), 0);
|
589 | },
|
590 | };
|
591 |
|
592 | const resolver = createResolver(portUri, resolverListener, this.options);
|
593 | resolver.updateResolution();
|
594 | }
|
595 |
|
596 | forceShutdown(): void {
|
597 |
|
598 |
|
599 | for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
|
600 | if (http2Server.listening) {
|
601 | http2Server.close(() => {
|
602 | if (this.channelzEnabled) {
|
603 | this.listenerChildrenTracker.unrefChild(ref);
|
604 | unregisterChannelzRef(ref);
|
605 | }
|
606 | });
|
607 | }
|
608 | }
|
609 |
|
610 | this.started = false;
|
611 |
|
612 |
|
613 |
|
614 | this.sessions.forEach((channelzInfo, session) => {
|
615 |
|
616 |
|
617 |
|
618 | session.destroy(http2.constants.NGHTTP2_CANCEL as any);
|
619 | });
|
620 | this.sessions.clear();
|
621 | if (this.channelzEnabled) {
|
622 | unregisterChannelzRef(this.channelzRef);
|
623 | }
|
624 | }
|
625 |
|
626 | register<RequestType, ResponseType>(
|
627 | name: string,
|
628 | handler: HandleCall<RequestType, ResponseType>,
|
629 | serialize: Serialize<ResponseType>,
|
630 | deserialize: Deserialize<RequestType>,
|
631 | type: string
|
632 | ): boolean {
|
633 | if (this.handlers.has(name)) {
|
634 | return false;
|
635 | }
|
636 |
|
637 | this.handlers.set(name, {
|
638 | func: handler,
|
639 | serialize,
|
640 | deserialize,
|
641 | type,
|
642 | path: name,
|
643 | } as UntypedHandler);
|
644 | return true;
|
645 | }
|
646 |
|
647 | unregister(name: string): boolean {
|
648 | return this.handlers.delete(name);
|
649 | }
|
650 |
|
651 | start(): void {
|
652 | if (
|
653 | this.http2ServerList.length === 0 ||
|
654 | this.http2ServerList.every(
|
655 | ({server: http2Server}) => http2Server.listening !== true
|
656 | )
|
657 | ) {
|
658 | throw new Error('server must be bound in order to start');
|
659 | }
|
660 |
|
661 | if (this.started === true) {
|
662 | throw new Error('server is already started');
|
663 | }
|
664 | if (this.channelzEnabled) {
|
665 | this.channelzTrace.addTrace('CT_INFO', 'Starting');
|
666 | }
|
667 | this.started = true;
|
668 | }
|
669 |
|
670 | tryShutdown(callback: (error?: Error) => void): void {
|
671 | const wrappedCallback = (error?: Error) => {
|
672 | if (this.channelzEnabled) {
|
673 | unregisterChannelzRef(this.channelzRef);
|
674 | }
|
675 | callback(error);
|
676 | };
|
677 | let pendingChecks = 0;
|
678 |
|
679 | function maybeCallback(): void {
|
680 | pendingChecks--;
|
681 |
|
682 | if (pendingChecks === 0) {
|
683 | wrappedCallback();
|
684 | }
|
685 | }
|
686 |
|
687 |
|
688 | this.started = false;
|
689 |
|
690 | for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
|
691 | if (http2Server.listening) {
|
692 | pendingChecks++;
|
693 | http2Server.close(() => {
|
694 | if (this.channelzEnabled) {
|
695 | this.listenerChildrenTracker.unrefChild(ref);
|
696 | unregisterChannelzRef(ref);
|
697 | }
|
698 | maybeCallback();
|
699 | });
|
700 | }
|
701 | }
|
702 |
|
703 | this.sessions.forEach((channelzInfo, session) => {
|
704 | if (!session.closed) {
|
705 | pendingChecks += 1;
|
706 | session.close(maybeCallback);
|
707 | }
|
708 | });
|
709 | if (pendingChecks === 0) {
|
710 | wrappedCallback();
|
711 | }
|
712 | }
|
713 |
|
714 | addHttp2Port(): never {
|
715 | throw new Error('Not yet implemented');
|
716 | }
|
717 |
|
718 | |
719 |
|
720 |
|
721 |
|
722 |
|
723 | getChannelzRef() {
|
724 | return this.channelzRef;
|
725 | }
|
726 |
|
727 | private _setupHandlers(
|
728 | http2Server: http2.Http2Server | http2.Http2SecureServer
|
729 | ): void {
|
730 | if (http2Server === null) {
|
731 | return;
|
732 | }
|
733 |
|
734 | http2Server.on(
|
735 | 'stream',
|
736 | (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
|
737 | const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
|
738 | if (this.channelzEnabled) {
|
739 | this.callTracker.addCallStarted();
|
740 | channelzSessionInfo?.streamTracker.addCallStarted();
|
741 | }
|
742 | const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
|
743 |
|
744 | if (
|
745 | typeof contentType !== 'string' ||
|
746 | !contentType.startsWith('application/grpc')
|
747 | ) {
|
748 | stream.respond(
|
749 | {
|
750 | [http2.constants.HTTP2_HEADER_STATUS]:
|
751 | http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
|
752 | },
|
753 | { endStream: true }
|
754 | );
|
755 | this.callTracker.addCallFailed();
|
756 | if (this.channelzEnabled) {
|
757 | channelzSessionInfo?.streamTracker.addCallFailed();
|
758 | }
|
759 | return;
|
760 | }
|
761 |
|
762 | let call: Http2ServerCallStream<any, any> | null = null;
|
763 |
|
764 | try {
|
765 | const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
|
766 | const serverAddress = http2Server.address();
|
767 | let serverAddressString = 'null';
|
768 | if (serverAddress) {
|
769 | if (typeof serverAddress === 'string') {
|
770 | serverAddressString = serverAddress;
|
771 | } else {
|
772 | serverAddressString =
|
773 | serverAddress.address + ':' + serverAddress.port;
|
774 | }
|
775 | }
|
776 | this.trace(
|
777 | 'Received call to method ' +
|
778 | path +
|
779 | ' at address ' +
|
780 | serverAddressString
|
781 | );
|
782 | const handler = this.handlers.get(path);
|
783 |
|
784 | if (handler === undefined) {
|
785 | this.trace(
|
786 | 'No handler registered for method ' +
|
787 | path +
|
788 | '. Sending UNIMPLEMENTED status.'
|
789 | );
|
790 | throw getUnimplementedStatusResponse(path);
|
791 | }
|
792 |
|
793 | call = new Http2ServerCallStream(stream, handler, this.options);
|
794 | call.once('callEnd', (code: Status) => {
|
795 | if (code === Status.OK) {
|
796 | this.callTracker.addCallSucceeded();
|
797 | } else {
|
798 | this.callTracker.addCallFailed();
|
799 | }
|
800 | });
|
801 | if (this.channelzEnabled && channelzSessionInfo) {
|
802 | call.once('streamEnd', (success: boolean) => {
|
803 | if (success) {
|
804 | channelzSessionInfo.streamTracker.addCallSucceeded();
|
805 | } else {
|
806 | channelzSessionInfo.streamTracker.addCallFailed();
|
807 | }
|
808 | });
|
809 | call.on('sendMessage', () => {
|
810 | channelzSessionInfo.messagesSent += 1;
|
811 | channelzSessionInfo.lastMessageSentTimestamp = new Date();
|
812 | });
|
813 | call.on('receiveMessage', () => {
|
814 | channelzSessionInfo.messagesReceived += 1;
|
815 | channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
|
816 | });
|
817 | }
|
818 | const metadata = call.receiveMetadata(headers);
|
819 | const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
|
820 | metadata.remove('grpc-encoding');
|
821 |
|
822 | switch (handler.type) {
|
823 | case 'unary':
|
824 | handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
|
825 | break;
|
826 | case 'clientStream':
|
827 | handleClientStreaming(
|
828 | call,
|
829 | handler as UntypedClientStreamingHandler,
|
830 | metadata,
|
831 | encoding
|
832 | );
|
833 | break;
|
834 | case 'serverStream':
|
835 | handleServerStreaming(
|
836 | call,
|
837 | handler as UntypedServerStreamingHandler,
|
838 | metadata,
|
839 | encoding
|
840 | );
|
841 | break;
|
842 | case 'bidi':
|
843 | handleBidiStreaming(
|
844 | call,
|
845 | handler as UntypedBidiStreamingHandler,
|
846 | metadata,
|
847 | encoding
|
848 | );
|
849 | break;
|
850 | default:
|
851 | throw new Error(`Unknown handler type: ${handler.type}`);
|
852 | }
|
853 | } catch (err) {
|
854 | if (!call) {
|
855 | call = new Http2ServerCallStream(stream, null!, this.options);
|
856 | if (this.channelzEnabled) {
|
857 | this.callTracker.addCallFailed();
|
858 | channelzSessionInfo?.streamTracker.addCallFailed()
|
859 | }
|
860 | }
|
861 |
|
862 | if (err.code === undefined) {
|
863 | err.code = Status.INTERNAL;
|
864 | }
|
865 |
|
866 | call.sendError(err);
|
867 | }
|
868 | }
|
869 | );
|
870 |
|
871 | http2Server.on('session', (session) => {
|
872 | if (!this.started) {
|
873 | session.destroy();
|
874 | return;
|
875 | }
|
876 |
|
877 | let channelzRef: SocketRef;
|
878 | channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
|
879 |
|
880 | const channelzSessionInfo: ChannelzSessionInfo = {
|
881 | ref: channelzRef,
|
882 | streamTracker: new ChannelzCallTracker(),
|
883 | messagesSent: 0,
|
884 | messagesReceived: 0,
|
885 | lastMessageSentTimestamp: null,
|
886 | lastMessageReceivedTimestamp: null
|
887 | };
|
888 |
|
889 | this.sessions.set(session, channelzSessionInfo);
|
890 | const clientAddress = session.socket.remoteAddress;
|
891 | if (this.channelzEnabled) {
|
892 | this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
|
893 | this.sessionChildrenTracker.refChild(channelzRef);
|
894 | }
|
895 | session.on('close', () => {
|
896 | if (this.channelzEnabled) {
|
897 | this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
|
898 | this.sessionChildrenTracker.unrefChild(channelzRef);
|
899 | unregisterChannelzRef(channelzRef);
|
900 | }
|
901 | this.sessions.delete(session);
|
902 | });
|
903 | });
|
904 | }
|
905 | }
|
906 |
|
907 | async function handleUnary<RequestType, ResponseType>(
|
908 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
909 | handler: UnaryHandler<RequestType, ResponseType>,
|
910 | metadata: Metadata,
|
911 | encoding: string
|
912 | ): Promise<void> {
|
913 | const request = await call.receiveUnaryMessage(encoding);
|
914 |
|
915 | if (request === undefined || call.cancelled) {
|
916 | return;
|
917 | }
|
918 |
|
919 | const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
|
920 | call,
|
921 | metadata,
|
922 | request
|
923 | );
|
924 |
|
925 | handler.func(
|
926 | emitter,
|
927 | (
|
928 | err: ServerErrorResponse | ServerStatusResponse | null,
|
929 | value?: ResponseType | null,
|
930 | trailer?: Metadata,
|
931 | flags?: number
|
932 | ) => {
|
933 | call.sendUnaryMessage(err, value, trailer, flags);
|
934 | }
|
935 | );
|
936 | }
|
937 |
|
938 | function handleClientStreaming<RequestType, ResponseType>(
|
939 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
940 | handler: ClientStreamingHandler<RequestType, ResponseType>,
|
941 | metadata: Metadata,
|
942 | encoding: string
|
943 | ): void {
|
944 | const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
|
945 | call,
|
946 | metadata,
|
947 | handler.deserialize,
|
948 | encoding
|
949 | );
|
950 |
|
951 | function respond(
|
952 | err: ServerErrorResponse | ServerStatusResponse | null,
|
953 | value?: ResponseType | null,
|
954 | trailer?: Metadata,
|
955 | flags?: number
|
956 | ) {
|
957 | stream.destroy();
|
958 | call.sendUnaryMessage(err, value, trailer, flags);
|
959 | }
|
960 |
|
961 | if (call.cancelled) {
|
962 | return;
|
963 | }
|
964 |
|
965 | stream.on('error', respond);
|
966 | handler.func(stream, respond);
|
967 | }
|
968 |
|
969 | async function handleServerStreaming<RequestType, ResponseType>(
|
970 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
971 | handler: ServerStreamingHandler<RequestType, ResponseType>,
|
972 | metadata: Metadata,
|
973 | encoding: string
|
974 | ): Promise<void> {
|
975 | const request = await call.receiveUnaryMessage(encoding);
|
976 |
|
977 | if (request === undefined || call.cancelled) {
|
978 | return;
|
979 | }
|
980 |
|
981 | const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
|
982 | call,
|
983 | metadata,
|
984 | handler.serialize,
|
985 | request
|
986 | );
|
987 |
|
988 | handler.func(stream);
|
989 | }
|
990 |
|
991 | function handleBidiStreaming<RequestType, ResponseType>(
|
992 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
993 | handler: BidiStreamingHandler<RequestType, ResponseType>,
|
994 | metadata: Metadata,
|
995 | encoding: string
|
996 | ): void {
|
997 | const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
|
998 | call,
|
999 | metadata,
|
1000 | handler.serialize,
|
1001 | handler.deserialize,
|
1002 | encoding
|
1003 | );
|
1004 |
|
1005 | if (call.cancelled) {
|
1006 | return;
|
1007 | }
|
1008 |
|
1009 | handler.func(stream);
|
1010 | }
|