/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as http2 from 'http2'; import * as util from 'util'; import { ServiceError } from './call'; import { Status, LogVerbosity } from './constants'; import { Deserialize, Serialize, ServiceDefinition } from './make-client'; import { Metadata } from './metadata'; import { BidiStreamingHandler, ClientStreamingHandler, HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerDuplexStreamImpl, ServerReadableStream, ServerStreamingHandler, ServerUnaryCall, ServerWritableStream, ServerWritableStreamImpl, UnaryHandler, ServerErrorResponse, ServerStatusResponse, serverErrorToStatus, } from './server-call'; import { ServerCredentials } from './server-credentials'; import { ChannelOptions } from './channel-options'; import { createResolver, ResolverListener, mapUriDefaultScheme, } from './resolver'; import * as logging from './logging'; import { SubchannelAddress, isTcpSubchannelAddress, subchannelAddressToString, stringToSubchannelAddress, } from './subchannel-address'; import { GrpcUri, combineHostPort, parseUri, splitHostPort, uriToString, } from './uri-parser'; import { ChannelzCallTracker, ChannelzCallTrackerStub, ChannelzChildrenTracker, ChannelzChildrenTrackerStub, ChannelzTrace, ChannelzTraceStub, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef, } from './channelz'; import { CipherNameAndProtocol, TLSSocket } from 'tls'; import { ServerInterceptingCallInterface, ServerInterceptor, getServerInterceptingCall, } from './server-interceptors'; import { PartialStatusObject } from './call-interface'; import { CallEventTracker } from './transport'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; const MAX_CONNECTION_IDLE_MS = ~(1 << 31); const { HTTP2_HEADER_PATH } = http2.constants; const TRACER_NAME = 'server'; const kMaxAge = Buffer.from('max_age'); type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer; interface BindResult { port: number; count: number; errors: string[]; } interface SingleAddressBindResult { port: number; error?: string; } function noop(): void {} /** * Decorator to wrap a class method with util.deprecate * @param message The message to output if the deprecated method is called * @returns */ function deprecate(message: string) { return function ( target: (this: This, ...args: Args) => Return, context: ClassMethodDecoratorContext< This, (this: This, ...args: Args) => Return > ) { return util.deprecate(target, message); }; } function getUnimplementedStatusResponse( methodName: string ): PartialStatusObject { return { code: Status.UNIMPLEMENTED, details: `The server does not implement the method ${methodName}`, }; } /* eslint-disable @typescript-eslint/no-explicit-any */ type UntypedUnaryHandler = UnaryHandler; type UntypedClientStreamingHandler = ClientStreamingHandler; type UntypedServerStreamingHandler = ServerStreamingHandler; type UntypedBidiStreamingHandler = BidiStreamingHandler; export type UntypedHandleCall = HandleCall; type UntypedHandler = Handler; export interface UntypedServiceImplementation { [name: string]: UntypedHandleCall; } function getDefaultHandler(handlerType: HandlerType, methodName: string) { const unimplementedStatusResponse = getUnimplementedStatusResponse(methodName); switch (handlerType) { case 'unary': return ( call: ServerUnaryCall, callback: sendUnaryData ) => { callback(unimplementedStatusResponse as ServiceError, null); }; case 'clientStream': return ( call: ServerReadableStream, callback: sendUnaryData ) => { callback(unimplementedStatusResponse as ServiceError, null); }; case 'serverStream': return (call: ServerWritableStream) => { call.emit('error', unimplementedStatusResponse); }; case 'bidi': return (call: ServerDuplexStream) => { call.emit('error', unimplementedStatusResponse); }; default: throw new Error(`Invalid handlerType ${handlerType}`); } } interface ChannelzSessionInfo { ref: SocketRef; streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; messagesSent: number; messagesReceived: number; keepAlivesSent: number; lastMessageSentTimestamp: Date | null; lastMessageReceivedTimestamp: Date | null; } /** * Information related to a single invocation of bindAsync. This should be * tracked in a map keyed by target string, normalized with a pass through * parseUri -> mapUriDefaultScheme -> uriToString. If the target has a port * number and the port number is 0, the target string is modified with the * concrete bound port. */ interface BoundPort { /** * The key used to refer to this object in the boundPorts map. */ mapKey: string; /** * The target string, passed through parseUri -> mapUriDefaultScheme. Used * to determine the final key when the port number is 0. */ originalUri: GrpcUri; /** * If there is a pending bindAsync operation, this is a promise that resolves * with the port number when that operation succeeds. If there is no such * operation pending, this is null. */ completionPromise: Promise | null; /** * The port number that was actually bound. Populated only after * completionPromise resolves. */ portNumber: number; /** * Set by unbind if called while pending is true. */ cancelled: boolean; /** * The credentials object passed to the original bindAsync call. */ credentials: ServerCredentials; /** * The set of servers associated with this listening port. A target string * that expands to multiple addresses will result in multiple listening * servers. */ listeningServers: Set; } /** * Should be in a map keyed by AnyHttp2Server. */ interface Http2ServerInfo { channelzRef: SocketRef; sessions: Set; } interface SessionIdleTimeoutTracker { activeStreams: number; lastIdle: number; timeout: NodeJS.Timeout; onClose: (session: http2.ServerHttp2Session) => void | null; } export interface ServerOptions extends ChannelOptions { interceptors?: ServerInterceptor[]; } export class Server { private boundPorts: Map = new Map(); private http2Servers: Map = new Map(); private sessionIdleTimeouts = new Map< http2.ServerHttp2Session, SessionIdleTimeoutTracker >(); private handlers: Map = new Map< string, UntypedHandler >(); private sessions = new Map(); /** * This field only exists to ensure that the start method throws an error if * it is called twice, as it did previously. */ private started = false; private shutdown = false; private options: ServerOptions; private serverAddressString = 'null'; // Channelz Info private readonly channelzEnabled: boolean = true; private channelzRef: ServerRef; private channelzTrace: ChannelzTrace | ChannelzTraceStub; private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; private listenerChildrenTracker: | ChannelzChildrenTracker | ChannelzChildrenTrackerStub; private sessionChildrenTracker: | ChannelzChildrenTracker | ChannelzChildrenTrackerStub; private readonly maxConnectionAgeMs: number; private readonly maxConnectionAgeGraceMs: number; private readonly keepaliveTimeMs: number; private readonly keepaliveTimeoutMs: number; private readonly sessionIdleTimeout: number; private readonly interceptors: ServerInterceptor[]; /** * Options that will be used to construct all Http2Server instances for this * Server. */ private commonServerOptions: http2.ServerOptions; constructor(options?: ServerOptions) { this.options = options ?? {}; if (this.options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; this.channelzTrace = new ChannelzTraceStub(); this.callTracker = new ChannelzCallTrackerStub(); this.listenerChildrenTracker = new ChannelzChildrenTrackerStub(); this.sessionChildrenTracker = new ChannelzChildrenTrackerStub(); } else { this.channelzTrace = new ChannelzTrace(); this.callTracker = new ChannelzCallTracker(); this.listenerChildrenTracker = new ChannelzChildrenTracker(); this.sessionChildrenTracker = new ChannelzChildrenTracker(); } this.channelzRef = registerChannelzServer( 'server', () => this.getChannelzInfo(), this.channelzEnabled ); this.channelzTrace.addTrace('CT_INFO', 'Server created'); this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; this.maxConnectionAgeGraceMs = this.options['grpc.max_connection_age_grace_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS; this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; this.sessionIdleTimeout = this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS; this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, }; if ('grpc-node.max_session_memory' in this.options) { this.commonServerOptions.maxSessionMemory = this.options['grpc-node.max_session_memory']; } else { /* By default, set a very large max session memory limit, to effectively * disable enforcement of the limit. Some testing indicates that Node's * behavior degrades badly when this limit is reached, so we solve that * by disabling the check entirely. */ this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; } if ('grpc.max_concurrent_streams' in this.options) { this.commonServerOptions.settings = { maxConcurrentStreams: this.options['grpc.max_concurrent_streams'], }; } this.interceptors = this.options.interceptors ?? []; this.trace('Server constructed'); } private getChannelzInfo(): ServerInfo { return { trace: this.channelzTrace, callTracker: this.callTracker, listenerChildren: this.listenerChildrenTracker.getChildLists(), sessionChildren: this.sessionChildrenTracker.getChildLists(), }; } private getChannelzSessionInfo( session: http2.ServerHttp2Session ): SocketInfo { const sessionInfo = this.sessions.get(session)!; const sessionSocket = session.socket; const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress( sessionSocket.remoteAddress, sessionSocket.remotePort ) : null; const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress( sessionSocket.localAddress!, sessionSocket.localPort ) : null; let tlsInfo: TlsInfo | null; if (session.encrypted) { const tlsSocket: TLSSocket = sessionSocket as TLSSocket; const cipherInfo: CipherNameAndProtocol & { standardName?: string } = tlsSocket.getCipher(); const certificate = tlsSocket.getCertificate(); const peerCertificate = tlsSocket.getPeerCertificate(); tlsInfo = { cipherSuiteStandardName: cipherInfo.standardName ?? null, cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, localCertificate: certificate && 'raw' in certificate ? certificate.raw : null, remoteCertificate: peerCertificate && 'raw' in peerCertificate ? peerCertificate.raw : null, }; } else { tlsInfo = null; } const socketInfo: SocketInfo = { remoteAddress: remoteAddress, localAddress: localAddress, security: tlsInfo, remoteName: null, streamsStarted: sessionInfo.streamTracker.callsStarted, streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, streamsFailed: sessionInfo.streamTracker.callsFailed, messagesSent: sessionInfo.messagesSent, messagesReceived: sessionInfo.messagesReceived, keepAlivesSent: sessionInfo.keepAlivesSent, lastLocalStreamCreatedTimestamp: null, lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp, lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, localFlowControlWindow: session.state.localWindowSize ?? null, remoteFlowControlWindow: session.state.remoteWindowSize ?? null, }; return socketInfo; } private trace(text: string): void { logging.trace( LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text ); } addProtoService(): never { throw new Error('Not implemented. Use addService() instead'); } addService( service: ServiceDefinition, implementation: UntypedServiceImplementation ): void { if ( service === null || typeof service !== 'object' || implementation === null || typeof implementation !== 'object' ) { throw new Error('addService() requires two objects as arguments'); } const serviceKeys = Object.keys(service); if (serviceKeys.length === 0) { throw new Error('Cannot add an empty service to a server'); } serviceKeys.forEach(name => { const attrs = service[name]; let methodType: HandlerType; if (attrs.requestStream) { if (attrs.responseStream) { methodType = 'bidi'; } else { methodType = 'clientStream'; } } else { if (attrs.responseStream) { methodType = 'serverStream'; } else { methodType = 'unary'; } } let implFn = implementation[name]; let impl; if (implFn === undefined && typeof attrs.originalName === 'string') { implFn = implementation[attrs.originalName]; } if (implFn !== undefined) { impl = implFn.bind(implementation); } else { impl = getDefaultHandler(methodType, name); } const success = this.register( attrs.path, impl as UntypedHandleCall, attrs.responseSerialize, attrs.requestDeserialize, methodType ); if (success === false) { throw new Error(`Method handler for ${attrs.path} already provided.`); } }); } removeService(service: ServiceDefinition): void { if (service === null || typeof service !== 'object') { throw new Error('removeService() requires object as argument'); } const serviceKeys = Object.keys(service); serviceKeys.forEach(name => { const attrs = service[name]; this.unregister(attrs.path); }); } bind(port: string, creds: ServerCredentials): never { throw new Error('Not implemented. Use bindAsync() instead'); } private registerListenerToChannelz(boundAddress: SubchannelAddress) { return registerChannelzSocket( subchannelAddressToString(boundAddress), () => { return { localAddress: boundAddress, remoteAddress: null, security: null, remoteName: null, streamsStarted: 0, streamsSucceeded: 0, streamsFailed: 0, messagesSent: 0, messagesReceived: 0, keepAlivesSent: 0, lastLocalStreamCreatedTimestamp: null, lastRemoteStreamCreatedTimestamp: null, lastMessageSentTimestamp: null, lastMessageReceivedTimestamp: null, localFlowControlWindow: null, remoteFlowControlWindow: null, }; }, this.channelzEnabled ); } private createHttp2Server(credentials: ServerCredentials) { let http2Server: http2.Http2Server | http2.Http2SecureServer; if (credentials._isSecure()) { const secureServerOptions = Object.assign( this.commonServerOptions, credentials._getSettings()! ); secureServerOptions.enableTrace = this.options['grpc-node.tls_enable_trace'] === 1; http2Server = http2.createSecureServer(secureServerOptions); http2Server.on('secureConnection', (socket: TLSSocket) => { /* These errors need to be handled by the user of Http2SecureServer, * according to https://github.com/nodejs/node/issues/35824 */ socket.on('error', (e: Error) => { this.trace( 'An incoming TLS connection closed with error: ' + e.message ); }); }); } else { http2Server = http2.createServer(this.commonServerOptions); } http2Server.setTimeout(0, noop); this._setupHandlers(http2Server); return http2Server; } private bindOneAddress( address: SubchannelAddress, boundPortObject: BoundPort ): Promise { this.trace('Attempting to bind ' + subchannelAddressToString(address)); const http2Server = this.createHttp2Server(boundPortObject.credentials); return new Promise((resolve, reject) => { const onError = (err: Error) => { this.trace( 'Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message ); resolve({ port: 'port' in address ? address.port : 1, error: err.message, }); }; http2Server.once('error', onError); http2Server.listen(address, () => { const boundAddress = http2Server.address()!; let boundSubchannelAddress: SubchannelAddress; if (typeof boundAddress === 'string') { boundSubchannelAddress = { path: boundAddress, }; } else { boundSubchannelAddress = { host: boundAddress.address, port: boundAddress.port, }; } const channelzRef = this.registerListenerToChannelz( boundSubchannelAddress ); this.listenerChildrenTracker.refChild(channelzRef); this.http2Servers.set(http2Server, { channelzRef: channelzRef, sessions: new Set(), }); boundPortObject.listeningServers.add(http2Server); this.trace( 'Successfully bound ' + subchannelAddressToString(boundSubchannelAddress) ); resolve({ port: 'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1, }); http2Server.removeListener('error', onError); }); }); } private async bindManyPorts( addressList: SubchannelAddress[], boundPortObject: BoundPort ): Promise { if (addressList.length === 0) { return { count: 0, port: 0, errors: [], }; } if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) { /* If binding to port 0, first try to bind the first address, then bind * the rest of the address list to the specific port that it binds. */ const firstAddressResult = await this.bindOneAddress( addressList[0], boundPortObject ); if (firstAddressResult.error) { /* If the first address fails to bind, try the same operation starting * from the second item in the list. */ const restAddressResult = await this.bindManyPorts( addressList.slice(1), boundPortObject ); return { ...restAddressResult, errors: [firstAddressResult.error, ...restAddressResult.errors], }; } else { const restAddresses = addressList .slice(1) .map(address => isTcpSubchannelAddress(address) ? { host: address.host, port: firstAddressResult.port } : address ); const restAddressResult = await Promise.all( restAddresses.map(address => this.bindOneAddress(address, boundPortObject) ) ); const allResults = [firstAddressResult, ...restAddressResult]; return { count: allResults.filter(result => result.error === undefined).length, port: firstAddressResult.port, errors: allResults .filter(result => result.error) .map(result => result.error!), }; } } else { const allResults = await Promise.all( addressList.map(address => this.bindOneAddress(address, boundPortObject) ) ); return { count: allResults.filter(result => result.error === undefined).length, port: allResults[0].port, errors: allResults .filter(result => result.error) .map(result => result.error!), }; } } private async bindAddressList( addressList: SubchannelAddress[], boundPortObject: BoundPort ): Promise { const bindResult = await this.bindManyPorts(addressList, boundPortObject); if (bindResult.count > 0) { if (bindResult.count < addressList.length) { logging.log( LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved` ); } return bindResult.port; } else { const errorString = `No address added out of total ${addressList.length} resolved`; logging.log(LogVerbosity.ERROR, errorString); throw new Error( `${errorString} errors: [${bindResult.errors.join(',')}]` ); } } private resolvePort(port: GrpcUri): Promise { return new Promise((resolve, reject) => { const resolverListener: ResolverListener = { onSuccessfulResolution: ( endpointList, serviceConfig, serviceConfigError ) => { // We only want one resolution result. Discard all future results resolverListener.onSuccessfulResolution = () => {}; const addressList = ([] as SubchannelAddress[]).concat( ...endpointList.map(endpoint => endpoint.addresses) ); if (addressList.length === 0) { reject(new Error(`No addresses resolved for port ${port}`)); return; } resolve(addressList); }, onError: error => { reject(new Error(error.details)); }, }; const resolver = createResolver(port, resolverListener, this.options); resolver.updateResolution(); }); } private async bindPort( port: GrpcUri, boundPortObject: BoundPort ): Promise { const addressList = await this.resolvePort(port); if (boundPortObject.cancelled) { this.completeUnbind(boundPortObject); throw new Error('bindAsync operation cancelled by unbind call'); } const portNumber = await this.bindAddressList(addressList, boundPortObject); if (boundPortObject.cancelled) { this.completeUnbind(boundPortObject); throw new Error('bindAsync operation cancelled by unbind call'); } return portNumber; } private normalizePort(port: string): GrpcUri { const initialPortUri = parseUri(port); if (initialPortUri === null) { throw new Error(`Could not parse port "${port}"`); } const portUri = mapUriDefaultScheme(initialPortUri); if (portUri === null) { throw new Error(`Could not get a default scheme for port "${port}"`); } return portUri; } bindAsync( port: string, creds: ServerCredentials, callback: (error: Error | null, port: number) => void ): void { if (this.shutdown) { throw new Error('bindAsync called after shutdown'); } if (typeof port !== 'string') { throw new TypeError('port must be a string'); } if (creds === null || !(creds instanceof ServerCredentials)) { throw new TypeError('creds must be a ServerCredentials object'); } if (typeof callback !== 'function') { throw new TypeError('callback must be a function'); } this.trace('bindAsync port=' + port); const portUri = this.normalizePort(port); const deferredCallback = (error: Error | null, port: number) => { process.nextTick(() => callback(error, port)); }; /* First, if this port is already bound or that bind operation is in * progress, use that result. */ let boundPortObject = this.boundPorts.get(uriToString(portUri)); if (boundPortObject) { if (!creds._equals(boundPortObject.credentials)) { deferredCallback( new Error(`${port} already bound with incompatible credentials`), 0 ); return; } /* If that operation has previously been cancelled by an unbind call, * uncancel it. */ boundPortObject.cancelled = false; if (boundPortObject.completionPromise) { boundPortObject.completionPromise.then( portNum => callback(null, portNum), error => callback(error as Error, 0) ); } else { deferredCallback(null, boundPortObject.portNumber); } return; } boundPortObject = { mapKey: uriToString(portUri), originalUri: portUri, completionPromise: null, cancelled: false, portNumber: 0, credentials: creds, listeningServers: new Set(), }; const splitPort = splitHostPort(portUri.path); const completionPromise = this.bindPort(portUri, boundPortObject); boundPortObject.completionPromise = completionPromise; /* If the port number is 0, defer populating the map entry until after the * bind operation completes and we have a specific port number. Otherwise, * populate it immediately. */ if (splitPort?.port === 0) { completionPromise.then( portNum => { const finalUri: GrpcUri = { scheme: portUri.scheme, authority: portUri.authority, path: combineHostPort({ host: splitPort.host, port: portNum }), }; boundPortObject!.mapKey = uriToString(finalUri); boundPortObject!.completionPromise = null; boundPortObject!.portNumber = portNum; this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!); callback(null, portNum); }, error => { callback(error, 0); } ); } else { this.boundPorts.set(boundPortObject.mapKey, boundPortObject); completionPromise.then( portNum => { boundPortObject!.completionPromise = null; boundPortObject!.portNumber = portNum; callback(null, portNum); }, error => { callback(error, 0); } ); } } private closeServer(server: AnyHttp2Server, callback?: () => void) { this.trace( 'Closing server with address ' + JSON.stringify(server.address()) ); const serverInfo = this.http2Servers.get(server); server.close(() => { if (serverInfo) { this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef); unregisterChannelzRef(serverInfo.channelzRef); } this.http2Servers.delete(server); callback?.(); }); } private closeSession( session: http2.ServerHttp2Session, callback?: () => void ) { this.trace('Closing session initiated by ' + session.socket?.remoteAddress); const sessionInfo = this.sessions.get(session); const closeCallback = () => { if (sessionInfo) { this.sessionChildrenTracker.unrefChild(sessionInfo.ref); unregisterChannelzRef(sessionInfo.ref); } callback?.(); }; if (session.closed) { queueMicrotask(closeCallback); } else { session.close(closeCallback); } } private completeUnbind(boundPortObject: BoundPort) { for (const server of boundPortObject.listeningServers) { const serverInfo = this.http2Servers.get(server); this.closeServer(server, () => { boundPortObject.listeningServers.delete(server); }); if (serverInfo) { for (const session of serverInfo.sessions) { this.closeSession(session); } } } this.boundPorts.delete(boundPortObject.mapKey); } /** * Unbind a previously bound port, or cancel an in-progress bindAsync * operation. If port 0 was bound, only the actual bound port can be * unbound. For example, if bindAsync was called with "localhost:0" and the * bound port result was 54321, it can be unbound as "localhost:54321". * @param port */ unbind(port: string): void { this.trace('unbind port=' + port); const portUri = this.normalizePort(port); const splitPort = splitHostPort(portUri.path); if (splitPort?.port === 0) { throw new Error('Cannot unbind port 0'); } const boundPortObject = this.boundPorts.get(uriToString(portUri)); if (boundPortObject) { this.trace( 'unbinding ' + boundPortObject.mapKey + ' originally bound as ' + uriToString(boundPortObject.originalUri) ); /* If the bind operation is pending, the cancelled flag will trigger * the unbind operation later. */ if (boundPortObject.completionPromise) { boundPortObject.cancelled = true; } else { this.completeUnbind(boundPortObject); } } } /** * Gracefully close all connections associated with a previously bound port. * After the grace time, forcefully close all remaining open connections. * * If port 0 was bound, only the actual bound port can be * drained. For example, if bindAsync was called with "localhost:0" and the * bound port result was 54321, it can be drained as "localhost:54321". * @param port * @param graceTimeMs * @returns */ drain(port: string, graceTimeMs: number): void { this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs); const portUri = this.normalizePort(port); const splitPort = splitHostPort(portUri.path); if (splitPort?.port === 0) { throw new Error('Cannot drain port 0'); } const boundPortObject = this.boundPorts.get(uriToString(portUri)); if (!boundPortObject) { return; } const allSessions: Set = new Set(); for (const http2Server of boundPortObject.listeningServers) { const serverEntry = this.http2Servers.get(http2Server); if (serverEntry) { for (const session of serverEntry.sessions) { allSessions.add(session); this.closeSession(session, () => { allSessions.delete(session); }); } } } /* After the grace time ends, send another goaway to all remaining sessions * with the CANCEL code. */ setTimeout(() => { for (const session of allSessions) { session.destroy(http2.constants.NGHTTP2_CANCEL as any); } }, graceTimeMs).unref?.(); } forceShutdown(): void { for (const boundPortObject of this.boundPorts.values()) { boundPortObject.cancelled = true; } this.boundPorts.clear(); // Close the server if it is still running. for (const server of this.http2Servers.keys()) { this.closeServer(server); } // Always destroy any available sessions. It's possible that one or more // tryShutdown() calls are in progress. Don't wait on them to finish. this.sessions.forEach((channelzInfo, session) => { this.closeSession(session); // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to // recognize destroy(code) as a valid signature. // eslint-disable-next-line @typescript-eslint/no-explicit-any session.destroy(http2.constants.NGHTTP2_CANCEL as any); }); this.sessions.clear(); unregisterChannelzRef(this.channelzRef); this.shutdown = true; } register( name: string, handler: HandleCall, serialize: Serialize, deserialize: Deserialize, type: string ): boolean { if (this.handlers.has(name)) { return false; } this.handlers.set(name, { func: handler, serialize, deserialize, type, path: name, } as UntypedHandler); return true; } unregister(name: string): boolean { return this.handlers.delete(name); } /** * @deprecated No longer needed as of version 1.10.x */ @deprecate( 'Calling start() is no longer necessary. It can be safely omitted.' ) start(): void { if ( this.http2Servers.size === 0 || [...this.http2Servers.keys()].every(server => !server.listening) ) { throw new Error('server must be bound in order to start'); } if (this.started === true) { throw new Error('server is already started'); } this.started = true; } tryShutdown(callback: (error?: Error) => void): void { const wrappedCallback = (error?: Error) => { unregisterChannelzRef(this.channelzRef); callback(error); }; let pendingChecks = 0; function maybeCallback(): void { pendingChecks--; if (pendingChecks === 0) { wrappedCallback(); } } this.shutdown = true; for (const [serverKey, server] of this.http2Servers.entries()) { pendingChecks++; const serverString = server.channelzRef.name; this.trace('Waiting for server ' + serverString + ' to close'); this.closeServer(serverKey, () => { this.trace('Server ' + serverString + ' finished closing'); maybeCallback(); }); for (const session of server.sessions.keys()) { pendingChecks++; const sessionString = session.socket?.remoteAddress; this.trace('Waiting for session ' + sessionString + ' to close'); this.closeSession(session, () => { this.trace('Session ' + sessionString + ' finished closing'); maybeCallback(); }); } } if (pendingChecks === 0) { wrappedCallback(); } } addHttp2Port(): never { throw new Error('Not yet implemented'); } /** * Get the channelz reference object for this server. The returned value is * garbage if channelz is disabled for this server. * @returns */ getChannelzRef() { return this.channelzRef; } private _verifyContentType( stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders ): boolean { const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; if ( typeof contentType !== 'string' || !contentType.startsWith('application/grpc') ) { stream.respond( { [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, }, { endStream: true } ); return false; } return true; } private _retrieveHandler(path: string): Handler | null { this.trace( 'Received call to method ' + path + ' at address ' + this.serverAddressString ); const handler = this.handlers.get(path); if (handler === undefined) { this.trace( 'No handler registered for method ' + path + '. Sending UNIMPLEMENTED status.' ); return null; } return handler; } private _respondWithError( err: PartialStatusObject, stream: http2.ServerHttp2Stream, channelzSessionInfo: ChannelzSessionInfo | null = null ) { const trailersToSend = { 'grpc-status': err.code ?? Status.INTERNAL, 'grpc-message': err.details, [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto', ...err.metadata?.toHttp2Headers(), }; stream.respond(trailersToSend, { endStream: true }); this.callTracker.addCallFailed(); channelzSessionInfo?.streamTracker.addCallFailed(); } private _channelzHandler( stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders ) { // for handling idle timeout this.onStreamOpened(stream); const channelzSessionInfo = this.sessions.get( stream.session as http2.ServerHttp2Session ); this.callTracker.addCallStarted(); channelzSessionInfo?.streamTracker.addCallStarted(); if (!this._verifyContentType(stream, headers)) { this.callTracker.addCallFailed(); channelzSessionInfo?.streamTracker.addCallFailed(); return; } const path = headers[HTTP2_HEADER_PATH] as string; const handler = this._retrieveHandler(path); if (!handler) { this._respondWithError( getUnimplementedStatusResponse(path), stream, channelzSessionInfo ); return; } const callEventTracker: CallEventTracker = { addMessageSent: () => { if (channelzSessionInfo) { channelzSessionInfo.messagesSent += 1; channelzSessionInfo.lastMessageSentTimestamp = new Date(); } }, addMessageReceived: () => { if (channelzSessionInfo) { channelzSessionInfo.messagesReceived += 1; channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); } }, onCallEnd: status => { if (status.code === Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } }, onStreamEnd: success => { if (channelzSessionInfo) { if (success) { channelzSessionInfo.streamTracker.addCallSucceeded(); } else { channelzSessionInfo.streamTracker.addCallFailed(); } } }, }; const call = getServerInterceptingCall( this.interceptors, stream, headers, callEventTracker, handler, this.options ); if (!this._runHandlerForCall(call, handler)) { this.callTracker.addCallFailed(); channelzSessionInfo?.streamTracker.addCallFailed(); call.sendStatus({ code: Status.INTERNAL, details: `Unknown handler type: ${handler.type}`, }); } } private _streamHandler( stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders ) { // for handling idle timeout this.onStreamOpened(stream); if (this._verifyContentType(stream, headers) !== true) { return; } const path = headers[HTTP2_HEADER_PATH] as string; const handler = this._retrieveHandler(path); if (!handler) { this._respondWithError( getUnimplementedStatusResponse(path), stream, null ); return; } const call = getServerInterceptingCall( this.interceptors, stream, headers, null, handler, this.options ); if (!this._runHandlerForCall(call, handler)) { call.sendStatus({ code: Status.INTERNAL, details: `Unknown handler type: ${handler.type}`, }); } } private _runHandlerForCall( call: ServerInterceptingCallInterface, handler: | UntypedUnaryHandler | UntypedClientStreamingHandler | UntypedServerStreamingHandler | UntypedBidiStreamingHandler ): boolean { const { type } = handler; if (type === 'unary') { handleUnary(call, handler); } else if (type === 'clientStream') { handleClientStreaming(call, handler); } else if (type === 'serverStream') { handleServerStreaming(call, handler); } else if (type === 'bidi') { handleBidiStreaming(call, handler); } else { return false; } return true; } private _setupHandlers( http2Server: http2.Http2Server | http2.Http2SecureServer ): void { if (http2Server === null) { return; } const serverAddress = http2Server.address(); let serverAddressString = 'null'; if (serverAddress) { if (typeof serverAddress === 'string') { serverAddressString = serverAddress; } else { serverAddressString = serverAddress.address + ':' + serverAddress.port; } } this.serverAddressString = serverAddressString; const handler = this.channelzEnabled ? this._channelzHandler : this._streamHandler; const sessionHandler = this.channelzEnabled ? this._channelzSessionHandler(http2Server) : this._sessionHandler(http2Server); http2Server.on('stream', handler.bind(this)); http2Server.on('session', sessionHandler); } private _sessionHandler( http2Server: http2.Http2Server | http2.Http2SecureServer ) { return (session: http2.ServerHttp2Session) => { this.http2Servers.get(http2Server)?.sessions.add(session); let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keeapliveTimeTimer: NodeJS.Timeout | null = null; let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { // Apply a random jitter within a +/-10% range const jitterMagnitude = this.maxConnectionAgeMs / 10; const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; connectionAgeTimer = setTimeout(() => { sessionClosedByServer = true; this.trace( 'Connection dropped by max connection age: ' + session.socket?.remoteAddress ); try { session.goaway( http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), kMaxAge ); } catch (e) { // The goaway can't be sent because the session is already closed session.destroy(); return; } session.close(); /* Allow a grace period after sending the GOAWAY before forcibly * closing the connection. */ if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); }, this.maxConnectionAgeGraceMs); connectionAgeGraceTimer.unref?.(); } }, this.maxConnectionAgeMs + jitter); connectionAgeTimer.unref?.(); } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { keeapliveTimeTimer = setInterval(() => { keepaliveTimeoutTimer = setTimeout(() => { sessionClosedByServer = true; session.close(); }, this.keepaliveTimeoutMs); keepaliveTimeoutTimer.unref?.(); try { session.ping( (err: Error | null, duration: number, payload: Buffer) => { if (keepaliveTimeoutTimer) { clearTimeout(keepaliveTimeoutTimer); } if (err) { sessionClosedByServer = true; this.trace( 'Connection dropped due to error of a ping frame ' + err.message + ' return in ' + duration ); session.close(); } } ); } catch (e) { clearTimeout(keepaliveTimeoutTimer); // The ping can't be sent because the session is already closed session.destroy(); } }, this.keepaliveTimeMs); keeapliveTimeTimer.unref?.(); } session.on('close', () => { if (!sessionClosedByServer) { this.trace( `Connection dropped by client ${session.socket?.remoteAddress}` ); } if (connectionAgeTimer) { clearTimeout(connectionAgeTimer); } if (connectionAgeGraceTimer) { clearTimeout(connectionAgeGraceTimer); } if (keeapliveTimeTimer) { clearInterval(keeapliveTimeTimer); if (keepaliveTimeoutTimer) { clearTimeout(keepaliveTimeoutTimer); } } if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); this.sessionIdleTimeouts.delete(session); } this.http2Servers.get(http2Server)?.sessions.delete(session); }); }; } private _channelzSessionHandler( http2Server: http2.Http2Server | http2.Http2SecureServer ) { return (session: http2.ServerHttp2Session) => { const channelzRef = registerChannelzSocket( session.socket?.remoteAddress ?? 'unknown', this.getChannelzSessionInfo.bind(this, session), this.channelzEnabled ); const channelzSessionInfo: ChannelzSessionInfo = { ref: channelzRef, streamTracker: new ChannelzCallTracker(), messagesSent: 0, messagesReceived: 0, keepAlivesSent: 0, lastMessageSentTimestamp: null, lastMessageReceivedTimestamp: null, }; this.http2Servers.get(http2Server)?.sessions.add(session); this.sessions.set(session, channelzSessionInfo); const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`; this.channelzTrace.addTrace( 'CT_INFO', 'Connection established by client ' + clientAddress ); this.trace('Connection established by client ' + clientAddress); this.sessionChildrenTracker.refChild(channelzRef); let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keeapliveTimeTimer: NodeJS.Timeout | null = null; let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { // Apply a random jitter within a +/-10% range const jitterMagnitude = this.maxConnectionAgeMs / 10; const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; connectionAgeTimer = setTimeout(() => { sessionClosedByServer = true; this.channelzTrace.addTrace( 'CT_INFO', 'Connection dropped by max connection age from ' + clientAddress ); try { session.goaway( http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), kMaxAge ); } catch (e) { // The goaway can't be sent because the session is already closed session.destroy(); return; } session.close(); /* Allow a grace period after sending the GOAWAY before forcibly * closing the connection. */ if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); }, this.maxConnectionAgeGraceMs); connectionAgeGraceTimer.unref?.(); } }, this.maxConnectionAgeMs + jitter); connectionAgeTimer.unref?.(); } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { keeapliveTimeTimer = setInterval(() => { keepaliveTimeoutTimer = setTimeout(() => { sessionClosedByServer = true; this.channelzTrace.addTrace( 'CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress ); session.close(); }, this.keepaliveTimeoutMs); keepaliveTimeoutTimer.unref?.(); try { session.ping( (err: Error | null, duration: number, payload: Buffer) => { if (keepaliveTimeoutTimer) { clearTimeout(keepaliveTimeoutTimer); } if (err) { sessionClosedByServer = true; this.channelzTrace.addTrace( 'CT_INFO', 'Connection dropped due to error of a ping frame ' + err.message + ' return in ' + duration ); session.close(); } } ); channelzSessionInfo.keepAlivesSent += 1; } catch (e) { clearTimeout(keepaliveTimeoutTimer); // The ping can't be sent because the session is already closed session.destroy(); } }, this.keepaliveTimeMs); keeapliveTimeTimer.unref?.(); } session.on('close', () => { if (!sessionClosedByServer) { this.channelzTrace.addTrace( 'CT_INFO', 'Connection dropped by client ' + clientAddress ); } this.sessionChildrenTracker.unrefChild(channelzRef); unregisterChannelzRef(channelzRef); if (connectionAgeTimer) { clearTimeout(connectionAgeTimer); } if (connectionAgeGraceTimer) { clearTimeout(connectionAgeGraceTimer); } if (keeapliveTimeTimer) { clearInterval(keeapliveTimeTimer); if (keepaliveTimeoutTimer) { clearTimeout(keepaliveTimeoutTimer); } } if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); this.sessionIdleTimeouts.delete(session); } this.http2Servers.get(http2Server)?.sessions.delete(session); this.sessions.delete(session); }); }; } private enableIdleTimeout( session: http2.ServerHttp2Session ): SessionIdleTimeoutTracker | null { if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) { return null; } const idleTimeoutObj: SessionIdleTimeoutTracker = { activeStreams: 0, lastIdle: Date.now(), onClose: this.onStreamClose.bind(this, session), timeout: setTimeout( this.onIdleTimeout, this.sessionIdleTimeout, this, session ), }; idleTimeoutObj.timeout.unref?.(); this.sessionIdleTimeouts.set(session, idleTimeoutObj); const { socket } = session; this.trace( 'Enable idle timeout for ' + socket.remoteAddress + ':' + socket.remotePort ); return idleTimeoutObj; } private onIdleTimeout( this: undefined, ctx: Server, session: http2.ServerHttp2Session ) { const { socket } = session; const sessionInfo = ctx.sessionIdleTimeouts.get(session); // if it is called while we have activeStreams - timer will not be rescheduled // until last active stream is closed, then it will call .refresh() on the timer // important part is to not clearTimeout(timer) or it becomes unusable // for future refreshes if ( sessionInfo !== undefined && sessionInfo.activeStreams === 0 && Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout ) { ctx.trace( 'Session idle timeout triggered for ' + socket?.remoteAddress + ':' + socket?.remotePort + ' last idle at ' + sessionInfo.lastIdle ); ctx.closeSession(session); } } private onStreamOpened(stream: http2.ServerHttp2Stream) { const session = stream.session as http2.ServerHttp2Session; const idleTimeoutObj = this.sessionIdleTimeouts.get(session); if (idleTimeoutObj) { idleTimeoutObj.activeStreams += 1; stream.once('close', idleTimeoutObj.onClose); } } private onStreamClose(session: http2.ServerHttp2Session) { const idleTimeoutObj = this.sessionIdleTimeouts.get(session); if (idleTimeoutObj) { idleTimeoutObj.activeStreams -= 1; if (idleTimeoutObj.activeStreams === 0) { idleTimeoutObj.lastIdle = Date.now(); idleTimeoutObj.timeout.refresh(); this.trace( 'Session onStreamClose' + session.socket?.remoteAddress + ':' + session.socket?.remotePort + ' at ' + idleTimeoutObj.lastIdle ); } } } } async function handleUnary( call: ServerInterceptingCallInterface, handler: UnaryHandler ): Promise { let stream: ServerUnaryCall; function respond( err: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, trailer?: Metadata, flags?: number ) { if (err) { call.sendStatus(serverErrorToStatus(err, trailer)); return; } call.sendMessage(value, () => { call.sendStatus({ code: Status.OK, details: 'OK', metadata: trailer ?? null, }); }); } let requestMetadata: Metadata; let requestMessage: RequestType | null = null; call.start({ onReceiveMetadata(metadata) { requestMetadata = metadata; call.startRead(); }, onReceiveMessage(message) { if (requestMessage) { call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received a second request message for server streaming method ${handler.path}`, metadata: null, }); return; } requestMessage = message; call.startRead(); }, onReceiveHalfClose() { if (!requestMessage) { call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received no request message for server streaming method ${handler.path}`, metadata: null, }); return; } stream = new ServerWritableStreamImpl( handler.path, call, requestMetadata, requestMessage ); try { handler.func(stream, respond); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, details: `Server method handler threw error ${ (err as Error).message }`, metadata: null, }); } }, onCancel() { if (stream) { stream.cancelled = true; stream.emit('cancelled', 'cancelled'); } }, }); } function handleClientStreaming( call: ServerInterceptingCallInterface, handler: ClientStreamingHandler ): void { let stream: ServerReadableStream; function respond( err: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, trailer?: Metadata, flags?: number ) { if (err) { call.sendStatus(serverErrorToStatus(err, trailer)); return; } call.sendMessage(value, () => { call.sendStatus({ code: Status.OK, details: 'OK', metadata: trailer ?? null, }); }); } call.start({ onReceiveMetadata(metadata) { stream = new ServerDuplexStreamImpl(handler.path, call, metadata); try { handler.func(stream, respond); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, details: `Server method handler threw error ${ (err as Error).message }`, metadata: null, }); } }, onReceiveMessage(message) { stream.push(message); }, onReceiveHalfClose() { stream.push(null); }, onCancel() { if (stream) { stream.cancelled = true; stream.emit('cancelled', 'cancelled'); stream.destroy(); } }, }); } function handleServerStreaming( call: ServerInterceptingCallInterface, handler: ServerStreamingHandler ): void { let stream: ServerWritableStream; let requestMetadata: Metadata; let requestMessage: RequestType | null = null; call.start({ onReceiveMetadata(metadata) { requestMetadata = metadata; call.startRead(); }, onReceiveMessage(message) { if (requestMessage) { call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received a second request message for server streaming method ${handler.path}`, metadata: null, }); return; } requestMessage = message; call.startRead(); }, onReceiveHalfClose() { if (!requestMessage) { call.sendStatus({ code: Status.UNIMPLEMENTED, details: `Received no request message for server streaming method ${handler.path}`, metadata: null, }); return; } stream = new ServerWritableStreamImpl( handler.path, call, requestMetadata, requestMessage ); try { handler.func(stream); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, details: `Server method handler threw error ${ (err as Error).message }`, metadata: null, }); } }, onCancel() { if (stream) { stream.cancelled = true; stream.emit('cancelled', 'cancelled'); stream.destroy(); } }, }); } function handleBidiStreaming( call: ServerInterceptingCallInterface, handler: BidiStreamingHandler ): void { let stream: ServerDuplexStream; call.start({ onReceiveMetadata(metadata) { stream = new ServerDuplexStreamImpl(handler.path, call, metadata); try { handler.func(stream); } catch (err) { call.sendStatus({ code: Status.UNKNOWN, details: `Server method handler threw error ${ (err as Error).message }`, metadata: null, }); } }, onReceiveMessage(message) { stream.push(message); }, onReceiveHalfClose() { stream.push(null); }, onCancel() { if (stream) { stream.cancelled = true; stream.emit('cancelled', 'cancelled'); stream.destroy(); } }, }); }