1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import { AddressInfo } from 'net';
|
20 |
|
21 | import { ServiceError } from './call';
|
22 | import { Status, LogVerbosity } from './constants';
|
23 | import { Deserialize, Serialize, ServiceDefinition } from './make-client';
|
24 | import { Metadata } from './metadata';
|
25 | import {
|
26 | BidiStreamingHandler,
|
27 | ClientStreamingHandler,
|
28 | HandleCall,
|
29 | Handler,
|
30 | HandlerType,
|
31 | Http2ServerCallStream,
|
32 | sendUnaryData,
|
33 | ServerDuplexStream,
|
34 | ServerDuplexStreamImpl,
|
35 | ServerReadableStream,
|
36 | ServerReadableStreamImpl,
|
37 | ServerStreamingHandler,
|
38 | ServerUnaryCall,
|
39 | ServerUnaryCallImpl,
|
40 | ServerWritableStream,
|
41 | ServerWritableStreamImpl,
|
42 | UnaryHandler,
|
43 | ServerErrorResponse,
|
44 | ServerStatusResponse,
|
45 | } from './server-call';
|
46 | import { ServerCredentials } from './server-credentials';
|
47 | import { ChannelOptions } from './channel-options';
|
48 | import {
|
49 | createResolver,
|
50 | ResolverListener,
|
51 | mapUriDefaultScheme,
|
52 | } from './resolver';
|
53 | import * as logging from './logging';
|
54 | import {
|
55 | SubchannelAddress,
|
56 | TcpSubchannelAddress,
|
57 | isTcpSubchannelAddress,
|
58 | subchannelAddressToString,
|
59 | stringToSubchannelAddress,
|
60 | } from './subchannel-address';
|
61 | import { parseUri } from './uri-parser';
|
62 | import {
|
63 | ChannelzCallTracker,
|
64 | ChannelzChildrenTracker,
|
65 | ChannelzTrace,
|
66 | registerChannelzServer,
|
67 | registerChannelzSocket,
|
68 | ServerInfo,
|
69 | ServerRef,
|
70 | SocketInfo,
|
71 | SocketRef,
|
72 | TlsInfo,
|
73 | unregisterChannelzRef,
|
74 | } from './channelz';
|
75 | import { CipherNameAndProtocol, TLSSocket } from 'tls';
|
76 |
|
77 | const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
|
78 | const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
|
79 | const KEEPALIVE_TIMEOUT_MS = 20000;
|
80 |
|
81 | const { HTTP2_HEADER_PATH } = http2.constants;
|
82 |
|
83 | const TRACER_NAME = 'server';
|
84 |
|
85 | interface BindResult {
|
86 | port: number;
|
87 | count: number;
|
88 | }
|
89 |
|
90 | function noop(): void {}
|
91 |
|
92 | function getUnimplementedStatusResponse(
|
93 | methodName: string
|
94 | ): Partial<ServiceError> {
|
95 | return {
|
96 | code: Status.UNIMPLEMENTED,
|
97 | details: `The server does not implement the method ${methodName}`,
|
98 | };
|
99 | }
|
100 |
|
101 |
|
102 | type UntypedUnaryHandler = UnaryHandler<any, any>;
|
103 | type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
|
104 | type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
|
105 | type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
|
106 | export type UntypedHandleCall = HandleCall<any, any>;
|
107 | type UntypedHandler = Handler<any, any>;
|
108 | export interface UntypedServiceImplementation {
|
109 | [name: string]: UntypedHandleCall;
|
110 | }
|
111 |
|
112 | function getDefaultHandler(handlerType: HandlerType, methodName: string) {
|
113 | const unimplementedStatusResponse =
|
114 | getUnimplementedStatusResponse(methodName);
|
115 | switch (handlerType) {
|
116 | case 'unary':
|
117 | return (
|
118 | call: ServerUnaryCall<any, any>,
|
119 | callback: sendUnaryData<any>
|
120 | ) => {
|
121 | callback(unimplementedStatusResponse as ServiceError, null);
|
122 | };
|
123 | case 'clientStream':
|
124 | return (
|
125 | call: ServerReadableStream<any, any>,
|
126 | callback: sendUnaryData<any>
|
127 | ) => {
|
128 | callback(unimplementedStatusResponse as ServiceError, null);
|
129 | };
|
130 | case 'serverStream':
|
131 | return (call: ServerWritableStream<any, any>) => {
|
132 | call.emit('error', unimplementedStatusResponse);
|
133 | };
|
134 | case 'bidi':
|
135 | return (call: ServerDuplexStream<any, any>) => {
|
136 | call.emit('error', unimplementedStatusResponse);
|
137 | };
|
138 | default:
|
139 | throw new Error(`Invalid handlerType ${handlerType}`);
|
140 | }
|
141 | }
|
142 |
|
143 | interface ChannelzSessionInfo {
|
144 | ref: SocketRef;
|
145 | streamTracker: ChannelzCallTracker;
|
146 | messagesSent: number;
|
147 | messagesReceived: number;
|
148 | lastMessageSentTimestamp: Date | null;
|
149 | lastMessageReceivedTimestamp: Date | null;
|
150 | }
|
151 |
|
152 | export class Server {
|
153 | private http2ServerList: {
|
154 | server: http2.Http2Server | http2.Http2SecureServer;
|
155 | channelzRef: SocketRef;
|
156 | }[] = [];
|
157 |
|
158 | private handlers: Map<string, UntypedHandler> = new Map<
|
159 | string,
|
160 | UntypedHandler
|
161 | >();
|
162 | private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
|
163 | private started = false;
|
164 | private options: ChannelOptions;
|
165 | private serverAddressString = 'null';
|
166 |
|
167 |
|
168 | private readonly channelzEnabled: boolean = true;
|
169 | private channelzRef: ServerRef;
|
170 | private channelzTrace = new ChannelzTrace();
|
171 | private callTracker = new ChannelzCallTracker();
|
172 | private listenerChildrenTracker = new ChannelzChildrenTracker();
|
173 | private sessionChildrenTracker = new ChannelzChildrenTracker();
|
174 |
|
175 | private readonly maxConnectionAgeMs: number;
|
176 | private readonly maxConnectionAgeGraceMs: number;
|
177 |
|
178 | private readonly keepaliveTimeMs: number;
|
179 | private readonly keepaliveTimeoutMs: number;
|
180 |
|
181 | constructor(options?: ChannelOptions) {
|
182 | this.options = options ?? {};
|
183 | if (this.options['grpc.enable_channelz'] === 0) {
|
184 | this.channelzEnabled = false;
|
185 | }
|
186 | this.channelzRef = registerChannelzServer(
|
187 | () => this.getChannelzInfo(),
|
188 | this.channelzEnabled
|
189 | );
|
190 | if (this.channelzEnabled) {
|
191 | this.channelzTrace.addTrace('CT_INFO', 'Server created');
|
192 | }
|
193 | this.maxConnectionAgeMs =
|
194 | this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
|
195 | this.maxConnectionAgeGraceMs =
|
196 | this.options['grpc.max_connection_age_grace_ms'] ??
|
197 | UNLIMITED_CONNECTION_AGE_MS;
|
198 | this.keepaliveTimeMs =
|
199 | this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
|
200 | this.keepaliveTimeoutMs =
|
201 | this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
|
202 | this.trace('Server constructed');
|
203 | }
|
204 |
|
205 | private getChannelzInfo(): ServerInfo {
|
206 | return {
|
207 | trace: this.channelzTrace,
|
208 | callTracker: this.callTracker,
|
209 | listenerChildren: this.listenerChildrenTracker.getChildLists(),
|
210 | sessionChildren: this.sessionChildrenTracker.getChildLists(),
|
211 | };
|
212 | }
|
213 |
|
214 | private getChannelzSessionInfoGetter(
|
215 | session: http2.ServerHttp2Session
|
216 | ): () => SocketInfo {
|
217 | return () => {
|
218 | const sessionInfo = this.sessions.get(session)!;
|
219 | const sessionSocket = session.socket;
|
220 | const remoteAddress = sessionSocket.remoteAddress
|
221 | ? stringToSubchannelAddress(
|
222 | sessionSocket.remoteAddress,
|
223 | sessionSocket.remotePort
|
224 | )
|
225 | : null;
|
226 | const localAddress = sessionSocket.localAddress
|
227 | ? stringToSubchannelAddress(
|
228 | sessionSocket.localAddress!,
|
229 | sessionSocket.localPort
|
230 | )
|
231 | : null;
|
232 | let tlsInfo: TlsInfo | null;
|
233 | if (session.encrypted) {
|
234 | const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
|
235 | const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
|
236 | tlsSocket.getCipher();
|
237 | const certificate = tlsSocket.getCertificate();
|
238 | const peerCertificate = tlsSocket.getPeerCertificate();
|
239 | tlsInfo = {
|
240 | cipherSuiteStandardName: cipherInfo.standardName ?? null,
|
241 | cipherSuiteOtherName: cipherInfo.standardName
|
242 | ? null
|
243 | : cipherInfo.name,
|
244 | localCertificate:
|
245 | certificate && 'raw' in certificate ? certificate.raw : null,
|
246 | remoteCertificate:
|
247 | peerCertificate && 'raw' in peerCertificate
|
248 | ? peerCertificate.raw
|
249 | : null,
|
250 | };
|
251 | } else {
|
252 | tlsInfo = null;
|
253 | }
|
254 | const socketInfo: SocketInfo = {
|
255 | remoteAddress: remoteAddress,
|
256 | localAddress: localAddress,
|
257 | security: tlsInfo,
|
258 | remoteName: null,
|
259 | streamsStarted: sessionInfo.streamTracker.callsStarted,
|
260 | streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
|
261 | streamsFailed: sessionInfo.streamTracker.callsFailed,
|
262 | messagesSent: sessionInfo.messagesSent,
|
263 | messagesReceived: sessionInfo.messagesReceived,
|
264 | keepAlivesSent: 0,
|
265 | lastLocalStreamCreatedTimestamp: null,
|
266 | lastRemoteStreamCreatedTimestamp:
|
267 | sessionInfo.streamTracker.lastCallStartedTimestamp,
|
268 | lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
|
269 | lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
|
270 | localFlowControlWindow: session.state.localWindowSize ?? null,
|
271 | remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
|
272 | };
|
273 | return socketInfo;
|
274 | };
|
275 | }
|
276 |
|
277 | private trace(text: string): void {
|
278 | logging.trace(
|
279 | LogVerbosity.DEBUG,
|
280 | TRACER_NAME,
|
281 | '(' + this.channelzRef.id + ') ' + text
|
282 | );
|
283 | }
|
284 |
|
285 | addProtoService(): never {
|
286 | throw new Error('Not implemented. Use addService() instead');
|
287 | }
|
288 |
|
289 | addService(
|
290 | service: ServiceDefinition,
|
291 | implementation: UntypedServiceImplementation
|
292 | ): void {
|
293 | if (
|
294 | service === null ||
|
295 | typeof service !== 'object' ||
|
296 | implementation === null ||
|
297 | typeof implementation !== 'object'
|
298 | ) {
|
299 | throw new Error('addService() requires two objects as arguments');
|
300 | }
|
301 |
|
302 | const serviceKeys = Object.keys(service);
|
303 |
|
304 | if (serviceKeys.length === 0) {
|
305 | throw new Error('Cannot add an empty service to a server');
|
306 | }
|
307 |
|
308 | serviceKeys.forEach(name => {
|
309 | const attrs = service[name];
|
310 | let methodType: HandlerType;
|
311 |
|
312 | if (attrs.requestStream) {
|
313 | if (attrs.responseStream) {
|
314 | methodType = 'bidi';
|
315 | } else {
|
316 | methodType = 'clientStream';
|
317 | }
|
318 | } else {
|
319 | if (attrs.responseStream) {
|
320 | methodType = 'serverStream';
|
321 | } else {
|
322 | methodType = 'unary';
|
323 | }
|
324 | }
|
325 |
|
326 | let implFn = implementation[name];
|
327 | let impl;
|
328 |
|
329 | if (implFn === undefined && typeof attrs.originalName === 'string') {
|
330 | implFn = implementation[attrs.originalName];
|
331 | }
|
332 |
|
333 | if (implFn !== undefined) {
|
334 | impl = implFn.bind(implementation);
|
335 | } else {
|
336 | impl = getDefaultHandler(methodType, name);
|
337 | }
|
338 |
|
339 | const success = this.register(
|
340 | attrs.path,
|
341 | impl as UntypedHandleCall,
|
342 | attrs.responseSerialize,
|
343 | attrs.requestDeserialize,
|
344 | methodType
|
345 | );
|
346 |
|
347 | if (success === false) {
|
348 | throw new Error(`Method handler for ${attrs.path} already provided.`);
|
349 | }
|
350 | });
|
351 | }
|
352 |
|
353 | removeService(service: ServiceDefinition): void {
|
354 | if (service === null || typeof service !== 'object') {
|
355 | throw new Error('removeService() requires object as argument');
|
356 | }
|
357 |
|
358 | const serviceKeys = Object.keys(service);
|
359 | serviceKeys.forEach(name => {
|
360 | const attrs = service[name];
|
361 | this.unregister(attrs.path);
|
362 | });
|
363 | }
|
364 |
|
365 | bind(port: string, creds: ServerCredentials): never {
|
366 | throw new Error('Not implemented. Use bindAsync() instead');
|
367 | }
|
368 |
|
369 | bindAsync(
|
370 | port: string,
|
371 | creds: ServerCredentials,
|
372 | callback: (error: Error | null, port: number) => void
|
373 | ): void {
|
374 | if (this.started === true) {
|
375 | throw new Error('server is already started');
|
376 | }
|
377 |
|
378 | if (typeof port !== 'string') {
|
379 | throw new TypeError('port must be a string');
|
380 | }
|
381 |
|
382 | if (creds === null || !(creds instanceof ServerCredentials)) {
|
383 | throw new TypeError('creds must be a ServerCredentials object');
|
384 | }
|
385 |
|
386 | if (typeof callback !== 'function') {
|
387 | throw new TypeError('callback must be a function');
|
388 | }
|
389 |
|
390 | const initialPortUri = parseUri(port);
|
391 | if (initialPortUri === null) {
|
392 | throw new Error(`Could not parse port "${port}"`);
|
393 | }
|
394 | const portUri = mapUriDefaultScheme(initialPortUri);
|
395 | if (portUri === null) {
|
396 | throw new Error(`Could not get a default scheme for port "${port}"`);
|
397 | }
|
398 |
|
399 | const serverOptions: http2.ServerOptions = {
|
400 | maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
|
401 | };
|
402 | if ('grpc-node.max_session_memory' in this.options) {
|
403 | serverOptions.maxSessionMemory =
|
404 | this.options['grpc-node.max_session_memory'];
|
405 | } else {
|
406 | |
407 |
|
408 |
|
409 |
|
410 | serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
|
411 | }
|
412 | if ('grpc.max_concurrent_streams' in this.options) {
|
413 | serverOptions.settings = {
|
414 | maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
|
415 | };
|
416 | }
|
417 |
|
418 | const deferredCallback = (error: Error | null, port: number) => {
|
419 | process.nextTick(() => callback(error, port));
|
420 | };
|
421 |
|
422 | const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
|
423 | let http2Server: http2.Http2Server | http2.Http2SecureServer;
|
424 | if (creds._isSecure()) {
|
425 | const secureServerOptions = Object.assign(
|
426 | serverOptions,
|
427 | creds._getSettings()!
|
428 | );
|
429 | secureServerOptions.enableTrace =
|
430 | this.options['grpc-node.tls_enable_trace'] === 1;
|
431 | http2Server = http2.createSecureServer(secureServerOptions);
|
432 | http2Server.on('secureConnection', (socket: TLSSocket) => {
|
433 | |
434 |
|
435 | socket.on('error', (e: Error) => {
|
436 | this.trace(
|
437 | 'An incoming TLS connection closed with error: ' + e.message
|
438 | );
|
439 | });
|
440 | });
|
441 | } else {
|
442 | http2Server = http2.createServer(serverOptions);
|
443 | }
|
444 |
|
445 | http2Server.setTimeout(0, noop);
|
446 | this._setupHandlers(http2Server);
|
447 | return http2Server;
|
448 | };
|
449 |
|
450 | const bindSpecificPort = (
|
451 | addressList: SubchannelAddress[],
|
452 | portNum: number,
|
453 | previousCount: number
|
454 | ): Promise<BindResult> => {
|
455 | if (addressList.length === 0) {
|
456 | return Promise.resolve({ port: portNum, count: previousCount });
|
457 | }
|
458 | return Promise.all(
|
459 | addressList.map(address => {
|
460 | this.trace(
|
461 | 'Attempting to bind ' + subchannelAddressToString(address)
|
462 | );
|
463 | let addr: SubchannelAddress;
|
464 | if (isTcpSubchannelAddress(address)) {
|
465 | addr = {
|
466 | host: (address as TcpSubchannelAddress).host,
|
467 | port: portNum,
|
468 | };
|
469 | } else {
|
470 | addr = address;
|
471 | }
|
472 |
|
473 | const http2Server = setupServer();
|
474 | return new Promise<number | Error>((resolve, reject) => {
|
475 | const onError = (err: Error) => {
|
476 | this.trace(
|
477 | 'Failed to bind ' +
|
478 | subchannelAddressToString(address) +
|
479 | ' with error ' +
|
480 | err.message
|
481 | );
|
482 | resolve(err);
|
483 | };
|
484 |
|
485 | http2Server.once('error', onError);
|
486 |
|
487 | http2Server.listen(addr, () => {
|
488 | const boundAddress = http2Server.address()!;
|
489 | let boundSubchannelAddress: SubchannelAddress;
|
490 | if (typeof boundAddress === 'string') {
|
491 | boundSubchannelAddress = {
|
492 | path: boundAddress,
|
493 | };
|
494 | } else {
|
495 | boundSubchannelAddress = {
|
496 | host: boundAddress.address,
|
497 | port: boundAddress.port,
|
498 | };
|
499 | }
|
500 |
|
501 | const channelzRef = registerChannelzSocket(
|
502 | subchannelAddressToString(boundSubchannelAddress),
|
503 | () => {
|
504 | return {
|
505 | localAddress: boundSubchannelAddress,
|
506 | remoteAddress: null,
|
507 | security: null,
|
508 | remoteName: null,
|
509 | streamsStarted: 0,
|
510 | streamsSucceeded: 0,
|
511 | streamsFailed: 0,
|
512 | messagesSent: 0,
|
513 | messagesReceived: 0,
|
514 | keepAlivesSent: 0,
|
515 | lastLocalStreamCreatedTimestamp: null,
|
516 | lastRemoteStreamCreatedTimestamp: null,
|
517 | lastMessageSentTimestamp: null,
|
518 | lastMessageReceivedTimestamp: null,
|
519 | localFlowControlWindow: null,
|
520 | remoteFlowControlWindow: null,
|
521 | };
|
522 | },
|
523 | this.channelzEnabled
|
524 | );
|
525 | if (this.channelzEnabled) {
|
526 | this.listenerChildrenTracker.refChild(channelzRef);
|
527 | }
|
528 | this.http2ServerList.push({
|
529 | server: http2Server,
|
530 | channelzRef: channelzRef,
|
531 | });
|
532 | this.trace(
|
533 | 'Successfully bound ' +
|
534 | subchannelAddressToString(boundSubchannelAddress)
|
535 | );
|
536 | resolve(
|
537 | 'port' in boundSubchannelAddress
|
538 | ? boundSubchannelAddress.port
|
539 | : portNum
|
540 | );
|
541 | http2Server.removeListener('error', onError);
|
542 | });
|
543 | });
|
544 | })
|
545 | ).then(results => {
|
546 | let count = 0;
|
547 | for (const result of results) {
|
548 | if (typeof result === 'number') {
|
549 | count += 1;
|
550 | if (result !== portNum) {
|
551 | throw new Error(
|
552 | 'Invalid state: multiple port numbers added from single address'
|
553 | );
|
554 | }
|
555 | }
|
556 | }
|
557 | return {
|
558 | port: portNum,
|
559 | count: count + previousCount,
|
560 | };
|
561 | });
|
562 | };
|
563 |
|
564 | const bindWildcardPort = (
|
565 | addressList: SubchannelAddress[]
|
566 | ): Promise<BindResult> => {
|
567 | if (addressList.length === 0) {
|
568 | return Promise.resolve<BindResult>({ port: 0, count: 0 });
|
569 | }
|
570 | const address = addressList[0];
|
571 | const http2Server = setupServer();
|
572 | return new Promise<BindResult>((resolve, reject) => {
|
573 | const onError = (err: Error) => {
|
574 | this.trace(
|
575 | 'Failed to bind ' +
|
576 | subchannelAddressToString(address) +
|
577 | ' with error ' +
|
578 | err.message
|
579 | );
|
580 | resolve(bindWildcardPort(addressList.slice(1)));
|
581 | };
|
582 |
|
583 | http2Server.once('error', onError);
|
584 |
|
585 | http2Server.listen(address, () => {
|
586 | const boundAddress = http2Server.address() as AddressInfo;
|
587 | const boundSubchannelAddress: SubchannelAddress = {
|
588 | host: boundAddress.address,
|
589 | port: boundAddress.port,
|
590 | };
|
591 | const channelzRef = registerChannelzSocket(
|
592 | subchannelAddressToString(boundSubchannelAddress),
|
593 | () => {
|
594 | return {
|
595 | localAddress: boundSubchannelAddress,
|
596 | remoteAddress: null,
|
597 | security: null,
|
598 | remoteName: null,
|
599 | streamsStarted: 0,
|
600 | streamsSucceeded: 0,
|
601 | streamsFailed: 0,
|
602 | messagesSent: 0,
|
603 | messagesReceived: 0,
|
604 | keepAlivesSent: 0,
|
605 | lastLocalStreamCreatedTimestamp: null,
|
606 | lastRemoteStreamCreatedTimestamp: null,
|
607 | lastMessageSentTimestamp: null,
|
608 | lastMessageReceivedTimestamp: null,
|
609 | localFlowControlWindow: null,
|
610 | remoteFlowControlWindow: null,
|
611 | };
|
612 | },
|
613 | this.channelzEnabled
|
614 | );
|
615 | if (this.channelzEnabled) {
|
616 | this.listenerChildrenTracker.refChild(channelzRef);
|
617 | }
|
618 | this.http2ServerList.push({
|
619 | server: http2Server,
|
620 | channelzRef: channelzRef,
|
621 | });
|
622 | this.trace(
|
623 | 'Successfully bound ' +
|
624 | subchannelAddressToString(boundSubchannelAddress)
|
625 | );
|
626 | resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
|
627 | http2Server.removeListener('error', onError);
|
628 | });
|
629 | });
|
630 | };
|
631 |
|
632 | const resolverListener: ResolverListener = {
|
633 | onSuccessfulResolution: (
|
634 | addressList,
|
635 | serviceConfig,
|
636 | serviceConfigError
|
637 | ) => {
|
638 |
|
639 | resolverListener.onSuccessfulResolution = () => {};
|
640 | if (addressList.length === 0) {
|
641 | deferredCallback(
|
642 | new Error(`No addresses resolved for port ${port}`),
|
643 | 0
|
644 | );
|
645 | return;
|
646 | }
|
647 | let bindResultPromise: Promise<BindResult>;
|
648 | if (isTcpSubchannelAddress(addressList[0])) {
|
649 | if (addressList[0].port === 0) {
|
650 | bindResultPromise = bindWildcardPort(addressList);
|
651 | } else {
|
652 | bindResultPromise = bindSpecificPort(
|
653 | addressList,
|
654 | addressList[0].port,
|
655 | 0
|
656 | );
|
657 | }
|
658 | } else {
|
659 |
|
660 | bindResultPromise = bindSpecificPort(addressList, 1, 0);
|
661 | }
|
662 | bindResultPromise.then(
|
663 | bindResult => {
|
664 | if (bindResult.count === 0) {
|
665 | const errorString = `No address added out of total ${addressList.length} resolved`;
|
666 | logging.log(LogVerbosity.ERROR, errorString);
|
667 | deferredCallback(new Error(errorString), 0);
|
668 | } else {
|
669 | if (bindResult.count < addressList.length) {
|
670 | logging.log(
|
671 | LogVerbosity.INFO,
|
672 | `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
|
673 | );
|
674 | }
|
675 | deferredCallback(null, bindResult.port);
|
676 | }
|
677 | },
|
678 | error => {
|
679 | const errorString = `No address added out of total ${addressList.length} resolved`;
|
680 | logging.log(LogVerbosity.ERROR, errorString);
|
681 | deferredCallback(new Error(errorString), 0);
|
682 | }
|
683 | );
|
684 | },
|
685 | onError: error => {
|
686 | deferredCallback(new Error(error.details), 0);
|
687 | },
|
688 | };
|
689 |
|
690 | const resolver = createResolver(portUri, resolverListener, this.options);
|
691 | resolver.updateResolution();
|
692 | }
|
693 |
|
694 | forceShutdown(): void {
|
695 |
|
696 |
|
697 | for (const { server: http2Server, channelzRef: ref } of this
|
698 | .http2ServerList) {
|
699 | if (http2Server.listening) {
|
700 | http2Server.close(() => {
|
701 | if (this.channelzEnabled) {
|
702 | this.listenerChildrenTracker.unrefChild(ref);
|
703 | unregisterChannelzRef(ref);
|
704 | }
|
705 | });
|
706 | }
|
707 | }
|
708 |
|
709 | this.started = false;
|
710 |
|
711 |
|
712 |
|
713 | this.sessions.forEach((channelzInfo, session) => {
|
714 |
|
715 |
|
716 |
|
717 | session.destroy(http2.constants.NGHTTP2_CANCEL as any);
|
718 | });
|
719 | this.sessions.clear();
|
720 | if (this.channelzEnabled) {
|
721 | unregisterChannelzRef(this.channelzRef);
|
722 | }
|
723 | }
|
724 |
|
725 | register<RequestType, ResponseType>(
|
726 | name: string,
|
727 | handler: HandleCall<RequestType, ResponseType>,
|
728 | serialize: Serialize<ResponseType>,
|
729 | deserialize: Deserialize<RequestType>,
|
730 | type: string
|
731 | ): boolean {
|
732 | if (this.handlers.has(name)) {
|
733 | return false;
|
734 | }
|
735 |
|
736 | this.handlers.set(name, {
|
737 | func: handler,
|
738 | serialize,
|
739 | deserialize,
|
740 | type,
|
741 | path: name,
|
742 | } as UntypedHandler);
|
743 | return true;
|
744 | }
|
745 |
|
746 | unregister(name: string): boolean {
|
747 | return this.handlers.delete(name);
|
748 | }
|
749 |
|
750 | start(): void {
|
751 | if (
|
752 | this.http2ServerList.length === 0 ||
|
753 | this.http2ServerList.every(
|
754 | ({ server: http2Server }) => http2Server.listening !== true
|
755 | )
|
756 | ) {
|
757 | throw new Error('server must be bound in order to start');
|
758 | }
|
759 |
|
760 | if (this.started === true) {
|
761 | throw new Error('server is already started');
|
762 | }
|
763 | if (this.channelzEnabled) {
|
764 | this.channelzTrace.addTrace('CT_INFO', 'Starting');
|
765 | }
|
766 | this.started = true;
|
767 | }
|
768 |
|
769 | tryShutdown(callback: (error?: Error) => void): void {
|
770 | const wrappedCallback = (error?: Error) => {
|
771 | if (this.channelzEnabled) {
|
772 | unregisterChannelzRef(this.channelzRef);
|
773 | }
|
774 | callback(error);
|
775 | };
|
776 | let pendingChecks = 0;
|
777 |
|
778 | function maybeCallback(): void {
|
779 | pendingChecks--;
|
780 |
|
781 | if (pendingChecks === 0) {
|
782 | wrappedCallback();
|
783 | }
|
784 | }
|
785 |
|
786 |
|
787 | this.started = false;
|
788 |
|
789 | for (const { server: http2Server, channelzRef: ref } of this
|
790 | .http2ServerList) {
|
791 | if (http2Server.listening) {
|
792 | pendingChecks++;
|
793 | http2Server.close(() => {
|
794 | if (this.channelzEnabled) {
|
795 | this.listenerChildrenTracker.unrefChild(ref);
|
796 | unregisterChannelzRef(ref);
|
797 | }
|
798 | maybeCallback();
|
799 | });
|
800 | }
|
801 | }
|
802 |
|
803 | this.sessions.forEach((channelzInfo, session) => {
|
804 | if (!session.closed) {
|
805 | pendingChecks += 1;
|
806 | session.close(maybeCallback);
|
807 | }
|
808 | });
|
809 | if (pendingChecks === 0) {
|
810 | wrappedCallback();
|
811 | }
|
812 | }
|
813 |
|
814 | addHttp2Port(): never {
|
815 | throw new Error('Not yet implemented');
|
816 | }
|
817 |
|
818 | |
819 |
|
820 |
|
821 |
|
822 |
|
823 | getChannelzRef() {
|
824 | return this.channelzRef;
|
825 | }
|
826 |
|
827 | private _verifyContentType(
|
828 | stream: http2.ServerHttp2Stream,
|
829 | headers: http2.IncomingHttpHeaders
|
830 | ): boolean {
|
831 | const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
|
832 |
|
833 | if (
|
834 | typeof contentType !== 'string' ||
|
835 | !contentType.startsWith('application/grpc')
|
836 | ) {
|
837 | stream.respond(
|
838 | {
|
839 | [http2.constants.HTTP2_HEADER_STATUS]:
|
840 | http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
|
841 | },
|
842 | { endStream: true }
|
843 | );
|
844 | return false;
|
845 | }
|
846 |
|
847 | return true;
|
848 | }
|
849 |
|
850 | private _retrieveHandler(path: string): Handler<any, any> | null {
|
851 | this.trace(
|
852 | 'Received call to method ' +
|
853 | path +
|
854 | ' at address ' +
|
855 | this.serverAddressString
|
856 | );
|
857 |
|
858 | const handler = this.handlers.get(path);
|
859 |
|
860 | if (handler === undefined) {
|
861 | this.trace(
|
862 | 'No handler registered for method ' +
|
863 | path +
|
864 | '. Sending UNIMPLEMENTED status.'
|
865 | );
|
866 | return null;
|
867 | }
|
868 |
|
869 | return handler;
|
870 | }
|
871 |
|
872 | private _respondWithError<T extends Partial<ServiceError>>(
|
873 | err: T,
|
874 | stream: http2.ServerHttp2Stream,
|
875 | channelzSessionInfo: ChannelzSessionInfo | null = null
|
876 | ) {
|
877 | const call = new Http2ServerCallStream(stream, null!, this.options);
|
878 |
|
879 | if (err.code === undefined) {
|
880 | err.code = Status.INTERNAL;
|
881 | }
|
882 |
|
883 | if (this.channelzEnabled) {
|
884 | this.callTracker.addCallFailed();
|
885 | channelzSessionInfo?.streamTracker.addCallFailed();
|
886 | }
|
887 |
|
888 | call.sendError(err);
|
889 | }
|
890 |
|
891 | private _channelzHandler(
|
892 | stream: http2.ServerHttp2Stream,
|
893 | headers: http2.IncomingHttpHeaders
|
894 | ) {
|
895 | const channelzSessionInfo = this.sessions.get(
|
896 | stream.session as http2.ServerHttp2Session
|
897 | );
|
898 |
|
899 | this.callTracker.addCallStarted();
|
900 | channelzSessionInfo?.streamTracker.addCallStarted();
|
901 |
|
902 | if (!this._verifyContentType(stream, headers)) {
|
903 | this.callTracker.addCallFailed();
|
904 | channelzSessionInfo?.streamTracker.addCallFailed();
|
905 | return;
|
906 | }
|
907 |
|
908 | const path = headers[HTTP2_HEADER_PATH] as string;
|
909 |
|
910 | const handler = this._retrieveHandler(path);
|
911 | if (!handler) {
|
912 | this._respondWithError(
|
913 | getUnimplementedStatusResponse(path),
|
914 | stream,
|
915 | channelzSessionInfo
|
916 | );
|
917 | return;
|
918 | }
|
919 |
|
920 | const call = new Http2ServerCallStream(stream, handler, this.options);
|
921 |
|
922 | call.once('callEnd', (code: Status) => {
|
923 | if (code === Status.OK) {
|
924 | this.callTracker.addCallSucceeded();
|
925 | } else {
|
926 | this.callTracker.addCallFailed();
|
927 | }
|
928 | });
|
929 |
|
930 | if (channelzSessionInfo) {
|
931 | call.once('streamEnd', (success: boolean) => {
|
932 | if (success) {
|
933 | channelzSessionInfo.streamTracker.addCallSucceeded();
|
934 | } else {
|
935 | channelzSessionInfo.streamTracker.addCallFailed();
|
936 | }
|
937 | });
|
938 | call.on('sendMessage', () => {
|
939 | channelzSessionInfo.messagesSent += 1;
|
940 | channelzSessionInfo.lastMessageSentTimestamp = new Date();
|
941 | });
|
942 | call.on('receiveMessage', () => {
|
943 | channelzSessionInfo.messagesReceived += 1;
|
944 | channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
|
945 | });
|
946 | }
|
947 |
|
948 | if (!this._runHandlerForCall(call, handler, headers)) {
|
949 | this.callTracker.addCallFailed();
|
950 | channelzSessionInfo?.streamTracker.addCallFailed();
|
951 |
|
952 | call.sendError({
|
953 | code: Status.INTERNAL,
|
954 | details: `Unknown handler type: ${handler.type}`,
|
955 | });
|
956 | }
|
957 | }
|
958 |
|
959 | private _streamHandler(
|
960 | stream: http2.ServerHttp2Stream,
|
961 | headers: http2.IncomingHttpHeaders
|
962 | ) {
|
963 | if (this._verifyContentType(stream, headers) !== true) {
|
964 | return;
|
965 | }
|
966 |
|
967 | const path = headers[HTTP2_HEADER_PATH] as string;
|
968 |
|
969 | const handler = this._retrieveHandler(path);
|
970 | if (!handler) {
|
971 | this._respondWithError(
|
972 | getUnimplementedStatusResponse(path),
|
973 | stream,
|
974 | null
|
975 | );
|
976 | return;
|
977 | }
|
978 |
|
979 | const call = new Http2ServerCallStream(stream, handler, this.options);
|
980 | if (!this._runHandlerForCall(call, handler, headers)) {
|
981 | call.sendError({
|
982 | code: Status.INTERNAL,
|
983 | details: `Unknown handler type: ${handler.type}`,
|
984 | });
|
985 | }
|
986 | }
|
987 |
|
988 | private _runHandlerForCall(
|
989 | call: Http2ServerCallStream<any, any>,
|
990 | handler: Handler<any, any>,
|
991 | headers: http2.IncomingHttpHeaders
|
992 | ): boolean {
|
993 | const metadata = call.receiveMetadata(headers);
|
994 | const encoding =
|
995 | (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
|
996 | metadata.remove('grpc-encoding');
|
997 |
|
998 | const { type } = handler;
|
999 | if (type === 'unary') {
|
1000 | handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
|
1001 | } else if (type === 'clientStream') {
|
1002 | handleClientStreaming(
|
1003 | call,
|
1004 | handler as UntypedClientStreamingHandler,
|
1005 | metadata,
|
1006 | encoding
|
1007 | );
|
1008 | } else if (type === 'serverStream') {
|
1009 | handleServerStreaming(
|
1010 | call,
|
1011 | handler as UntypedServerStreamingHandler,
|
1012 | metadata,
|
1013 | encoding
|
1014 | );
|
1015 | } else if (type === 'bidi') {
|
1016 | handleBidiStreaming(
|
1017 | call,
|
1018 | handler as UntypedBidiStreamingHandler,
|
1019 | metadata,
|
1020 | encoding
|
1021 | );
|
1022 | } else {
|
1023 | return false;
|
1024 | }
|
1025 |
|
1026 | return true;
|
1027 | }
|
1028 |
|
1029 | private _setupHandlers(
|
1030 | http2Server: http2.Http2Server | http2.Http2SecureServer
|
1031 | ): void {
|
1032 | if (http2Server === null) {
|
1033 | return;
|
1034 | }
|
1035 |
|
1036 | const serverAddress = http2Server.address();
|
1037 | let serverAddressString = 'null';
|
1038 | if (serverAddress) {
|
1039 | if (typeof serverAddress === 'string') {
|
1040 | serverAddressString = serverAddress;
|
1041 | } else {
|
1042 | serverAddressString = serverAddress.address + ':' + serverAddress.port;
|
1043 | }
|
1044 | }
|
1045 | this.serverAddressString = serverAddressString;
|
1046 |
|
1047 | const handler = this.channelzEnabled
|
1048 | ? this._channelzHandler
|
1049 | : this._streamHandler;
|
1050 |
|
1051 | http2Server.on('stream', handler.bind(this));
|
1052 | http2Server.on('session', session => {
|
1053 | if (!this.started) {
|
1054 | session.destroy();
|
1055 | return;
|
1056 | }
|
1057 |
|
1058 | const channelzRef = registerChannelzSocket(
|
1059 | session.socket.remoteAddress ?? 'unknown',
|
1060 | this.getChannelzSessionInfoGetter(session),
|
1061 | this.channelzEnabled
|
1062 | );
|
1063 |
|
1064 | const channelzSessionInfo: ChannelzSessionInfo = {
|
1065 | ref: channelzRef,
|
1066 | streamTracker: new ChannelzCallTracker(),
|
1067 | messagesSent: 0,
|
1068 | messagesReceived: 0,
|
1069 | lastMessageSentTimestamp: null,
|
1070 | lastMessageReceivedTimestamp: null,
|
1071 | };
|
1072 |
|
1073 | this.sessions.set(session, channelzSessionInfo);
|
1074 | const clientAddress = session.socket.remoteAddress;
|
1075 | if (this.channelzEnabled) {
|
1076 | this.channelzTrace.addTrace(
|
1077 | 'CT_INFO',
|
1078 | 'Connection established by client ' + clientAddress
|
1079 | );
|
1080 | this.sessionChildrenTracker.refChild(channelzRef);
|
1081 | }
|
1082 | let connectionAgeTimer: NodeJS.Timeout | null = null;
|
1083 | let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
|
1084 | let sessionClosedByServer = false;
|
1085 | if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1086 |
|
1087 | const jitterMagnitude = this.maxConnectionAgeMs / 10;
|
1088 | const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
|
1089 | connectionAgeTimer = setTimeout(() => {
|
1090 | sessionClosedByServer = true;
|
1091 | if (this.channelzEnabled) {
|
1092 | this.channelzTrace.addTrace(
|
1093 | 'CT_INFO',
|
1094 | 'Connection dropped by max connection age from ' + clientAddress
|
1095 | );
|
1096 | }
|
1097 | try {
|
1098 | session.goaway(
|
1099 | http2.constants.NGHTTP2_NO_ERROR,
|
1100 | ~(1 << 31),
|
1101 | Buffer.from('max_age')
|
1102 | );
|
1103 | } catch (e) {
|
1104 |
|
1105 | session.destroy();
|
1106 | return;
|
1107 | }
|
1108 | session.close();
|
1109 | |
1110 |
|
1111 | if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
|
1112 | connectionAgeGraceTimer = setTimeout(() => {
|
1113 | session.destroy();
|
1114 | }, this.maxConnectionAgeGraceMs).unref?.();
|
1115 | }
|
1116 | }, this.maxConnectionAgeMs + jitter).unref?.();
|
1117 | }
|
1118 | const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => {
|
1119 | const timeoutTImer = setTimeout(() => {
|
1120 | sessionClosedByServer = true;
|
1121 | if (this.channelzEnabled) {
|
1122 | this.channelzTrace.addTrace(
|
1123 | 'CT_INFO',
|
1124 | 'Connection dropped by keepalive timeout from ' + clientAddress
|
1125 | );
|
1126 | }
|
1127 | session.close();
|
1128 | }, this.keepaliveTimeoutMs).unref?.();
|
1129 | try {
|
1130 | session.ping(
|
1131 | (err: Error | null, duration: number, payload: Buffer) => {
|
1132 | clearTimeout(timeoutTImer);
|
1133 | }
|
1134 | );
|
1135 | } catch (e) {
|
1136 |
|
1137 | session.destroy();
|
1138 | }
|
1139 | }, this.keepaliveTimeMs).unref?.();
|
1140 | session.on('close', () => {
|
1141 | if (this.channelzEnabled) {
|
1142 | if (!sessionClosedByServer) {
|
1143 | this.channelzTrace.addTrace(
|
1144 | 'CT_INFO',
|
1145 | 'Connection dropped by client ' + clientAddress
|
1146 | );
|
1147 | }
|
1148 | this.sessionChildrenTracker.unrefChild(channelzRef);
|
1149 | unregisterChannelzRef(channelzRef);
|
1150 | }
|
1151 | if (connectionAgeTimer) {
|
1152 | clearTimeout(connectionAgeTimer);
|
1153 | }
|
1154 | if (connectionAgeGraceTimer) {
|
1155 | clearTimeout(connectionAgeGraceTimer);
|
1156 | }
|
1157 | if (keeapliveTimeTimer) {
|
1158 | clearTimeout(keeapliveTimeTimer);
|
1159 | }
|
1160 | this.sessions.delete(session);
|
1161 | });
|
1162 | });
|
1163 | }
|
1164 | }
|
1165 |
|
1166 | async function handleUnary<RequestType, ResponseType>(
|
1167 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
1168 | handler: UnaryHandler<RequestType, ResponseType>,
|
1169 | metadata: Metadata,
|
1170 | encoding: string
|
1171 | ): Promise<void> {
|
1172 | try {
|
1173 | const request = await call.receiveUnaryMessage(encoding);
|
1174 |
|
1175 | if (request === undefined || call.cancelled) {
|
1176 | return;
|
1177 | }
|
1178 |
|
1179 | const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
|
1180 | call,
|
1181 | metadata,
|
1182 | request
|
1183 | );
|
1184 |
|
1185 | handler.func(
|
1186 | emitter,
|
1187 | (
|
1188 | err: ServerErrorResponse | ServerStatusResponse | null,
|
1189 | value?: ResponseType | null,
|
1190 | trailer?: Metadata,
|
1191 | flags?: number
|
1192 | ) => {
|
1193 | call.sendUnaryMessage(err, value, trailer, flags);
|
1194 | }
|
1195 | );
|
1196 | } catch (err) {
|
1197 | call.sendError(err as ServerErrorResponse);
|
1198 | }
|
1199 | }
|
1200 |
|
1201 | function handleClientStreaming<RequestType, ResponseType>(
|
1202 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
1203 | handler: ClientStreamingHandler<RequestType, ResponseType>,
|
1204 | metadata: Metadata,
|
1205 | encoding: string
|
1206 | ): void {
|
1207 | const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
|
1208 | call,
|
1209 | metadata,
|
1210 | handler.deserialize,
|
1211 | encoding
|
1212 | );
|
1213 |
|
1214 | function respond(
|
1215 | err: ServerErrorResponse | ServerStatusResponse | null,
|
1216 | value?: ResponseType | null,
|
1217 | trailer?: Metadata,
|
1218 | flags?: number
|
1219 | ) {
|
1220 | stream.destroy();
|
1221 | call.sendUnaryMessage(err, value, trailer, flags);
|
1222 | }
|
1223 |
|
1224 | if (call.cancelled) {
|
1225 | return;
|
1226 | }
|
1227 |
|
1228 | stream.on('error', respond);
|
1229 | handler.func(stream, respond);
|
1230 | }
|
1231 |
|
1232 | async function handleServerStreaming<RequestType, ResponseType>(
|
1233 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
1234 | handler: ServerStreamingHandler<RequestType, ResponseType>,
|
1235 | metadata: Metadata,
|
1236 | encoding: string
|
1237 | ): Promise<void> {
|
1238 | try {
|
1239 | const request = await call.receiveUnaryMessage(encoding);
|
1240 |
|
1241 | if (request === undefined || call.cancelled) {
|
1242 | return;
|
1243 | }
|
1244 |
|
1245 | const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
|
1246 | call,
|
1247 | metadata,
|
1248 | handler.serialize,
|
1249 | request
|
1250 | );
|
1251 |
|
1252 | handler.func(stream);
|
1253 | } catch (err) {
|
1254 | call.sendError(err as ServerErrorResponse);
|
1255 | }
|
1256 | }
|
1257 |
|
1258 | function handleBidiStreaming<RequestType, ResponseType>(
|
1259 | call: Http2ServerCallStream<RequestType, ResponseType>,
|
1260 | handler: BidiStreamingHandler<RequestType, ResponseType>,
|
1261 | metadata: Metadata,
|
1262 | encoding: string
|
1263 | ): void {
|
1264 | const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
|
1265 | call,
|
1266 | metadata,
|
1267 | handler.serialize,
|
1268 | handler.deserialize,
|
1269 | encoding
|
1270 | );
|
1271 |
|
1272 | if (call.cancelled) {
|
1273 | return;
|
1274 | }
|
1275 |
|
1276 | handler.func(stream);
|
1277 | }
|