UNPKG

37.3 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import { AddressInfo } from 'net';
20
21import { ServiceError } from './call';
22import { Status, LogVerbosity } from './constants';
23import { Deserialize, Serialize, ServiceDefinition } from './make-client';
24import { Metadata } from './metadata';
25import {
26 BidiStreamingHandler,
27 ClientStreamingHandler,
28 HandleCall,
29 Handler,
30 HandlerType,
31 Http2ServerCallStream,
32 sendUnaryData,
33 ServerDuplexStream,
34 ServerDuplexStreamImpl,
35 ServerReadableStream,
36 ServerReadableStreamImpl,
37 ServerStreamingHandler,
38 ServerUnaryCall,
39 ServerUnaryCallImpl,
40 ServerWritableStream,
41 ServerWritableStreamImpl,
42 UnaryHandler,
43 ServerErrorResponse,
44 ServerStatusResponse,
45} from './server-call';
46import { ServerCredentials } from './server-credentials';
47import { ChannelOptions } from './channel-options';
48import {
49 createResolver,
50 ResolverListener,
51 mapUriDefaultScheme,
52} from './resolver';
53import * as logging from './logging';
54import {
55 SubchannelAddress,
56 TcpSubchannelAddress,
57 isTcpSubchannelAddress,
58 subchannelAddressToString,
59 stringToSubchannelAddress,
60} from './subchannel-address';
61import { parseUri } from './uri-parser';
62import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
63import { CipherNameAndProtocol, TLSSocket } from 'tls';
64import { getErrorCode, getErrorMessage } from './error';
65
66const UNLIMITED_CONNECTION_AGE_MS = ~(1<<31);
67const KEEPALIVE_MAX_TIME_MS = ~(1<<31);
68const KEEPALIVE_TIMEOUT_MS = 20000;
69
70const {
71 HTTP2_HEADER_PATH
72} = http2.constants
73
74const TRACER_NAME = 'server';
75
76interface BindResult {
77 port: number;
78 count: number;
79}
80
81function noop(): void {}
82
83function getUnimplementedStatusResponse(
84 methodName: string
85): Partial<ServiceError> {
86 return {
87 code: Status.UNIMPLEMENTED,
88 details: `The server does not implement the method ${methodName}`,
89 };
90}
91
92/* eslint-disable @typescript-eslint/no-explicit-any */
93type UntypedUnaryHandler = UnaryHandler<any, any>;
94type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
95type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
96type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
97export type UntypedHandleCall = HandleCall<any, any>;
98type UntypedHandler = Handler<any, any>;
99export interface UntypedServiceImplementation {
100 [name: string]: UntypedHandleCall;
101}
102
103function getDefaultHandler(handlerType: HandlerType, methodName: string) {
104 const unimplementedStatusResponse = getUnimplementedStatusResponse(
105 methodName
106 );
107 switch (handlerType) {
108 case 'unary':
109 return (
110 call: ServerUnaryCall<any, any>,
111 callback: sendUnaryData<any>
112 ) => {
113 callback(unimplementedStatusResponse as ServiceError, null);
114 };
115 case 'clientStream':
116 return (
117 call: ServerReadableStream<any, any>,
118 callback: sendUnaryData<any>
119 ) => {
120 callback(unimplementedStatusResponse as ServiceError, null);
121 };
122 case 'serverStream':
123 return (call: ServerWritableStream<any, any>) => {
124 call.emit('error', unimplementedStatusResponse);
125 };
126 case 'bidi':
127 return (call: ServerDuplexStream<any, any>) => {
128 call.emit('error', unimplementedStatusResponse);
129 };
130 default:
131 throw new Error(`Invalid handlerType ${handlerType}`);
132 }
133}
134
135interface ChannelzSessionInfo {
136 ref: SocketRef;
137 streamTracker: ChannelzCallTracker;
138 messagesSent: number;
139 messagesReceived: number;
140 lastMessageSentTimestamp: Date | null;
141 lastMessageReceivedTimestamp: Date | null;
142}
143
144interface ChannelzListenerInfo {
145 ref: SocketRef;
146}
147
148export class Server {
149 private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = [];
150
151 private handlers: Map<string, UntypedHandler> = new Map<
152 string,
153 UntypedHandler
154 >();
155 private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
156 private started = false;
157 private options: ChannelOptions;
158 private serverAddressString: string = 'null'
159
160 // Channelz Info
161 private readonly channelzEnabled: boolean = true;
162 private channelzRef: ServerRef;
163 private channelzTrace = new ChannelzTrace();
164 private callTracker = new ChannelzCallTracker();
165 private listenerChildrenTracker = new ChannelzChildrenTracker();
166 private sessionChildrenTracker = new ChannelzChildrenTracker();
167
168 private readonly maxConnectionAgeMs: number;
169 private readonly maxConnectionAgeGraceMs: number;
170
171 private readonly keepaliveTimeMs: number;
172 private readonly keepaliveTimeoutMs: number;
173
174 constructor(options?: ChannelOptions) {
175 this.options = options ?? {};
176 if (this.options['grpc.enable_channelz'] === 0) {
177 this.channelzEnabled = false;
178 }
179 this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
180 if (this.channelzEnabled) {
181 this.channelzTrace.addTrace('CT_INFO', 'Server created');
182 }
183 this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
184 this.maxConnectionAgeGraceMs = this.options['grpc.max_connection_age_grace_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
185 this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
186 this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
187 this.trace('Server constructed');
188 }
189
190 private getChannelzInfo(): ServerInfo {
191 return {
192 trace: this.channelzTrace,
193 callTracker: this.callTracker,
194 listenerChildren: this.listenerChildrenTracker.getChildLists(),
195 sessionChildren: this.sessionChildrenTracker.getChildLists()
196 };
197 }
198
199 private getChannelzSessionInfoGetter(session: http2.ServerHttp2Session): () => SocketInfo {
200 return () => {
201 const sessionInfo = this.sessions.get(session)!;
202 const sessionSocket = session.socket;
203 const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
204 const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress!, sessionSocket.localPort) : null;
205 let tlsInfo: TlsInfo | null;
206 if (session.encrypted) {
207 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
208 const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
209 const certificate = tlsSocket.getCertificate();
210 const peerCertificate = tlsSocket.getPeerCertificate();
211 tlsInfo = {
212 cipherSuiteStandardName: cipherInfo.standardName ?? null,
213 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
214 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
215 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
216 };
217 } else {
218 tlsInfo = null;
219 }
220 const socketInfo: SocketInfo = {
221 remoteAddress: remoteAddress,
222 localAddress: localAddress,
223 security: tlsInfo,
224 remoteName: null,
225 streamsStarted: sessionInfo.streamTracker.callsStarted,
226 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
227 streamsFailed: sessionInfo.streamTracker.callsFailed,
228 messagesSent: sessionInfo.messagesSent,
229 messagesReceived: sessionInfo.messagesReceived,
230 keepAlivesSent: 0,
231 lastLocalStreamCreatedTimestamp: null,
232 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
233 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
234 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
235 localFlowControlWindow: session.state.localWindowSize ?? null,
236 remoteFlowControlWindow: session.state.remoteWindowSize ?? null
237 };
238 return socketInfo;
239 };
240 }
241
242 private trace(text: string): void {
243 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
244 }
245
246
247 addProtoService(): never {
248 throw new Error('Not implemented. Use addService() instead');
249 }
250
251 addService(
252 service: ServiceDefinition,
253 implementation: UntypedServiceImplementation
254 ): void {
255 if (
256 service === null ||
257 typeof service !== 'object' ||
258 implementation === null ||
259 typeof implementation !== 'object'
260 ) {
261 throw new Error('addService() requires two objects as arguments');
262 }
263
264 const serviceKeys = Object.keys(service);
265
266 if (serviceKeys.length === 0) {
267 throw new Error('Cannot add an empty service to a server');
268 }
269
270 serviceKeys.forEach((name) => {
271 const attrs = service[name];
272 let methodType: HandlerType;
273
274 if (attrs.requestStream) {
275 if (attrs.responseStream) {
276 methodType = 'bidi';
277 } else {
278 methodType = 'clientStream';
279 }
280 } else {
281 if (attrs.responseStream) {
282 methodType = 'serverStream';
283 } else {
284 methodType = 'unary';
285 }
286 }
287
288 let implFn = implementation[name];
289 let impl;
290
291 if (implFn === undefined && typeof attrs.originalName === 'string') {
292 implFn = implementation[attrs.originalName];
293 }
294
295 if (implFn !== undefined) {
296 impl = implFn.bind(implementation);
297 } else {
298 impl = getDefaultHandler(methodType, name);
299 }
300
301 const success = this.register(
302 attrs.path,
303 impl as UntypedHandleCall,
304 attrs.responseSerialize,
305 attrs.requestDeserialize,
306 methodType
307 );
308
309 if (success === false) {
310 throw new Error(`Method handler for ${attrs.path} already provided.`);
311 }
312 });
313 }
314
315 removeService(service: ServiceDefinition): void {
316 if (service === null || typeof service !== 'object') {
317 throw new Error('removeService() requires object as argument');
318 }
319
320 const serviceKeys = Object.keys(service);
321 serviceKeys.forEach((name) => {
322 const attrs = service[name];
323 this.unregister(attrs.path);
324 });
325 }
326
327 bind(port: string, creds: ServerCredentials): never {
328 throw new Error('Not implemented. Use bindAsync() instead');
329 }
330
331 bindAsync(
332 port: string,
333 creds: ServerCredentials,
334 callback: (error: Error | null, port: number) => void
335 ): void {
336 if (this.started === true) {
337 throw new Error('server is already started');
338 }
339
340 if (typeof port !== 'string') {
341 throw new TypeError('port must be a string');
342 }
343
344 if (creds === null || !(creds instanceof ServerCredentials)) {
345 throw new TypeError('creds must be a ServerCredentials object');
346 }
347
348 if (typeof callback !== 'function') {
349 throw new TypeError('callback must be a function');
350 }
351
352 const initialPortUri = parseUri(port);
353 if (initialPortUri === null) {
354 throw new Error(`Could not parse port "${port}"`);
355 }
356 const portUri = mapUriDefaultScheme(initialPortUri);
357 if (portUri === null) {
358 throw new Error(`Could not get a default scheme for port "${port}"`);
359 }
360
361 const serverOptions: http2.ServerOptions = {
362 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
363 };
364 if ('grpc-node.max_session_memory' in this.options) {
365 serverOptions.maxSessionMemory = this.options[
366 'grpc-node.max_session_memory'
367 ];
368 } else {
369 /* By default, set a very large max session memory limit, to effectively
370 * disable enforcement of the limit. Some testing indicates that Node's
371 * behavior degrades badly when this limit is reached, so we solve that
372 * by disabling the check entirely. */
373 serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
374 }
375 if ('grpc.max_concurrent_streams' in this.options) {
376 serverOptions.settings = {
377 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
378 };
379 }
380
381 const deferredCallback = (error: Error | null, port: number) => {
382 process.nextTick(() => callback(error, port));
383 }
384
385 const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
386 let http2Server: http2.Http2Server | http2.Http2SecureServer;
387 if (creds._isSecure()) {
388 const secureServerOptions = Object.assign(
389 serverOptions,
390 creds._getSettings()!
391 );
392 http2Server = http2.createSecureServer(secureServerOptions);
393 http2Server.on('secureConnection', (socket: TLSSocket) => {
394 /* These errors need to be handled by the user of Http2SecureServer,
395 * according to https://github.com/nodejs/node/issues/35824 */
396 socket.on('error', (e: Error) => {
397 this.trace('An incoming TLS connection closed with error: ' + e.message);
398 });
399 });
400 } else {
401 http2Server = http2.createServer(serverOptions);
402 }
403
404 http2Server.setTimeout(0, noop);
405 this._setupHandlers(http2Server);
406 return http2Server;
407 };
408
409 const bindSpecificPort = (
410 addressList: SubchannelAddress[],
411 portNum: number,
412 previousCount: number
413 ): Promise<BindResult> => {
414 if (addressList.length === 0) {
415 return Promise.resolve({ port: portNum, count: previousCount });
416 }
417 return Promise.all(
418 addressList.map((address) => {
419 this.trace('Attempting to bind ' + subchannelAddressToString(address));
420 let addr: SubchannelAddress;
421 if (isTcpSubchannelAddress(address)) {
422 addr = {
423 host: (address as TcpSubchannelAddress).host,
424 port: portNum,
425 };
426 } else {
427 addr = address;
428 }
429
430 const http2Server = setupServer();
431 return new Promise<number | Error>((resolve, reject) => {
432 const onError = (err: Error) => {
433 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
434 resolve(err);
435 }
436
437 http2Server.once('error', onError);
438
439 http2Server.listen(addr, () => {
440 const boundAddress = http2Server.address()!;
441 let boundSubchannelAddress: SubchannelAddress;
442 if (typeof boundAddress === 'string') {
443 boundSubchannelAddress = {
444 path: boundAddress
445 };
446 } else {
447 boundSubchannelAddress = {
448 host: boundAddress.address,
449 port: boundAddress.port
450 }
451 }
452 let channelzRef: SocketRef;
453 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
454 return {
455 localAddress: boundSubchannelAddress,
456 remoteAddress: null,
457 security: null,
458 remoteName: null,
459 streamsStarted: 0,
460 streamsSucceeded: 0,
461 streamsFailed: 0,
462 messagesSent: 0,
463 messagesReceived: 0,
464 keepAlivesSent: 0,
465 lastLocalStreamCreatedTimestamp: null,
466 lastRemoteStreamCreatedTimestamp: null,
467 lastMessageSentTimestamp: null,
468 lastMessageReceivedTimestamp: null,
469 localFlowControlWindow: null,
470 remoteFlowControlWindow: null
471 };
472 }, this.channelzEnabled);
473 if (this.channelzEnabled) {
474 this.listenerChildrenTracker.refChild(channelzRef);
475 }
476 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
477 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
478 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
479 http2Server.removeListener('error', onError);
480 });
481 });
482 })
483 ).then((results) => {
484 let count = 0;
485 for (const result of results) {
486 if (typeof result === 'number') {
487 count += 1;
488 if (result !== portNum) {
489 throw new Error(
490 'Invalid state: multiple port numbers added from single address'
491 );
492 }
493 }
494 }
495 return {
496 port: portNum,
497 count: count + previousCount,
498 };
499 });
500 };
501
502 const bindWildcardPort = (
503 addressList: SubchannelAddress[]
504 ): Promise<BindResult> => {
505 if (addressList.length === 0) {
506 return Promise.resolve<BindResult>({ port: 0, count: 0 });
507 }
508 const address = addressList[0];
509 const http2Server = setupServer();
510 return new Promise<BindResult>((resolve, reject) => {
511 const onError = (err: Error) => {
512 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
513 resolve(bindWildcardPort(addressList.slice(1)));
514 }
515
516 http2Server.once('error', onError);
517
518 http2Server.listen(address, () => {
519 const boundAddress = http2Server.address() as AddressInfo;
520 const boundSubchannelAddress: SubchannelAddress = {
521 host: boundAddress.address,
522 port: boundAddress.port
523 };
524 let channelzRef: SocketRef;
525 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
526 return {
527 localAddress: boundSubchannelAddress,
528 remoteAddress: null,
529 security: null,
530 remoteName: null,
531 streamsStarted: 0,
532 streamsSucceeded: 0,
533 streamsFailed: 0,
534 messagesSent: 0,
535 messagesReceived: 0,
536 keepAlivesSent: 0,
537 lastLocalStreamCreatedTimestamp: null,
538 lastRemoteStreamCreatedTimestamp: null,
539 lastMessageSentTimestamp: null,
540 lastMessageReceivedTimestamp: null,
541 localFlowControlWindow: null,
542 remoteFlowControlWindow: null
543 };
544 }, this.channelzEnabled);
545 if (this.channelzEnabled) {
546 this.listenerChildrenTracker.refChild(channelzRef);
547 }
548 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
549 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
550 resolve(
551 bindSpecificPort(
552 addressList.slice(1),
553 boundAddress.port,
554 1
555 )
556 );
557 http2Server.removeListener('error', onError);
558 });
559 });
560 };
561
562 const resolverListener: ResolverListener = {
563 onSuccessfulResolution: (
564 addressList,
565 serviceConfig,
566 serviceConfigError
567 ) => {
568 // We only want one resolution result. Discard all future results
569 resolverListener.onSuccessfulResolution = () => {};
570 if (addressList.length === 0) {
571 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
572 return;
573 }
574 let bindResultPromise: Promise<BindResult>;
575 if (isTcpSubchannelAddress(addressList[0])) {
576 if (addressList[0].port === 0) {
577 bindResultPromise = bindWildcardPort(addressList);
578 } else {
579 bindResultPromise = bindSpecificPort(
580 addressList,
581 addressList[0].port,
582 0
583 );
584 }
585 } else {
586 // Use an arbitrary non-zero port for non-TCP addresses
587 bindResultPromise = bindSpecificPort(addressList, 1, 0);
588 }
589 bindResultPromise.then(
590 (bindResult) => {
591 if (bindResult.count === 0) {
592 const errorString = `No address added out of total ${addressList.length} resolved`;
593 logging.log(LogVerbosity.ERROR, errorString);
594 deferredCallback(new Error(errorString), 0);
595 } else {
596 if (bindResult.count < addressList.length) {
597 logging.log(
598 LogVerbosity.INFO,
599 `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
600 );
601 }
602 deferredCallback(null, bindResult.port);
603 }
604 },
605 (error) => {
606 const errorString = `No address added out of total ${addressList.length} resolved`;
607 logging.log(LogVerbosity.ERROR, errorString);
608 deferredCallback(new Error(errorString), 0);
609 }
610 );
611 },
612 onError: (error) => {
613 deferredCallback(new Error(error.details), 0);
614 },
615 };
616
617 const resolver = createResolver(portUri, resolverListener, this.options);
618 resolver.updateResolution();
619 }
620
621 forceShutdown(): void {
622 // Close the server if it is still running.
623
624 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
625 if (http2Server.listening) {
626 http2Server.close(() => {
627 if (this.channelzEnabled) {
628 this.listenerChildrenTracker.unrefChild(ref);
629 unregisterChannelzRef(ref);
630 }
631 });
632 }
633 }
634
635 this.started = false;
636
637 // Always destroy any available sessions. It's possible that one or more
638 // tryShutdown() calls are in progress. Don't wait on them to finish.
639 this.sessions.forEach((channelzInfo, session) => {
640 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
641 // recognize destroy(code) as a valid signature.
642 // eslint-disable-next-line @typescript-eslint/no-explicit-any
643 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
644 });
645 this.sessions.clear();
646 if (this.channelzEnabled) {
647 unregisterChannelzRef(this.channelzRef);
648 }
649 }
650
651 register<RequestType, ResponseType>(
652 name: string,
653 handler: HandleCall<RequestType, ResponseType>,
654 serialize: Serialize<ResponseType>,
655 deserialize: Deserialize<RequestType>,
656 type: string
657 ): boolean {
658 if (this.handlers.has(name)) {
659 return false;
660 }
661
662 this.handlers.set(name, {
663 func: handler,
664 serialize,
665 deserialize,
666 type,
667 path: name,
668 } as UntypedHandler);
669 return true;
670 }
671
672 unregister(name: string): boolean {
673 return this.handlers.delete(name);
674 }
675
676 start(): void {
677 if (
678 this.http2ServerList.length === 0 ||
679 this.http2ServerList.every(
680 ({server: http2Server}) => http2Server.listening !== true
681 )
682 ) {
683 throw new Error('server must be bound in order to start');
684 }
685
686 if (this.started === true) {
687 throw new Error('server is already started');
688 }
689 if (this.channelzEnabled) {
690 this.channelzTrace.addTrace('CT_INFO', 'Starting');
691 }
692 this.started = true;
693 }
694
695 tryShutdown(callback: (error?: Error) => void): void {
696 const wrappedCallback = (error?: Error) => {
697 if (this.channelzEnabled) {
698 unregisterChannelzRef(this.channelzRef);
699 }
700 callback(error);
701 };
702 let pendingChecks = 0;
703
704 function maybeCallback(): void {
705 pendingChecks--;
706
707 if (pendingChecks === 0) {
708 wrappedCallback();
709 }
710 }
711
712 // Close the server if necessary.
713 this.started = false;
714
715 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
716 if (http2Server.listening) {
717 pendingChecks++;
718 http2Server.close(() => {
719 if (this.channelzEnabled) {
720 this.listenerChildrenTracker.unrefChild(ref);
721 unregisterChannelzRef(ref);
722 }
723 maybeCallback();
724 });
725 }
726 }
727
728 this.sessions.forEach((channelzInfo, session) => {
729 if (!session.closed) {
730 pendingChecks += 1;
731 session.close(maybeCallback);
732 }
733 });
734 if (pendingChecks === 0) {
735 wrappedCallback();
736 }
737 }
738
739 addHttp2Port(): never {
740 throw new Error('Not yet implemented');
741 }
742
743 /**
744 * Get the channelz reference object for this server. The returned value is
745 * garbage if channelz is disabled for this server.
746 * @returns
747 */
748 getChannelzRef() {
749 return this.channelzRef;
750 }
751
752 private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean {
753 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
754
755 if (
756 typeof contentType !== 'string' ||
757 !contentType.startsWith('application/grpc')
758 ) {
759 stream.respond(
760 {
761 [http2.constants.HTTP2_HEADER_STATUS]:
762 http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
763 },
764 { endStream: true }
765 );
766 return false
767 }
768
769 return true
770 }
771
772 private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler<any, any> {
773 const path = headers[HTTP2_HEADER_PATH] as string
774
775 this.trace(
776 'Received call to method ' +
777 path +
778 ' at address ' +
779 this.serverAddressString
780 );
781
782 const handler = this.handlers.get(path);
783
784 if (handler === undefined) {
785 this.trace(
786 'No handler registered for method ' +
787 path +
788 '. Sending UNIMPLEMENTED status.'
789 );
790 throw getUnimplementedStatusResponse(path);
791 }
792
793 return handler
794 }
795
796 private _respondWithError<T extends Partial<ServiceError>>(
797 err: T,
798 stream: http2.ServerHttp2Stream,
799 channelzSessionInfo: ChannelzSessionInfo | null = null
800 ) {
801 const call = new Http2ServerCallStream(stream, null!, this.options);
802
803 if (err.code === undefined) {
804 err.code = Status.INTERNAL;
805 }
806
807 if (this.channelzEnabled) {
808 this.callTracker.addCallFailed();
809 channelzSessionInfo?.streamTracker.addCallFailed()
810 }
811
812 call.sendError(err);
813 }
814
815 private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
816 const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
817
818 this.callTracker.addCallStarted();
819 channelzSessionInfo?.streamTracker.addCallStarted();
820
821 if (!this._verifyContentType(stream, headers)) {
822 this.callTracker.addCallFailed();
823 channelzSessionInfo?.streamTracker.addCallFailed();
824 return
825 }
826
827 let handler: Handler<any, any>
828 try {
829 handler = this._retrieveHandler(headers)
830 } catch (err) {
831 this._respondWithError({
832 details: getErrorMessage(err),
833 code: getErrorCode(err) ?? undefined
834 }, stream, channelzSessionInfo)
835 return
836 }
837
838 const call = new Http2ServerCallStream(stream, handler, this.options);
839
840 call.once('callEnd', (code: Status) => {
841 if (code === Status.OK) {
842 this.callTracker.addCallSucceeded();
843 } else {
844 this.callTracker.addCallFailed();
845 }
846 });
847
848 if (channelzSessionInfo) {
849 call.once('streamEnd', (success: boolean) => {
850 if (success) {
851 channelzSessionInfo.streamTracker.addCallSucceeded();
852 } else {
853 channelzSessionInfo.streamTracker.addCallFailed();
854 }
855 });
856 call.on('sendMessage', () => {
857 channelzSessionInfo.messagesSent += 1;
858 channelzSessionInfo.lastMessageSentTimestamp = new Date();
859 });
860 call.on('receiveMessage', () => {
861 channelzSessionInfo.messagesReceived += 1;
862 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
863 });
864 }
865
866 if (!this._runHandlerForCall(call, handler, headers)) {
867 this.callTracker.addCallFailed();
868 channelzSessionInfo?.streamTracker.addCallFailed()
869
870 call.sendError({
871 code: Status.INTERNAL,
872 details: `Unknown handler type: ${handler.type}`
873 });
874 }
875 }
876
877 private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
878 if (this._verifyContentType(stream, headers) !== true) {
879 return
880 }
881
882 let handler: Handler<any, any>
883 try {
884 handler = this._retrieveHandler(headers)
885 } catch (err) {
886 this._respondWithError({
887 details: getErrorMessage(err),
888 code: getErrorCode(err) ?? undefined
889 }, stream, null)
890 return
891 }
892
893 const call = new Http2ServerCallStream(stream, handler, this.options)
894 if (!this._runHandlerForCall(call, handler, headers)) {
895 call.sendError({
896 code: Status.INTERNAL,
897 details: `Unknown handler type: ${handler.type}`
898 });
899 }
900 }
901
902 private _runHandlerForCall(call: Http2ServerCallStream<any, any>, handler: Handler<any, any>, headers: http2.IncomingHttpHeaders): boolean {
903 const metadata = call.receiveMetadata(headers);
904 const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
905 metadata.remove('grpc-encoding');
906
907 const { type } = handler
908 if (type === 'unary') {
909 handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
910 } else if (type === 'clientStream') {
911 handleClientStreaming(
912 call,
913 handler as UntypedClientStreamingHandler,
914 metadata,
915 encoding
916 );
917 } else if (type === 'serverStream') {
918 handleServerStreaming(
919 call,
920 handler as UntypedServerStreamingHandler,
921 metadata,
922 encoding
923 );
924 } else if (type === 'bidi') {
925 handleBidiStreaming(
926 call,
927 handler as UntypedBidiStreamingHandler,
928 metadata,
929 encoding
930 );
931 } else {
932 return false
933 }
934
935 return true
936 }
937
938 private _setupHandlers(
939 http2Server: http2.Http2Server | http2.Http2SecureServer
940 ): void {
941 if (http2Server === null) {
942 return;
943 }
944
945 const serverAddress = http2Server.address();
946 let serverAddressString = 'null'
947 if (serverAddress) {
948 if (typeof serverAddress === 'string') {
949 serverAddressString = serverAddress
950 } else {
951 serverAddressString =
952 serverAddress.address + ':' + serverAddress.port
953 }
954 }
955 this.serverAddressString = serverAddressString
956
957 const handler = this.channelzEnabled
958 ? this._channelzHandler
959 : this._streamHandler
960
961 http2Server.on('stream', handler.bind(this))
962 http2Server.on('session', (session) => {
963 if (!this.started) {
964 session.destroy();
965 return;
966 }
967
968 let channelzRef: SocketRef;
969 channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
970
971 const channelzSessionInfo: ChannelzSessionInfo = {
972 ref: channelzRef,
973 streamTracker: new ChannelzCallTracker(),
974 messagesSent: 0,
975 messagesReceived: 0,
976 lastMessageSentTimestamp: null,
977 lastMessageReceivedTimestamp: null
978 };
979
980 this.sessions.set(session, channelzSessionInfo);
981 const clientAddress = session.socket.remoteAddress;
982 if (this.channelzEnabled) {
983 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
984 this.sessionChildrenTracker.refChild(channelzRef);
985 }
986 let connectionAgeTimer: NodeJS.Timer | null = null;
987 let connectionAgeGraceTimer: NodeJS.Timer | null = null;
988 let sessionClosedByServer = false;
989 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
990 // Apply a random jitter within a +/-10% range
991 const jitterMagnitude = this.maxConnectionAgeMs / 10;
992 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
993 connectionAgeTimer = setTimeout(() => {
994 sessionClosedByServer = true;
995 if (this.channelzEnabled) {
996 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
997 }
998 try {
999 session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1<<31), Buffer.from('max_age'));
1000 } catch (e) {
1001 // The goaway can't be sent because the session is already closed
1002 session.destroy();
1003 return;
1004 }
1005 session.close();
1006 /* Allow a grace period after sending the GOAWAY before forcibly
1007 * closing the connection. */
1008 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
1009 connectionAgeGraceTimer = setTimeout(() => {
1010 session.destroy();
1011 }, this.maxConnectionAgeGraceMs).unref?.();
1012 }
1013 }, this.maxConnectionAgeMs + jitter).unref?.();
1014 }
1015 const keeapliveTimeTimer: NodeJS.Timer | null = setInterval(() => {
1016 const timeoutTImer = setTimeout(() => {
1017 sessionClosedByServer = true;
1018 if (this.channelzEnabled) {
1019 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
1020 }
1021 session.close();
1022 }, this.keepaliveTimeoutMs).unref?.();
1023 try {
1024 session.ping((err: Error | null, duration: number, payload: Buffer) => {
1025 clearTimeout(timeoutTImer);
1026 });
1027 } catch (e) {
1028 // The ping can't be sent because the session is already closed
1029 session.destroy();
1030 }
1031 }, this.keepaliveTimeMs).unref?.();
1032 session.on('close', () => {
1033 if (this.channelzEnabled) {
1034 if (!sessionClosedByServer) {
1035 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
1036 }
1037 this.sessionChildrenTracker.unrefChild(channelzRef);
1038 unregisterChannelzRef(channelzRef);
1039 }
1040 if (connectionAgeTimer) {
1041 clearTimeout(connectionAgeTimer);
1042 }
1043 if (connectionAgeGraceTimer) {
1044 clearTimeout(connectionAgeGraceTimer);
1045 }
1046 if (keeapliveTimeTimer) {
1047 clearTimeout(keeapliveTimeTimer);
1048 }
1049 this.sessions.delete(session);
1050 });
1051 });
1052 }
1053}
1054
1055function handleUnary<RequestType, ResponseType>(
1056 call: Http2ServerCallStream<RequestType, ResponseType>,
1057 handler: UnaryHandler<RequestType, ResponseType>,
1058 metadata: Metadata,
1059 encoding: string
1060): void {
1061 call.receiveUnaryMessage(encoding, (err, request) => {
1062 if (err) {
1063 call.sendError(err)
1064 return
1065 }
1066
1067 if (request === undefined || call.cancelled) {
1068 return;
1069 }
1070
1071 const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
1072 call,
1073 metadata,
1074 request
1075 );
1076
1077 handler.func(
1078 emitter,
1079 (
1080 err: ServerErrorResponse | ServerStatusResponse | null,
1081 value?: ResponseType | null,
1082 trailer?: Metadata,
1083 flags?: number
1084 ) => {
1085 call.sendUnaryMessage(err, value, trailer, flags);
1086 }
1087 );
1088 });
1089}
1090
1091function handleClientStreaming<RequestType, ResponseType>(
1092 call: Http2ServerCallStream<RequestType, ResponseType>,
1093 handler: ClientStreamingHandler<RequestType, ResponseType>,
1094 metadata: Metadata,
1095 encoding: string
1096): void {
1097 const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
1098 call,
1099 metadata,
1100 handler.deserialize,
1101 encoding
1102 );
1103
1104 function respond(
1105 err: ServerErrorResponse | ServerStatusResponse | null,
1106 value?: ResponseType | null,
1107 trailer?: Metadata,
1108 flags?: number
1109 ) {
1110 stream.destroy();
1111 call.sendUnaryMessage(err, value, trailer, flags);
1112 }
1113
1114 if (call.cancelled) {
1115 return;
1116 }
1117
1118 stream.on('error', respond);
1119 handler.func(stream, respond);
1120}
1121
1122function handleServerStreaming<RequestType, ResponseType>(
1123 call: Http2ServerCallStream<RequestType, ResponseType>,
1124 handler: ServerStreamingHandler<RequestType, ResponseType>,
1125 metadata: Metadata,
1126 encoding: string
1127): void {
1128 call.receiveUnaryMessage(encoding, (err, request) => {
1129 if (err) {
1130 call.sendError(err)
1131 return
1132 }
1133
1134 if (request === undefined || call.cancelled) {
1135 return;
1136 }
1137
1138 const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
1139 call,
1140 metadata,
1141 handler.serialize,
1142 request
1143 );
1144
1145 handler.func(stream);
1146 });
1147}
1148
1149function handleBidiStreaming<RequestType, ResponseType>(
1150 call: Http2ServerCallStream<RequestType, ResponseType>,
1151 handler: BidiStreamingHandler<RequestType, ResponseType>,
1152 metadata: Metadata,
1153 encoding: string
1154): void {
1155 const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
1156 call,
1157 metadata,
1158 handler.serialize,
1159 handler.deserialize,
1160 encoding
1161 );
1162
1163 if (call.cancelled) {
1164 return;
1165 }
1166
1167 handler.func(stream);
1168}