UNPKG

59.5 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import * as util from 'util';
20
21import { ServiceError } from './call';
22import { Status, LogVerbosity } from './constants';
23import { Deserialize, Serialize, ServiceDefinition } from './make-client';
24import { Metadata } from './metadata';
25import {
26 BidiStreamingHandler,
27 ClientStreamingHandler,
28 HandleCall,
29 Handler,
30 HandlerType,
31 sendUnaryData,
32 ServerDuplexStream,
33 ServerDuplexStreamImpl,
34 ServerReadableStream,
35 ServerStreamingHandler,
36 ServerUnaryCall,
37 ServerWritableStream,
38 ServerWritableStreamImpl,
39 UnaryHandler,
40 ServerErrorResponse,
41 ServerStatusResponse,
42 serverErrorToStatus,
43} from './server-call';
44import { ServerCredentials } from './server-credentials';
45import { ChannelOptions } from './channel-options';
46import {
47 createResolver,
48 ResolverListener,
49 mapUriDefaultScheme,
50} from './resolver';
51import * as logging from './logging';
52import {
53 SubchannelAddress,
54 isTcpSubchannelAddress,
55 subchannelAddressToString,
56 stringToSubchannelAddress,
57} from './subchannel-address';
58import {
59 GrpcUri,
60 combineHostPort,
61 parseUri,
62 splitHostPort,
63 uriToString,
64} from './uri-parser';
65import {
66 ChannelzCallTracker,
67 ChannelzCallTrackerStub,
68 ChannelzChildrenTracker,
69 ChannelzChildrenTrackerStub,
70 ChannelzTrace,
71 ChannelzTraceStub,
72 registerChannelzServer,
73 registerChannelzSocket,
74 ServerInfo,
75 ServerRef,
76 SocketInfo,
77 SocketRef,
78 TlsInfo,
79 unregisterChannelzRef,
80} from './channelz';
81import { CipherNameAndProtocol, TLSSocket } from 'tls';
82import {
83 ServerInterceptingCallInterface,
84 ServerInterceptor,
85 getServerInterceptingCall,
86} from './server-interceptors';
87import { PartialStatusObject } from './call-interface';
88import { CallEventTracker } from './transport';
89
90const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
91const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
92const KEEPALIVE_TIMEOUT_MS = 20000;
93const MAX_CONNECTION_IDLE_MS = ~(1 << 31);
94
95const { HTTP2_HEADER_PATH } = http2.constants;
96
97const TRACER_NAME = 'server';
98const kMaxAge = Buffer.from('max_age');
99
100type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
101
102interface BindResult {
103 port: number;
104 count: number;
105 errors: string[];
106}
107
108interface SingleAddressBindResult {
109 port: number;
110 error?: string;
111}
112
113function noop(): void {}
114
115/**
116 * Decorator to wrap a class method with util.deprecate
117 * @param message The message to output if the deprecated method is called
118 * @returns
119 */
120function deprecate(message: string) {
121 return function <This, Args extends any[], Return>(
122 target: (this: This, ...args: Args) => Return,
123 context: ClassMethodDecoratorContext<
124 This,
125 (this: This, ...args: Args) => Return
126 >
127 ) {
128 return util.deprecate(target, message);
129 };
130}
131
132function getUnimplementedStatusResponse(
133 methodName: string
134): PartialStatusObject {
135 return {
136 code: Status.UNIMPLEMENTED,
137 details: `The server does not implement the method ${methodName}`,
138 };
139}
140
141/* eslint-disable @typescript-eslint/no-explicit-any */
142type UntypedUnaryHandler = UnaryHandler<any, any>;
143type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
144type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
145type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
146export type UntypedHandleCall = HandleCall<any, any>;
147type UntypedHandler = Handler<any, any>;
148export interface UntypedServiceImplementation {
149 [name: string]: UntypedHandleCall;
150}
151
152function getDefaultHandler(handlerType: HandlerType, methodName: string) {
153 const unimplementedStatusResponse =
154 getUnimplementedStatusResponse(methodName);
155 switch (handlerType) {
156 case 'unary':
157 return (
158 call: ServerUnaryCall<any, any>,
159 callback: sendUnaryData<any>
160 ) => {
161 callback(unimplementedStatusResponse as ServiceError, null);
162 };
163 case 'clientStream':
164 return (
165 call: ServerReadableStream<any, any>,
166 callback: sendUnaryData<any>
167 ) => {
168 callback(unimplementedStatusResponse as ServiceError, null);
169 };
170 case 'serverStream':
171 return (call: ServerWritableStream<any, any>) => {
172 call.emit('error', unimplementedStatusResponse);
173 };
174 case 'bidi':
175 return (call: ServerDuplexStream<any, any>) => {
176 call.emit('error', unimplementedStatusResponse);
177 };
178 default:
179 throw new Error(`Invalid handlerType ${handlerType}`);
180 }
181}
182
183interface ChannelzSessionInfo {
184 ref: SocketRef;
185 streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
186 messagesSent: number;
187 messagesReceived: number;
188 keepAlivesSent: number;
189 lastMessageSentTimestamp: Date | null;
190 lastMessageReceivedTimestamp: Date | null;
191}
192
193/**
194 * Information related to a single invocation of bindAsync. This should be
195 * tracked in a map keyed by target string, normalized with a pass through
196 * parseUri -> mapUriDefaultScheme -> uriToString. If the target has a port
197 * number and the port number is 0, the target string is modified with the
198 * concrete bound port.
199 */
200interface BoundPort {
201 /**
202 * The key used to refer to this object in the boundPorts map.
203 */
204 mapKey: string;
205 /**
206 * The target string, passed through parseUri -> mapUriDefaultScheme. Used
207 * to determine the final key when the port number is 0.
208 */
209 originalUri: GrpcUri;
210 /**
211 * If there is a pending bindAsync operation, this is a promise that resolves
212 * with the port number when that operation succeeds. If there is no such
213 * operation pending, this is null.
214 */
215 completionPromise: Promise<number> | null;
216 /**
217 * The port number that was actually bound. Populated only after
218 * completionPromise resolves.
219 */
220 portNumber: number;
221 /**
222 * Set by unbind if called while pending is true.
223 */
224 cancelled: boolean;
225 /**
226 * The credentials object passed to the original bindAsync call.
227 */
228 credentials: ServerCredentials;
229 /**
230 * The set of servers associated with this listening port. A target string
231 * that expands to multiple addresses will result in multiple listening
232 * servers.
233 */
234 listeningServers: Set<AnyHttp2Server>;
235}
236
237/**
238 * Should be in a map keyed by AnyHttp2Server.
239 */
240interface Http2ServerInfo {
241 channelzRef: SocketRef;
242 sessions: Set<http2.ServerHttp2Session>;
243}
244
245interface SessionIdleTimeoutTracker {
246 activeStreams: number;
247 lastIdle: number;
248 timeout: NodeJS.Timeout;
249 onClose: (session: http2.ServerHttp2Session) => void | null;
250}
251
252export interface ServerOptions extends ChannelOptions {
253 interceptors?: ServerInterceptor[];
254}
255
256export class Server {
257 private boundPorts: Map<string, BoundPort> = new Map();
258 private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
259 private sessionIdleTimeouts = new Map<
260 http2.ServerHttp2Session,
261 SessionIdleTimeoutTracker
262 >();
263
264 private handlers: Map<string, UntypedHandler> = new Map<
265 string,
266 UntypedHandler
267 >();
268 private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
269 /**
270 * This field only exists to ensure that the start method throws an error if
271 * it is called twice, as it did previously.
272 */
273 private started = false;
274 private shutdown = false;
275 private options: ServerOptions;
276 private serverAddressString = 'null';
277
278 // Channelz Info
279 private readonly channelzEnabled: boolean = true;
280 private channelzRef: ServerRef;
281 private channelzTrace: ChannelzTrace | ChannelzTraceStub;
282 private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
283 private listenerChildrenTracker:
284 | ChannelzChildrenTracker
285 | ChannelzChildrenTrackerStub;
286 private sessionChildrenTracker:
287 | ChannelzChildrenTracker
288 | ChannelzChildrenTrackerStub;
289
290 private readonly maxConnectionAgeMs: number;
291 private readonly maxConnectionAgeGraceMs: number;
292
293 private readonly keepaliveTimeMs: number;
294 private readonly keepaliveTimeoutMs: number;
295
296 private readonly sessionIdleTimeout: number;
297
298 private readonly interceptors: ServerInterceptor[];
299
300 /**
301 * Options that will be used to construct all Http2Server instances for this
302 * Server.
303 */
304 private commonServerOptions: http2.ServerOptions;
305
306 constructor(options?: ServerOptions) {
307 this.options = options ?? {};
308 if (this.options['grpc.enable_channelz'] === 0) {
309 this.channelzEnabled = false;
310 this.channelzTrace = new ChannelzTraceStub();
311 this.callTracker = new ChannelzCallTrackerStub();
312 this.listenerChildrenTracker = new ChannelzChildrenTrackerStub();
313 this.sessionChildrenTracker = new ChannelzChildrenTrackerStub();
314 } else {
315 this.channelzTrace = new ChannelzTrace();
316 this.callTracker = new ChannelzCallTracker();
317 this.listenerChildrenTracker = new ChannelzChildrenTracker();
318 this.sessionChildrenTracker = new ChannelzChildrenTracker();
319 }
320
321 this.channelzRef = registerChannelzServer(
322 'server',
323 () => this.getChannelzInfo(),
324 this.channelzEnabled
325 );
326
327 this.channelzTrace.addTrace('CT_INFO', 'Server created');
328 this.maxConnectionAgeMs =
329 this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
330 this.maxConnectionAgeGraceMs =
331 this.options['grpc.max_connection_age_grace_ms'] ??
332 UNLIMITED_CONNECTION_AGE_MS;
333 this.keepaliveTimeMs =
334 this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
335 this.keepaliveTimeoutMs =
336 this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
337 this.sessionIdleTimeout =
338 this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS;
339
340 this.commonServerOptions = {
341 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
342 };
343 if ('grpc-node.max_session_memory' in this.options) {
344 this.commonServerOptions.maxSessionMemory =
345 this.options['grpc-node.max_session_memory'];
346 } else {
347 /* By default, set a very large max session memory limit, to effectively
348 * disable enforcement of the limit. Some testing indicates that Node's
349 * behavior degrades badly when this limit is reached, so we solve that
350 * by disabling the check entirely. */
351 this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
352 }
353 if ('grpc.max_concurrent_streams' in this.options) {
354 this.commonServerOptions.settings = {
355 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
356 };
357 }
358 this.interceptors = this.options.interceptors ?? [];
359 this.trace('Server constructed');
360 }
361
362 private getChannelzInfo(): ServerInfo {
363 return {
364 trace: this.channelzTrace,
365 callTracker: this.callTracker,
366 listenerChildren: this.listenerChildrenTracker.getChildLists(),
367 sessionChildren: this.sessionChildrenTracker.getChildLists(),
368 };
369 }
370
371 private getChannelzSessionInfo(
372 session: http2.ServerHttp2Session
373 ): SocketInfo {
374 const sessionInfo = this.sessions.get(session)!;
375 const sessionSocket = session.socket;
376 const remoteAddress = sessionSocket.remoteAddress
377 ? stringToSubchannelAddress(
378 sessionSocket.remoteAddress,
379 sessionSocket.remotePort
380 )
381 : null;
382 const localAddress = sessionSocket.localAddress
383 ? stringToSubchannelAddress(
384 sessionSocket.localAddress!,
385 sessionSocket.localPort
386 )
387 : null;
388 let tlsInfo: TlsInfo | null;
389 if (session.encrypted) {
390 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
391 const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
392 tlsSocket.getCipher();
393 const certificate = tlsSocket.getCertificate();
394 const peerCertificate = tlsSocket.getPeerCertificate();
395 tlsInfo = {
396 cipherSuiteStandardName: cipherInfo.standardName ?? null,
397 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
398 localCertificate:
399 certificate && 'raw' in certificate ? certificate.raw : null,
400 remoteCertificate:
401 peerCertificate && 'raw' in peerCertificate
402 ? peerCertificate.raw
403 : null,
404 };
405 } else {
406 tlsInfo = null;
407 }
408 const socketInfo: SocketInfo = {
409 remoteAddress: remoteAddress,
410 localAddress: localAddress,
411 security: tlsInfo,
412 remoteName: null,
413 streamsStarted: sessionInfo.streamTracker.callsStarted,
414 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
415 streamsFailed: sessionInfo.streamTracker.callsFailed,
416 messagesSent: sessionInfo.messagesSent,
417 messagesReceived: sessionInfo.messagesReceived,
418 keepAlivesSent: sessionInfo.keepAlivesSent,
419 lastLocalStreamCreatedTimestamp: null,
420 lastRemoteStreamCreatedTimestamp:
421 sessionInfo.streamTracker.lastCallStartedTimestamp,
422 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
423 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
424 localFlowControlWindow: session.state.localWindowSize ?? null,
425 remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
426 };
427 return socketInfo;
428 }
429
430 private trace(text: string): void {
431 logging.trace(
432 LogVerbosity.DEBUG,
433 TRACER_NAME,
434 '(' + this.channelzRef.id + ') ' + text
435 );
436 }
437
438 addProtoService(): never {
439 throw new Error('Not implemented. Use addService() instead');
440 }
441
442 addService(
443 service: ServiceDefinition,
444 implementation: UntypedServiceImplementation
445 ): void {
446 if (
447 service === null ||
448 typeof service !== 'object' ||
449 implementation === null ||
450 typeof implementation !== 'object'
451 ) {
452 throw new Error('addService() requires two objects as arguments');
453 }
454
455 const serviceKeys = Object.keys(service);
456
457 if (serviceKeys.length === 0) {
458 throw new Error('Cannot add an empty service to a server');
459 }
460
461 serviceKeys.forEach(name => {
462 const attrs = service[name];
463 let methodType: HandlerType;
464
465 if (attrs.requestStream) {
466 if (attrs.responseStream) {
467 methodType = 'bidi';
468 } else {
469 methodType = 'clientStream';
470 }
471 } else {
472 if (attrs.responseStream) {
473 methodType = 'serverStream';
474 } else {
475 methodType = 'unary';
476 }
477 }
478
479 let implFn = implementation[name];
480 let impl;
481
482 if (implFn === undefined && typeof attrs.originalName === 'string') {
483 implFn = implementation[attrs.originalName];
484 }
485
486 if (implFn !== undefined) {
487 impl = implFn.bind(implementation);
488 } else {
489 impl = getDefaultHandler(methodType, name);
490 }
491
492 const success = this.register(
493 attrs.path,
494 impl as UntypedHandleCall,
495 attrs.responseSerialize,
496 attrs.requestDeserialize,
497 methodType
498 );
499
500 if (success === false) {
501 throw new Error(`Method handler for ${attrs.path} already provided.`);
502 }
503 });
504 }
505
506 removeService(service: ServiceDefinition): void {
507 if (service === null || typeof service !== 'object') {
508 throw new Error('removeService() requires object as argument');
509 }
510
511 const serviceKeys = Object.keys(service);
512 serviceKeys.forEach(name => {
513 const attrs = service[name];
514 this.unregister(attrs.path);
515 });
516 }
517
518 bind(port: string, creds: ServerCredentials): never {
519 throw new Error('Not implemented. Use bindAsync() instead');
520 }
521
522 private registerListenerToChannelz(boundAddress: SubchannelAddress) {
523 return registerChannelzSocket(
524 subchannelAddressToString(boundAddress),
525 () => {
526 return {
527 localAddress: boundAddress,
528 remoteAddress: null,
529 security: null,
530 remoteName: null,
531 streamsStarted: 0,
532 streamsSucceeded: 0,
533 streamsFailed: 0,
534 messagesSent: 0,
535 messagesReceived: 0,
536 keepAlivesSent: 0,
537 lastLocalStreamCreatedTimestamp: null,
538 lastRemoteStreamCreatedTimestamp: null,
539 lastMessageSentTimestamp: null,
540 lastMessageReceivedTimestamp: null,
541 localFlowControlWindow: null,
542 remoteFlowControlWindow: null,
543 };
544 },
545 this.channelzEnabled
546 );
547 }
548
549 private createHttp2Server(credentials: ServerCredentials) {
550 let http2Server: http2.Http2Server | http2.Http2SecureServer;
551 if (credentials._isSecure()) {
552 const secureServerOptions = Object.assign(
553 this.commonServerOptions,
554 credentials._getSettings()!
555 );
556 secureServerOptions.enableTrace =
557 this.options['grpc-node.tls_enable_trace'] === 1;
558 http2Server = http2.createSecureServer(secureServerOptions);
559 http2Server.on('secureConnection', (socket: TLSSocket) => {
560 /* These errors need to be handled by the user of Http2SecureServer,
561 * according to https://github.com/nodejs/node/issues/35824 */
562 socket.on('error', (e: Error) => {
563 this.trace(
564 'An incoming TLS connection closed with error: ' + e.message
565 );
566 });
567 });
568 } else {
569 http2Server = http2.createServer(this.commonServerOptions);
570 }
571
572 http2Server.setTimeout(0, noop);
573 this._setupHandlers(http2Server);
574 return http2Server;
575 }
576
577 private bindOneAddress(
578 address: SubchannelAddress,
579 boundPortObject: BoundPort
580 ): Promise<SingleAddressBindResult> {
581 this.trace('Attempting to bind ' + subchannelAddressToString(address));
582 const http2Server = this.createHttp2Server(boundPortObject.credentials);
583 return new Promise<SingleAddressBindResult>((resolve, reject) => {
584 const onError = (err: Error) => {
585 this.trace(
586 'Failed to bind ' +
587 subchannelAddressToString(address) +
588 ' with error ' +
589 err.message
590 );
591 resolve({
592 port: 'port' in address ? address.port : 1,
593 error: err.message,
594 });
595 };
596
597 http2Server.once('error', onError);
598
599 http2Server.listen(address, () => {
600 const boundAddress = http2Server.address()!;
601 let boundSubchannelAddress: SubchannelAddress;
602 if (typeof boundAddress === 'string') {
603 boundSubchannelAddress = {
604 path: boundAddress,
605 };
606 } else {
607 boundSubchannelAddress = {
608 host: boundAddress.address,
609 port: boundAddress.port,
610 };
611 }
612
613 const channelzRef = this.registerListenerToChannelz(
614 boundSubchannelAddress
615 );
616 this.listenerChildrenTracker.refChild(channelzRef);
617
618 this.http2Servers.set(http2Server, {
619 channelzRef: channelzRef,
620 sessions: new Set(),
621 });
622 boundPortObject.listeningServers.add(http2Server);
623 this.trace(
624 'Successfully bound ' +
625 subchannelAddressToString(boundSubchannelAddress)
626 );
627 resolve({
628 port:
629 'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1,
630 });
631 http2Server.removeListener('error', onError);
632 });
633 });
634 }
635
636 private async bindManyPorts(
637 addressList: SubchannelAddress[],
638 boundPortObject: BoundPort
639 ): Promise<BindResult> {
640 if (addressList.length === 0) {
641 return {
642 count: 0,
643 port: 0,
644 errors: [],
645 };
646 }
647 if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) {
648 /* If binding to port 0, first try to bind the first address, then bind
649 * the rest of the address list to the specific port that it binds. */
650 const firstAddressResult = await this.bindOneAddress(
651 addressList[0],
652 boundPortObject
653 );
654 if (firstAddressResult.error) {
655 /* If the first address fails to bind, try the same operation starting
656 * from the second item in the list. */
657 const restAddressResult = await this.bindManyPorts(
658 addressList.slice(1),
659 boundPortObject
660 );
661 return {
662 ...restAddressResult,
663 errors: [firstAddressResult.error, ...restAddressResult.errors],
664 };
665 } else {
666 const restAddresses = addressList
667 .slice(1)
668 .map(address =>
669 isTcpSubchannelAddress(address)
670 ? { host: address.host, port: firstAddressResult.port }
671 : address
672 );
673 const restAddressResult = await Promise.all(
674 restAddresses.map(address =>
675 this.bindOneAddress(address, boundPortObject)
676 )
677 );
678 const allResults = [firstAddressResult, ...restAddressResult];
679 return {
680 count: allResults.filter(result => result.error === undefined).length,
681 port: firstAddressResult.port,
682 errors: allResults
683 .filter(result => result.error)
684 .map(result => result.error!),
685 };
686 }
687 } else {
688 const allResults = await Promise.all(
689 addressList.map(address =>
690 this.bindOneAddress(address, boundPortObject)
691 )
692 );
693 return {
694 count: allResults.filter(result => result.error === undefined).length,
695 port: allResults[0].port,
696 errors: allResults
697 .filter(result => result.error)
698 .map(result => result.error!),
699 };
700 }
701 }
702
703 private async bindAddressList(
704 addressList: SubchannelAddress[],
705 boundPortObject: BoundPort
706 ): Promise<number> {
707 const bindResult = await this.bindManyPorts(addressList, boundPortObject);
708 if (bindResult.count > 0) {
709 if (bindResult.count < addressList.length) {
710 logging.log(
711 LogVerbosity.INFO,
712 `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
713 );
714 }
715 return bindResult.port;
716 } else {
717 const errorString = `No address added out of total ${addressList.length} resolved`;
718 logging.log(LogVerbosity.ERROR, errorString);
719 throw new Error(
720 `${errorString} errors: [${bindResult.errors.join(',')}]`
721 );
722 }
723 }
724
725 private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
726 return new Promise<SubchannelAddress[]>((resolve, reject) => {
727 const resolverListener: ResolverListener = {
728 onSuccessfulResolution: (
729 endpointList,
730 serviceConfig,
731 serviceConfigError
732 ) => {
733 // We only want one resolution result. Discard all future results
734 resolverListener.onSuccessfulResolution = () => {};
735 const addressList = ([] as SubchannelAddress[]).concat(
736 ...endpointList.map(endpoint => endpoint.addresses)
737 );
738 if (addressList.length === 0) {
739 reject(new Error(`No addresses resolved for port ${port}`));
740 return;
741 }
742 resolve(addressList);
743 },
744 onError: error => {
745 reject(new Error(error.details));
746 },
747 };
748 const resolver = createResolver(port, resolverListener, this.options);
749 resolver.updateResolution();
750 });
751 }
752
753 private async bindPort(
754 port: GrpcUri,
755 boundPortObject: BoundPort
756 ): Promise<number> {
757 const addressList = await this.resolvePort(port);
758 if (boundPortObject.cancelled) {
759 this.completeUnbind(boundPortObject);
760 throw new Error('bindAsync operation cancelled by unbind call');
761 }
762 const portNumber = await this.bindAddressList(addressList, boundPortObject);
763 if (boundPortObject.cancelled) {
764 this.completeUnbind(boundPortObject);
765 throw new Error('bindAsync operation cancelled by unbind call');
766 }
767 return portNumber;
768 }
769
770 private normalizePort(port: string): GrpcUri {
771 const initialPortUri = parseUri(port);
772 if (initialPortUri === null) {
773 throw new Error(`Could not parse port "${port}"`);
774 }
775 const portUri = mapUriDefaultScheme(initialPortUri);
776 if (portUri === null) {
777 throw new Error(`Could not get a default scheme for port "${port}"`);
778 }
779 return portUri;
780 }
781
782 bindAsync(
783 port: string,
784 creds: ServerCredentials,
785 callback: (error: Error | null, port: number) => void
786 ): void {
787 if (this.shutdown) {
788 throw new Error('bindAsync called after shutdown');
789 }
790 if (typeof port !== 'string') {
791 throw new TypeError('port must be a string');
792 }
793
794 if (creds === null || !(creds instanceof ServerCredentials)) {
795 throw new TypeError('creds must be a ServerCredentials object');
796 }
797
798 if (typeof callback !== 'function') {
799 throw new TypeError('callback must be a function');
800 }
801
802 this.trace('bindAsync port=' + port);
803
804 const portUri = this.normalizePort(port);
805
806 const deferredCallback = (error: Error | null, port: number) => {
807 process.nextTick(() => callback(error, port));
808 };
809
810 /* First, if this port is already bound or that bind operation is in
811 * progress, use that result. */
812 let boundPortObject = this.boundPorts.get(uriToString(portUri));
813 if (boundPortObject) {
814 if (!creds._equals(boundPortObject.credentials)) {
815 deferredCallback(
816 new Error(`${port} already bound with incompatible credentials`),
817 0
818 );
819 return;
820 }
821 /* If that operation has previously been cancelled by an unbind call,
822 * uncancel it. */
823 boundPortObject.cancelled = false;
824 if (boundPortObject.completionPromise) {
825 boundPortObject.completionPromise.then(
826 portNum => callback(null, portNum),
827 error => callback(error as Error, 0)
828 );
829 } else {
830 deferredCallback(null, boundPortObject.portNumber);
831 }
832 return;
833 }
834 boundPortObject = {
835 mapKey: uriToString(portUri),
836 originalUri: portUri,
837 completionPromise: null,
838 cancelled: false,
839 portNumber: 0,
840 credentials: creds,
841 listeningServers: new Set(),
842 };
843 const splitPort = splitHostPort(portUri.path);
844 const completionPromise = this.bindPort(portUri, boundPortObject);
845 boundPortObject.completionPromise = completionPromise;
846 /* If the port number is 0, defer populating the map entry until after the
847 * bind operation completes and we have a specific port number. Otherwise,
848 * populate it immediately. */
849 if (splitPort?.port === 0) {
850 completionPromise.then(
851 portNum => {
852 const finalUri: GrpcUri = {
853 scheme: portUri.scheme,
854 authority: portUri.authority,
855 path: combineHostPort({ host: splitPort.host, port: portNum }),
856 };
857 boundPortObject!.mapKey = uriToString(finalUri);
858 boundPortObject!.completionPromise = null;
859 boundPortObject!.portNumber = portNum;
860 this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
861 callback(null, portNum);
862 },
863 error => {
864 callback(error, 0);
865 }
866 );
867 } else {
868 this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
869 completionPromise.then(
870 portNum => {
871 boundPortObject!.completionPromise = null;
872 boundPortObject!.portNumber = portNum;
873 callback(null, portNum);
874 },
875 error => {
876 callback(error, 0);
877 }
878 );
879 }
880 }
881
882 private closeServer(server: AnyHttp2Server, callback?: () => void) {
883 this.trace(
884 'Closing server with address ' + JSON.stringify(server.address())
885 );
886 const serverInfo = this.http2Servers.get(server);
887 server.close(() => {
888 if (serverInfo) {
889 this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
890 unregisterChannelzRef(serverInfo.channelzRef);
891 }
892 this.http2Servers.delete(server);
893 callback?.();
894 });
895 }
896
897 private closeSession(
898 session: http2.ServerHttp2Session,
899 callback?: () => void
900 ) {
901 this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
902 const sessionInfo = this.sessions.get(session);
903 const closeCallback = () => {
904 if (sessionInfo) {
905 this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
906 unregisterChannelzRef(sessionInfo.ref);
907 }
908 callback?.();
909 };
910 if (session.closed) {
911 queueMicrotask(closeCallback);
912 } else {
913 session.close(closeCallback);
914 }
915 }
916
917 private completeUnbind(boundPortObject: BoundPort) {
918 for (const server of boundPortObject.listeningServers) {
919 const serverInfo = this.http2Servers.get(server);
920 this.closeServer(server, () => {
921 boundPortObject.listeningServers.delete(server);
922 });
923 if (serverInfo) {
924 for (const session of serverInfo.sessions) {
925 this.closeSession(session);
926 }
927 }
928 }
929 this.boundPorts.delete(boundPortObject.mapKey);
930 }
931
932 /**
933 * Unbind a previously bound port, or cancel an in-progress bindAsync
934 * operation. If port 0 was bound, only the actual bound port can be
935 * unbound. For example, if bindAsync was called with "localhost:0" and the
936 * bound port result was 54321, it can be unbound as "localhost:54321".
937 * @param port
938 */
939 unbind(port: string): void {
940 this.trace('unbind port=' + port);
941 const portUri = this.normalizePort(port);
942 const splitPort = splitHostPort(portUri.path);
943 if (splitPort?.port === 0) {
944 throw new Error('Cannot unbind port 0');
945 }
946 const boundPortObject = this.boundPorts.get(uriToString(portUri));
947 if (boundPortObject) {
948 this.trace(
949 'unbinding ' +
950 boundPortObject.mapKey +
951 ' originally bound as ' +
952 uriToString(boundPortObject.originalUri)
953 );
954 /* If the bind operation is pending, the cancelled flag will trigger
955 * the unbind operation later. */
956 if (boundPortObject.completionPromise) {
957 boundPortObject.cancelled = true;
958 } else {
959 this.completeUnbind(boundPortObject);
960 }
961 }
962 }
963
964 /**
965 * Gracefully close all connections associated with a previously bound port.
966 * After the grace time, forcefully close all remaining open connections.
967 *
968 * If port 0 was bound, only the actual bound port can be
969 * drained. For example, if bindAsync was called with "localhost:0" and the
970 * bound port result was 54321, it can be drained as "localhost:54321".
971 * @param port
972 * @param graceTimeMs
973 * @returns
974 */
975 drain(port: string, graceTimeMs: number): void {
976 this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
977 const portUri = this.normalizePort(port);
978 const splitPort = splitHostPort(portUri.path);
979 if (splitPort?.port === 0) {
980 throw new Error('Cannot drain port 0');
981 }
982 const boundPortObject = this.boundPorts.get(uriToString(portUri));
983 if (!boundPortObject) {
984 return;
985 }
986 const allSessions: Set<http2.Http2Session> = new Set();
987 for (const http2Server of boundPortObject.listeningServers) {
988 const serverEntry = this.http2Servers.get(http2Server);
989 if (serverEntry) {
990 for (const session of serverEntry.sessions) {
991 allSessions.add(session);
992 this.closeSession(session, () => {
993 allSessions.delete(session);
994 });
995 }
996 }
997 }
998 /* After the grace time ends, send another goaway to all remaining sessions
999 * with the CANCEL code. */
1000 setTimeout(() => {
1001 for (const session of allSessions) {
1002 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
1003 }
1004 }, graceTimeMs).unref?.();
1005 }
1006
1007 forceShutdown(): void {
1008 for (const boundPortObject of this.boundPorts.values()) {
1009 boundPortObject.cancelled = true;
1010 }
1011 this.boundPorts.clear();
1012 // Close the server if it is still running.
1013 for (const server of this.http2Servers.keys()) {
1014 this.closeServer(server);
1015 }
1016
1017 // Always destroy any available sessions. It's possible that one or more
1018 // tryShutdown() calls are in progress. Don't wait on them to finish.
1019 this.sessions.forEach((channelzInfo, session) => {
1020 this.closeSession(session);
1021 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
1022 // recognize destroy(code) as a valid signature.
1023 // eslint-disable-next-line @typescript-eslint/no-explicit-any
1024 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
1025 });
1026 this.sessions.clear();
1027 unregisterChannelzRef(this.channelzRef);
1028
1029 this.shutdown = true;
1030 }
1031
1032 register<RequestType, ResponseType>(
1033 name: string,
1034 handler: HandleCall<RequestType, ResponseType>,
1035 serialize: Serialize<ResponseType>,
1036 deserialize: Deserialize<RequestType>,
1037 type: string
1038 ): boolean {
1039 if (this.handlers.has(name)) {
1040 return false;
1041 }
1042
1043 this.handlers.set(name, {
1044 func: handler,
1045 serialize,
1046 deserialize,
1047 type,
1048 path: name,
1049 } as UntypedHandler);
1050 return true;
1051 }
1052
1053 unregister(name: string): boolean {
1054 return this.handlers.delete(name);
1055 }
1056
1057 /**
1058 * @deprecated No longer needed as of version 1.10.x
1059 */
1060 @deprecate(
1061 'Calling start() is no longer necessary. It can be safely omitted.'
1062 )
1063 start(): void {
1064 if (
1065 this.http2Servers.size === 0 ||
1066 [...this.http2Servers.keys()].every(server => !server.listening)
1067 ) {
1068 throw new Error('server must be bound in order to start');
1069 }
1070
1071 if (this.started === true) {
1072 throw new Error('server is already started');
1073 }
1074 this.started = true;
1075 }
1076
1077 tryShutdown(callback: (error?: Error) => void): void {
1078 const wrappedCallback = (error?: Error) => {
1079 unregisterChannelzRef(this.channelzRef);
1080 callback(error);
1081 };
1082 let pendingChecks = 0;
1083
1084 function maybeCallback(): void {
1085 pendingChecks--;
1086
1087 if (pendingChecks === 0) {
1088 wrappedCallback();
1089 }
1090 }
1091 this.shutdown = true;
1092
1093 for (const [serverKey, server] of this.http2Servers.entries()) {
1094 pendingChecks++;
1095 const serverString = server.channelzRef.name;
1096 this.trace('Waiting for server ' + serverString + ' to close');
1097 this.closeServer(serverKey, () => {
1098 this.trace('Server ' + serverString + ' finished closing');
1099 maybeCallback();
1100 });
1101
1102 for (const session of server.sessions.keys()) {
1103 pendingChecks++;
1104 const sessionString = session.socket?.remoteAddress;
1105 this.trace('Waiting for session ' + sessionString + ' to close');
1106 this.closeSession(session, () => {
1107 this.trace('Session ' + sessionString + ' finished closing');
1108 maybeCallback();
1109 });
1110 }
1111 }
1112
1113 if (pendingChecks === 0) {
1114 wrappedCallback();
1115 }
1116 }
1117
1118 addHttp2Port(): never {
1119 throw new Error('Not yet implemented');
1120 }
1121
1122 /**
1123 * Get the channelz reference object for this server. The returned value is
1124 * garbage if channelz is disabled for this server.
1125 * @returns
1126 */
1127 getChannelzRef() {
1128 return this.channelzRef;
1129 }
1130
1131 private _verifyContentType(
1132 stream: http2.ServerHttp2Stream,
1133 headers: http2.IncomingHttpHeaders
1134 ): boolean {
1135 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
1136
1137 if (
1138 typeof contentType !== 'string' ||
1139 !contentType.startsWith('application/grpc')
1140 ) {
1141 stream.respond(
1142 {
1143 [http2.constants.HTTP2_HEADER_STATUS]:
1144 http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
1145 },
1146 { endStream: true }
1147 );
1148 return false;
1149 }
1150
1151 return true;
1152 }
1153
1154 private _retrieveHandler(path: string): Handler<any, any> | null {
1155 this.trace(
1156 'Received call to method ' +
1157 path +
1158 ' at address ' +
1159 this.serverAddressString
1160 );
1161
1162 const handler = this.handlers.get(path);
1163
1164 if (handler === undefined) {
1165 this.trace(
1166 'No handler registered for method ' +
1167 path +
1168 '. Sending UNIMPLEMENTED status.'
1169 );
1170 return null;
1171 }
1172
1173 return handler;
1174 }
1175
1176 private _respondWithError(
1177 err: PartialStatusObject,
1178 stream: http2.ServerHttp2Stream,
1179 channelzSessionInfo: ChannelzSessionInfo | null = null
1180 ) {
1181 const trailersToSend = {
1182 'grpc-status': err.code ?? Status.INTERNAL,
1183 'grpc-message': err.details,
1184 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
1185 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
1186 ...err.metadata?.toHttp2Headers(),
1187 };
1188 stream.respond(trailersToSend, { endStream: true });
1189
1190 this.callTracker.addCallFailed();
1191 channelzSessionInfo?.streamTracker.addCallFailed();
1192 }
1193
1194 private _channelzHandler(
1195 stream: http2.ServerHttp2Stream,
1196 headers: http2.IncomingHttpHeaders
1197 ) {
1198 // for handling idle timeout
1199 this.onStreamOpened(stream);
1200
1201 const channelzSessionInfo = this.sessions.get(
1202 stream.session as http2.ServerHttp2Session
1203 );
1204
1205 this.callTracker.addCallStarted();
1206 channelzSessionInfo?.streamTracker.addCallStarted();
1207
1208 if (!this._verifyContentType(stream, headers)) {
1209 this.callTracker.addCallFailed();
1210 channelzSessionInfo?.streamTracker.addCallFailed();
1211 return;
1212 }
1213
1214 const path = headers[HTTP2_HEADER_PATH] as string;
1215
1216 const handler = this._retrieveHandler(path);
1217 if (!handler) {
1218 this._respondWithError(
1219 getUnimplementedStatusResponse(path),
1220 stream,
1221 channelzSessionInfo
1222 );
1223 return;
1224 }
1225
1226 const callEventTracker: CallEventTracker = {
1227 addMessageSent: () => {
1228 if (channelzSessionInfo) {
1229 channelzSessionInfo.messagesSent += 1;
1230 channelzSessionInfo.lastMessageSentTimestamp = new Date();
1231 }
1232 },
1233 addMessageReceived: () => {
1234 if (channelzSessionInfo) {
1235 channelzSessionInfo.messagesReceived += 1;
1236 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
1237 }
1238 },
1239 onCallEnd: status => {
1240 if (status.code === Status.OK) {
1241 this.callTracker.addCallSucceeded();
1242 } else {
1243 this.callTracker.addCallFailed();
1244 }
1245 },
1246 onStreamEnd: success => {
1247 if (channelzSessionInfo) {
1248 if (success) {
1249 channelzSessionInfo.streamTracker.addCallSucceeded();
1250 } else {
1251 channelzSessionInfo.streamTracker.addCallFailed();
1252 }
1253 }
1254 },
1255 };
1256
1257 const call = getServerInterceptingCall(
1258 this.interceptors,
1259 stream,
1260 headers,
1261 callEventTracker,
1262 handler,
1263 this.options
1264 );
1265
1266 if (!this._runHandlerForCall(call, handler)) {
1267 this.callTracker.addCallFailed();
1268 channelzSessionInfo?.streamTracker.addCallFailed();
1269
1270 call.sendStatus({
1271 code: Status.INTERNAL,
1272 details: `Unknown handler type: ${handler.type}`,
1273 });
1274 }
1275 }
1276
1277 private _streamHandler(
1278 stream: http2.ServerHttp2Stream,
1279 headers: http2.IncomingHttpHeaders
1280 ) {
1281 // for handling idle timeout
1282 this.onStreamOpened(stream);
1283
1284 if (this._verifyContentType(stream, headers) !== true) {
1285 return;
1286 }
1287
1288 const path = headers[HTTP2_HEADER_PATH] as string;
1289
1290 const handler = this._retrieveHandler(path);
1291 if (!handler) {
1292 this._respondWithError(
1293 getUnimplementedStatusResponse(path),
1294 stream,
1295 null
1296 );
1297 return;
1298 }
1299
1300 const call = getServerInterceptingCall(
1301 this.interceptors,
1302 stream,
1303 headers,
1304 null,
1305 handler,
1306 this.options
1307 );
1308
1309 if (!this._runHandlerForCall(call, handler)) {
1310 call.sendStatus({
1311 code: Status.INTERNAL,
1312 details: `Unknown handler type: ${handler.type}`,
1313 });
1314 }
1315 }
1316
1317 private _runHandlerForCall(
1318 call: ServerInterceptingCallInterface,
1319 handler:
1320 | UntypedUnaryHandler
1321 | UntypedClientStreamingHandler
1322 | UntypedServerStreamingHandler
1323 | UntypedBidiStreamingHandler
1324 ): boolean {
1325 const { type } = handler;
1326 if (type === 'unary') {
1327 handleUnary(call, handler);
1328 } else if (type === 'clientStream') {
1329 handleClientStreaming(call, handler);
1330 } else if (type === 'serverStream') {
1331 handleServerStreaming(call, handler);
1332 } else if (type === 'bidi') {
1333 handleBidiStreaming(call, handler);
1334 } else {
1335 return false;
1336 }
1337
1338 return true;
1339 }
1340
1341 private _setupHandlers(
1342 http2Server: http2.Http2Server | http2.Http2SecureServer
1343 ): void {
1344 if (http2Server === null) {
1345 return;
1346 }
1347
1348 const serverAddress = http2Server.address();
1349 let serverAddressString = 'null';
1350 if (serverAddress) {
1351 if (typeof serverAddress === 'string') {
1352 serverAddressString = serverAddress;
1353 } else {
1354 serverAddressString = serverAddress.address + ':' + serverAddress.port;
1355 }
1356 }
1357 this.serverAddressString = serverAddressString;
1358
1359 const handler = this.channelzEnabled
1360 ? this._channelzHandler
1361 : this._streamHandler;
1362
1363 const sessionHandler = this.channelzEnabled
1364 ? this._channelzSessionHandler(http2Server)
1365 : this._sessionHandler(http2Server);
1366
1367 http2Server.on('stream', handler.bind(this));
1368 http2Server.on('session', sessionHandler);
1369 }
1370
1371 private _sessionHandler(
1372 http2Server: http2.Http2Server | http2.Http2SecureServer
1373 ) {
1374 return (session: http2.ServerHttp2Session) => {
1375 this.http2Servers.get(http2Server)?.sessions.add(session);
1376
1377 let connectionAgeTimer: NodeJS.Timeout | null = null;
1378 let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
1379 let keeapliveTimeTimer: NodeJS.Timeout | null = null;
1380 let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
1381 let sessionClosedByServer = false;
1382
1383 const idleTimeoutObj = this.enableIdleTimeout(session);
1384
1385 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
1386 // Apply a random jitter within a +/-10% range
1387 const jitterMagnitude = this.maxConnectionAgeMs / 10;
1388 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
1389
1390 connectionAgeTimer = setTimeout(() => {
1391 sessionClosedByServer = true;
1392
1393 this.trace(
1394 'Connection dropped by max connection age: ' +
1395 session.socket?.remoteAddress
1396 );
1397
1398 try {
1399 session.goaway(
1400 http2.constants.NGHTTP2_NO_ERROR,
1401 ~(1 << 31),
1402 kMaxAge
1403 );
1404 } catch (e) {
1405 // The goaway can't be sent because the session is already closed
1406 session.destroy();
1407 return;
1408 }
1409 session.close();
1410
1411 /* Allow a grace period after sending the GOAWAY before forcibly
1412 * closing the connection. */
1413 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
1414 connectionAgeGraceTimer = setTimeout(() => {
1415 session.destroy();
1416 }, this.maxConnectionAgeGraceMs);
1417 connectionAgeGraceTimer.unref?.();
1418 }
1419 }, this.maxConnectionAgeMs + jitter);
1420 connectionAgeTimer.unref?.();
1421 }
1422
1423 if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
1424 keeapliveTimeTimer = setInterval(() => {
1425 keepaliveTimeoutTimer = setTimeout(() => {
1426 sessionClosedByServer = true;
1427 session.close();
1428 }, this.keepaliveTimeoutMs);
1429 keepaliveTimeoutTimer.unref?.();
1430
1431 try {
1432 session.ping(
1433 (err: Error | null, duration: number, payload: Buffer) => {
1434 if (keepaliveTimeoutTimer) {
1435 clearTimeout(keepaliveTimeoutTimer);
1436 }
1437
1438 if (err) {
1439 sessionClosedByServer = true;
1440 this.trace(
1441 'Connection dropped due to error of a ping frame ' +
1442 err.message +
1443 ' return in ' +
1444 duration
1445 );
1446 session.close();
1447 }
1448 }
1449 );
1450 } catch (e) {
1451 clearTimeout(keepaliveTimeoutTimer);
1452 // The ping can't be sent because the session is already closed
1453 session.destroy();
1454 }
1455 }, this.keepaliveTimeMs);
1456 keeapliveTimeTimer.unref?.();
1457 }
1458
1459 session.on('close', () => {
1460 if (!sessionClosedByServer) {
1461 this.trace(
1462 `Connection dropped by client ${session.socket?.remoteAddress}`
1463 );
1464 }
1465
1466 if (connectionAgeTimer) {
1467 clearTimeout(connectionAgeTimer);
1468 }
1469
1470 if (connectionAgeGraceTimer) {
1471 clearTimeout(connectionAgeGraceTimer);
1472 }
1473
1474 if (keeapliveTimeTimer) {
1475 clearInterval(keeapliveTimeTimer);
1476 if (keepaliveTimeoutTimer) {
1477 clearTimeout(keepaliveTimeoutTimer);
1478 }
1479 }
1480
1481 if (idleTimeoutObj !== null) {
1482 clearTimeout(idleTimeoutObj.timeout);
1483 this.sessionIdleTimeouts.delete(session);
1484 }
1485
1486 this.http2Servers.get(http2Server)?.sessions.delete(session);
1487 });
1488 };
1489 }
1490
1491 private _channelzSessionHandler(
1492 http2Server: http2.Http2Server | http2.Http2SecureServer
1493 ) {
1494 return (session: http2.ServerHttp2Session) => {
1495 const channelzRef = registerChannelzSocket(
1496 session.socket?.remoteAddress ?? 'unknown',
1497 this.getChannelzSessionInfo.bind(this, session),
1498 this.channelzEnabled
1499 );
1500
1501 const channelzSessionInfo: ChannelzSessionInfo = {
1502 ref: channelzRef,
1503 streamTracker: new ChannelzCallTracker(),
1504 messagesSent: 0,
1505 messagesReceived: 0,
1506 keepAlivesSent: 0,
1507 lastMessageSentTimestamp: null,
1508 lastMessageReceivedTimestamp: null,
1509 };
1510
1511 this.http2Servers.get(http2Server)?.sessions.add(session);
1512 this.sessions.set(session, channelzSessionInfo);
1513 const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`;
1514
1515 this.channelzTrace.addTrace(
1516 'CT_INFO',
1517 'Connection established by client ' + clientAddress
1518 );
1519 this.trace('Connection established by client ' + clientAddress);
1520 this.sessionChildrenTracker.refChild(channelzRef);
1521
1522 let connectionAgeTimer: NodeJS.Timeout | null = null;
1523 let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
1524 let keeapliveTimeTimer: NodeJS.Timeout | null = null;
1525 let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
1526 let sessionClosedByServer = false;
1527
1528 const idleTimeoutObj = this.enableIdleTimeout(session);
1529
1530 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
1531 // Apply a random jitter within a +/-10% range
1532 const jitterMagnitude = this.maxConnectionAgeMs / 10;
1533 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
1534
1535 connectionAgeTimer = setTimeout(() => {
1536 sessionClosedByServer = true;
1537 this.channelzTrace.addTrace(
1538 'CT_INFO',
1539 'Connection dropped by max connection age from ' + clientAddress
1540 );
1541
1542 try {
1543 session.goaway(
1544 http2.constants.NGHTTP2_NO_ERROR,
1545 ~(1 << 31),
1546 kMaxAge
1547 );
1548 } catch (e) {
1549 // The goaway can't be sent because the session is already closed
1550 session.destroy();
1551 return;
1552 }
1553 session.close();
1554
1555 /* Allow a grace period after sending the GOAWAY before forcibly
1556 * closing the connection. */
1557 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
1558 connectionAgeGraceTimer = setTimeout(() => {
1559 session.destroy();
1560 }, this.maxConnectionAgeGraceMs);
1561 connectionAgeGraceTimer.unref?.();
1562 }
1563 }, this.maxConnectionAgeMs + jitter);
1564 connectionAgeTimer.unref?.();
1565 }
1566
1567 if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
1568 keeapliveTimeTimer = setInterval(() => {
1569 keepaliveTimeoutTimer = setTimeout(() => {
1570 sessionClosedByServer = true;
1571 this.channelzTrace.addTrace(
1572 'CT_INFO',
1573 'Connection dropped by keepalive timeout from ' + clientAddress
1574 );
1575
1576 session.close();
1577 }, this.keepaliveTimeoutMs);
1578 keepaliveTimeoutTimer.unref?.();
1579
1580 try {
1581 session.ping(
1582 (err: Error | null, duration: number, payload: Buffer) => {
1583 if (keepaliveTimeoutTimer) {
1584 clearTimeout(keepaliveTimeoutTimer);
1585 }
1586
1587 if (err) {
1588 sessionClosedByServer = true;
1589 this.channelzTrace.addTrace(
1590 'CT_INFO',
1591 'Connection dropped due to error of a ping frame ' +
1592 err.message +
1593 ' return in ' +
1594 duration
1595 );
1596
1597 session.close();
1598 }
1599 }
1600 );
1601 channelzSessionInfo.keepAlivesSent += 1;
1602 } catch (e) {
1603 clearTimeout(keepaliveTimeoutTimer);
1604 // The ping can't be sent because the session is already closed
1605 session.destroy();
1606 }
1607 }, this.keepaliveTimeMs);
1608 keeapliveTimeTimer.unref?.();
1609 }
1610
1611 session.on('close', () => {
1612 if (!sessionClosedByServer) {
1613 this.channelzTrace.addTrace(
1614 'CT_INFO',
1615 'Connection dropped by client ' + clientAddress
1616 );
1617 }
1618
1619 this.sessionChildrenTracker.unrefChild(channelzRef);
1620 unregisterChannelzRef(channelzRef);
1621
1622 if (connectionAgeTimer) {
1623 clearTimeout(connectionAgeTimer);
1624 }
1625
1626 if (connectionAgeGraceTimer) {
1627 clearTimeout(connectionAgeGraceTimer);
1628 }
1629
1630 if (keeapliveTimeTimer) {
1631 clearInterval(keeapliveTimeTimer);
1632 if (keepaliveTimeoutTimer) {
1633 clearTimeout(keepaliveTimeoutTimer);
1634 }
1635 }
1636
1637 if (idleTimeoutObj !== null) {
1638 clearTimeout(idleTimeoutObj.timeout);
1639 this.sessionIdleTimeouts.delete(session);
1640 }
1641
1642 this.http2Servers.get(http2Server)?.sessions.delete(session);
1643 this.sessions.delete(session);
1644 });
1645 };
1646 }
1647
1648 private enableIdleTimeout(
1649 session: http2.ServerHttp2Session
1650 ): SessionIdleTimeoutTracker | null {
1651 if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) {
1652 return null;
1653 }
1654
1655 const idleTimeoutObj: SessionIdleTimeoutTracker = {
1656 activeStreams: 0,
1657 lastIdle: Date.now(),
1658 onClose: this.onStreamClose.bind(this, session),
1659 timeout: setTimeout(
1660 this.onIdleTimeout,
1661 this.sessionIdleTimeout,
1662 this,
1663 session
1664 ),
1665 };
1666 idleTimeoutObj.timeout.unref?.();
1667 this.sessionIdleTimeouts.set(session, idleTimeoutObj);
1668
1669 const { socket } = session;
1670 this.trace(
1671 'Enable idle timeout for ' +
1672 socket.remoteAddress +
1673 ':' +
1674 socket.remotePort
1675 );
1676
1677 return idleTimeoutObj;
1678 }
1679
1680 private onIdleTimeout(
1681 this: undefined,
1682 ctx: Server,
1683 session: http2.ServerHttp2Session
1684 ) {
1685 const { socket } = session;
1686 const sessionInfo = ctx.sessionIdleTimeouts.get(session);
1687
1688 // if it is called while we have activeStreams - timer will not be rescheduled
1689 // until last active stream is closed, then it will call .refresh() on the timer
1690 // important part is to not clearTimeout(timer) or it becomes unusable
1691 // for future refreshes
1692 if (
1693 sessionInfo !== undefined &&
1694 sessionInfo.activeStreams === 0 &&
1695 Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout
1696 ) {
1697 ctx.trace(
1698 'Session idle timeout triggered for ' +
1699 socket?.remoteAddress +
1700 ':' +
1701 socket?.remotePort +
1702 ' last idle at ' +
1703 sessionInfo.lastIdle
1704 );
1705
1706 ctx.closeSession(session);
1707 }
1708 }
1709
1710 private onStreamOpened(stream: http2.ServerHttp2Stream) {
1711 const session = stream.session as http2.ServerHttp2Session;
1712
1713 const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
1714 if (idleTimeoutObj) {
1715 idleTimeoutObj.activeStreams += 1;
1716 stream.once('close', idleTimeoutObj.onClose);
1717 }
1718 }
1719
1720 private onStreamClose(session: http2.ServerHttp2Session) {
1721 const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
1722
1723 if (idleTimeoutObj) {
1724 idleTimeoutObj.activeStreams -= 1;
1725 if (idleTimeoutObj.activeStreams === 0) {
1726 idleTimeoutObj.lastIdle = Date.now();
1727 idleTimeoutObj.timeout.refresh();
1728
1729 this.trace(
1730 'Session onStreamClose' +
1731 session.socket?.remoteAddress +
1732 ':' +
1733 session.socket?.remotePort +
1734 ' at ' +
1735 idleTimeoutObj.lastIdle
1736 );
1737 }
1738 }
1739 }
1740}
1741
1742async function handleUnary<RequestType, ResponseType>(
1743 call: ServerInterceptingCallInterface,
1744 handler: UnaryHandler<RequestType, ResponseType>
1745): Promise<void> {
1746 let stream: ServerUnaryCall<RequestType, ResponseType>;
1747
1748 function respond(
1749 err: ServerErrorResponse | ServerStatusResponse | null,
1750 value?: ResponseType | null,
1751 trailer?: Metadata,
1752 flags?: number
1753 ) {
1754 if (err) {
1755 call.sendStatus(serverErrorToStatus(err, trailer));
1756 return;
1757 }
1758 call.sendMessage(value, () => {
1759 call.sendStatus({
1760 code: Status.OK,
1761 details: 'OK',
1762 metadata: trailer ?? null,
1763 });
1764 });
1765 }
1766
1767 let requestMetadata: Metadata;
1768 let requestMessage: RequestType | null = null;
1769 call.start({
1770 onReceiveMetadata(metadata) {
1771 requestMetadata = metadata;
1772 call.startRead();
1773 },
1774 onReceiveMessage(message) {
1775 if (requestMessage) {
1776 call.sendStatus({
1777 code: Status.UNIMPLEMENTED,
1778 details: `Received a second request message for server streaming method ${handler.path}`,
1779 metadata: null,
1780 });
1781 return;
1782 }
1783 requestMessage = message;
1784 call.startRead();
1785 },
1786 onReceiveHalfClose() {
1787 if (!requestMessage) {
1788 call.sendStatus({
1789 code: Status.UNIMPLEMENTED,
1790 details: `Received no request message for server streaming method ${handler.path}`,
1791 metadata: null,
1792 });
1793 return;
1794 }
1795 stream = new ServerWritableStreamImpl(
1796 handler.path,
1797 call,
1798 requestMetadata,
1799 requestMessage
1800 );
1801 try {
1802 handler.func(stream, respond);
1803 } catch (err) {
1804 call.sendStatus({
1805 code: Status.UNKNOWN,
1806 details: `Server method handler threw error ${
1807 (err as Error).message
1808 }`,
1809 metadata: null,
1810 });
1811 }
1812 },
1813 onCancel() {
1814 if (stream) {
1815 stream.cancelled = true;
1816 stream.emit('cancelled', 'cancelled');
1817 }
1818 },
1819 });
1820}
1821
1822function handleClientStreaming<RequestType, ResponseType>(
1823 call: ServerInterceptingCallInterface,
1824 handler: ClientStreamingHandler<RequestType, ResponseType>
1825): void {
1826 let stream: ServerReadableStream<RequestType, ResponseType>;
1827
1828 function respond(
1829 err: ServerErrorResponse | ServerStatusResponse | null,
1830 value?: ResponseType | null,
1831 trailer?: Metadata,
1832 flags?: number
1833 ) {
1834 if (err) {
1835 call.sendStatus(serverErrorToStatus(err, trailer));
1836 return;
1837 }
1838 call.sendMessage(value, () => {
1839 call.sendStatus({
1840 code: Status.OK,
1841 details: 'OK',
1842 metadata: trailer ?? null,
1843 });
1844 });
1845 }
1846
1847 call.start({
1848 onReceiveMetadata(metadata) {
1849 stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
1850 try {
1851 handler.func(stream, respond);
1852 } catch (err) {
1853 call.sendStatus({
1854 code: Status.UNKNOWN,
1855 details: `Server method handler threw error ${
1856 (err as Error).message
1857 }`,
1858 metadata: null,
1859 });
1860 }
1861 },
1862 onReceiveMessage(message) {
1863 stream.push(message);
1864 },
1865 onReceiveHalfClose() {
1866 stream.push(null);
1867 },
1868 onCancel() {
1869 if (stream) {
1870 stream.cancelled = true;
1871 stream.emit('cancelled', 'cancelled');
1872 stream.destroy();
1873 }
1874 },
1875 });
1876}
1877
1878function handleServerStreaming<RequestType, ResponseType>(
1879 call: ServerInterceptingCallInterface,
1880 handler: ServerStreamingHandler<RequestType, ResponseType>
1881): void {
1882 let stream: ServerWritableStream<RequestType, ResponseType>;
1883
1884 let requestMetadata: Metadata;
1885 let requestMessage: RequestType | null = null;
1886 call.start({
1887 onReceiveMetadata(metadata) {
1888 requestMetadata = metadata;
1889 call.startRead();
1890 },
1891 onReceiveMessage(message) {
1892 if (requestMessage) {
1893 call.sendStatus({
1894 code: Status.UNIMPLEMENTED,
1895 details: `Received a second request message for server streaming method ${handler.path}`,
1896 metadata: null,
1897 });
1898 return;
1899 }
1900 requestMessage = message;
1901 call.startRead();
1902 },
1903 onReceiveHalfClose() {
1904 if (!requestMessage) {
1905 call.sendStatus({
1906 code: Status.UNIMPLEMENTED,
1907 details: `Received no request message for server streaming method ${handler.path}`,
1908 metadata: null,
1909 });
1910 return;
1911 }
1912 stream = new ServerWritableStreamImpl(
1913 handler.path,
1914 call,
1915 requestMetadata,
1916 requestMessage
1917 );
1918 try {
1919 handler.func(stream);
1920 } catch (err) {
1921 call.sendStatus({
1922 code: Status.UNKNOWN,
1923 details: `Server method handler threw error ${
1924 (err as Error).message
1925 }`,
1926 metadata: null,
1927 });
1928 }
1929 },
1930 onCancel() {
1931 if (stream) {
1932 stream.cancelled = true;
1933 stream.emit('cancelled', 'cancelled');
1934 stream.destroy();
1935 }
1936 },
1937 });
1938}
1939
1940function handleBidiStreaming<RequestType, ResponseType>(
1941 call: ServerInterceptingCallInterface,
1942 handler: BidiStreamingHandler<RequestType, ResponseType>
1943): void {
1944 let stream: ServerDuplexStream<RequestType, ResponseType>;
1945
1946 call.start({
1947 onReceiveMetadata(metadata) {
1948 stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
1949 try {
1950 handler.func(stream);
1951 } catch (err) {
1952 call.sendStatus({
1953 code: Status.UNKNOWN,
1954 details: `Server method handler threw error ${
1955 (err as Error).message
1956 }`,
1957 metadata: null,
1958 });
1959 }
1960 },
1961 onReceiveMessage(message) {
1962 stream.push(message);
1963 },
1964 onReceiveHalfClose() {
1965 stream.push(null);
1966 },
1967 onCancel() {
1968 if (stream) {
1969 stream.cancelled = true;
1970 stream.emit('cancelled', 'cancelled');
1971 stream.destroy();
1972 }
1973 },
1974 });
1975}