UNPKG

32.4 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import { AddressInfo } from 'net';
20
21import { ServiceError } from './call';
22import { Status, LogVerbosity } from './constants';
23import { Deserialize, Serialize, ServiceDefinition } from './make-client';
24import { Metadata } from './metadata';
25import {
26 BidiStreamingHandler,
27 ClientStreamingHandler,
28 HandleCall,
29 Handler,
30 HandlerType,
31 Http2ServerCallStream,
32 sendUnaryData,
33 ServerDuplexStream,
34 ServerDuplexStreamImpl,
35 ServerReadableStream,
36 ServerReadableStreamImpl,
37 ServerStreamingHandler,
38 ServerUnaryCall,
39 ServerUnaryCallImpl,
40 ServerWritableStream,
41 ServerWritableStreamImpl,
42 UnaryHandler,
43 ServerErrorResponse,
44 ServerStatusResponse,
45} from './server-call';
46import { ServerCredentials } from './server-credentials';
47import { ChannelOptions } from './channel-options';
48import {
49 createResolver,
50 ResolverListener,
51 mapUriDefaultScheme,
52} from './resolver';
53import * as logging from './logging';
54import {
55 SubchannelAddress,
56 TcpSubchannelAddress,
57 isTcpSubchannelAddress,
58 subchannelAddressToString,
59 stringToSubchannelAddress,
60} from './subchannel-address';
61import { parseUri } from './uri-parser';
62import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
63import { CipherNameAndProtocol, TLSSocket } from 'tls';
64
65const TRACER_NAME = 'server';
66
67interface BindResult {
68 port: number;
69 count: number;
70}
71
72function noop(): void {}
73
74function getUnimplementedStatusResponse(
75 methodName: string
76): Partial<ServiceError> {
77 return {
78 code: Status.UNIMPLEMENTED,
79 details: `The server does not implement the method ${methodName}`,
80 metadata: new Metadata(),
81 };
82}
83
84/* eslint-disable @typescript-eslint/no-explicit-any */
85type UntypedUnaryHandler = UnaryHandler<any, any>;
86type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
87type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
88type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
89export type UntypedHandleCall = HandleCall<any, any>;
90type UntypedHandler = Handler<any, any>;
91export interface UntypedServiceImplementation {
92 [name: string]: UntypedHandleCall;
93}
94
95function getDefaultHandler(handlerType: HandlerType, methodName: string) {
96 const unimplementedStatusResponse = getUnimplementedStatusResponse(
97 methodName
98 );
99 switch (handlerType) {
100 case 'unary':
101 return (
102 call: ServerUnaryCall<any, any>,
103 callback: sendUnaryData<any>
104 ) => {
105 callback(unimplementedStatusResponse as ServiceError, null);
106 };
107 case 'clientStream':
108 return (
109 call: ServerReadableStream<any, any>,
110 callback: sendUnaryData<any>
111 ) => {
112 callback(unimplementedStatusResponse as ServiceError, null);
113 };
114 case 'serverStream':
115 return (call: ServerWritableStream<any, any>) => {
116 call.emit('error', unimplementedStatusResponse);
117 };
118 case 'bidi':
119 return (call: ServerDuplexStream<any, any>) => {
120 call.emit('error', unimplementedStatusResponse);
121 };
122 default:
123 throw new Error(`Invalid handlerType ${handlerType}`);
124 }
125}
126
127interface ChannelzSessionInfo {
128 ref: SocketRef;
129 streamTracker: ChannelzCallTracker;
130 messagesSent: number;
131 messagesReceived: number;
132 lastMessageSentTimestamp: Date | null;
133 lastMessageReceivedTimestamp: Date | null;
134}
135
136interface ChannelzListenerInfo {
137 ref: SocketRef;
138}
139
140export class Server {
141 private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = [];
142
143 private handlers: Map<string, UntypedHandler> = new Map<
144 string,
145 UntypedHandler
146 >();
147 private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
148 private started = false;
149 private options: ChannelOptions;
150
151 // Channelz Info
152 private readonly channelzEnabled: boolean = true;
153 private channelzRef: ServerRef;
154 private channelzTrace = new ChannelzTrace();
155 private callTracker = new ChannelzCallTracker();
156 private listenerChildrenTracker = new ChannelzChildrenTracker();
157 private sessionChildrenTracker = new ChannelzChildrenTracker();
158
159 constructor(options?: ChannelOptions) {
160 this.options = options ?? {};
161 if (this.options['grpc.enable_channelz'] === 0) {
162 this.channelzEnabled = false;
163 }
164 this.channelzRef = registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
165 if (this.channelzEnabled) {
166 this.channelzTrace.addTrace('CT_INFO', 'Server created');
167 }
168 this.trace('Server constructed');
169 }
170
171 private getChannelzInfo(): ServerInfo {
172 return {
173 trace: this.channelzTrace,
174 callTracker: this.callTracker,
175 listenerChildren: this.listenerChildrenTracker.getChildLists(),
176 sessionChildren: this.sessionChildrenTracker.getChildLists()
177 };
178 }
179
180 private getChannelzSessionInfoGetter(session: http2.ServerHttp2Session): () => SocketInfo {
181 return () => {
182 const sessionInfo = this.sessions.get(session)!;
183 const sessionSocket = session.socket;
184 const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
185 const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress!, sessionSocket.localPort) : null;
186 let tlsInfo: TlsInfo | null;
187 if (session.encrypted) {
188 const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
189 const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
190 const certificate = tlsSocket.getCertificate();
191 const peerCertificate = tlsSocket.getPeerCertificate();
192 tlsInfo = {
193 cipherSuiteStandardName: cipherInfo.standardName ?? null,
194 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
195 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
196 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
197 };
198 } else {
199 tlsInfo = null;
200 }
201 const socketInfo: SocketInfo = {
202 remoteAddress: remoteAddress,
203 localAddress: localAddress,
204 security: tlsInfo,
205 remoteName: null,
206 streamsStarted: sessionInfo.streamTracker.callsStarted,
207 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
208 streamsFailed: sessionInfo.streamTracker.callsFailed,
209 messagesSent: sessionInfo.messagesSent,
210 messagesReceived: sessionInfo.messagesReceived,
211 keepAlivesSent: 0,
212 lastLocalStreamCreatedTimestamp: null,
213 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
214 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
215 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
216 localFlowControlWindow: session.state.localWindowSize ?? null,
217 remoteFlowControlWindow: session.state.remoteWindowSize ?? null
218 };
219 return socketInfo;
220 };
221 }
222
223 private trace(text: string): void {
224 logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
225 }
226
227
228 addProtoService(): never {
229 throw new Error('Not implemented. Use addService() instead');
230 }
231
232 addService(
233 service: ServiceDefinition,
234 implementation: UntypedServiceImplementation
235 ): void {
236 if (
237 service === null ||
238 typeof service !== 'object' ||
239 implementation === null ||
240 typeof implementation !== 'object'
241 ) {
242 throw new Error('addService() requires two objects as arguments');
243 }
244
245 const serviceKeys = Object.keys(service);
246
247 if (serviceKeys.length === 0) {
248 throw new Error('Cannot add an empty service to a server');
249 }
250
251 serviceKeys.forEach((name) => {
252 const attrs = service[name];
253 let methodType: HandlerType;
254
255 if (attrs.requestStream) {
256 if (attrs.responseStream) {
257 methodType = 'bidi';
258 } else {
259 methodType = 'clientStream';
260 }
261 } else {
262 if (attrs.responseStream) {
263 methodType = 'serverStream';
264 } else {
265 methodType = 'unary';
266 }
267 }
268
269 let implFn = implementation[name];
270 let impl;
271
272 if (implFn === undefined && typeof attrs.originalName === 'string') {
273 implFn = implementation[attrs.originalName];
274 }
275
276 if (implFn !== undefined) {
277 impl = implFn.bind(implementation);
278 } else {
279 impl = getDefaultHandler(methodType, name);
280 }
281
282 const success = this.register(
283 attrs.path,
284 impl as UntypedHandleCall,
285 attrs.responseSerialize,
286 attrs.requestDeserialize,
287 methodType
288 );
289
290 if (success === false) {
291 throw new Error(`Method handler for ${attrs.path} already provided.`);
292 }
293 });
294 }
295
296 removeService(service: ServiceDefinition): void {
297 if (service === null || typeof service !== 'object') {
298 throw new Error('removeService() requires object as argument');
299 }
300
301 const serviceKeys = Object.keys(service);
302 serviceKeys.forEach((name) => {
303 const attrs = service[name];
304 this.unregister(attrs.path);
305 });
306 }
307
308 bind(port: string, creds: ServerCredentials): never {
309 throw new Error('Not implemented. Use bindAsync() instead');
310 }
311
312 bindAsync(
313 port: string,
314 creds: ServerCredentials,
315 callback: (error: Error | null, port: number) => void
316 ): void {
317 if (this.started === true) {
318 throw new Error('server is already started');
319 }
320
321 if (typeof port !== 'string') {
322 throw new TypeError('port must be a string');
323 }
324
325 if (creds === null || !(creds instanceof ServerCredentials)) {
326 throw new TypeError('creds must be a ServerCredentials object');
327 }
328
329 if (typeof callback !== 'function') {
330 throw new TypeError('callback must be a function');
331 }
332
333 const initialPortUri = parseUri(port);
334 if (initialPortUri === null) {
335 throw new Error(`Could not parse port "${port}"`);
336 }
337 const portUri = mapUriDefaultScheme(initialPortUri);
338 if (portUri === null) {
339 throw new Error(`Could not get a default scheme for port "${port}"`);
340 }
341
342 const serverOptions: http2.ServerOptions = {
343 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
344 };
345 if ('grpc-node.max_session_memory' in this.options) {
346 serverOptions.maxSessionMemory = this.options[
347 'grpc-node.max_session_memory'
348 ];
349 }
350 if ('grpc.max_concurrent_streams' in this.options) {
351 serverOptions.settings = {
352 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
353 };
354 }
355
356 const deferredCallback = (error: Error | null, port: number) => {
357 process.nextTick(() => callback(error, port));
358 }
359
360 const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
361 let http2Server: http2.Http2Server | http2.Http2SecureServer;
362 if (creds._isSecure()) {
363 const secureServerOptions = Object.assign(
364 serverOptions,
365 creds._getSettings()!
366 );
367 http2Server = http2.createSecureServer(secureServerOptions);
368 http2Server.on('secureConnection', (socket: TLSSocket) => {
369 /* These errors need to be handled by the user of Http2SecureServer,
370 * according to https://github.com/nodejs/node/issues/35824 */
371 socket.on('error', (e: Error) => {
372 this.trace('An incoming TLS connection closed with error: ' + e.message);
373 });
374 });
375 } else {
376 http2Server = http2.createServer(serverOptions);
377 }
378
379 http2Server.setTimeout(0, noop);
380 this._setupHandlers(http2Server);
381 return http2Server;
382 };
383
384 const bindSpecificPort = (
385 addressList: SubchannelAddress[],
386 portNum: number,
387 previousCount: number
388 ): Promise<BindResult> => {
389 if (addressList.length === 0) {
390 return Promise.resolve({ port: portNum, count: previousCount });
391 }
392 return Promise.all(
393 addressList.map((address) => {
394 this.trace('Attempting to bind ' + subchannelAddressToString(address));
395 let addr: SubchannelAddress;
396 if (isTcpSubchannelAddress(address)) {
397 addr = {
398 host: (address as TcpSubchannelAddress).host,
399 port: portNum,
400 };
401 } else {
402 addr = address;
403 }
404
405 const http2Server = setupServer();
406 return new Promise<number | Error>((resolve, reject) => {
407 const onError = (err: Error) => {
408 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
409 resolve(err);
410 }
411
412 http2Server.once('error', onError);
413
414 http2Server.listen(addr, () => {
415 const boundAddress = http2Server.address()!;
416 let boundSubchannelAddress: SubchannelAddress;
417 if (typeof boundAddress === 'string') {
418 boundSubchannelAddress = {
419 path: boundAddress
420 };
421 } else {
422 boundSubchannelAddress = {
423 host: boundAddress.address,
424 port: boundAddress.port
425 }
426 }
427 let channelzRef: SocketRef;
428 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
429 return {
430 localAddress: boundSubchannelAddress,
431 remoteAddress: null,
432 security: null,
433 remoteName: null,
434 streamsStarted: 0,
435 streamsSucceeded: 0,
436 streamsFailed: 0,
437 messagesSent: 0,
438 messagesReceived: 0,
439 keepAlivesSent: 0,
440 lastLocalStreamCreatedTimestamp: null,
441 lastRemoteStreamCreatedTimestamp: null,
442 lastMessageSentTimestamp: null,
443 lastMessageReceivedTimestamp: null,
444 localFlowControlWindow: null,
445 remoteFlowControlWindow: null
446 };
447 }, this.channelzEnabled);
448 if (this.channelzEnabled) {
449 this.listenerChildrenTracker.refChild(channelzRef);
450 }
451 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
452 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
453 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
454 http2Server.removeListener('error', onError);
455 });
456 });
457 })
458 ).then((results) => {
459 let count = 0;
460 for (const result of results) {
461 if (typeof result === 'number') {
462 count += 1;
463 if (result !== portNum) {
464 throw new Error(
465 'Invalid state: multiple port numbers added from single address'
466 );
467 }
468 }
469 }
470 return {
471 port: portNum,
472 count: count + previousCount,
473 };
474 });
475 };
476
477 const bindWildcardPort = (
478 addressList: SubchannelAddress[]
479 ): Promise<BindResult> => {
480 if (addressList.length === 0) {
481 return Promise.resolve<BindResult>({ port: 0, count: 0 });
482 }
483 const address = addressList[0];
484 const http2Server = setupServer();
485 return new Promise<BindResult>((resolve, reject) => {
486 const onError = (err: Error) => {
487 this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message);
488 resolve(bindWildcardPort(addressList.slice(1)));
489 }
490
491 http2Server.once('error', onError);
492
493 http2Server.listen(address, () => {
494 const boundAddress = http2Server.address() as AddressInfo;
495 const boundSubchannelAddress: SubchannelAddress = {
496 host: boundAddress.address,
497 port: boundAddress.port
498 };
499 let channelzRef: SocketRef;
500 channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => {
501 return {
502 localAddress: boundSubchannelAddress,
503 remoteAddress: null,
504 security: null,
505 remoteName: null,
506 streamsStarted: 0,
507 streamsSucceeded: 0,
508 streamsFailed: 0,
509 messagesSent: 0,
510 messagesReceived: 0,
511 keepAlivesSent: 0,
512 lastLocalStreamCreatedTimestamp: null,
513 lastRemoteStreamCreatedTimestamp: null,
514 lastMessageSentTimestamp: null,
515 lastMessageReceivedTimestamp: null,
516 localFlowControlWindow: null,
517 remoteFlowControlWindow: null
518 };
519 }, this.channelzEnabled);
520 if (this.channelzEnabled) {
521 this.listenerChildrenTracker.refChild(channelzRef);
522 }
523 this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
524 this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
525 resolve(
526 bindSpecificPort(
527 addressList.slice(1),
528 boundAddress.port,
529 1
530 )
531 );
532 http2Server.removeListener('error', onError);
533 });
534 });
535 };
536
537 const resolverListener: ResolverListener = {
538 onSuccessfulResolution: (
539 addressList,
540 serviceConfig,
541 serviceConfigError
542 ) => {
543 // We only want one resolution result. Discard all future results
544 resolverListener.onSuccessfulResolution = () => {};
545 if (addressList.length === 0) {
546 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
547 return;
548 }
549 let bindResultPromise: Promise<BindResult>;
550 if (isTcpSubchannelAddress(addressList[0])) {
551 if (addressList[0].port === 0) {
552 bindResultPromise = bindWildcardPort(addressList);
553 } else {
554 bindResultPromise = bindSpecificPort(
555 addressList,
556 addressList[0].port,
557 0
558 );
559 }
560 } else {
561 // Use an arbitrary non-zero port for non-TCP addresses
562 bindResultPromise = bindSpecificPort(addressList, 1, 0);
563 }
564 bindResultPromise.then(
565 (bindResult) => {
566 if (bindResult.count === 0) {
567 const errorString = `No address added out of total ${addressList.length} resolved`;
568 logging.log(LogVerbosity.ERROR, errorString);
569 deferredCallback(new Error(errorString), 0);
570 } else {
571 if (bindResult.count < addressList.length) {
572 logging.log(
573 LogVerbosity.INFO,
574 `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
575 );
576 }
577 deferredCallback(null, bindResult.port);
578 }
579 },
580 (error) => {
581 const errorString = `No address added out of total ${addressList.length} resolved`;
582 logging.log(LogVerbosity.ERROR, errorString);
583 deferredCallback(new Error(errorString), 0);
584 }
585 );
586 },
587 onError: (error) => {
588 deferredCallback(new Error(error.details), 0);
589 },
590 };
591
592 const resolver = createResolver(portUri, resolverListener, this.options);
593 resolver.updateResolution();
594 }
595
596 forceShutdown(): void {
597 // Close the server if it is still running.
598
599 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
600 if (http2Server.listening) {
601 http2Server.close(() => {
602 if (this.channelzEnabled) {
603 this.listenerChildrenTracker.unrefChild(ref);
604 unregisterChannelzRef(ref);
605 }
606 });
607 }
608 }
609
610 this.started = false;
611
612 // Always destroy any available sessions. It's possible that one or more
613 // tryShutdown() calls are in progress. Don't wait on them to finish.
614 this.sessions.forEach((channelzInfo, session) => {
615 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
616 // recognize destroy(code) as a valid signature.
617 // eslint-disable-next-line @typescript-eslint/no-explicit-any
618 session.destroy(http2.constants.NGHTTP2_CANCEL as any);
619 });
620 this.sessions.clear();
621 if (this.channelzEnabled) {
622 unregisterChannelzRef(this.channelzRef);
623 }
624 }
625
626 register<RequestType, ResponseType>(
627 name: string,
628 handler: HandleCall<RequestType, ResponseType>,
629 serialize: Serialize<ResponseType>,
630 deserialize: Deserialize<RequestType>,
631 type: string
632 ): boolean {
633 if (this.handlers.has(name)) {
634 return false;
635 }
636
637 this.handlers.set(name, {
638 func: handler,
639 serialize,
640 deserialize,
641 type,
642 path: name,
643 } as UntypedHandler);
644 return true;
645 }
646
647 unregister(name: string): boolean {
648 return this.handlers.delete(name);
649 }
650
651 start(): void {
652 if (
653 this.http2ServerList.length === 0 ||
654 this.http2ServerList.every(
655 ({server: http2Server}) => http2Server.listening !== true
656 )
657 ) {
658 throw new Error('server must be bound in order to start');
659 }
660
661 if (this.started === true) {
662 throw new Error('server is already started');
663 }
664 if (this.channelzEnabled) {
665 this.channelzTrace.addTrace('CT_INFO', 'Starting');
666 }
667 this.started = true;
668 }
669
670 tryShutdown(callback: (error?: Error) => void): void {
671 const wrappedCallback = (error?: Error) => {
672 if (this.channelzEnabled) {
673 unregisterChannelzRef(this.channelzRef);
674 }
675 callback(error);
676 };
677 let pendingChecks = 0;
678
679 function maybeCallback(): void {
680 pendingChecks--;
681
682 if (pendingChecks === 0) {
683 wrappedCallback();
684 }
685 }
686
687 // Close the server if necessary.
688 this.started = false;
689
690 for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) {
691 if (http2Server.listening) {
692 pendingChecks++;
693 http2Server.close(() => {
694 if (this.channelzEnabled) {
695 this.listenerChildrenTracker.unrefChild(ref);
696 unregisterChannelzRef(ref);
697 }
698 maybeCallback();
699 });
700 }
701 }
702
703 this.sessions.forEach((channelzInfo, session) => {
704 if (!session.closed) {
705 pendingChecks += 1;
706 session.close(maybeCallback);
707 }
708 });
709 if (pendingChecks === 0) {
710 wrappedCallback();
711 }
712 }
713
714 addHttp2Port(): never {
715 throw new Error('Not yet implemented');
716 }
717
718 /**
719 * Get the channelz reference object for this server. The returned value is
720 * garbage if channelz is disabled for this server.
721 * @returns
722 */
723 getChannelzRef() {
724 return this.channelzRef;
725 }
726
727 private _setupHandlers(
728 http2Server: http2.Http2Server | http2.Http2SecureServer
729 ): void {
730 if (http2Server === null) {
731 return;
732 }
733
734 http2Server.on(
735 'stream',
736 (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
737 const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
738 if (this.channelzEnabled) {
739 this.callTracker.addCallStarted();
740 channelzSessionInfo?.streamTracker.addCallStarted();
741 }
742 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
743
744 if (
745 typeof contentType !== 'string' ||
746 !contentType.startsWith('application/grpc')
747 ) {
748 stream.respond(
749 {
750 [http2.constants.HTTP2_HEADER_STATUS]:
751 http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
752 },
753 { endStream: true }
754 );
755 this.callTracker.addCallFailed();
756 if (this.channelzEnabled) {
757 channelzSessionInfo?.streamTracker.addCallFailed();
758 }
759 return;
760 }
761
762 let call: Http2ServerCallStream<any, any> | null = null;
763
764 try {
765 const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
766 const serverAddress = http2Server.address();
767 let serverAddressString = 'null';
768 if (serverAddress) {
769 if (typeof serverAddress === 'string') {
770 serverAddressString = serverAddress;
771 } else {
772 serverAddressString =
773 serverAddress.address + ':' + serverAddress.port;
774 }
775 }
776 this.trace(
777 'Received call to method ' +
778 path +
779 ' at address ' +
780 serverAddressString
781 );
782 const handler = this.handlers.get(path);
783
784 if (handler === undefined) {
785 this.trace(
786 'No handler registered for method ' +
787 path +
788 '. Sending UNIMPLEMENTED status.'
789 );
790 throw getUnimplementedStatusResponse(path);
791 }
792
793 call = new Http2ServerCallStream(stream, handler, this.options);
794 call.once('callEnd', (code: Status) => {
795 if (code === Status.OK) {
796 this.callTracker.addCallSucceeded();
797 } else {
798 this.callTracker.addCallFailed();
799 }
800 });
801 if (this.channelzEnabled && channelzSessionInfo) {
802 call.once('streamEnd', (success: boolean) => {
803 if (success) {
804 channelzSessionInfo.streamTracker.addCallSucceeded();
805 } else {
806 channelzSessionInfo.streamTracker.addCallFailed();
807 }
808 });
809 call.on('sendMessage', () => {
810 channelzSessionInfo.messagesSent += 1;
811 channelzSessionInfo.lastMessageSentTimestamp = new Date();
812 });
813 call.on('receiveMessage', () => {
814 channelzSessionInfo.messagesReceived += 1;
815 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
816 });
817 }
818 const metadata = call.receiveMetadata(headers);
819 const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
820 metadata.remove('grpc-encoding');
821
822 switch (handler.type) {
823 case 'unary':
824 handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
825 break;
826 case 'clientStream':
827 handleClientStreaming(
828 call,
829 handler as UntypedClientStreamingHandler,
830 metadata,
831 encoding
832 );
833 break;
834 case 'serverStream':
835 handleServerStreaming(
836 call,
837 handler as UntypedServerStreamingHandler,
838 metadata,
839 encoding
840 );
841 break;
842 case 'bidi':
843 handleBidiStreaming(
844 call,
845 handler as UntypedBidiStreamingHandler,
846 metadata,
847 encoding
848 );
849 break;
850 default:
851 throw new Error(`Unknown handler type: ${handler.type}`);
852 }
853 } catch (err) {
854 if (!call) {
855 call = new Http2ServerCallStream(stream, null!, this.options);
856 if (this.channelzEnabled) {
857 this.callTracker.addCallFailed();
858 channelzSessionInfo?.streamTracker.addCallFailed()
859 }
860 }
861
862 if (err.code === undefined) {
863 err.code = Status.INTERNAL;
864 }
865
866 call.sendError(err);
867 }
868 }
869 );
870
871 http2Server.on('session', (session) => {
872 if (!this.started) {
873 session.destroy();
874 return;
875 }
876
877 let channelzRef: SocketRef;
878 channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
879
880 const channelzSessionInfo: ChannelzSessionInfo = {
881 ref: channelzRef,
882 streamTracker: new ChannelzCallTracker(),
883 messagesSent: 0,
884 messagesReceived: 0,
885 lastMessageSentTimestamp: null,
886 lastMessageReceivedTimestamp: null
887 };
888
889 this.sessions.set(session, channelzSessionInfo);
890 const clientAddress = session.socket.remoteAddress;
891 if (this.channelzEnabled) {
892 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
893 this.sessionChildrenTracker.refChild(channelzRef);
894 }
895 session.on('close', () => {
896 if (this.channelzEnabled) {
897 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
898 this.sessionChildrenTracker.unrefChild(channelzRef);
899 unregisterChannelzRef(channelzRef);
900 }
901 this.sessions.delete(session);
902 });
903 });
904 }
905}
906
907async function handleUnary<RequestType, ResponseType>(
908 call: Http2ServerCallStream<RequestType, ResponseType>,
909 handler: UnaryHandler<RequestType, ResponseType>,
910 metadata: Metadata,
911 encoding: string
912): Promise<void> {
913 const request = await call.receiveUnaryMessage(encoding);
914
915 if (request === undefined || call.cancelled) {
916 return;
917 }
918
919 const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
920 call,
921 metadata,
922 request
923 );
924
925 handler.func(
926 emitter,
927 (
928 err: ServerErrorResponse | ServerStatusResponse | null,
929 value?: ResponseType | null,
930 trailer?: Metadata,
931 flags?: number
932 ) => {
933 call.sendUnaryMessage(err, value, trailer, flags);
934 }
935 );
936}
937
938function handleClientStreaming<RequestType, ResponseType>(
939 call: Http2ServerCallStream<RequestType, ResponseType>,
940 handler: ClientStreamingHandler<RequestType, ResponseType>,
941 metadata: Metadata,
942 encoding: string
943): void {
944 const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
945 call,
946 metadata,
947 handler.deserialize,
948 encoding
949 );
950
951 function respond(
952 err: ServerErrorResponse | ServerStatusResponse | null,
953 value?: ResponseType | null,
954 trailer?: Metadata,
955 flags?: number
956 ) {
957 stream.destroy();
958 call.sendUnaryMessage(err, value, trailer, flags);
959 }
960
961 if (call.cancelled) {
962 return;
963 }
964
965 stream.on('error', respond);
966 handler.func(stream, respond);
967}
968
969async function handleServerStreaming<RequestType, ResponseType>(
970 call: Http2ServerCallStream<RequestType, ResponseType>,
971 handler: ServerStreamingHandler<RequestType, ResponseType>,
972 metadata: Metadata,
973 encoding: string
974): Promise<void> {
975 const request = await call.receiveUnaryMessage(encoding);
976
977 if (request === undefined || call.cancelled) {
978 return;
979 }
980
981 const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
982 call,
983 metadata,
984 handler.serialize,
985 request
986 );
987
988 handler.func(stream);
989}
990
991function handleBidiStreaming<RequestType, ResponseType>(
992 call: Http2ServerCallStream<RequestType, ResponseType>,
993 handler: BidiStreamingHandler<RequestType, ResponseType>,
994 metadata: Metadata,
995 encoding: string
996): void {
997 const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
998 call,
999 metadata,
1000 handler.serialize,
1001 handler.deserialize,
1002 encoding
1003 );
1004
1005 if (call.cancelled) {
1006 return;
1007 }
1008
1009 handler.func(stream);
1010}