UNPKG

38.4 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import { AddressInfo } from 'net';
20
21import { ServiceError } from './call';
22import { Status, LogVerbosity } from './constants';
23import { Deserialize, Serialize, ServiceDefinition } from './make-client';
24import { Metadata } from './metadata';
25import {
26 BidiStreamingHandler,
27 ClientStreamingHandler,
28 HandleCall,
29 Handler,
30 HandlerType,
31 Http2ServerCallStream,
32 sendUnaryData,
33 ServerDuplexStream,
34 ServerDuplexStreamImpl,
35 ServerReadableStream,
36 ServerReadableStreamImpl,
37 ServerStreamingHandler,
38 ServerUnaryCall,
39 ServerUnaryCallImpl,
40 ServerWritableStream,
41 ServerWritableStreamImpl,
42 UnaryHandler,
43 ServerErrorResponse,
44 ServerStatusResponse,
45} from './server-call';
46import { ServerCredentials } from './server-credentials';
47import { ChannelOptions } from './channel-options';
48import {
49 createResolver,
50 ResolverListener,
51 mapUriDefaultScheme,
52} from './resolver';
53import * as logging from './logging';
54import {
55 SubchannelAddress,
56 TcpSubchannelAddress,
57 isTcpSubchannelAddress,
58 subchannelAddressToString,
59 stringToSubchannelAddress,
60} from './subchannel-address';
61import { parseUri } from './uri-parser';
62import {
63 ChannelzCallTracker,
64 ChannelzChildrenTracker,
65 ChannelzTrace,
66 registerChannelzServer,
67 registerChannelzSocket,
68 ServerInfo,
69 ServerRef,
70 SocketInfo,
71 SocketRef,
72 TlsInfo,
73 unregisterChannelzRef,
74} from './channelz';
75import { CipherNameAndProtocol, TLSSocket } from 'tls';
76
77const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
78const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
79const KEEPALIVE_TIMEOUT_MS = 20000;
80
81const { HTTP2_HEADER_PATH } = http2.constants;
82
83const TRACER_NAME = 'server';
84
85interface BindResult {
86 port: number;
87 count: number;
88}
89
90function noop(): void {}
91
92function getUnimplementedStatusResponse(
93 methodName: string
94): Partial<ServiceError> {
95 return {
96 code: Status.UNIMPLEMENTED,
97 details: `The server does not implement the method ${methodName}`,
98 };
99}
100
101/* eslint-disable @typescript-eslint/no-explicit-any */
102type UntypedUnaryHandler = UnaryHandler<any, any>;
103type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
104type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
105type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
106export type UntypedHandleCall = HandleCall<any, any>;
107type UntypedHandler = Handler<any, any>;
108export interface UntypedServiceImplementation {
109 [name: string]: UntypedHandleCall;
110}
111
112function getDefaultHandler(handlerType: HandlerType, methodName: string) {
113 const unimplementedStatusResponse =
114 getUnimplementedStatusResponse(methodName);
115 switch (handlerType) {
116 case 'unary':
117 return (
118 call: ServerUnaryCall<any, any>,
119 callback: sendUnaryData<any>
120 ) => {
121 callback(unimplementedStatusResponse as ServiceError, null);
122 };
123 case 'clientStream':
124 return (
125 call: ServerReadableStream<any, any>,
126 callback: sendUnaryData<any>
127 ) => {
128 callback(unimplementedStatusResponse as ServiceError, null);
129 };
130 case 'serverStream':
131 return (call: ServerWritableStream<any, any>) => {
132 call.emit('error', unimplementedStatusResponse);
133 };
134 case 'bidi':
135 return (call: ServerDuplexStream<any, any>) => {
136 call.emit('error', unimplementedStatusResponse);
137 };
138 default:
139 throw new Error(`Invalid handlerType ${handlerType}`);
140 }
141}
142
143interface ChannelzSessionInfo {
144 ref: SocketRef;
145 streamTracker: ChannelzCallTracker;
146 messagesSent: number;
147 messagesReceived: number;
148 lastMessageSentTimestamp: Date | null;
149 lastMessageReceivedTimestamp: Date | null;
150}
151
152export class Server {
153 private http2ServerList: {
154 server: http2.Http2Server | http2.Http2SecureServer;
155 channelzRef: SocketRef;
156 }[] = [];
157
158 private handlers: Map<string, UntypedHandler> = new Map<
159 string,
160 UntypedHandler
161 >();
162 private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
163 private started = false;
164 private options: ChannelOptions;
165 private serverAddressString = 'null';
166
167 // Channelz Info
168 private readonly channelzEnabled: boolean = true;
169 private channelzRef: ServerRef;
170 private channelzTrace = new ChannelzTrace();
171 private callTracker = new ChannelzCallTracker();
172 private listenerChildrenTracker = new ChannelzChildrenTracker();
173 private sessionChildrenTracker = new ChannelzChildrenTracker();
174
175 private readonly maxConnectionAgeMs: number;
176 private readonly maxConnectionAgeGraceMs: number;
177
178 private readonly keepaliveTimeMs: number;
179 private readonly keepaliveTimeoutMs: number;
180
181 constructor(options?: ChannelOptions) {
182 this.options = options ?? {};
183 if (this.options['grpc.enable_channelz'] === 0) {
184 this.channelzEnabled = false;
185 }
186 this.channelzRef = registerChannelzServer(
187 () => this.getChannelzInfo(),
188 this.channelzEnabled
189 );
190 if (this.channelzEnabled) {
191 this.channelzTrace.addTrace('CT_INFO', 'Server created');
192 }
193 this.maxConnectionAgeMs =
194 this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
195 this.maxConnectionAgeGraceMs =
196 this.options['grpc.max_connection_age_grace_ms'] ??
197 UNLIMITED_CONNECTION_AGE_MS;
198 this.keepaliveTimeMs =
199 this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
200 this.keepaliveTimeoutMs =
201 this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
202 this.trace('Server constructed');
203 }
204
205 private getChannelzInfo(): ServerInfo {
206 return {
207 trace: this.channelzTrace,
208 callTracker: this.callTracker,
209 listenerChildren: this.listenerChildrenTracker.getChildLists(),
210 sessionChildren: this.sessionChildrenTracker.getChildLists(),
211 };
212 }
213
214 private getChannelzSessionInfoGetter(
215 session: http2.ServerHttp2Session
216 ): () => SocketInfo {
217 return () => {
218 const sessionInfo = this.sessions.get(session)!;
219 const sessionSocket = session.socket;
220 const remoteAddress = sessionSocket.remoteAddress
221 ? stringToSubchannelAddress(
222 sessionSocket.remoteAddress,
223 sessionSocket.remotePort
224 )
225 : null;
226 const localAddress = sessionSocket.localAddress
227 ? stringToSubchannelAddress(
228 sessionSocket.localAddress!,
229 sessionSocket.localPort
230 )
231 : null;
232 let tlsInfo: TlsInfo | null;
233 if (session.encrypted) {
234 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
235 const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
236 tlsSocket.getCipher();
237 const certificate = tlsSocket.getCertificate();
238 const peerCertificate = tlsSocket.getPeerCertificate();
239 tlsInfo = {
240 cipherSuiteStandardName: cipherInfo.standardName ?? null,
241 cipherSuiteOtherName: cipherInfo.standardName
242 ? null
243 : cipherInfo.name,
244 localCertificate:
245 certificate && 'raw' in certificate ? certificate.raw : null,
246 remoteCertificate:
247 peerCertificate && 'raw' in peerCertificate
248 ? peerCertificate.raw
249 : null,
250 };
251 } else {
252 tlsInfo = null;
253 }
254 const socketInfo: SocketInfo = {
255 remoteAddress: remoteAddress,
256 localAddress: localAddress,
257 security: tlsInfo,
258 remoteName: null,
259 streamsStarted: sessionInfo.streamTracker.callsStarted,
260 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
261 streamsFailed: sessionInfo.streamTracker.callsFailed,
262 messagesSent: sessionInfo.messagesSent,
263 messagesReceived: sessionInfo.messagesReceived,
264 keepAlivesSent: 0,
265 lastLocalStreamCreatedTimestamp: null,
266 lastRemoteStreamCreatedTimestamp:
267 sessionInfo.streamTracker.lastCallStartedTimestamp,
268 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
269 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
270 localFlowControlWindow: session.state.localWindowSize ?? null,
271 remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
272 };
273 return socketInfo;
274 };
275 }
276
277 private trace(text: string): void {
278 logging.trace(
279 LogVerbosity.DEBUG,
280 TRACER_NAME,
281 '(' + this.channelzRef.id + ') ' + text
282 );
283 }
284
285 addProtoService(): never {
286 throw new Error('Not implemented. Use addService() instead');
287 }
288
289 addService(
290 service: ServiceDefinition,
291 implementation: UntypedServiceImplementation
292 ): void {
293 if (
294 service === null ||
295 typeof service !== 'object' ||
296 implementation === null ||
297 typeof implementation !== 'object'
298 ) {
299 throw new Error('addService() requires two objects as arguments');
300 }
301
302 const serviceKeys = Object.keys(service);
303
304 if (serviceKeys.length === 0) {
305 throw new Error('Cannot add an empty service to a server');
306 }
307
308 serviceKeys.forEach(name => {
309 const attrs = service[name];
310 let methodType: HandlerType;
311
312 if (attrs.requestStream) {
313 if (attrs.responseStream) {
314 methodType = 'bidi';
315 } else {
316 methodType = 'clientStream';
317 }
318 } else {
319 if (attrs.responseStream) {
320 methodType = 'serverStream';
321 } else {
322 methodType = 'unary';
323 }
324 }
325
326 let implFn = implementation[name];
327 let impl;
328
329 if (implFn === undefined && typeof attrs.originalName === 'string') {
330 implFn = implementation[attrs.originalName];
331 }
332
333 if (implFn !== undefined) {
334 impl = implFn.bind(implementation);
335 } else {
336 impl = getDefaultHandler(methodType, name);
337 }
338
339 const success = this.register(
340 attrs.path,
341 impl as UntypedHandleCall,
342 attrs.responseSerialize,
343 attrs.requestDeserialize,
344 methodType
345 );
346
347 if (success === false) {
348 throw new Error(`Method handler for ${attrs.path} already provided.`);
349 }
350 });
351 }
352
353 removeService(service: ServiceDefinition): void {
354 if (service === null || typeof service !== 'object') {
355 throw new Error('removeService() requires object as argument');
356 }
357
358 const serviceKeys = Object.keys(service);
359 serviceKeys.forEach(name => {
360 const attrs = service[name];
361 this.unregister(attrs.path);
362 });
363 }
364
365 bind(port: string, creds: ServerCredentials): never {
366 throw new Error('Not implemented. Use bindAsync() instead');
367 }
368
369 bindAsync(
370 port: string,
371 creds: ServerCredentials,
372 callback: (error: Error | null, port: number) => void
373 ): void {
374 if (this.started === true) {
375 throw new Error('server is already started');
376 }
377
378 if (typeof port !== 'string') {
379 throw new TypeError('port must be a string');
380 }
381
382 if (creds === null || !(creds instanceof ServerCredentials)) {
383 throw new TypeError('creds must be a ServerCredentials object');
384 }
385
386 if (typeof callback !== 'function') {
387 throw new TypeError('callback must be a function');
388 }
389
390 const initialPortUri = parseUri(port);
391 if (initialPortUri === null) {
392 throw new Error(`Could not parse port "${port}"`);
393 }
394 const portUri = mapUriDefaultScheme(initialPortUri);
395 if (portUri === null) {
396 throw new Error(`Could not get a default scheme for port "${port}"`);
397 }
398
399 const serverOptions: http2.ServerOptions = {
400 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
401 };
402 if ('grpc-node.max_session_memory' in this.options) {
403 serverOptions.maxSessionMemory =
404 this.options['grpc-node.max_session_memory'];
405 } else {
406 /* By default, set a very large max session memory limit, to effectively
407 * disable enforcement of the limit. Some testing indicates that Node's
408 * behavior degrades badly when this limit is reached, so we solve that
409 * by disabling the check entirely. */
410 serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
411 }
412 if ('grpc.max_concurrent_streams' in this.options) {
413 serverOptions.settings = {
414 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
415 };
416 }
417
418 const deferredCallback = (error: Error | null, port: number) => {
419 process.nextTick(() => callback(error, port));
420 };
421
422 const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
423 let http2Server: http2.Http2Server | http2.Http2SecureServer;
424 if (creds._isSecure()) {
425 const secureServerOptions = Object.assign(
426 serverOptions,
427 creds._getSettings()!
428 );
429 secureServerOptions.enableTrace =
430 this.options['grpc-node.tls_enable_trace'] === 1;
431 http2Server = http2.createSecureServer(secureServerOptions);
432 http2Server.on('secureConnection', (socket: TLSSocket) => {
433 /* These errors need to be handled by the user of Http2SecureServer,
434 * according to https://github.com/nodejs/node/issues/35824 */
435 socket.on('error', (e: Error) => {
436 this.trace(
437 'An incoming TLS connection closed with error: ' + e.message
438 );
439 });
440 });
441 } else {
442 http2Server = http2.createServer(serverOptions);
443 }
444
445 http2Server.setTimeout(0, noop);
446 this._setupHandlers(http2Server);
447 return http2Server;
448 };
449
450 const bindSpecificPort = (
451 addressList: SubchannelAddress[],
452 portNum: number,
453 previousCount: number
454 ): Promise<BindResult> => {
455 if (addressList.length === 0) {
456 return Promise.resolve({ port: portNum, count: previousCount });
457 }
458 return Promise.all(
459 addressList.map(address => {
460 this.trace(
461 'Attempting to bind ' + subchannelAddressToString(address)
462 );
463 let addr: SubchannelAddress;
464 if (isTcpSubchannelAddress(address)) {
465 addr = {
466 host: (address as TcpSubchannelAddress).host,
467 port: portNum,
468 };
469 } else {
470 addr = address;
471 }
472
473 const http2Server = setupServer();
474 return new Promise<number | Error>((resolve, reject) => {
475 const onError = (err: Error) => {
476 this.trace(
477 'Failed to bind ' +
478 subchannelAddressToString(address) +
479 ' with error ' +
480 err.message
481 );
482 resolve(err);
483 };
484
485 http2Server.once('error', onError);
486
487 http2Server.listen(addr, () => {
488 const boundAddress = http2Server.address()!;
489 let boundSubchannelAddress: SubchannelAddress;
490 if (typeof boundAddress === 'string') {
491 boundSubchannelAddress = {
492 path: boundAddress,
493 };
494 } else {
495 boundSubchannelAddress = {
496 host: boundAddress.address,
497 port: boundAddress.port,
498 };
499 }
500
501 const channelzRef = registerChannelzSocket(
502 subchannelAddressToString(boundSubchannelAddress),
503 () => {
504 return {
505 localAddress: boundSubchannelAddress,
506 remoteAddress: null,
507 security: null,
508 remoteName: null,
509 streamsStarted: 0,
510 streamsSucceeded: 0,
511 streamsFailed: 0,
512 messagesSent: 0,
513 messagesReceived: 0,
514 keepAlivesSent: 0,
515 lastLocalStreamCreatedTimestamp: null,
516 lastRemoteStreamCreatedTimestamp: null,
517 lastMessageSentTimestamp: null,
518 lastMessageReceivedTimestamp: null,
519 localFlowControlWindow: null,
520 remoteFlowControlWindow: null,
521 };
522 },
523 this.channelzEnabled
524 );
525 if (this.channelzEnabled) {
526 this.listenerChildrenTracker.refChild(channelzRef);
527 }
528 this.http2ServerList.push({
529 server: http2Server,
530 channelzRef: channelzRef,
531 });
532 this.trace(
533 'Successfully bound ' +
534 subchannelAddressToString(boundSubchannelAddress)
535 );
536 resolve(
537 'port' in boundSubchannelAddress
538 ? boundSubchannelAddress.port
539 : portNum
540 );
541 http2Server.removeListener('error', onError);
542 });
543 });
544 })
545 ).then(results => {
546 let count = 0;
547 for (const result of results) {
548 if (typeof result === 'number') {
549 count += 1;
550 if (result !== portNum) {
551 throw new Error(
552 'Invalid state: multiple port numbers added from single address'
553 );
554 }
555 }
556 }
557 return {
558 port: portNum,
559 count: count + previousCount,
560 };
561 });
562 };
563
564 const bindWildcardPort = (
565 addressList: SubchannelAddress[]
566 ): Promise<BindResult> => {
567 if (addressList.length === 0) {
568 return Promise.resolve<BindResult>({ port: 0, count: 0 });
569 }
570 const address = addressList[0];
571 const http2Server = setupServer();
572 return new Promise<BindResult>((resolve, reject) => {
573 const onError = (err: Error) => {
574 this.trace(
575 'Failed to bind ' +
576 subchannelAddressToString(address) +
577 ' with error ' +
578 err.message
579 );
580 resolve(bindWildcardPort(addressList.slice(1)));
581 };
582
583 http2Server.once('error', onError);
584
585 http2Server.listen(address, () => {
586 const boundAddress = http2Server.address() as AddressInfo;
587 const boundSubchannelAddress: SubchannelAddress = {
588 host: boundAddress.address,
589 port: boundAddress.port,
590 };
591 const channelzRef = registerChannelzSocket(
592 subchannelAddressToString(boundSubchannelAddress),
593 () => {
594 return {
595 localAddress: boundSubchannelAddress,
596 remoteAddress: null,
597 security: null,
598 remoteName: null,
599 streamsStarted: 0,
600 streamsSucceeded: 0,
601 streamsFailed: 0,
602 messagesSent: 0,
603 messagesReceived: 0,
604 keepAlivesSent: 0,
605 lastLocalStreamCreatedTimestamp: null,
606 lastRemoteStreamCreatedTimestamp: null,
607 lastMessageSentTimestamp: null,
608 lastMessageReceivedTimestamp: null,
609 localFlowControlWindow: null,
610 remoteFlowControlWindow: null,
611 };
612 },
613 this.channelzEnabled
614 );
615 if (this.channelzEnabled) {
616 this.listenerChildrenTracker.refChild(channelzRef);
617 }
618 this.http2ServerList.push({
619 server: http2Server,
620 channelzRef: channelzRef,
621 });
622 this.trace(
623 'Successfully bound ' +
624 subchannelAddressToString(boundSubchannelAddress)
625 );
626 resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
627 http2Server.removeListener('error', onError);
628 });
629 });
630 };
631
632 const resolverListener: ResolverListener = {
633 onSuccessfulResolution: (
634 addressList,
635 serviceConfig,
636 serviceConfigError
637 ) => {
638 // We only want one resolution result. Discard all future results
639 resolverListener.onSuccessfulResolution = () => {};
640 if (addressList.length === 0) {
641 deferredCallback(
642 new Error(`No addresses resolved for port ${port}`),
643 0
644 );
645 return;
646 }
647 let bindResultPromise: Promise<BindResult>;
648 if (isTcpSubchannelAddress(addressList[0])) {
649 if (addressList[0].port === 0) {
650 bindResultPromise = bindWildcardPort(addressList);
651 } else {
652 bindResultPromise = bindSpecificPort(
653 addressList,
654 addressList[0].port,
655 0
656 );
657 }
658 } else {
659 // Use an arbitrary non-zero port for non-TCP addresses
660 bindResultPromise = bindSpecificPort(addressList, 1, 0);
661 }
662 bindResultPromise.then(
663 bindResult => {
664 if (bindResult.count === 0) {
665 const errorString = `No address added out of total ${addressList.length} resolved`;
666 logging.log(LogVerbosity.ERROR, errorString);
667 deferredCallback(new Error(errorString), 0);
668 } else {
669 if (bindResult.count < addressList.length) {
670 logging.log(
671 LogVerbosity.INFO,
672 `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
673 );
674 }
675 deferredCallback(null, bindResult.port);
676 }
677 },
678 error => {
679 const errorString = `No address added out of total ${addressList.length} resolved`;
680 logging.log(LogVerbosity.ERROR, errorString);
681 deferredCallback(new Error(errorString), 0);
682 }
683 );
684 },
685 onError: error => {
686 deferredCallback(new Error(error.details), 0);
687 },
688 };
689
690 const resolver = createResolver(portUri, resolverListener, this.options);
691 resolver.updateResolution();
692 }
693
694 forceShutdown(): void {
695 // Close the server if it is still running.
696
697 for (const { server: http2Server, channelzRef: ref } of this
698 .http2ServerList) {
699 if (http2Server.listening) {
700 http2Server.close(() => {
701 if (this.channelzEnabled) {
702 this.listenerChildrenTracker.unrefChild(ref);
703 unregisterChannelzRef(ref);
704 }
705 });
706 }
707 }
708
709 this.started = false;
710
711 // Always destroy any available sessions. It's possible that one or more
712 // tryShutdown() calls are in progress. Don't wait on them to finish.
713 this.sessions.forEach((channelzInfo, session) => {
714 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
715 // recognize destroy(code) as a valid signature.
716 // eslint-disable-next-line @typescript-eslint/no-explicit-any
717 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
718 });
719 this.sessions.clear();
720 if (this.channelzEnabled) {
721 unregisterChannelzRef(this.channelzRef);
722 }
723 }
724
725 register<RequestType, ResponseType>(
726 name: string,
727 handler: HandleCall<RequestType, ResponseType>,
728 serialize: Serialize<ResponseType>,
729 deserialize: Deserialize<RequestType>,
730 type: string
731 ): boolean {
732 if (this.handlers.has(name)) {
733 return false;
734 }
735
736 this.handlers.set(name, {
737 func: handler,
738 serialize,
739 deserialize,
740 type,
741 path: name,
742 } as UntypedHandler);
743 return true;
744 }
745
746 unregister(name: string): boolean {
747 return this.handlers.delete(name);
748 }
749
750 start(): void {
751 if (
752 this.http2ServerList.length === 0 ||
753 this.http2ServerList.every(
754 ({ server: http2Server }) => http2Server.listening !== true
755 )
756 ) {
757 throw new Error('server must be bound in order to start');
758 }
759
760 if (this.started === true) {
761 throw new Error('server is already started');
762 }
763 if (this.channelzEnabled) {
764 this.channelzTrace.addTrace('CT_INFO', 'Starting');
765 }
766 this.started = true;
767 }
768
769 tryShutdown(callback: (error?: Error) => void): void {
770 const wrappedCallback = (error?: Error) => {
771 if (this.channelzEnabled) {
772 unregisterChannelzRef(this.channelzRef);
773 }
774 callback(error);
775 };
776 let pendingChecks = 0;
777
778 function maybeCallback(): void {
779 pendingChecks--;
780
781 if (pendingChecks === 0) {
782 wrappedCallback();
783 }
784 }
785
786 // Close the server if necessary.
787 this.started = false;
788
789 for (const { server: http2Server, channelzRef: ref } of this
790 .http2ServerList) {
791 if (http2Server.listening) {
792 pendingChecks++;
793 http2Server.close(() => {
794 if (this.channelzEnabled) {
795 this.listenerChildrenTracker.unrefChild(ref);
796 unregisterChannelzRef(ref);
797 }
798 maybeCallback();
799 });
800 }
801 }
802
803 this.sessions.forEach((channelzInfo, session) => {
804 if (!session.closed) {
805 pendingChecks += 1;
806 session.close(maybeCallback);
807 }
808 });
809 if (pendingChecks === 0) {
810 wrappedCallback();
811 }
812 }
813
814 addHttp2Port(): never {
815 throw new Error('Not yet implemented');
816 }
817
818 /**
819 * Get the channelz reference object for this server. The returned value is
820 * garbage if channelz is disabled for this server.
821 * @returns
822 */
823 getChannelzRef() {
824 return this.channelzRef;
825 }
826
827 private _verifyContentType(
828 stream: http2.ServerHttp2Stream,
829 headers: http2.IncomingHttpHeaders
830 ): boolean {
831 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
832
833 if (
834 typeof contentType !== 'string' ||
835 !contentType.startsWith('application/grpc')
836 ) {
837 stream.respond(
838 {
839 [http2.constants.HTTP2_HEADER_STATUS]:
840 http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
841 },
842 { endStream: true }
843 );
844 return false;
845 }
846
847 return true;
848 }
849
850 private _retrieveHandler(path: string): Handler<any, any> | null {
851 this.trace(
852 'Received call to method ' +
853 path +
854 ' at address ' +
855 this.serverAddressString
856 );
857
858 const handler = this.handlers.get(path);
859
860 if (handler === undefined) {
861 this.trace(
862 'No handler registered for method ' +
863 path +
864 '. Sending UNIMPLEMENTED status.'
865 );
866 return null;
867 }
868
869 return handler;
870 }
871
872 private _respondWithError<T extends Partial<ServiceError>>(
873 err: T,
874 stream: http2.ServerHttp2Stream,
875 channelzSessionInfo: ChannelzSessionInfo | null = null
876 ) {
877 const call = new Http2ServerCallStream(stream, null!, this.options);
878
879 if (err.code === undefined) {
880 err.code = Status.INTERNAL;
881 }
882
883 if (this.channelzEnabled) {
884 this.callTracker.addCallFailed();
885 channelzSessionInfo?.streamTracker.addCallFailed();
886 }
887
888 call.sendError(err);
889 }
890
891 private _channelzHandler(
892 stream: http2.ServerHttp2Stream,
893 headers: http2.IncomingHttpHeaders
894 ) {
895 const channelzSessionInfo = this.sessions.get(
896 stream.session as http2.ServerHttp2Session
897 );
898
899 this.callTracker.addCallStarted();
900 channelzSessionInfo?.streamTracker.addCallStarted();
901
902 if (!this._verifyContentType(stream, headers)) {
903 this.callTracker.addCallFailed();
904 channelzSessionInfo?.streamTracker.addCallFailed();
905 return;
906 }
907
908 const path = headers[HTTP2_HEADER_PATH] as string;
909
910 const handler = this._retrieveHandler(path);
911 if (!handler) {
912 this._respondWithError(
913 getUnimplementedStatusResponse(path),
914 stream,
915 channelzSessionInfo
916 );
917 return;
918 }
919
920 const call = new Http2ServerCallStream(stream, handler, this.options);
921
922 call.once('callEnd', (code: Status) => {
923 if (code === Status.OK) {
924 this.callTracker.addCallSucceeded();
925 } else {
926 this.callTracker.addCallFailed();
927 }
928 });
929
930 if (channelzSessionInfo) {
931 call.once('streamEnd', (success: boolean) => {
932 if (success) {
933 channelzSessionInfo.streamTracker.addCallSucceeded();
934 } else {
935 channelzSessionInfo.streamTracker.addCallFailed();
936 }
937 });
938 call.on('sendMessage', () => {
939 channelzSessionInfo.messagesSent += 1;
940 channelzSessionInfo.lastMessageSentTimestamp = new Date();
941 });
942 call.on('receiveMessage', () => {
943 channelzSessionInfo.messagesReceived += 1;
944 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
945 });
946 }
947
948 if (!this._runHandlerForCall(call, handler, headers)) {
949 this.callTracker.addCallFailed();
950 channelzSessionInfo?.streamTracker.addCallFailed();
951
952 call.sendError({
953 code: Status.INTERNAL,
954 details: `Unknown handler type: ${handler.type}`,
955 });
956 }
957 }
958
959 private _streamHandler(
960 stream: http2.ServerHttp2Stream,
961 headers: http2.IncomingHttpHeaders
962 ) {
963 if (this._verifyContentType(stream, headers) !== true) {
964 return;
965 }
966
967 const path = headers[HTTP2_HEADER_PATH] as string;
968
969 const handler = this._retrieveHandler(path);
970 if (!handler) {
971 this._respondWithError(
972 getUnimplementedStatusResponse(path),
973 stream,
974 null
975 );
976 return;
977 }
978
979 const call = new Http2ServerCallStream(stream, handler, this.options);
980 if (!this._runHandlerForCall(call, handler, headers)) {
981 call.sendError({
982 code: Status.INTERNAL,
983 details: `Unknown handler type: ${handler.type}`,
984 });
985 }
986 }
987
988 private _runHandlerForCall(
989 call: Http2ServerCallStream<any, any>,
990 handler: Handler<any, any>,
991 headers: http2.IncomingHttpHeaders
992 ): boolean {
993 const metadata = call.receiveMetadata(headers);
994 const encoding =
995 (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
996 metadata.remove('grpc-encoding');
997
998 const { type } = handler;
999 if (type === 'unary') {
1000 handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
1001 } else if (type === 'clientStream') {
1002 handleClientStreaming(
1003 call,
1004 handler as UntypedClientStreamingHandler,
1005 metadata,
1006 encoding
1007 );
1008 } else if (type === 'serverStream') {
1009 handleServerStreaming(
1010 call,
1011 handler as UntypedServerStreamingHandler,
1012 metadata,
1013 encoding
1014 );
1015 } else if (type === 'bidi') {
1016 handleBidiStreaming(
1017 call,
1018 handler as UntypedBidiStreamingHandler,
1019 metadata,
1020 encoding
1021 );
1022 } else {
1023 return false;
1024 }
1025
1026 return true;
1027 }
1028
1029 private _setupHandlers(
1030 http2Server: http2.Http2Server | http2.Http2SecureServer
1031 ): void {
1032 if (http2Server === null) {
1033 return;
1034 }
1035
1036 const serverAddress = http2Server.address();
1037 let serverAddressString = 'null';
1038 if (serverAddress) {
1039 if (typeof serverAddress === 'string') {
1040 serverAddressString = serverAddress;
1041 } else {
1042 serverAddressString = serverAddress.address + ':' + serverAddress.port;
1043 }
1044 }
1045 this.serverAddressString = serverAddressString;
1046
1047 const handler = this.channelzEnabled
1048 ? this._channelzHandler
1049 : this._streamHandler;
1050
1051 http2Server.on('stream', handler.bind(this));
1052 http2Server.on('session', session => {
1053 if (!this.started) {
1054 session.destroy();
1055 return;
1056 }
1057
1058 const channelzRef = registerChannelzSocket(
1059 session.socket.remoteAddress ?? 'unknown',
1060 this.getChannelzSessionInfoGetter(session),
1061 this.channelzEnabled
1062 );
1063
1064 const channelzSessionInfo: ChannelzSessionInfo = {
1065 ref: channelzRef,
1066 streamTracker: new ChannelzCallTracker(),
1067 messagesSent: 0,
1068 messagesReceived: 0,
1069 lastMessageSentTimestamp: null,
1070 lastMessageReceivedTimestamp: null,
1071 };
1072
1073 this.sessions.set(session, channelzSessionInfo);
1074 const clientAddress = session.socket.remoteAddress;
1075 if (this.channelzEnabled) {
1076 this.channelzTrace.addTrace(
1077 'CT_INFO',
1078 'Connection established by client ' + clientAddress
1079 );
1080 this.sessionChildrenTracker.refChild(channelzRef);
1081 }
1082 let connectionAgeTimer: NodeJS.Timeout | null = null;
1083 let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
1084 let sessionClosedByServer = false;
1085 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
1086 // Apply a random jitter within a +/-10% range
1087 const jitterMagnitude = this.maxConnectionAgeMs / 10;
1088 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
1089 connectionAgeTimer = setTimeout(() => {
1090 sessionClosedByServer = true;
1091 if (this.channelzEnabled) {
1092 this.channelzTrace.addTrace(
1093 'CT_INFO',
1094 'Connection dropped by max connection age from ' + clientAddress
1095 );
1096 }
1097 try {
1098 session.goaway(
1099 http2.constants.NGHTTP2_NO_ERROR,
1100 ~(1 << 31),
1101 Buffer.from('max_age')
1102 );
1103 } catch (e) {
1104 // The goaway can't be sent because the session is already closed
1105 session.destroy();
1106 return;
1107 }
1108 session.close();
1109 /* Allow a grace period after sending the GOAWAY before forcibly
1110 * closing the connection. */
1111 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
1112 connectionAgeGraceTimer = setTimeout(() => {
1113 session.destroy();
1114 }, this.maxConnectionAgeGraceMs).unref?.();
1115 }
1116 }, this.maxConnectionAgeMs + jitter).unref?.();
1117 }
1118 const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
1119 const timeoutTImer = setTimeout(() => {
1120 sessionClosedByServer = true;
1121 if (this.channelzEnabled) {
1122 this.channelzTrace.addTrace(
1123 'CT_INFO',
1124 'Connection dropped by keepalive timeout from ' + clientAddress
1125 );
1126 }
1127 session.close();
1128 }, this.keepaliveTimeoutMs).unref?.();
1129 try {
1130 session.ping(
1131 (err: Error | null, duration: number, payload: Buffer) => {
1132 clearTimeout(timeoutTImer);
1133 }
1134 );
1135 } catch (e) {
1136 // The ping can't be sent because the session is already closed
1137 session.destroy();
1138 }
1139 }, this.keepaliveTimeMs).unref?.();
1140 session.on('close', () => {
1141 if (this.channelzEnabled) {
1142 if (!sessionClosedByServer) {
1143 this.channelzTrace.addTrace(
1144 'CT_INFO',
1145 'Connection dropped by client ' + clientAddress
1146 );
1147 }
1148 this.sessionChildrenTracker.unrefChild(channelzRef);
1149 unregisterChannelzRef(channelzRef);
1150 }
1151 if (connectionAgeTimer) {
1152 clearTimeout(connectionAgeTimer);
1153 }
1154 if (connectionAgeGraceTimer) {
1155 clearTimeout(connectionAgeGraceTimer);
1156 }
1157 if (keeapliveTimeTimer) {
1158 clearTimeout(keeapliveTimeTimer);
1159 }
1160 this.sessions.delete(session);
1161 });
1162 });
1163 }
1164}
1165
1166async function handleUnary<RequestType, ResponseType>(
1167 call: Http2ServerCallStream<RequestType, ResponseType>,
1168 handler: UnaryHandler<RequestType, ResponseType>,
1169 metadata: Metadata,
1170 encoding: string
1171): Promise<void> {
1172 try {
1173 const request = await call.receiveUnaryMessage(encoding);
1174
1175 if (request === undefined || call.cancelled) {
1176 return;
1177 }
1178
1179 const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
1180 call,
1181 metadata,
1182 request
1183 );
1184
1185 handler.func(
1186 emitter,
1187 (
1188 err: ServerErrorResponse | ServerStatusResponse | null,
1189 value?: ResponseType | null,
1190 trailer?: Metadata,
1191 flags?: number
1192 ) => {
1193 call.sendUnaryMessage(err, value, trailer, flags);
1194 }
1195 );
1196 } catch (err) {
1197 call.sendError(err as ServerErrorResponse);
1198 }
1199}
1200
1201function handleClientStreaming<RequestType, ResponseType>(
1202 call: Http2ServerCallStream<RequestType, ResponseType>,
1203 handler: ClientStreamingHandler<RequestType, ResponseType>,
1204 metadata: Metadata,
1205 encoding: string
1206): void {
1207 const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
1208 call,
1209 metadata,
1210 handler.deserialize,
1211 encoding
1212 );
1213
1214 function respond(
1215 err: ServerErrorResponse | ServerStatusResponse | null,
1216 value?: ResponseType | null,
1217 trailer?: Metadata,
1218 flags?: number
1219 ) {
1220 stream.destroy();
1221 call.sendUnaryMessage(err, value, trailer, flags);
1222 }
1223
1224 if (call.cancelled) {
1225 return;
1226 }
1227
1228 stream.on('error', respond);
1229 handler.func(stream, respond);
1230}
1231
1232async function handleServerStreaming<RequestType, ResponseType>(
1233 call: Http2ServerCallStream<RequestType, ResponseType>,
1234 handler: ServerStreamingHandler<RequestType, ResponseType>,
1235 metadata: Metadata,
1236 encoding: string
1237): Promise<void> {
1238 try {
1239 const request = await call.receiveUnaryMessage(encoding);
1240
1241 if (request === undefined || call.cancelled) {
1242 return;
1243 }
1244
1245 const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
1246 call,
1247 metadata,
1248 handler.serialize,
1249 request
1250 );
1251
1252 handler.func(stream);
1253 } catch (err) {
1254 call.sendError(err as ServerErrorResponse);
1255 }
1256}
1257
1258function handleBidiStreaming<RequestType, ResponseType>(
1259 call: Http2ServerCallStream<RequestType, ResponseType>,
1260 handler: BidiStreamingHandler<RequestType, ResponseType>,
1261 metadata: Metadata,
1262 encoding: string
1263): void {
1264 const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
1265 call,
1266 metadata,
1267 handler.serialize,
1268 handler.deserialize,
1269 encoding
1270 );
1271
1272 if (call.cancelled) {
1273 return;
1274 }
1275
1276 handler.func(stream);
1277}