UNPKG

33.9 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import { AddressInfo } from 'net';
20
21import { ServiceError } from './call';
22import { Status, LogVerbosity } from './constants';
23import { Deserialize, Serialize, ServiceDefinition } from './make-client';
24import { Metadata } from './metadata';
25import {
26 BidiStreamingHandler,
27 ClientStreamingHandler,
28 HandleCall,
29 Handler,
30 HandlerType,
31 Http2ServerCallStream,
32 sendUnaryData,
33 ServerDuplexStream,
34 ServerDuplexStreamImpl,
35 ServerReadableStream,
36 ServerReadableStreamImpl,
37 ServerStreamingHandler,
38 ServerUnaryCall,
39 ServerUnaryCallImpl,
40 ServerWritableStream,
41 ServerWritableStreamImpl,
42 UnaryHandler,
43 ServerErrorResponse,
44 ServerStatusResponse,
45} from './server-call';
46import { ServerCredentials } from './server-credentials';
47import { ChannelOptions } from './channel-options';
48import {
49 createResolver,
50 ResolverListener,
51 mapUriDefaultScheme,
52} from './resolver';
53import * as logging from './logging';
54import {
55 SubchannelAddress,
56 TcpSubchannelAddress,
57 isTcpSubchannelAddress,
58 subchannelAddressToString,
59 stringToSubchannelAddress,
60} from './subchannel-address';
61import { parseUri } from './uri-parser';
62import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
63import { CipherNameAndProtocol, TLSSocket } from 'tls';
64
65const {
66 HTTP2_HEADER_PATH
67} = http2.constants
68
69const TRACER_NAME = 'server';
70
71interface BindResult {
72 port: number;
73 count: number;
74}
75
76function noop(): void {}
77
78function getUnimplementedStatusResponse(
79 methodName: string
80): Partial<ServiceError> {
81 return {
82 code: Status.UNIMPLEMENTED,
83 details: `The server does not implement the method ${methodName}`,
84 };
85}
86
87/* eslint-disable @typescript-eslint/no-explicit-any */
88type UntypedUnaryHandler = UnaryHandler<any, any>;
89type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
90type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
91type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
92export type UntypedHandleCall = HandleCall<any, any>;
93type UntypedHandler = Handler<any, any>;
94export interface UntypedServiceImplementation {
95 [name: string]: UntypedHandleCall;
96}
97
98function getDefaultHandler(handlerType: HandlerType, methodName: string) {
99 const unimplementedStatusResponse = getUnimplementedStatusResponse(
100 methodName
101 );
102 switch (handlerType) {
103 case 'unary':
104 return (
105 call: ServerUnaryCall<any, any>,
106 callback: sendUnaryData<any>
107 ) => {
108 callback(unimplementedStatusResponse as ServiceError, null);
109 };
110 case 'clientStream':
111 return (
112 call: ServerReadableStream<any, any>,
113 callback: sendUnaryData<any>
114 ) => {
115 callback(unimplementedStatusResponse as ServiceError, null);
116 };
117 case 'serverStream':
118 return (call: ServerWritableStream<any, any>) => {
119 call.emit('error', unimplementedStatusResponse);
120 };
121 case 'bidi':
122 return (call: ServerDuplexStream<any, any>) => {
123 call.emit('error', unimplementedStatusResponse);
124 };
125 default:
126 throw new Error(`Invalid handlerType ${handlerType}`);
127 }
128}
129
130interface ChannelzSessionInfo {
131 ref: SocketRef;
132 streamTracker: ChannelzCallTracker;
133 messagesSent: number;
134 messagesReceived: number;
135 lastMessageSentTimestamp: Date | null;
136 lastMessageReceivedTimestamp: Date | null;
137}
138
139interface ChannelzListenerInfo {
140 ref: SocketRef;
141}
142
143export class Server {
144 private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = [];
145
146 private handlers: Map<string, UntypedHandler> = new Map<
147 string,
148 UntypedHandler
149 >();
150 private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
151 private started = false;
152 private options: ChannelOptions;
153 private serverAddressString: string = 'null'
154
155 // Channelz Info
156 private readonly channelzEnabled: boolean = true;
157 private channelzRef: ServerRef;
158 private channelzTrace = new ChannelzTrace();
159 private callTracker = new ChannelzCallTracker();
160 private listenerChildrenTracker = new ChannelzChildrenTracker();
161 private sessionChildrenTracker = new ChannelzChildrenTracker();
162
163 constructor(options?: ChannelOptions) {
164 this.options = options ?? {};
165 if (this.options['grpc.enable_channelz'] === 0) {
166 this.channelzEnabled = false;
167 }
168 this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
169 if (this.channelzEnabled) {
170 this.channelzTrace.addTrace('CT_INFO', 'Server created');
171 }
172
173 this.trace('Server constructed');
174 }
175
176 private getChannelzInfo(): ServerInfo {
177 return {
178 trace: this.channelzTrace,
179 callTracker: this.callTracker,
180 listenerChildren: this.listenerChildrenTracker.getChildLists(),
181 sessionChildren: this.sessionChildrenTracker.getChildLists()
182 };
183 }
184
185 private getChannelzSessionInfoGetter(session: http2.ServerHttp2Session): () => SocketInfo {
186 return () => {
187 const sessionInfo = this.sessions.get(session)!;
188 const sessionSocket = session.socket;
189 const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
190 const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress!, sessionSocket.localPort) : null;
191 let tlsInfo: TlsInfo | null;
192 if (session.encrypted) {
193 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
194 const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
195 const certificate = tlsSocket.getCertificate();
196 const peerCertificate = tlsSocket.getPeerCertificate();
197 tlsInfo = {
198 cipherSuiteStandardName: cipherInfo.standardName ?? null,
199 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
200 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
201 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
202 };
203 } else {
204 tlsInfo = null;
205 }
206 const socketInfo: SocketInfo = {
207 remoteAddress: remoteAddress,
208 localAddress: localAddress,
209 security: tlsInfo,
210 remoteName: null,
211 streamsStarted: sessionInfo.streamTracker.callsStarted,
212 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
213 streamsFailed: sessionInfo.streamTracker.callsFailed,
214 messagesSent: sessionInfo.messagesSent,
215 messagesReceived: sessionInfo.messagesReceived,
216 keepAlivesSent: 0,
217 lastLocalStreamCreatedTimestamp: null,
218 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
219 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
220 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
221 localFlowControlWindow: session.state.localWindowSize ?? null,
222 remoteFlowControlWindow: session.state.remoteWindowSize ?? null
223 };
224 return socketInfo;
225 };
226 }
227
228 private trace(text: string): void {
229 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
230 }
231
232
233 addProtoService(): never {
234 throw new Error('Not implemented. Use addService() instead');
235 }
236
237 addService(
238 service: ServiceDefinition,
239 implementation: UntypedServiceImplementation
240 ): void {
241 if (
242 service === null ||
243 typeof service !== 'object' ||
244 implementation === null ||
245 typeof implementation !== 'object'
246 ) {
247 throw new Error('addService() requires two objects as arguments');
248 }
249
250 const serviceKeys = Object.keys(service);
251
252 if (serviceKeys.length === 0) {
253 throw new Error('Cannot add an empty service to a server');
254 }
255
256 serviceKeys.forEach((name) => {
257 const attrs = service[name];
258 let methodType: HandlerType;
259
260 if (attrs.requestStream) {
261 if (attrs.responseStream) {
262 methodType = 'bidi';
263 } else {
264 methodType = 'clientStream';
265 }
266 } else {
267 if (attrs.responseStream) {
268 methodType = 'serverStream';
269 } else {
270 methodType = 'unary';
271 }
272 }
273
274 let implFn = implementation[name];
275 let impl;
276
277 if (implFn === undefined && typeof attrs.originalName === 'string') {
278 implFn = implementation[attrs.originalName];
279 }
280
281 if (implFn !== undefined) {
282 impl = implFn.bind(implementation);
283 } else {
284 impl = getDefaultHandler(methodType, name);
285 }
286
287 const success = this.register(
288 attrs.path,
289 impl as UntypedHandleCall,
290 attrs.responseSerialize,
291 attrs.requestDeserialize,
292 methodType
293 );
294
295 if (success === false) {
296 throw new Error(`Method handler for ${attrs.path} already provided.`);
297 }
298 });
299 }
300
301 removeService(service: ServiceDefinition): void {
302 if (service === null || typeof service !== 'object') {
303 throw new Error('removeService() requires object as argument');
304 }
305
306 const serviceKeys = Object.keys(service);
307 serviceKeys.forEach((name) => {
308 const attrs = service[name];
309 this.unregister(attrs.path);
310 });
311 }
312
313 bind(port: string, creds: ServerCredentials): never {
314 throw new Error('Not implemented. Use bindAsync() instead');
315 }
316
317 bindAsync(
318 port: string,
319 creds: ServerCredentials,
320 callback: (error: Error | null, port: number) => void
321 ): void {
322 if (this.started === true) {
323 throw new Error('server is already started');
324 }
325
326 if (typeof port !== 'string') {
327 throw new TypeError('port must be a string');
328 }
329
330 if (creds === null || !(creds instanceof ServerCredentials)) {
331 throw new TypeError('creds must be a ServerCredentials object');
332 }
333
334 if (typeof callback !== 'function') {
335 throw new TypeError('callback must be a function');
336 }
337
338 const initialPortUri = parseUri(port);
339 if (initialPortUri === null) {
340 throw new Error(`Could not parse port "${port}"`);
341 }
342 const portUri = mapUriDefaultScheme(initialPortUri);
343 if (portUri === null) {
344 throw new Error(`Could not get a default scheme for port "${port}"`);
345 }
346
347 const serverOptions: http2.ServerOptions = {
348 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
349 };
350 if ('grpc-node.max_session_memory' in this.options) {
351 serverOptions.maxSessionMemory = this.options[
352 'grpc-node.max_session_memory'
353 ];
354 } else {
355 /* By default, set a very large max session memory limit, to effectively
356 * disable enforcement of the limit. Some testing indicates that Node's
357 * behavior degrades badly when this limit is reached, so we solve that
358 * by disabling the check entirely. */
359 serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
360 }
361 if ('grpc.max_concurrent_streams' in this.options) {
362 serverOptions.settings = {
363 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
364 };
365 }
366
367 const deferredCallback = (error: Error | null, port: number) => {
368 process.nextTick(() => callback(error, port));
369 }
370
371 const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
372 let http2Server: http2.Http2Server | http2.Http2SecureServer;
373 if (creds._isSecure()) {
374 const secureServerOptions = Object.assign(
375 serverOptions,
376 creds._getSettings()!
377 );
378 http2Server = http2.createSecureServer(secureServerOptions);
379 http2Server.on('secureConnection', (socket: TLSSocket) => {
380 /* These errors need to be handled by the user of Http2SecureServer,
381 * according to https://github.com/nodejs/node/issues/35824 */
382 socket.on('error', (e: Error) => {
383 this.trace('An incoming TLS connection closed with error: ' + e.message);
384 });
385 });
386 } else {
387 http2Server = http2.createServer(serverOptions);
388 }
389
390 http2Server.setTimeout(0, noop);
391 this._setupHandlers(http2Server);
392 return http2Server;
393 };
394
395 const bindSpecificPort = (
396 addressList: SubchannelAddress[],
397 portNum: number,
398 previousCount: number
399 ): Promise<BindResult> => {
400 if (addressList.length === 0) {
401 return Promise.resolve({ port: portNum, count: previousCount });
402 }
403 return Promise.all(
404 addressList.map((address) => {
405 this.trace('Attempting to bind ' + subchannelAddressToString(address));
406 let addr: SubchannelAddress;
407 if (isTcpSubchannelAddress(address)) {
408 addr = {
409 host: (address as TcpSubchannelAddress).host,
410 port: portNum,
411 };
412 } else {
413 addr = address;
414 }
415
416 const http2Server = setupServer();
417 return new Promise<number | Error>((resolve, reject) => {
418 const onError = (err: Error) => {
419 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
420 resolve(err);
421 }
422
423 http2Server.once('error', onError);
424
425 http2Server.listen(addr, () => {
426 const boundAddress = http2Server.address()!;
427 let boundSubchannelAddress: SubchannelAddress;
428 if (typeof boundAddress === 'string') {
429 boundSubchannelAddress = {
430 path: boundAddress
431 };
432 } else {
433 boundSubchannelAddress = {
434 host: boundAddress.address,
435 port: boundAddress.port
436 }
437 }
438 let channelzRef: SocketRef;
439 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
440 return {
441 localAddress: boundSubchannelAddress,
442 remoteAddress: null,
443 security: null,
444 remoteName: null,
445 streamsStarted: 0,
446 streamsSucceeded: 0,
447 streamsFailed: 0,
448 messagesSent: 0,
449 messagesReceived: 0,
450 keepAlivesSent: 0,
451 lastLocalStreamCreatedTimestamp: null,
452 lastRemoteStreamCreatedTimestamp: null,
453 lastMessageSentTimestamp: null,
454 lastMessageReceivedTimestamp: null,
455 localFlowControlWindow: null,
456 remoteFlowControlWindow: null
457 };
458 }, this.channelzEnabled);
459 if (this.channelzEnabled) {
460 this.listenerChildrenTracker.refChild(channelzRef);
461 }
462 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
463 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
464 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
465 http2Server.removeListener('error', onError);
466 });
467 });
468 })
469 ).then((results) => {
470 let count = 0;
471 for (const result of results) {
472 if (typeof result === 'number') {
473 count += 1;
474 if (result !== portNum) {
475 throw new Error(
476 'Invalid state: multiple port numbers added from single address'
477 );
478 }
479 }
480 }
481 return {
482 port: portNum,
483 count: count + previousCount,
484 };
485 });
486 };
487
488 const bindWildcardPort = (
489 addressList: SubchannelAddress[]
490 ): Promise<BindResult> => {
491 if (addressList.length === 0) {
492 return Promise.resolve<BindResult>({ port: 0, count: 0 });
493 }
494 const address = addressList[0];
495 const http2Server = setupServer();
496 return new Promise<BindResult>((resolve, reject) => {
497 const onError = (err: Error) => {
498 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
499 resolve(bindWildcardPort(addressList.slice(1)));
500 }
501
502 http2Server.once('error', onError);
503
504 http2Server.listen(address, () => {
505 const boundAddress = http2Server.address() as AddressInfo;
506 const boundSubchannelAddress: SubchannelAddress = {
507 host: boundAddress.address,
508 port: boundAddress.port
509 };
510 let channelzRef: SocketRef;
511 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
512 return {
513 localAddress: boundSubchannelAddress,
514 remoteAddress: null,
515 security: null,
516 remoteName: null,
517 streamsStarted: 0,
518 streamsSucceeded: 0,
519 streamsFailed: 0,
520 messagesSent: 0,
521 messagesReceived: 0,
522 keepAlivesSent: 0,
523 lastLocalStreamCreatedTimestamp: null,
524 lastRemoteStreamCreatedTimestamp: null,
525 lastMessageSentTimestamp: null,
526 lastMessageReceivedTimestamp: null,
527 localFlowControlWindow: null,
528 remoteFlowControlWindow: null
529 };
530 }, this.channelzEnabled);
531 if (this.channelzEnabled) {
532 this.listenerChildrenTracker.refChild(channelzRef);
533 }
534 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
535 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
536 resolve(
537 bindSpecificPort(
538 addressList.slice(1),
539 boundAddress.port,
540 1
541 )
542 );
543 http2Server.removeListener('error', onError);
544 });
545 });
546 };
547
548 const resolverListener: ResolverListener = {
549 onSuccessfulResolution: (
550 addressList,
551 serviceConfig,
552 serviceConfigError
553 ) => {
554 // We only want one resolution result. Discard all future results
555 resolverListener.onSuccessfulResolution = () => {};
556 if (addressList.length === 0) {
557 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
558 return;
559 }
560 let bindResultPromise: Promise<BindResult>;
561 if (isTcpSubchannelAddress(addressList[0])) {
562 if (addressList[0].port === 0) {
563 bindResultPromise = bindWildcardPort(addressList);
564 } else {
565 bindResultPromise = bindSpecificPort(
566 addressList,
567 addressList[0].port,
568 0
569 );
570 }
571 } else {
572 // Use an arbitrary non-zero port for non-TCP addresses
573 bindResultPromise = bindSpecificPort(addressList, 1, 0);
574 }
575 bindResultPromise.then(
576 (bindResult) => {
577 if (bindResult.count === 0) {
578 const errorString = `No address added out of total ${addressList.length} resolved`;
579 logging.log(LogVerbosity.ERROR, errorString);
580 deferredCallback(new Error(errorString), 0);
581 } else {
582 if (bindResult.count < addressList.length) {
583 logging.log(
584 LogVerbosity.INFO,
585 `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
586 );
587 }
588 deferredCallback(null, bindResult.port);
589 }
590 },
591 (error) => {
592 const errorString = `No address added out of total ${addressList.length} resolved`;
593 logging.log(LogVerbosity.ERROR, errorString);
594 deferredCallback(new Error(errorString), 0);
595 }
596 );
597 },
598 onError: (error) => {
599 deferredCallback(new Error(error.details), 0);
600 },
601 };
602
603 const resolver = createResolver(portUri, resolverListener, this.options);
604 resolver.updateResolution();
605 }
606
607 forceShutdown(): void {
608 // Close the server if it is still running.
609
610 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
611 if (http2Server.listening) {
612 http2Server.close(() => {
613 if (this.channelzEnabled) {
614 this.listenerChildrenTracker.unrefChild(ref);
615 unregisterChannelzRef(ref);
616 }
617 });
618 }
619 }
620
621 this.started = false;
622
623 // Always destroy any available sessions. It's possible that one or more
624 // tryShutdown() calls are in progress. Don't wait on them to finish.
625 this.sessions.forEach((channelzInfo, session) => {
626 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
627 // recognize destroy(code) as a valid signature.
628 // eslint-disable-next-line @typescript-eslint/no-explicit-any
629 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
630 });
631 this.sessions.clear();
632 if (this.channelzEnabled) {
633 unregisterChannelzRef(this.channelzRef);
634 }
635 }
636
637 register<RequestType, ResponseType>(
638 name: string,
639 handler: HandleCall<RequestType, ResponseType>,
640 serialize: Serialize<ResponseType>,
641 deserialize: Deserialize<RequestType>,
642 type: string
643 ): boolean {
644 if (this.handlers.has(name)) {
645 return false;
646 }
647
648 this.handlers.set(name, {
649 func: handler,
650 serialize,
651 deserialize,
652 type,
653 path: name,
654 } as UntypedHandler);
655 return true;
656 }
657
658 unregister(name: string): boolean {
659 return this.handlers.delete(name);
660 }
661
662 start(): void {
663 if (
664 this.http2ServerList.length === 0 ||
665 this.http2ServerList.every(
666 ({server: http2Server}) => http2Server.listening !== true
667 )
668 ) {
669 throw new Error('server must be bound in order to start');
670 }
671
672 if (this.started === true) {
673 throw new Error('server is already started');
674 }
675 if (this.channelzEnabled) {
676 this.channelzTrace.addTrace('CT_INFO', 'Starting');
677 }
678 this.started = true;
679 }
680
681 tryShutdown(callback: (error?: Error) => void): void {
682 const wrappedCallback = (error?: Error) => {
683 if (this.channelzEnabled) {
684 unregisterChannelzRef(this.channelzRef);
685 }
686 callback(error);
687 };
688 let pendingChecks = 0;
689
690 function maybeCallback(): void {
691 pendingChecks--;
692
693 if (pendingChecks === 0) {
694 wrappedCallback();
695 }
696 }
697
698 // Close the server if necessary.
699 this.started = false;
700
701 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
702 if (http2Server.listening) {
703 pendingChecks++;
704 http2Server.close(() => {
705 if (this.channelzEnabled) {
706 this.listenerChildrenTracker.unrefChild(ref);
707 unregisterChannelzRef(ref);
708 }
709 maybeCallback();
710 });
711 }
712 }
713
714 this.sessions.forEach((channelzInfo, session) => {
715 if (!session.closed) {
716 pendingChecks += 1;
717 session.close(maybeCallback);
718 }
719 });
720 if (pendingChecks === 0) {
721 wrappedCallback();
722 }
723 }
724
725 addHttp2Port(): never {
726 throw new Error('Not yet implemented');
727 }
728
729 /**
730 * Get the channelz reference object for this server. The returned value is
731 * garbage if channelz is disabled for this server.
732 * @returns
733 */
734 getChannelzRef() {
735 return this.channelzRef;
736 }
737
738 private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean {
739 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
740
741 if (
742 typeof contentType !== 'string' ||
743 !contentType.startsWith('application/grpc')
744 ) {
745 stream.respond(
746 {
747 [http2.constants.HTTP2_HEADER_STATUS]:
748 http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
749 },
750 { endStream: true }
751 );
752 return false
753 }
754
755 return true
756 }
757
758 private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler<any, any> {
759 const path = headers[HTTP2_HEADER_PATH] as string
760
761 this.trace(
762 'Received call to method ' +
763 path +
764 ' at address ' +
765 this.serverAddressString
766 );
767
768 const handler = this.handlers.get(path);
769
770 if (handler === undefined) {
771 this.trace(
772 'No handler registered for method ' +
773 path +
774 '. Sending UNIMPLEMENTED status.'
775 );
776 throw getUnimplementedStatusResponse(path);
777 }
778
779 return handler
780 }
781
782 private _respondWithError<T extends Partial<ServiceError>>(
783 err: T,
784 stream: http2.ServerHttp2Stream,
785 channelzSessionInfo: ChannelzSessionInfo | null = null
786 ) {
787 const call = new Http2ServerCallStream(stream, null!, this.options);
788
789 if (err.code === undefined) {
790 err.code = Status.INTERNAL;
791 }
792
793 if (this.channelzEnabled) {
794 this.callTracker.addCallFailed();
795 channelzSessionInfo?.streamTracker.addCallFailed()
796 }
797
798 call.sendError(err);
799 }
800
801 private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
802 const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
803
804 this.callTracker.addCallStarted();
805 channelzSessionInfo?.streamTracker.addCallStarted();
806
807 if (!this._verifyContentType(stream, headers)) {
808 this.callTracker.addCallFailed();
809 channelzSessionInfo?.streamTracker.addCallFailed();
810 return
811 }
812
813 let handler: Handler<any, any>
814 try {
815 handler = this._retrieveHandler(headers)
816 } catch (err) {
817 this._respondWithError(err, stream, channelzSessionInfo)
818 return
819 }
820
821 const call = new Http2ServerCallStream(stream, handler, this.options);
822
823 call.once('callEnd', (code: Status) => {
824 if (code === Status.OK) {
825 this.callTracker.addCallSucceeded();
826 } else {
827 this.callTracker.addCallFailed();
828 }
829 });
830
831 if (channelzSessionInfo) {
832 call.once('streamEnd', (success: boolean) => {
833 if (success) {
834 channelzSessionInfo.streamTracker.addCallSucceeded();
835 } else {
836 channelzSessionInfo.streamTracker.addCallFailed();
837 }
838 });
839 call.on('sendMessage', () => {
840 channelzSessionInfo.messagesSent += 1;
841 channelzSessionInfo.lastMessageSentTimestamp = new Date();
842 });
843 call.on('receiveMessage', () => {
844 channelzSessionInfo.messagesReceived += 1;
845 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
846 });
847 }
848
849 if (!this._runHandlerForCall(call, handler, headers)) {
850 this.callTracker.addCallFailed();
851 channelzSessionInfo?.streamTracker.addCallFailed()
852
853 call.sendError({
854 code: Status.INTERNAL,
855 details: `Unknown handler type: ${handler.type}`
856 });
857 }
858 }
859
860 private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
861 if (this._verifyContentType(stream, headers) !== true) {
862 return
863 }
864
865 let handler: Handler<any, any>
866 try {
867 handler = this._retrieveHandler(headers)
868 } catch (err) {
869 this._respondWithError(err, stream, null)
870 return
871 }
872
873 const call = new Http2ServerCallStream(stream, handler, this.options)
874 if (!this._runHandlerForCall(call, handler, headers)) {
875 call.sendError({
876 code: Status.INTERNAL,
877 details: `Unknown handler type: ${handler.type}`
878 });
879 }
880 }
881
882 private _runHandlerForCall(call: Http2ServerCallStream<any, any>, handler: Handler<any, any>, headers: http2.IncomingHttpHeaders): boolean {
883 const metadata = call.receiveMetadata(headers);
884 const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
885 metadata.remove('grpc-encoding');
886
887 const { type } = handler
888 if (type === 'unary') {
889 handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
890 } else if (type === 'clientStream') {
891 handleClientStreaming(
892 call,
893 handler as UntypedClientStreamingHandler,
894 metadata,
895 encoding
896 );
897 } else if (type === 'serverStream') {
898 handleServerStreaming(
899 call,
900 handler as UntypedServerStreamingHandler,
901 metadata,
902 encoding
903 );
904 } else if (type === 'bidi') {
905 handleBidiStreaming(
906 call,
907 handler as UntypedBidiStreamingHandler,
908 metadata,
909 encoding
910 );
911 } else {
912 return false
913 }
914
915 return true
916 }
917
918 private _setupHandlers(
919 http2Server: http2.Http2Server | http2.Http2SecureServer
920 ): void {
921 if (http2Server === null) {
922 return;
923 }
924
925 const serverAddress = http2Server.address();
926 let serverAddressString = 'null'
927 if (serverAddress) {
928 if (typeof serverAddress === 'string') {
929 serverAddressString = serverAddress
930 } else {
931 serverAddressString =
932 serverAddress.address + ':' + serverAddress.port
933 }
934 }
935 this.serverAddressString = serverAddressString
936
937 const handler = this.channelzEnabled
938 ? this._channelzHandler
939 : this._streamHandler
940
941 http2Server.on('stream', handler.bind(this))
942 http2Server.on('session', (session) => {
943 if (!this.started) {
944 session.destroy();
945 return;
946 }
947
948 let channelzRef: SocketRef;
949 channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
950
951 const channelzSessionInfo: ChannelzSessionInfo = {
952 ref: channelzRef,
953 streamTracker: new ChannelzCallTracker(),
954 messagesSent: 0,
955 messagesReceived: 0,
956 lastMessageSentTimestamp: null,
957 lastMessageReceivedTimestamp: null
958 };
959
960 this.sessions.set(session, channelzSessionInfo);
961 const clientAddress = session.socket.remoteAddress;
962 if (this.channelzEnabled) {
963 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
964 this.sessionChildrenTracker.refChild(channelzRef);
965 }
966 session.on('close', () => {
967 if (this.channelzEnabled) {
968 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
969 this.sessionChildrenTracker.unrefChild(channelzRef);
970 unregisterChannelzRef(channelzRef);
971 }
972 this.sessions.delete(session);
973 });
974 });
975 }
976}
977
978function handleUnary<RequestType, ResponseType>(
979 call: Http2ServerCallStream<RequestType, ResponseType>,
980 handler: UnaryHandler<RequestType, ResponseType>,
981 metadata: Metadata,
982 encoding: string
983): void {
984 call.receiveUnaryMessage(encoding, (err, request) => {
985 if (err) {
986 call.sendError(err)
987 return
988 }
989
990 if (request === undefined || call.cancelled) {
991 return;
992 }
993
994 const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
995 call,
996 metadata,
997 request
998 );
999
1000 handler.func(
1001 emitter,
1002 (
1003 err: ServerErrorResponse | ServerStatusResponse | null,
1004 value?: ResponseType | null,
1005 trailer?: Metadata,
1006 flags?: number
1007 ) => {
1008 call.sendUnaryMessage(err, value, trailer, flags);
1009 }
1010 );
1011 });
1012}
1013
1014function handleClientStreaming<RequestType, ResponseType>(
1015 call: Http2ServerCallStream<RequestType, ResponseType>,
1016 handler: ClientStreamingHandler<RequestType, ResponseType>,
1017 metadata: Metadata,
1018 encoding: string
1019): void {
1020 const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
1021 call,
1022 metadata,
1023 handler.deserialize,
1024 encoding
1025 );
1026
1027 function respond(
1028 err: ServerErrorResponse | ServerStatusResponse | null,
1029 value?: ResponseType | null,
1030 trailer?: Metadata,
1031 flags?: number
1032 ) {
1033 stream.destroy();
1034 call.sendUnaryMessage(err, value, trailer, flags);
1035 }
1036
1037 if (call.cancelled) {
1038 return;
1039 }
1040
1041 stream.on('error', respond);
1042 handler.func(stream, respond);
1043}
1044
1045function handleServerStreaming<RequestType, ResponseType>(
1046 call: Http2ServerCallStream<RequestType, ResponseType>,
1047 handler: ServerStreamingHandler<RequestType, ResponseType>,
1048 metadata: Metadata,
1049 encoding: string
1050): void {
1051 call.receiveUnaryMessage(encoding, (err, request) => {
1052 if (err) {
1053 call.sendError(err)
1054 return
1055 }
1056
1057 if (request === undefined || call.cancelled) {
1058 return;
1059 }
1060
1061 const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
1062 call,
1063 metadata,
1064 handler.serialize,
1065 request
1066 );
1067
1068 handler.func(stream);
1069 });
1070}
1071
1072function handleBidiStreaming<RequestType, ResponseType>(
1073 call: Http2ServerCallStream<RequestType, ResponseType>,
1074 handler: BidiStreamingHandler<RequestType, ResponseType>,
1075 metadata: Metadata,
1076 encoding: string
1077): void {
1078 const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
1079 call,
1080 metadata,
1081 handler.serialize,
1082 handler.deserialize,
1083 encoding
1084 );
1085
1086 if (call.cancelled) {
1087 return;
1088 }
1089
1090 handler.func(stream);
1091}