1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import * as util from 'util';
|
20 |
|
21 | import { ServiceError } from './call';
|
22 | import { Status, LogVerbosity } from './constants';
|
23 | import { Deserialize, Serialize, ServiceDefinition } from './make-client';
|
24 | import { Metadata } from './metadata';
|
25 | import {
|
26 | BidiStreamingHandler,
|
27 | ClientStreamingHandler,
|
28 | HandleCall,
|
29 | Handler,
|
30 | HandlerType,
|
31 | sendUnaryData,
|
32 | ServerDuplexStream,
|
33 | ServerDuplexStreamImpl,
|
34 | ServerReadableStream,
|
35 | ServerStreamingHandler,
|
36 | ServerUnaryCall,
|
37 | ServerWritableStream,
|
38 | ServerWritableStreamImpl,
|
39 | UnaryHandler,
|
40 | ServerErrorResponse,
|
41 | ServerStatusResponse,
|
42 | serverErrorToStatus,
|
43 | } from './server-call';
|
44 | import { ServerCredentials } from './server-credentials';
|
45 | import { ChannelOptions } from './channel-options';
|
46 | import {
|
47 | createResolver,
|
48 | ResolverListener,
|
49 | mapUriDefaultScheme,
|
50 | } from './resolver';
|
51 | import * as logging from './logging';
|
52 | import {
|
53 | SubchannelAddress,
|
54 | isTcpSubchannelAddress,
|
55 | subchannelAddressToString,
|
56 | stringToSubchannelAddress,
|
57 | } from './subchannel-address';
|
58 | import {
|
59 | GrpcUri,
|
60 | combineHostPort,
|
61 | parseUri,
|
62 | splitHostPort,
|
63 | uriToString,
|
64 | } from './uri-parser';
|
65 | import {
|
66 | ChannelzCallTracker,
|
67 | ChannelzCallTrackerStub,
|
68 | ChannelzChildrenTracker,
|
69 | ChannelzChildrenTrackerStub,
|
70 | ChannelzTrace,
|
71 | ChannelzTraceStub,
|
72 | registerChannelzServer,
|
73 | registerChannelzSocket,
|
74 | ServerInfo,
|
75 | ServerRef,
|
76 | SocketInfo,
|
77 | SocketRef,
|
78 | TlsInfo,
|
79 | unregisterChannelzRef,
|
80 | } from './channelz';
|
81 | import { CipherNameAndProtocol, TLSSocket } from 'tls';
|
82 | import {
|
83 | ServerInterceptingCallInterface,
|
84 | ServerInterceptor,
|
85 | getServerInterceptingCall,
|
86 | } from './server-interceptors';
|
87 | import { PartialStatusObject } from './call-interface';
|
88 | import { CallEventTracker } from './transport';
|
89 |
|
90 | const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
|
91 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
92 | const KEEPALIVE_TIMEOUT_MS = 20000;
|
93 | const MAX_CONNECTION_IDLE_MS = ~(1 << 31);
|
94 |
|
95 | const { HTTP2_HEADER_PATH } = http2.constants;
|
96 |
|
97 | const TRACER_NAME = 'server';
|
98 | const kMaxAge = Buffer.from('max_age');
|
99 |
|
100 | type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
|
101 |
|
102 | interface BindResult {
|
103 | port: number;
|
104 | count: number;
|
105 | errors: string[];
|
106 | }
|
107 |
|
108 | interface SingleAddressBindResult {
|
109 | port: number;
|
110 | error?: string;
|
111 | }
|
112 |
|
113 | function noop(): void {}
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 |
|
120 | function deprecate(message: string) {
|
121 | return function <This, Args extends any[], Return>(
|
122 | target: (this: This, ...args: Args) => Return,
|
123 | context: ClassMethodDecoratorContext<
|
124 | This,
|
125 | (this: This, ...args: Args) => Return
|
126 | >
|
127 | ) {
|
128 | return util.deprecate(target, message);
|
129 | };
|
130 | }
|
131 |
|
132 | function getUnimplementedStatusResponse(
|
133 | methodName: string
|
134 | ): PartialStatusObject {
|
135 | return {
|
136 | code: Status.UNIMPLEMENTED,
|
137 | details: `The server does not implement the method ${methodName}`,
|
138 | };
|
139 | }
|
140 |
|
141 |
|
142 | type UntypedUnaryHandler = UnaryHandler<any, any>;
|
143 | type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
|
144 | type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
|
145 | type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
|
146 | export type UntypedHandleCall = HandleCall<any, any>;
|
147 | type UntypedHandler = Handler<any, any>;
|
148 | export interface UntypedServiceImplementation {
|
149 | [name: string]: UntypedHandleCall;
|
150 | }
|
151 |
|
152 | function getDefaultHandler(handlerType: HandlerType, methodName: string) {
|
153 | const unimplementedStatusResponse =
|
154 | getUnimplementedStatusResponse(methodName);
|
155 | switch (handlerType) {
|
156 | case 'unary':
|
157 | return (
|
158 | call: ServerUnaryCall<any, any>,
|
159 | callback: sendUnaryData<any>
|
160 | ) => {
|
161 | callback(unimplementedStatusResponse as ServiceError, null);
|
162 | };
|
163 | case 'clientStream':
|
164 | return (
|
165 | call: ServerReadableStream<any, any>,
|
166 | callback: sendUnaryData<any>
|
167 | ) => {
|
168 | callback(unimplementedStatusResponse as ServiceError, null);
|
169 | };
|
170 | case 'serverStream':
|
171 | return (call: ServerWritableStream<any, any>) => {
|
172 | call.emit('error', unimplementedStatusResponse);
|
173 | };
|
174 | case 'bidi':
|
175 | return (call: ServerDuplexStream<any, any>) => {
|
176 | call.emit('error', unimplementedStatusResponse);
|
177 | };
|
178 | default:
|
179 | throw new Error(`Invalid handlerType ${handlerType}`);
|
180 | }
|
181 | }
|
182 |
|
183 | interface ChannelzSessionInfo {
|
184 | ref: SocketRef;
|
185 | streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
|
186 | messagesSent: number;
|
187 | messagesReceived: number;
|
188 | keepAlivesSent: number;
|
189 | lastMessageSentTimestamp: Date | null;
|
190 | lastMessageReceivedTimestamp: Date | null;
|
191 | }
|
192 |
|
193 |
|
194 |
|
195 |
|
196 |
|
197 |
|
198 |
|
199 |
|
200 | interface BoundPort {
|
201 | |
202 |
|
203 |
|
204 | mapKey: string;
|
205 | |
206 |
|
207 |
|
208 |
|
209 | originalUri: GrpcUri;
|
210 | |
211 |
|
212 |
|
213 |
|
214 |
|
215 | completionPromise: Promise<number> | null;
|
216 | |
217 |
|
218 |
|
219 |
|
220 | portNumber: number;
|
221 | |
222 |
|
223 |
|
224 | cancelled: boolean;
|
225 | |
226 |
|
227 |
|
228 | credentials: ServerCredentials;
|
229 | |
230 |
|
231 |
|
232 |
|
233 |
|
234 | listeningServers: Set<AnyHttp2Server>;
|
235 | }
|
236 |
|
237 |
|
238 |
|
239 |
|
240 | interface Http2ServerInfo {
|
241 | channelzRef: SocketRef;
|
242 | sessions: Set<http2.ServerHttp2Session>;
|
243 | }
|
244 |
|
245 | interface SessionIdleTimeoutTracker {
|
246 | activeStreams: number;
|
247 | lastIdle: number;
|
248 | timeout: NodeJS.Timeout;
|
249 | onClose: (session: http2.ServerHttp2Session) => void | null;
|
250 | }
|
251 |
|
252 | export interface ServerOptions extends ChannelOptions {
|
253 | interceptors?: ServerInterceptor[];
|
254 | }
|
255 |
|
256 | export class Server {
|
257 | private boundPorts: Map<string, BoundPort> = new Map();
|
258 | private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
|
259 | private sessionIdleTimeouts = new Map<
|
260 | http2.ServerHttp2Session,
|
261 | SessionIdleTimeoutTracker
|
262 | >();
|
263 |
|
264 | private handlers: Map<string, UntypedHandler> = new Map<
|
265 | string,
|
266 | UntypedHandler
|
267 | >();
|
268 | private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
|
269 | |
270 |
|
271 |
|
272 |
|
273 | private started = false;
|
274 | private shutdown = false;
|
275 | private options: ServerOptions;
|
276 | private serverAddressString = 'null';
|
277 |
|
278 |
|
279 | private readonly channelzEnabled: boolean = true;
|
280 | private channelzRef: ServerRef;
|
281 | private channelzTrace: ChannelzTrace | ChannelzTraceStub;
|
282 | private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
|
283 | private listenerChildrenTracker:
|
284 | | ChannelzChildrenTracker
|
285 | | ChannelzChildrenTrackerStub;
|
286 | private sessionChildrenTracker:
|
287 | | ChannelzChildrenTracker
|
288 | | ChannelzChildrenTrackerStub;
|
289 |
|
290 | private readonly maxConnectionAgeMs: number;
|
291 | private readonly maxConnectionAgeGraceMs: number;
|
292 |
|
293 | private readonly keepaliveTimeMs: number;
|
294 | private readonly keepaliveTimeoutMs: number;
|
295 |
|
296 | private readonly sessionIdleTimeout: number;
|
297 |
|
298 | private readonly interceptors: ServerInterceptor[];
|
299 |
|
300 | |
301 |
|
302 |
|
303 |
|
304 | private commonServerOptions: http2.ServerOptions;
|
305 |
|
306 | constructor(options?: ServerOptions) {
|
307 | this.options = options ?? {};
|
308 | if (this.options['grpc.enable_channelz'] === 0) {
|
309 | this.channelzEnabled = false;
|
310 | this.channelzTrace = new ChannelzTraceStub();
|
311 | this.callTracker = new ChannelzCallTrackerStub();
|
312 | this.listenerChildrenTracker = new ChannelzChildrenTrackerStub();
|
313 | this.sessionChildrenTracker = new ChannelzChildrenTrackerStub();
|
314 | } else {
|
315 | this.channelzTrace = new ChannelzTrace();
|
316 | this.callTracker = new ChannelzCallTracker();
|
317 | this.listenerChildrenTracker = new ChannelzChildrenTracker();
|
318 | this.sessionChildrenTracker = new ChannelzChildrenTracker();
|
319 | }
|
320 |
|
321 | this.channelzRef = registerChannelzServer(
|
322 | 'server',
|
323 | () => this.getChannelzInfo(),
|
324 | this.channelzEnabled
|
325 | );
|
326 |
|
327 | this.channelzTrace.addTrace('CT_INFO', 'Server created');
|
328 | this.maxConnectionAgeMs =
|
329 | this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
|
330 | this.maxConnectionAgeGraceMs =
|
331 | this.options['grpc.max_connection_age_grace_ms'] ??
|
332 | UNLIMITED_CONNECTION_AGE_MS;
|
333 | this.keepaliveTimeMs =
|
334 | this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
|
335 | this.keepaliveTimeoutMs =
|
336 | this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
|
337 | this.sessionIdleTimeout =
|
338 | this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS;
|
339 |
|
340 | this.commonServerOptions = {
|
341 | maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
|
342 | };
|
343 | if ('grpc-node.max_session_memory' in this.options) {
|
344 | this.commonServerOptions.maxSessionMemory =
|
345 | this.options['grpc-node.max_session_memory'];
|
346 | } else {
|
347 | |
348 |
|
349 |
|
350 |
|
351 | this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
|
352 | }
|
353 | if ('grpc.max_concurrent_streams' in this.options) {
|
354 | this.commonServerOptions.settings = {
|
355 | maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
|
356 | };
|
357 | }
|
358 | this.interceptors = this.options.interceptors ?? [];
|
359 | this.trace('Server constructed');
|
360 | }
|
361 |
|
362 | private getChannelzInfo(): ServerInfo {
|
363 | return {
|
364 | trace: this.channelzTrace,
|
365 | callTracker: this.callTracker,
|
366 | listenerChildren: this.listenerChildrenTracker.getChildLists(),
|
367 | sessionChildren: this.sessionChildrenTracker.getChildLists(),
|
368 | };
|
369 | }
|
370 |
|
371 | private getChannelzSessionInfo(
|
372 | session: http2.ServerHttp2Session
|
373 | ): SocketInfo {
|
374 | const sessionInfo = this.sessions.get(session)!;
|
375 | const sessionSocket = session.socket;
|
376 | const remoteAddress = sessionSocket.remoteAddress
|
377 | ? stringToSubchannelAddress(
|
378 | sessionSocket.remoteAddress,
|
379 | sessionSocket.remotePort
|
380 | )
|
381 | : null;
|
382 | const localAddress = sessionSocket.localAddress
|
383 | ? stringToSubchannelAddress(
|
384 | sessionSocket.localAddress!,
|
385 | sessionSocket.localPort
|
386 | )
|
387 | : null;
|
388 | let tlsInfo: TlsInfo | null;
|
389 | if (session.encrypted) {
|
390 | const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
|
391 | const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
|
392 | tlsSocket.getCipher();
|
393 | const certificate = tlsSocket.getCertificate();
|
394 | const peerCertificate = tlsSocket.getPeerCertificate();
|
395 | tlsInfo = {
|
396 | cipherSuiteStandardName: cipherInfo.standardName ?? null,
|
397 | cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
|
398 | localCertificate:
|
399 | certificate && 'raw' in certificate ? certificate.raw : null,
|
400 | remoteCertificate:
|
401 | peerCertificate && 'raw' in peerCertificate
|
402 | ? peerCertificate.raw
|
403 | : null,
|
404 | };
|
405 | } else {
|
406 | tlsInfo = null;
|
407 | }
|
408 | const socketInfo: SocketInfo = {
|
409 | remoteAddress: remoteAddress,
|
410 | localAddress: localAddress,
|
411 | security: tlsInfo,
|
412 | remoteName: null,
|
413 | streamsStarted: sessionInfo.streamTracker.callsStarted,
|
414 | streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
|
415 | streamsFailed: sessionInfo.streamTracker.callsFailed,
|
416 | messagesSent: sessionInfo.messagesSent,
|
417 | messagesReceived: sessionInfo.messagesReceived,
|
418 | keepAlivesSent: sessionInfo.keepAlivesSent,
|
419 | lastLocalStreamCreatedTimestamp: null,
|
420 | lastRemoteStreamCreatedTimestamp:
|
421 | sessionInfo.streamTracker.lastCallStartedTimestamp,
|
422 | lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
|
423 | lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
|
424 | localFlowControlWindow: session.state.localWindowSize ?? null,
|
425 | remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
|
426 | };
|
427 | return socketInfo;
|
428 | }
|
429 |
|
430 | private trace(text: string): void {
|
431 | logging.trace(
|
432 | LogVerbosity.DEBUG,
|
433 | TRACER_NAME,
|
434 | '(' + this.channelzRef.id + ') ' + text
|
435 | );
|
436 | }
|
437 |
|
438 | addProtoService(): never {
|
439 | throw new Error('Not implemented. Use addService() instead');
|
440 | }
|
441 |
|
442 | addService(
|
443 | service: ServiceDefinition,
|
444 | implementation: UntypedServiceImplementation
|
445 | ): void {
|
446 | if (
|
447 | service === null ||
|
448 | typeof service !== 'object' ||
|
449 | implementation === null ||
|
450 | typeof implementation !== 'object'
|
451 | ) {
|
452 | throw new Error('addService() requires two objects as arguments');
|
453 | }
|
454 |
|
455 | const serviceKeys = Object.keys(service);
|
456 |
|
457 | if (serviceKeys.length === 0) {
|
458 | throw new Error('Cannot add an empty service to a server');
|
459 | }
|
460 |
|
461 | serviceKeys.forEach(name => {
|
462 | const attrs = service[name];
|
463 | let methodType: HandlerType;
|
464 |
|
465 | if (attrs.requestStream) {
|
466 | if (attrs.responseStream) {
|
467 | methodType = 'bidi';
|
468 | } else {
|
469 | methodType = 'clientStream';
|
470 | }
|
471 | } else {
|
472 | if (attrs.responseStream) {
|
473 | methodType = 'serverStream';
|
474 | } else {
|
475 | methodType = 'unary';
|
476 | }
|
477 | }
|
478 |
|
479 | let implFn = implementation[name];
|
480 | let impl;
|
481 |
|
482 | if (implFn === undefined && typeof attrs.originalName === 'string') {
|
483 | implFn = implementation[attrs.originalName];
|
484 | }
|
485 |
|
486 | if (implFn !== undefined) {
|
487 | impl = implFn.bind(implementation);
|
488 | } else {
|
489 | impl = getDefaultHandler(methodType, name);
|
490 | }
|
491 |
|
492 | const success = this.register(
|
493 | attrs.path,
|
494 | impl as UntypedHandleCall,
|
495 | attrs.responseSerialize,
|
496 | attrs.requestDeserialize,
|
497 | methodType
|
498 | );
|
499 |
|
500 | if (success === false) {
|
501 | throw new Error(`Method handler for ${attrs.path} already provided.`);
|
502 | }
|
503 | });
|
504 | }
|
505 |
|
506 | removeService(service: ServiceDefinition): void {
|
507 | if (service === null || typeof service !== 'object') {
|
508 | throw new Error('removeService() requires object as argument');
|
509 | }
|
510 |
|
511 | const serviceKeys = Object.keys(service);
|
512 | serviceKeys.forEach(name => {
|
513 | const attrs = service[name];
|
514 | this.unregister(attrs.path);
|
515 | });
|
516 | }
|
517 |
|
518 | bind(port: string, creds: ServerCredentials): never {
|
519 | throw new Error('Not implemented. Use bindAsync() instead');
|
520 | }
|
521 |
|
522 | private registerListenerToChannelz(boundAddress: SubchannelAddress) {
|
523 | return registerChannelzSocket(
|
524 | subchannelAddressToString(boundAddress),
|
525 | () => {
|
526 | return {
|
527 | localAddress: boundAddress,
|
528 | remoteAddress: null,
|
529 | security: null,
|
530 | remoteName: null,
|
531 | streamsStarted: 0,
|
532 | streamsSucceeded: 0,
|
533 | streamsFailed: 0,
|
534 | messagesSent: 0,
|
535 | messagesReceived: 0,
|
536 | keepAlivesSent: 0,
|
537 | lastLocalStreamCreatedTimestamp: null,
|
538 | lastRemoteStreamCreatedTimestamp: null,
|
539 | lastMessageSentTimestamp: null,
|
540 | lastMessageReceivedTimestamp: null,
|
541 | localFlowControlWindow: null,
|
542 | remoteFlowControlWindow: null,
|
543 | };
|
544 | },
|
545 | this.channelzEnabled
|
546 | );
|
547 | }
|
548 |
|
549 | private createHttp2Server(credentials: ServerCredentials) {
|
550 | let http2Server: http2.Http2Server | http2.Http2SecureServer;
|
551 | if (credentials._isSecure()) {
|
552 | const secureServerOptions = Object.assign(
|
553 | this.commonServerOptions,
|
554 | credentials._getSettings()!
|
555 | );
|
556 | secureServerOptions.enableTrace =
|
557 | this.options['grpc-node.tls_enable_trace'] === 1;
|
558 | http2Server = http2.createSecureServer(secureServerOptions);
|
559 | http2Server.on('secureConnection', (socket: TLSSocket) => {
|
560 | |
561 |
|
562 | socket.on('error', (e: Error) => {
|
563 | this.trace(
|
564 | 'An incoming TLS connection closed with error: ' + e.message
|
565 | );
|
566 | });
|
567 | });
|
568 | } else {
|
569 | http2Server = http2.createServer(this.commonServerOptions);
|
570 | }
|
571 |
|
572 | http2Server.setTimeout(0, noop);
|
573 | this._setupHandlers(http2Server);
|
574 | return http2Server;
|
575 | }
|
576 |
|
577 | private bindOneAddress(
|
578 | address: SubchannelAddress,
|
579 | boundPortObject: BoundPort
|
580 | ): Promise<SingleAddressBindResult> {
|
581 | this.trace('Attempting to bind ' + subchannelAddressToString(address));
|
582 | const http2Server = this.createHttp2Server(boundPortObject.credentials);
|
583 | return new Promise<SingleAddressBindResult>((resolve, reject) => {
|
584 | const onError = (err: Error) => {
|
585 | this.trace(
|
586 | 'Failed to bind ' +
|
587 | subchannelAddressToString(address) +
|
588 | ' with error ' +
|
589 | err.message
|
590 | );
|
591 | resolve({
|
592 | port: 'port' in address ? address.port : 1,
|
593 | error: err.message,
|
594 | });
|
595 | };
|
596 |
|
597 | http2Server.once('error', onError);
|
598 |
|
599 | http2Server.listen(address, () => {
|
600 | const boundAddress = http2Server.address()!;
|
601 | let boundSubchannelAddress: SubchannelAddress;
|
602 | if (typeof boundAddress === 'string') {
|
603 | boundSubchannelAddress = {
|
604 | path: boundAddress,
|
605 | };
|
606 | } else {
|
607 | boundSubchannelAddress = {
|
608 | host: boundAddress.address,
|
609 | port: boundAddress.port,
|
610 | };
|
611 | }
|
612 |
|
613 | const channelzRef = this.registerListenerToChannelz(
|
614 | boundSubchannelAddress
|
615 | );
|
616 | this.listenerChildrenTracker.refChild(channelzRef);
|
617 |
|
618 | this.http2Servers.set(http2Server, {
|
619 | channelzRef: channelzRef,
|
620 | sessions: new Set(),
|
621 | });
|
622 | boundPortObject.listeningServers.add(http2Server);
|
623 | this.trace(
|
624 | 'Successfully bound ' +
|
625 | subchannelAddressToString(boundSubchannelAddress)
|
626 | );
|
627 | resolve({
|
628 | port:
|
629 | 'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1,
|
630 | });
|
631 | http2Server.removeListener('error', onError);
|
632 | });
|
633 | });
|
634 | }
|
635 |
|
636 | private async bindManyPorts(
|
637 | addressList: SubchannelAddress[],
|
638 | boundPortObject: BoundPort
|
639 | ): Promise<BindResult> {
|
640 | if (addressList.length === 0) {
|
641 | return {
|
642 | count: 0,
|
643 | port: 0,
|
644 | errors: [],
|
645 | };
|
646 | }
|
647 | if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) {
|
648 | /* If binding to port 0, first try to bind the first address, then bind
|
649 | * the rest of the address list to the specific port that it binds. */
|
650 | const firstAddressResult = await this.bindOneAddress(
|
651 | addressList[0],
|
652 | boundPortObject
|
653 | );
|
654 | if (firstAddressResult.error) {
|
655 | /* If the first address fails to bind, try the same operation starting
|
656 | * from the second item in the list. */
|
657 | const restAddressResult = await this.bindManyPorts(
|
658 | addressList.slice(1),
|
659 | boundPortObject
|
660 | );
|
661 | return {
|
662 | ...restAddressResult,
|
663 | errors: [firstAddressResult.error, ...restAddressResult.errors],
|
664 | };
|
665 | } else {
|
666 | const restAddresses = addressList
|
667 | .slice(1)
|
668 | .map(address =>
|
669 | isTcpSubchannelAddress(address)
|
670 | ? { host: address.host, port: firstAddressResult.port }
|
671 | : address
|
672 | );
|
673 | const restAddressResult = await Promise.all(
|
674 | restAddresses.map(address =>
|
675 | this.bindOneAddress(address, boundPortObject)
|
676 | )
|
677 | );
|
678 | const allResults = [firstAddressResult, ...restAddressResult];
|
679 | return {
|
680 | count: allResults.filter(result => result.error === undefined).length,
|
681 | port: firstAddressResult.port,
|
682 | errors: allResults
|
683 | .filter(result => result.error)
|
684 | .map(result => result.error!),
|
685 | };
|
686 | }
|
687 | } else {
|
688 | const allResults = await Promise.all(
|
689 | addressList.map(address =>
|
690 | this.bindOneAddress(address, boundPortObject)
|
691 | )
|
692 | );
|
693 | return {
|
694 | count: allResults.filter(result => result.error === undefined).length,
|
695 | port: allResults[0].port,
|
696 | errors: allResults
|
697 | .filter(result => result.error)
|
698 | .map(result => result.error!),
|
699 | };
|
700 | }
|
701 | }
|
702 |
|
703 | private async bindAddressList(
|
704 | addressList: SubchannelAddress[],
|
705 | boundPortObject: BoundPort
|
706 | ): Promise<number> {
|
707 | const bindResult = await this.bindManyPorts(addressList, boundPortObject);
|
708 | if (bindResult.count > 0) {
|
709 | if (bindResult.count < addressList.length) {
|
710 | logging.log(
|
711 | LogVerbosity.INFO,
|
712 | `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
|
713 | );
|
714 | }
|
715 | return bindResult.port;
|
716 | } else {
|
717 | const errorString = `No address added out of total ${addressList.length} resolved`;
|
718 | logging.log(LogVerbosity.ERROR, errorString);
|
719 | throw new Error(
|
720 | `${errorString} errors: [${bindResult.errors.join(',')}]`
|
721 | );
|
722 | }
|
723 | }
|
724 |
|
725 | private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
|
726 | return new Promise<SubchannelAddress[]>((resolve, reject) => {
|
727 | const resolverListener: ResolverListener = {
|
728 | onSuccessfulResolution: (
|
729 | endpointList,
|
730 | serviceConfig,
|
731 | serviceConfigError
|
732 | ) => {
|
733 |
|
734 | resolverListener.onSuccessfulResolution = () => {};
|
735 | const addressList = ([] as SubchannelAddress[]).concat(
|
736 | ...endpointList.map(endpoint => endpoint.addresses)
|
737 | );
|
738 | if (addressList.length === 0) {
|
739 | reject(new Error(`No addresses resolved for port ${port}`));
|
740 | return;
|
741 | }
|
742 | resolve(addressList);
|
743 | },
|
744 | onError: error => {
|
745 | reject(new Error(error.details));
|
746 | },
|
747 | };
|
748 | const resolver = createResolver(port, resolverListener, this.options);
|
749 | resolver.updateResolution();
|
750 | });
|
751 | }
|
752 |
|
753 | private async bindPort(
|
754 | port: GrpcUri,
|
755 | boundPortObject: BoundPort
|
756 | ): Promise<number> {
|
757 | const addressList = await this.resolvePort(port);
|
758 | if (boundPortObject.cancelled) {
|
759 | this.completeUnbind(boundPortObject);
|
760 | throw new Error('bindAsync operation cancelled by unbind call');
|
761 | }
|
762 | const portNumber = await this.bindAddressList(addressList, boundPortObject);
|
763 | if (boundPortObject.cancelled) {
|
764 | this.completeUnbind(boundPortObject);
|
765 | throw new Error('bindAsync operation cancelled by unbind call');
|
766 | }
|
767 | return portNumber;
|
768 | }
|
769 |
|
770 | private normalizePort(port: string): GrpcUri {
|
771 | const initialPortUri = parseUri(port);
|
772 | if (initialPortUri === null) {
|
773 | throw new Error(`Could not parse port "${port}"`);
|
774 | }
|
775 | const portUri = mapUriDefaultScheme(initialPortUri);
|
776 | if (portUri === null) {
|
777 | throw new Error(`Could not get a default scheme for port "${port}"`);
|
778 | }
|
779 | return portUri;
|
780 | }
|
781 |
|
782 | bindAsync(
|
783 | port: string,
|
784 | creds: ServerCredentials,
|
785 | callback: (error: Error | null, port: number) => void
|
786 | ): void {
|
787 | if (this.shutdown) {
|
788 | throw new Error('bindAsync called after shutdown');
|
789 | }
|
790 | if (typeof port !== 'string') {
|
791 | throw new TypeError('port must be a string');
|
792 | }
|
793 |
|
794 | if (creds === null || !(creds instanceof ServerCredentials)) {
|
795 | throw new TypeError('creds must be a ServerCredentials object');
|
796 | }
|
797 |
|
798 | if (typeof callback !== 'function') {
|
799 | throw new TypeError('callback must be a function');
|
800 | }
|
801 |
|
802 | this.trace('bindAsync port=' + port);
|
803 |
|
804 | const portUri = this.normalizePort(port);
|
805 |
|
806 | const deferredCallback = (error: Error | null, port: number) => {
|
807 | process.nextTick(() => callback(error, port));
|
808 | };
|
809 |
|
810 | |
811 |
|
812 | let boundPortObject = this.boundPorts.get(uriToString(portUri));
|
813 | if (boundPortObject) {
|
814 | if (!creds._equals(boundPortObject.credentials)) {
|
815 | deferredCallback(
|
816 | new Error(`${port} already bound with incompatible credentials`),
|
817 | 0
|
818 | );
|
819 | return;
|
820 | }
|
821 | |
822 |
|
823 | boundPortObject.cancelled = false;
|
824 | if (boundPortObject.completionPromise) {
|
825 | boundPortObject.completionPromise.then(
|
826 | portNum => callback(null, portNum),
|
827 | error => callback(error as Error, 0)
|
828 | );
|
829 | } else {
|
830 | deferredCallback(null, boundPortObject.portNumber);
|
831 | }
|
832 | return;
|
833 | }
|
834 | boundPortObject = {
|
835 | mapKey: uriToString(portUri),
|
836 | originalUri: portUri,
|
837 | completionPromise: null,
|
838 | cancelled: false,
|
839 | portNumber: 0,
|
840 | credentials: creds,
|
841 | listeningServers: new Set(),
|
842 | };
|
843 | const splitPort = splitHostPort(portUri.path);
|
844 | const completionPromise = this.bindPort(portUri, boundPortObject);
|
845 | boundPortObject.completionPromise = completionPromise;
|
846 | |
847 |
|
848 |
|
849 | if (splitPort?.port === 0) {
|
850 | completionPromise.then(
|
851 | portNum => {
|
852 | const finalUri: GrpcUri = {
|
853 | scheme: portUri.scheme,
|
854 | authority: portUri.authority,
|
855 | path: combineHostPort({ host: splitPort.host, port: portNum }),
|
856 | };
|
857 | boundPortObject!.mapKey = uriToString(finalUri);
|
858 | boundPortObject!.completionPromise = null;
|
859 | boundPortObject!.portNumber = portNum;
|
860 | this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
|
861 | callback(null, portNum);
|
862 | },
|
863 | error => {
|
864 | callback(error, 0);
|
865 | }
|
866 | );
|
867 | } else {
|
868 | this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
|
869 | completionPromise.then(
|
870 | portNum => {
|
871 | boundPortObject!.completionPromise = null;
|
872 | boundPortObject!.portNumber = portNum;
|
873 | callback(null, portNum);
|
874 | },
|
875 | error => {
|
876 | callback(error, 0);
|
877 | }
|
878 | );
|
879 | }
|
880 | }
|
881 |
|
882 | private closeServer(server: AnyHttp2Server, callback?: () => void) {
|
883 | this.trace(
|
884 | 'Closing server with address ' + JSON.stringify(server.address())
|
885 | );
|
886 | const serverInfo = this.http2Servers.get(server);
|
887 | server.close(() => {
|
888 | if (serverInfo) {
|
889 | this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
|
890 | unregisterChannelzRef(serverInfo.channelzRef);
|
891 | }
|
892 | this.http2Servers.delete(server);
|
893 | callback?.();
|
894 | });
|
895 | }
|
896 |
|
897 | private closeSession(
|
898 | session: http2.ServerHttp2Session,
|
899 | callback?: () => void
|
900 | ) {
|
901 | this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
|
902 | const sessionInfo = this.sessions.get(session);
|
903 | const closeCallback = () => {
|
904 | if (sessionInfo) {
|
905 | this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
|
906 | unregisterChannelzRef(sessionInfo.ref);
|
907 | }
|
908 | callback?.();
|
909 | };
|
910 | if (session.closed) {
|
911 | queueMicrotask(closeCallback);
|
912 | } else {
|
913 | session.close(closeCallback);
|
914 | }
|
915 | }
|
916 |
|
917 | private completeUnbind(boundPortObject: BoundPort) {
|
918 | for (const server of boundPortObject.listeningServers) {
|
919 | const serverInfo = this.http2Servers.get(server);
|
920 | this.closeServer(server, () => {
|
921 | boundPortObject.listeningServers.delete(server);
|
922 | });
|
923 | if (serverInfo) {
|
924 | for (const session of serverInfo.sessions) {
|
925 | this.closeSession(session);
|
926 | }
|
927 | }
|
928 | }
|
929 | this.boundPorts.delete(boundPortObject.mapKey);
|
930 | }
|
931 |
|
932 | |
933 |
|
934 |
|
935 |
|
936 |
|
937 |
|
938 |
|
939 | unbind(port: string): void {
|
940 | this.trace('unbind port=' + port);
|
941 | const portUri = this.normalizePort(port);
|
942 | const splitPort = splitHostPort(portUri.path);
|
943 | if (splitPort?.port === 0) {
|
944 | throw new Error('Cannot unbind port 0');
|
945 | }
|
946 | const boundPortObject = this.boundPorts.get(uriToString(portUri));
|
947 | if (boundPortObject) {
|
948 | this.trace(
|
949 | 'unbinding ' +
|
950 | boundPortObject.mapKey +
|
951 | ' originally bound as ' +
|
952 | uriToString(boundPortObject.originalUri)
|
953 | );
|
954 | |
955 |
|
956 | if (boundPortObject.completionPromise) {
|
957 | boundPortObject.cancelled = true;
|
958 | } else {
|
959 | this.completeUnbind(boundPortObject);
|
960 | }
|
961 | }
|
962 | }
|
963 |
|
964 | |
965 |
|
966 |
|
967 |
|
968 |
|
969 |
|
970 |
|
971 |
|
972 |
|
973 |
|
974 |
|
975 | drain(port: string, graceTimeMs: number): void {
|
976 | this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
|
977 | const portUri = this.normalizePort(port);
|
978 | const splitPort = splitHostPort(portUri.path);
|
979 | if (splitPort?.port === 0) {
|
980 | throw new Error('Cannot drain port 0');
|
981 | }
|
982 | const boundPortObject = this.boundPorts.get(uriToString(portUri));
|
983 | if (!boundPortObject) {
|
984 | return;
|
985 | }
|
986 | const allSessions: Set<http2.Http2Session> = new Set();
|
987 | for (const http2Server of boundPortObject.listeningServers) {
|
988 | const serverEntry = this.http2Servers.get(http2Server);
|
989 | if (serverEntry) {
|
990 | for (const session of serverEntry.sessions) {
|
991 | allSessions.add(session);
|
992 | this.closeSession(session, () => {
|
993 | allSessions.delete(session);
|
994 | });
|
995 | }
|
996 | }
|
997 | }
|
998 | |
999 |
|
1000 | setTimeout(() => {
|
1001 | for (const session of allSessions) {
|
1002 | session.destroy(http2.constants.NGHTTP2_CANCEL as any);
|
1003 | }
|
1004 | }, graceTimeMs).unref?.();
|
1005 | }
|
1006 |
|
1007 | forceShutdown(): void {
|
1008 | for (const boundPortObject of this.boundPorts.values()) {
|
1009 | boundPortObject.cancelled = true;
|
1010 | }
|
1011 | this.boundPorts.clear();
|
1012 |
|
1013 | for (const server of this.http2Servers.keys()) {
|
1014 | this.closeServer(server);
|
1015 | }
|
1016 |
|
1017 |
|
1018 |
|
1019 | this.sessions.forEach((channelzInfo, session) => {
|
1020 | this.closeSession(session);
|
1021 |
|
1022 |
|
1023 |
|
1024 | session.destroy(http2.constants.NGHTTP2_CANCEL as any);
|
1025 | });
|
1026 | this.sessions.clear();
|
1027 | unregisterChannelzRef(this.channelzRef);
|
1028 |
|
1029 | this.shutdown = true;
|
1030 | }
|
1031 |
|
1032 | register<RequestType, ResponseType>(
|
1033 | name: string,
|
1034 | handler: HandleCall<RequestType, ResponseType>,
|
1035 | serialize: Serialize<ResponseType>,
|
1036 | deserialize: Deserialize<RequestType>,
|
1037 | type: string
|
1038 | ): boolean {
|
1039 | if (this.handlers.has(name)) {
|
1040 | return false;
|
1041 | }
|
1042 |
|
1043 | this.handlers.set(name, {
|
1044 | func: handler,
|
1045 | serialize,
|
1046 | deserialize,
|
1047 | type,
|
1048 | path: name,
|
1049 | } as UntypedHandler);
|
1050 | return true;
|
1051 | }
|
1052 |
|
1053 | unregister(name: string): boolean {
|
1054 | return this.handlers.delete(name);
|
1055 | }
|
1056 |
|
1057 | |
1058 |
|
1059 |
|
1060 | @deprecate(
|
1061 | 'Calling start() is no longer necessary. It can be safely omitted.'
|
1062 | )
|
1063 | start(): void {
|
1064 | if (
|
1065 | this.http2Servers.size === 0 ||
|
1066 | [...this.http2Servers.keys()].every(server => !server.listening)
|
1067 | ) {
|
1068 | throw new Error('server must be bound in order to start');
|
1069 | }
|
1070 |
|
1071 | if (this.started === true) {
|
1072 | throw new Error('server is already started');
|
1073 | }
|
1074 | this.started = true;
|
1075 | }
|
1076 |
|
1077 | tryShutdown(callback: (error?: Error) => void): void {
|
1078 | const wrappedCallback = (error?: Error) => {
|
1079 | unregisterChannelzRef(this.channelzRef);
|
1080 | callback(error);
|
1081 | };
|
1082 | let pendingChecks = 0;
|
1083 |
|
1084 | function maybeCallback(): void {
|
1085 | pendingChecks--;
|
1086 |
|
1087 | if (pendingChecks === 0) {
|
1088 | wrappedCallback();
|
1089 | }
|
1090 | }
|
1091 | this.shutdown = true;
|
1092 |
|
1093 | for (const [serverKey, server] of this.http2Servers.entries()) {
|
1094 | pendingChecks++;
|
1095 | const serverString = server.channelzRef.name;
|
1096 | this.trace('Waiting for server ' + serverString + ' to close');
|
1097 | this.closeServer(serverKey, () => {
|
1098 | this.trace('Server ' + serverString + ' finished closing');
|
1099 | maybeCallback();
|
1100 | });
|
1101 |
|
1102 | for (const session of server.sessions.keys()) {
|
1103 | pendingChecks++;
|
1104 | const sessionString = session.socket?.remoteAddress;
|
1105 | this.trace('Waiting for session ' + sessionString + ' to close');
|
1106 | this.closeSession(session, () => {
|
1107 | this.trace('Session ' + sessionString + ' finished closing');
|
1108 | maybeCallback();
|
1109 | });
|
1110 | }
|
1111 | }
|
1112 |
|
1113 | if (pendingChecks === 0) {
|
1114 | wrappedCallback();
|
1115 | }
|
1116 | }
|
1117 |
|
1118 | addHttp2Port(): never {
|
1119 | throw new Error('Not yet implemented');
|
1120 | }
|
1121 |
|
1122 | |
1123 |
|
1124 |
|
1125 |
|
1126 |
|
1127 | getChannelzRef() {
|
1128 | return this.channelzRef;
|
1129 | }
|
1130 |
|
1131 | private _verifyContentType(
|
1132 | stream: http2.ServerHttp2Stream,
|
1133 | headers: http2.IncomingHttpHeaders
|
1134 | ): boolean {
|
1135 | const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
|
1136 |
|
1137 | if (
|
1138 | typeof contentType !== 'string' ||
|
1139 | !contentType.startsWith('application/grpc')
|
1140 | ) {
|
1141 | stream.respond(
|
1142 | {
|
1143 | [http2.constants.HTTP2_HEADER_STATUS]:
|
1144 | http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
|
1145 | },
|
1146 | { endStream: true }
|
1147 | );
|
1148 | return false;
|
1149 | }
|
1150 |
|
1151 | return true;
|
1152 | }
|
1153 |
|
1154 | private _retrieveHandler(path: string): Handler<any, any> | null {
|
1155 | this.trace(
|
1156 | 'Received call to method ' +
|
1157 | path +
|
1158 | ' at address ' +
|
1159 | this.serverAddressString
|
1160 | );
|
1161 |
|
1162 | const handler = this.handlers.get(path);
|
1163 |
|
1164 | if (handler === undefined) {
|
1165 | this.trace(
|
1166 | 'No handler registered for method ' +
|
1167 | path +
|
1168 | '. Sending UNIMPLEMENTED status.'
|
1169 | );
|
1170 | return null;
|
1171 | }
|
1172 |
|
1173 | return handler;
|
1174 | }
|
1175 |
|
1176 | private _respondWithError(
|
1177 | err: PartialStatusObject,
|
1178 | stream: http2.ServerHttp2Stream,
|
1179 | channelzSessionInfo: ChannelzSessionInfo | null = null
|
1180 | ) {
|
1181 | const trailersToSend = {
|
1182 | 'grpc-status': err.code ?? Status.INTERNAL,
|
1183 | 'grpc-message': err.details,
|
1184 | [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
|
1185 | [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
|
1186 | ...err.metadata?.toHttp2Headers(),
|
1187 | };
|
1188 | stream.respond(trailersToSend, { endStream: true });
|
1189 |
|
1190 | this.callTracker.addCallFailed();
|
1191 | channelzSessionInfo?.streamTracker.addCallFailed();
|
1192 | }
|
1193 |
|
1194 | private _channelzHandler(
|
1195 | stream: http2.ServerHttp2Stream,
|
1196 | headers: http2.IncomingHttpHeaders
|
1197 | ) {
|
1198 |
|
1199 | this.onStreamOpened(stream);
|
1200 |
|
1201 | const channelzSessionInfo = this.sessions.get(
|
1202 | stream.session as http2.ServerHttp2Session
|
1203 | );
|
1204 |
|
1205 | this.callTracker.addCallStarted();
|
1206 | channelzSessionInfo?.streamTracker.addCallStarted();
|
1207 |
|
1208 | if (!this._verifyContentType(stream, headers)) {
|
1209 | this.callTracker.addCallFailed();
|
1210 | channelzSessionInfo?.streamTracker.addCallFailed();
|
1211 | return;
|
1212 | }
|
1213 |
|
1214 | const path = headers[HTTP2_HEADER_PATH] as string;
|
1215 |
|
1216 | const handler = this._retrieveHandler(path);
|
1217 | if (!handler) {
|
1218 | this._respondWithError(
|
1219 | getUnimplementedStatusResponse(path),
|
1220 | stream,
|
1221 | channelzSessionInfo
|
1222 | );
|
1223 | return;
|
1224 | }
|
1225 |
|
1226 | const callEventTracker: CallEventTracker = {
|
1227 | addMessageSent: () => {
|
1228 | if (channelzSessionInfo) {
|
1229 | channelzSessionInfo.messagesSent += 1;
|
1230 | channelzSessionInfo.lastMessageSentTimestamp = new Date();
|
1231 | }
|
1232 | },
|
1233 | addMessageReceived: () => {
|
1234 | if (channelzSessionInfo) {
|
1235 | channelzSessionInfo.messagesReceived += 1;
|
1236 | channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
|
1237 | }
|
1238 | },
|
1239 | onCallEnd: status => {
|
1240 | if (status.code === Status.OK) {
|
1241 | this.callTracker.addCallSucceeded();
|
1242 | } else {
|
1243 | this.callTracker.addCallFailed();
|
1244 | }
|
1245 | },
|
1246 | onStreamEnd: success => {
|
1247 | if (channelzSessionInfo) {
|
1248 | if (success) {
|
1249 | channelzSessionInfo.streamTracker.addCallSucceeded();
|
1250 | } else {
|
1251 | channelzSessionInfo.streamTracker.addCallFailed();
|
1252 | }
|
1253 | }
|
1254 | },
|
1255 | };
|
1256 |
|
1257 | const call = getServerInterceptingCall(
|
1258 | this.interceptors,
|
1259 | stream,
|
1260 | headers,
|
1261 | callEventTracker,
|
1262 | handler,
|
1263 | this.options
|
1264 | );
|
1265 |
|
1266 | if (!this._runHandlerForCall(call, handler)) {
|
1267 | this.callTracker.addCallFailed();
|
1268 | channelzSessionInfo?.streamTracker.addCallFailed();
|
1269 |
|
1270 | call.sendStatus({
|
1271 | code: Status.INTERNAL,
|
1272 | details: `Unknown handler type: ${handler.type}`,
|
1273 | });
|
1274 | }
|
1275 | }
|
1276 |
|
1277 | private _streamHandler(
|
1278 | stream: http2.ServerHttp2Stream,
|
1279 | headers: http2.IncomingHttpHeaders
|
1280 | ) {
|
1281 |
|
1282 | this.onStreamOpened(stream);
|
1283 |
|
1284 | if (this._verifyContentType(stream, headers) !== true) {
|
1285 | return;
|
1286 | }
|
1287 |
|
1288 | const path = headers[HTTP2_HEADER_PATH] as string;
|
1289 |
|
1290 | const handler = this._retrieveHandler(path);
|
1291 | if (!handler) {
|
1292 | this._respondWithError(
|
1293 | getUnimplementedStatusResponse(path),
|
1294 | stream,
|
1295 | null
|
1296 | );
|
1297 | return;
|
1298 | }
|
1299 |
|
1300 | const call = getServerInterceptingCall(
|
1301 | this.interceptors,
|
1302 | stream,
|
1303 | headers,
|
1304 | null,
|
1305 | handler,
|
1306 | this.options
|
1307 | );
|
1308 |
|
1309 | if (!this._runHandlerForCall(call, handler)) {
|
1310 | call.sendStatus({
|
1311 | code: Status.INTERNAL,
|
1312 | details: `Unknown handler type: ${handler.type}`,
|
1313 | });
|
1314 | }
|
1315 | }
|
1316 |
|
1317 | private _runHandlerForCall(
|
1318 | call: ServerInterceptingCallInterface,
|
1319 | handler:
|
1320 | | UntypedUnaryHandler
|
1321 | | UntypedClientStreamingHandler
|
1322 | | UntypedServerStreamingHandler
|
1323 | | UntypedBidiStreamingHandler
|
1324 | ): boolean {
|
1325 | const { type } = handler;
|
1326 | if (type === 'unary') {
|
1327 | handleUnary(call, handler);
|
1328 | } else if (type === 'clientStream') {
|
1329 | handleClientStreaming(call, handler);
|
1330 | } else if (type === 'serverStream') {
|
1331 | handleServerStreaming(call, handler);
|
1332 | } else if (type === 'bidi') {
|
1333 | handleBidiStreaming(call, handler);
|
1334 | } else {
|
1335 | return false;
|
1336 | }
|
1337 |
|
1338 | return true;
|
1339 | }
|
1340 |
|
1341 | private _setupHandlers(
|
1342 | http2Server: http2.Http2Server | http2.Http2SecureServer
|
1343 | ): void {
|
1344 | if (http2Server === null) {
|
1345 | return;
|
1346 | }
|
1347 |
|
1348 | const serverAddress = http2Server.address();
|
1349 | let serverAddressString = 'null';
|
1350 | if (serverAddress) {
|
1351 | if (typeof serverAddress === 'string') {
|
1352 | serverAddressString = serverAddress;
|
1353 | } else {
|
1354 | serverAddressString = serverAddress.address + ':' + serverAddress.port;
|
1355 | }
|
1356 | }
|
1357 | this.serverAddressString = serverAddressString;
|
1358 |
|
1359 | const handler = this.channelzEnabled
|
1360 | ? this._channelzHandler
|
1361 | : this._streamHandler;
|
1362 |
|
1363 | const sessionHandler = this.channelzEnabled
|
1364 | ? this._channelzSessionHandler(http2Server)
|
1365 | : this._sessionHandler(http2Server);
|
1366 |
|
1367 | http2Server.on('stream', handler.bind(this));
|
1368 | http2Server.on('session', sessionHandler);
|
1369 | }
|
1370 |
|
1371 | private _sessionHandler(
|
1372 | http2Server: http2.Http2Server | http2.Http2SecureServer
|
1373 | ) {
|
1374 | return (session: http2.ServerHttp2Session) => {
|
1375 | this.http2Servers.get(http2Server)?.sessions.add(session);
|
1376 |
|
1377 | let connectionAgeTimer: NodeJS.Timeout | null = null;
|
1378 | let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
|
1379 | let keeapliveTimeTimer: NodeJS.Timeout | null = null;
|
1380 | let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
|
1381 | let sessionClosedByServer = false;
|
1382 |
|
1383 | const idleTimeoutObj = this.enableIdleTimeout(session);
|
1384 |
|
1385 | if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1386 |
|
1387 | const jitterMagnitude = this.maxConnectionAgeMs / 10;
|
1388 | const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
|
1389 |
|
1390 | connectionAgeTimer = setTimeout(() => {
|
1391 | sessionClosedByServer = true;
|
1392 |
|
1393 | this.trace(
|
1394 | 'Connection dropped by max connection age: ' +
|
1395 | session.socket?.remoteAddress
|
1396 | );
|
1397 |
|
1398 | try {
|
1399 | session.goaway(
|
1400 | http2.constants.NGHTTP2_NO_ERROR,
|
1401 | ~(1 << 31),
|
1402 | kMaxAge
|
1403 | );
|
1404 | } catch (e) {
|
1405 |
|
1406 | session.destroy();
|
1407 | return;
|
1408 | }
|
1409 | session.close();
|
1410 |
|
1411 | |
1412 |
|
1413 | if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1414 | connectionAgeGraceTimer = setTimeout(() => {
|
1415 | session.destroy();
|
1416 | }, this.maxConnectionAgeGraceMs);
|
1417 | connectionAgeGraceTimer.unref?.();
|
1418 | }
|
1419 | }, this.maxConnectionAgeMs + jitter);
|
1420 | connectionAgeTimer.unref?.();
|
1421 | }
|
1422 |
|
1423 | if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
|
1424 | keeapliveTimeTimer = setInterval(() => {
|
1425 | keepaliveTimeoutTimer = setTimeout(() => {
|
1426 | sessionClosedByServer = true;
|
1427 | session.close();
|
1428 | }, this.keepaliveTimeoutMs);
|
1429 | keepaliveTimeoutTimer.unref?.();
|
1430 |
|
1431 | try {
|
1432 | session.ping(
|
1433 | (err: Error | null, duration: number, payload: Buffer) => {
|
1434 | if (keepaliveTimeoutTimer) {
|
1435 | clearTimeout(keepaliveTimeoutTimer);
|
1436 | }
|
1437 |
|
1438 | if (err) {
|
1439 | sessionClosedByServer = true;
|
1440 | this.trace(
|
1441 | 'Connection dropped due to error of a ping frame ' +
|
1442 | err.message +
|
1443 | ' return in ' +
|
1444 | duration
|
1445 | );
|
1446 | session.close();
|
1447 | }
|
1448 | }
|
1449 | );
|
1450 | } catch (e) {
|
1451 | clearTimeout(keepaliveTimeoutTimer);
|
1452 |
|
1453 | session.destroy();
|
1454 | }
|
1455 | }, this.keepaliveTimeMs);
|
1456 | keeapliveTimeTimer.unref?.();
|
1457 | }
|
1458 |
|
1459 | session.on('close', () => {
|
1460 | if (!sessionClosedByServer) {
|
1461 | this.trace(
|
1462 | `Connection dropped by client ${session.socket?.remoteAddress}`
|
1463 | );
|
1464 | }
|
1465 |
|
1466 | if (connectionAgeTimer) {
|
1467 | clearTimeout(connectionAgeTimer);
|
1468 | }
|
1469 |
|
1470 | if (connectionAgeGraceTimer) {
|
1471 | clearTimeout(connectionAgeGraceTimer);
|
1472 | }
|
1473 |
|
1474 | if (keeapliveTimeTimer) {
|
1475 | clearInterval(keeapliveTimeTimer);
|
1476 | if (keepaliveTimeoutTimer) {
|
1477 | clearTimeout(keepaliveTimeoutTimer);
|
1478 | }
|
1479 | }
|
1480 |
|
1481 | if (idleTimeoutObj !== null) {
|
1482 | clearTimeout(idleTimeoutObj.timeout);
|
1483 | this.sessionIdleTimeouts.delete(session);
|
1484 | }
|
1485 |
|
1486 | this.http2Servers.get(http2Server)?.sessions.delete(session);
|
1487 | });
|
1488 | };
|
1489 | }
|
1490 |
|
1491 | private _channelzSessionHandler(
|
1492 | http2Server: http2.Http2Server | http2.Http2SecureServer
|
1493 | ) {
|
1494 | return (session: http2.ServerHttp2Session) => {
|
1495 | const channelzRef = registerChannelzSocket(
|
1496 | session.socket?.remoteAddress ?? 'unknown',
|
1497 | this.getChannelzSessionInfo.bind(this, session),
|
1498 | this.channelzEnabled
|
1499 | );
|
1500 |
|
1501 | const channelzSessionInfo: ChannelzSessionInfo = {
|
1502 | ref: channelzRef,
|
1503 | streamTracker: new ChannelzCallTracker(),
|
1504 | messagesSent: 0,
|
1505 | messagesReceived: 0,
|
1506 | keepAlivesSent: 0,
|
1507 | lastMessageSentTimestamp: null,
|
1508 | lastMessageReceivedTimestamp: null,
|
1509 | };
|
1510 |
|
1511 | this.http2Servers.get(http2Server)?.sessions.add(session);
|
1512 | this.sessions.set(session, channelzSessionInfo);
|
1513 | const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`;
|
1514 |
|
1515 | this.channelzTrace.addTrace(
|
1516 | 'CT_INFO',
|
1517 | 'Connection established by client ' + clientAddress
|
1518 | );
|
1519 | this.trace('Connection established by client ' + clientAddress);
|
1520 | this.sessionChildrenTracker.refChild(channelzRef);
|
1521 |
|
1522 | let connectionAgeTimer: NodeJS.Timeout | null = null;
|
1523 | let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
|
1524 | let keeapliveTimeTimer: NodeJS.Timeout | null = null;
|
1525 | let keepaliveTimeoutTimer: NodeJS.Timeout | null = null;
|
1526 | let sessionClosedByServer = false;
|
1527 |
|
1528 | const idleTimeoutObj = this.enableIdleTimeout(session);
|
1529 |
|
1530 | if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1531 |
|
1532 | const jitterMagnitude = this.maxConnectionAgeMs / 10;
|
1533 | const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
|
1534 |
|
1535 | connectionAgeTimer = setTimeout(() => {
|
1536 | sessionClosedByServer = true;
|
1537 | this.channelzTrace.addTrace(
|
1538 | 'CT_INFO',
|
1539 | 'Connection dropped by max connection age from ' + clientAddress
|
1540 | );
|
1541 |
|
1542 | try {
|
1543 | session.goaway(
|
1544 | http2.constants.NGHTTP2_NO_ERROR,
|
1545 | ~(1 << 31),
|
1546 | kMaxAge
|
1547 | );
|
1548 | } catch (e) {
|
1549 |
|
1550 | session.destroy();
|
1551 | return;
|
1552 | }
|
1553 | session.close();
|
1554 |
|
1555 | |
1556 |
|
1557 | if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1558 | connectionAgeGraceTimer = setTimeout(() => {
|
1559 | session.destroy();
|
1560 | }, this.maxConnectionAgeGraceMs);
|
1561 | connectionAgeGraceTimer.unref?.();
|
1562 | }
|
1563 | }, this.maxConnectionAgeMs + jitter);
|
1564 | connectionAgeTimer.unref?.();
|
1565 | }
|
1566 |
|
1567 | if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) {
|
1568 | keeapliveTimeTimer = setInterval(() => {
|
1569 | keepaliveTimeoutTimer = setTimeout(() => {
|
1570 | sessionClosedByServer = true;
|
1571 | this.channelzTrace.addTrace(
|
1572 | 'CT_INFO',
|
1573 | 'Connection dropped by keepalive timeout from ' + clientAddress
|
1574 | );
|
1575 |
|
1576 | session.close();
|
1577 | }, this.keepaliveTimeoutMs);
|
1578 | keepaliveTimeoutTimer.unref?.();
|
1579 |
|
1580 | try {
|
1581 | session.ping(
|
1582 | (err: Error | null, duration: number, payload: Buffer) => {
|
1583 | if (keepaliveTimeoutTimer) {
|
1584 | clearTimeout(keepaliveTimeoutTimer);
|
1585 | }
|
1586 |
|
1587 | if (err) {
|
1588 | sessionClosedByServer = true;
|
1589 | this.channelzTrace.addTrace(
|
1590 | 'CT_INFO',
|
1591 | 'Connection dropped due to error of a ping frame ' +
|
1592 | err.message +
|
1593 | ' return in ' +
|
1594 | duration
|
1595 | );
|
1596 |
|
1597 | session.close();
|
1598 | }
|
1599 | }
|
1600 | );
|
1601 | channelzSessionInfo.keepAlivesSent += 1;
|
1602 | } catch (e) {
|
1603 | clearTimeout(keepaliveTimeoutTimer);
|
1604 |
|
1605 | session.destroy();
|
1606 | }
|
1607 | }, this.keepaliveTimeMs);
|
1608 | keeapliveTimeTimer.unref?.();
|
1609 | }
|
1610 |
|
1611 | session.on('close', () => {
|
1612 | if (!sessionClosedByServer) {
|
1613 | this.channelzTrace.addTrace(
|
1614 | 'CT_INFO',
|
1615 | 'Connection dropped by client ' + clientAddress
|
1616 | );
|
1617 | }
|
1618 |
|
1619 | this.sessionChildrenTracker.unrefChild(channelzRef);
|
1620 | unregisterChannelzRef(channelzRef);
|
1621 |
|
1622 | if (connectionAgeTimer) {
|
1623 | clearTimeout(connectionAgeTimer);
|
1624 | }
|
1625 |
|
1626 | if (connectionAgeGraceTimer) {
|
1627 | clearTimeout(connectionAgeGraceTimer);
|
1628 | }
|
1629 |
|
1630 | if (keeapliveTimeTimer) {
|
1631 | clearInterval(keeapliveTimeTimer);
|
1632 | if (keepaliveTimeoutTimer) {
|
1633 | clearTimeout(keepaliveTimeoutTimer);
|
1634 | }
|
1635 | }
|
1636 |
|
1637 | if (idleTimeoutObj !== null) {
|
1638 | clearTimeout(idleTimeoutObj.timeout);
|
1639 | this.sessionIdleTimeouts.delete(session);
|
1640 | }
|
1641 |
|
1642 | this.http2Servers.get(http2Server)?.sessions.delete(session);
|
1643 | this.sessions.delete(session);
|
1644 | });
|
1645 | };
|
1646 | }
|
1647 |
|
1648 | private enableIdleTimeout(
|
1649 | session: http2.ServerHttp2Session
|
1650 | ): SessionIdleTimeoutTracker | null {
|
1651 | if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) {
|
1652 | return null;
|
1653 | }
|
1654 |
|
1655 | const idleTimeoutObj: SessionIdleTimeoutTracker = {
|
1656 | activeStreams: 0,
|
1657 | lastIdle: Date.now(),
|
1658 | onClose: this.onStreamClose.bind(this, session),
|
1659 | timeout: setTimeout(
|
1660 | this.onIdleTimeout,
|
1661 | this.sessionIdleTimeout,
|
1662 | this,
|
1663 | session
|
1664 | ),
|
1665 | };
|
1666 | idleTimeoutObj.timeout.unref?.();
|
1667 | this.sessionIdleTimeouts.set(session, idleTimeoutObj);
|
1668 |
|
1669 | const { socket } = session;
|
1670 | this.trace(
|
1671 | 'Enable idle timeout for ' +
|
1672 | socket.remoteAddress +
|
1673 | ':' +
|
1674 | socket.remotePort
|
1675 | );
|
1676 |
|
1677 | return idleTimeoutObj;
|
1678 | }
|
1679 |
|
1680 | private onIdleTimeout(
|
1681 | this: undefined,
|
1682 | ctx: Server,
|
1683 | session: http2.ServerHttp2Session
|
1684 | ) {
|
1685 | const { socket } = session;
|
1686 | const sessionInfo = ctx.sessionIdleTimeouts.get(session);
|
1687 |
|
1688 |
|
1689 |
|
1690 |
|
1691 |
|
1692 | if (
|
1693 | sessionInfo !== undefined &&
|
1694 | sessionInfo.activeStreams === 0 &&
|
1695 | Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout
|
1696 | ) {
|
1697 | ctx.trace(
|
1698 | 'Session idle timeout triggered for ' +
|
1699 | socket?.remoteAddress +
|
1700 | ':' +
|
1701 | socket?.remotePort +
|
1702 | ' last idle at ' +
|
1703 | sessionInfo.lastIdle
|
1704 | );
|
1705 |
|
1706 | ctx.closeSession(session);
|
1707 | }
|
1708 | }
|
1709 |
|
1710 | private onStreamOpened(stream: http2.ServerHttp2Stream) {
|
1711 | const session = stream.session as http2.ServerHttp2Session;
|
1712 |
|
1713 | const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
|
1714 | if (idleTimeoutObj) {
|
1715 | idleTimeoutObj.activeStreams += 1;
|
1716 | stream.once('close', idleTimeoutObj.onClose);
|
1717 | }
|
1718 | }
|
1719 |
|
1720 | private onStreamClose(session: http2.ServerHttp2Session) {
|
1721 | const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
|
1722 |
|
1723 | if (idleTimeoutObj) {
|
1724 | idleTimeoutObj.activeStreams -= 1;
|
1725 | if (idleTimeoutObj.activeStreams === 0) {
|
1726 | idleTimeoutObj.lastIdle = Date.now();
|
1727 | idleTimeoutObj.timeout.refresh();
|
1728 |
|
1729 | this.trace(
|
1730 | 'Session onStreamClose' +
|
1731 | session.socket?.remoteAddress +
|
1732 | ':' +
|
1733 | session.socket?.remotePort +
|
1734 | ' at ' +
|
1735 | idleTimeoutObj.lastIdle
|
1736 | );
|
1737 | }
|
1738 | }
|
1739 | }
|
1740 | }
|
1741 |
|
1742 | async function handleUnary<RequestType, ResponseType>(
|
1743 | call: ServerInterceptingCallInterface,
|
1744 | handler: UnaryHandler<RequestType, ResponseType>
|
1745 | ): Promise<void> {
|
1746 | let stream: ServerUnaryCall<RequestType, ResponseType>;
|
1747 |
|
1748 | function respond(
|
1749 | err: ServerErrorResponse | ServerStatusResponse | null,
|
1750 | value?: ResponseType | null,
|
1751 | trailer?: Metadata,
|
1752 | flags?: number
|
1753 | ) {
|
1754 | if (err) {
|
1755 | call.sendStatus(serverErrorToStatus(err, trailer));
|
1756 | return;
|
1757 | }
|
1758 | call.sendMessage(value, () => {
|
1759 | call.sendStatus({
|
1760 | code: Status.OK,
|
1761 | details: 'OK',
|
1762 | metadata: trailer ?? null,
|
1763 | });
|
1764 | });
|
1765 | }
|
1766 |
|
1767 | let requestMetadata: Metadata;
|
1768 | let requestMessage: RequestType | null = null;
|
1769 | call.start({
|
1770 | onReceiveMetadata(metadata) {
|
1771 | requestMetadata = metadata;
|
1772 | call.startRead();
|
1773 | },
|
1774 | onReceiveMessage(message) {
|
1775 | if (requestMessage) {
|
1776 | call.sendStatus({
|
1777 | code: Status.UNIMPLEMENTED,
|
1778 | details: `Received a second request message for server streaming method ${handler.path}`,
|
1779 | metadata: null,
|
1780 | });
|
1781 | return;
|
1782 | }
|
1783 | requestMessage = message;
|
1784 | call.startRead();
|
1785 | },
|
1786 | onReceiveHalfClose() {
|
1787 | if (!requestMessage) {
|
1788 | call.sendStatus({
|
1789 | code: Status.UNIMPLEMENTED,
|
1790 | details: `Received no request message for server streaming method ${handler.path}`,
|
1791 | metadata: null,
|
1792 | });
|
1793 | return;
|
1794 | }
|
1795 | stream = new ServerWritableStreamImpl(
|
1796 | handler.path,
|
1797 | call,
|
1798 | requestMetadata,
|
1799 | requestMessage
|
1800 | );
|
1801 | try {
|
1802 | handler.func(stream, respond);
|
1803 | } catch (err) {
|
1804 | call.sendStatus({
|
1805 | code: Status.UNKNOWN,
|
1806 | details: `Server method handler threw error ${
|
1807 | (err as Error).message
|
1808 | }`,
|
1809 | metadata: null,
|
1810 | });
|
1811 | }
|
1812 | },
|
1813 | onCancel() {
|
1814 | if (stream) {
|
1815 | stream.cancelled = true;
|
1816 | stream.emit('cancelled', 'cancelled');
|
1817 | }
|
1818 | },
|
1819 | });
|
1820 | }
|
1821 |
|
1822 | function handleClientStreaming<RequestType, ResponseType>(
|
1823 | call: ServerInterceptingCallInterface,
|
1824 | handler: ClientStreamingHandler<RequestType, ResponseType>
|
1825 | ): void {
|
1826 | let stream: ServerReadableStream<RequestType, ResponseType>;
|
1827 |
|
1828 | function respond(
|
1829 | err: ServerErrorResponse | ServerStatusResponse | null,
|
1830 | value?: ResponseType | null,
|
1831 | trailer?: Metadata,
|
1832 | flags?: number
|
1833 | ) {
|
1834 | if (err) {
|
1835 | call.sendStatus(serverErrorToStatus(err, trailer));
|
1836 | return;
|
1837 | }
|
1838 | call.sendMessage(value, () => {
|
1839 | call.sendStatus({
|
1840 | code: Status.OK,
|
1841 | details: 'OK',
|
1842 | metadata: trailer ?? null,
|
1843 | });
|
1844 | });
|
1845 | }
|
1846 |
|
1847 | call.start({
|
1848 | onReceiveMetadata(metadata) {
|
1849 | stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
|
1850 | try {
|
1851 | handler.func(stream, respond);
|
1852 | } catch (err) {
|
1853 | call.sendStatus({
|
1854 | code: Status.UNKNOWN,
|
1855 | details: `Server method handler threw error ${
|
1856 | (err as Error).message
|
1857 | }`,
|
1858 | metadata: null,
|
1859 | });
|
1860 | }
|
1861 | },
|
1862 | onReceiveMessage(message) {
|
1863 | stream.push(message);
|
1864 | },
|
1865 | onReceiveHalfClose() {
|
1866 | stream.push(null);
|
1867 | },
|
1868 | onCancel() {
|
1869 | if (stream) {
|
1870 | stream.cancelled = true;
|
1871 | stream.emit('cancelled', 'cancelled');
|
1872 | stream.destroy();
|
1873 | }
|
1874 | },
|
1875 | });
|
1876 | }
|
1877 |
|
1878 | function handleServerStreaming<RequestType, ResponseType>(
|
1879 | call: ServerInterceptingCallInterface,
|
1880 | handler: ServerStreamingHandler<RequestType, ResponseType>
|
1881 | ): void {
|
1882 | let stream: ServerWritableStream<RequestType, ResponseType>;
|
1883 |
|
1884 | let requestMetadata: Metadata;
|
1885 | let requestMessage: RequestType | null = null;
|
1886 | call.start({
|
1887 | onReceiveMetadata(metadata) {
|
1888 | requestMetadata = metadata;
|
1889 | call.startRead();
|
1890 | },
|
1891 | onReceiveMessage(message) {
|
1892 | if (requestMessage) {
|
1893 | call.sendStatus({
|
1894 | code: Status.UNIMPLEMENTED,
|
1895 | details: `Received a second request message for server streaming method ${handler.path}`,
|
1896 | metadata: null,
|
1897 | });
|
1898 | return;
|
1899 | }
|
1900 | requestMessage = message;
|
1901 | call.startRead();
|
1902 | },
|
1903 | onReceiveHalfClose() {
|
1904 | if (!requestMessage) {
|
1905 | call.sendStatus({
|
1906 | code: Status.UNIMPLEMENTED,
|
1907 | details: `Received no request message for server streaming method ${handler.path}`,
|
1908 | metadata: null,
|
1909 | });
|
1910 | return;
|
1911 | }
|
1912 | stream = new ServerWritableStreamImpl(
|
1913 | handler.path,
|
1914 | call,
|
1915 | requestMetadata,
|
1916 | requestMessage
|
1917 | );
|
1918 | try {
|
1919 | handler.func(stream);
|
1920 | } catch (err) {
|
1921 | call.sendStatus({
|
1922 | code: Status.UNKNOWN,
|
1923 | details: `Server method handler threw error ${
|
1924 | (err as Error).message
|
1925 | }`,
|
1926 | metadata: null,
|
1927 | });
|
1928 | }
|
1929 | },
|
1930 | onCancel() {
|
1931 | if (stream) {
|
1932 | stream.cancelled = true;
|
1933 | stream.emit('cancelled', 'cancelled');
|
1934 | stream.destroy();
|
1935 | }
|
1936 | },
|
1937 | });
|
1938 | }
|
1939 |
|
1940 | function handleBidiStreaming<RequestType, ResponseType>(
|
1941 | call: ServerInterceptingCallInterface,
|
1942 | handler: BidiStreamingHandler<RequestType, ResponseType>
|
1943 | ): void {
|
1944 | let stream: ServerDuplexStream<RequestType, ResponseType>;
|
1945 |
|
1946 | call.start({
|
1947 | onReceiveMetadata(metadata) {
|
1948 | stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
|
1949 | try {
|
1950 | handler.func(stream);
|
1951 | } catch (err) {
|
1952 | call.sendStatus({
|
1953 | code: Status.UNKNOWN,
|
1954 | details: `Server method handler threw error ${
|
1955 | (err as Error).message
|
1956 | }`,
|
1957 | metadata: null,
|
1958 | });
|
1959 | }
|
1960 | },
|
1961 | onReceiveMessage(message) {
|
1962 | stream.push(message);
|
1963 | },
|
1964 | onReceiveHalfClose() {
|
1965 | stream.push(null);
|
1966 | },
|
1967 | onCancel() {
|
1968 | if (stream) {
|
1969 | stream.cancelled = true;
|
1970 | stream.emit('cancelled', 'cancelled');
|
1971 | stream.destroy();
|
1972 | }
|
1973 | },
|
1974 | });
|
1975 | }
|