UNPKG

38.5 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Server = void 0;
20const http2 = require("http2");
21const constants_1 = require("./constants");
22const server_call_1 = require("./server-call");
23const server_credentials_1 = require("./server-credentials");
24const resolver_1 = require("./resolver");
25const logging = require("./logging");
26const subchannel_address_1 = require("./subchannel-address");
27const uri_parser_1 = require("./uri-parser");
28const channelz_1 = require("./channelz");
29const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
30const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
31const KEEPALIVE_TIMEOUT_MS = 20000;
32const { HTTP2_HEADER_PATH } = http2.constants;
33const TRACER_NAME = 'server';
34function noop() { }
35function getUnimplementedStatusResponse(methodName) {
36 return {
37 code: constants_1.Status.UNIMPLEMENTED,
38 details: `The server does not implement the method ${methodName}`,
39 };
40}
41function getDefaultHandler(handlerType, methodName) {
42 const unimplementedStatusResponse = getUnimplementedStatusResponse(methodName);
43 switch (handlerType) {
44 case 'unary':
45 return (call, callback) => {
46 callback(unimplementedStatusResponse, null);
47 };
48 case 'clientStream':
49 return (call, callback) => {
50 callback(unimplementedStatusResponse, null);
51 };
52 case 'serverStream':
53 return (call) => {
54 call.emit('error', unimplementedStatusResponse);
55 };
56 case 'bidi':
57 return (call) => {
58 call.emit('error', unimplementedStatusResponse);
59 };
60 default:
61 throw new Error(`Invalid handlerType ${handlerType}`);
62 }
63}
64class Server {
65 constructor(options) {
66 var _a, _b, _c, _d;
67 this.http2ServerList = [];
68 this.handlers = new Map();
69 this.sessions = new Map();
70 this.started = false;
71 this.serverAddressString = 'null';
72 // Channelz Info
73 this.channelzEnabled = true;
74 this.channelzTrace = new channelz_1.ChannelzTrace();
75 this.callTracker = new channelz_1.ChannelzCallTracker();
76 this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
77 this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
78 this.options = options !== null && options !== void 0 ? options : {};
79 if (this.options['grpc.enable_channelz'] === 0) {
80 this.channelzEnabled = false;
81 }
82 this.channelzRef = (0, channelz_1.registerChannelzServer)(() => this.getChannelzInfo(), this.channelzEnabled);
83 if (this.channelzEnabled) {
84 this.channelzTrace.addTrace('CT_INFO', 'Server created');
85 }
86 this.maxConnectionAgeMs =
87 (_a = this.options['grpc.max_connection_age_ms']) !== null && _a !== void 0 ? _a : UNLIMITED_CONNECTION_AGE_MS;
88 this.maxConnectionAgeGraceMs =
89 (_b = this.options['grpc.max_connection_age_grace_ms']) !== null && _b !== void 0 ? _b : UNLIMITED_CONNECTION_AGE_MS;
90 this.keepaliveTimeMs =
91 (_c = this.options['grpc.keepalive_time_ms']) !== null && _c !== void 0 ? _c : KEEPALIVE_MAX_TIME_MS;
92 this.keepaliveTimeoutMs =
93 (_d = this.options['grpc.keepalive_timeout_ms']) !== null && _d !== void 0 ? _d : KEEPALIVE_TIMEOUT_MS;
94 this.trace('Server constructed');
95 }
96 getChannelzInfo() {
97 return {
98 trace: this.channelzTrace,
99 callTracker: this.callTracker,
100 listenerChildren: this.listenerChildrenTracker.getChildLists(),
101 sessionChildren: this.sessionChildrenTracker.getChildLists(),
102 };
103 }
104 getChannelzSessionInfoGetter(session) {
105 return () => {
106 var _a, _b, _c;
107 const sessionInfo = this.sessions.get(session);
108 const sessionSocket = session.socket;
109 const remoteAddress = sessionSocket.remoteAddress
110 ? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.remoteAddress, sessionSocket.remotePort)
111 : null;
112 const localAddress = sessionSocket.localAddress
113 ? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.localAddress, sessionSocket.localPort)
114 : null;
115 let tlsInfo;
116 if (session.encrypted) {
117 const tlsSocket = sessionSocket;
118 const cipherInfo = tlsSocket.getCipher();
119 const certificate = tlsSocket.getCertificate();
120 const peerCertificate = tlsSocket.getPeerCertificate();
121 tlsInfo = {
122 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
123 cipherSuiteOtherName: cipherInfo.standardName
124 ? null
125 : cipherInfo.name,
126 localCertificate: certificate && 'raw' in certificate ? certificate.raw : null,
127 remoteCertificate: peerCertificate && 'raw' in peerCertificate
128 ? peerCertificate.raw
129 : null,
130 };
131 }
132 else {
133 tlsInfo = null;
134 }
135 const socketInfo = {
136 remoteAddress: remoteAddress,
137 localAddress: localAddress,
138 security: tlsInfo,
139 remoteName: null,
140 streamsStarted: sessionInfo.streamTracker.callsStarted,
141 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
142 streamsFailed: sessionInfo.streamTracker.callsFailed,
143 messagesSent: sessionInfo.messagesSent,
144 messagesReceived: sessionInfo.messagesReceived,
145 keepAlivesSent: 0,
146 lastLocalStreamCreatedTimestamp: null,
147 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
148 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
149 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
150 localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
151 remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null,
152 };
153 return socketInfo;
154 };
155 }
156 trace(text) {
157 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
158 }
159 addProtoService() {
160 throw new Error('Not implemented. Use addService() instead');
161 }
162 addService(service, implementation) {
163 if (service === null ||
164 typeof service !== 'object' ||
165 implementation === null ||
166 typeof implementation !== 'object') {
167 throw new Error('addService() requires two objects as arguments');
168 }
169 const serviceKeys = Object.keys(service);
170 if (serviceKeys.length === 0) {
171 throw new Error('Cannot add an empty service to a server');
172 }
173 serviceKeys.forEach(name => {
174 const attrs = service[name];
175 let methodType;
176 if (attrs.requestStream) {
177 if (attrs.responseStream) {
178 methodType = 'bidi';
179 }
180 else {
181 methodType = 'clientStream';
182 }
183 }
184 else {
185 if (attrs.responseStream) {
186 methodType = 'serverStream';
187 }
188 else {
189 methodType = 'unary';
190 }
191 }
192 let implFn = implementation[name];
193 let impl;
194 if (implFn === undefined && typeof attrs.originalName === 'string') {
195 implFn = implementation[attrs.originalName];
196 }
197 if (implFn !== undefined) {
198 impl = implFn.bind(implementation);
199 }
200 else {
201 impl = getDefaultHandler(methodType, name);
202 }
203 const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
204 if (success === false) {
205 throw new Error(`Method handler for ${attrs.path} already provided.`);
206 }
207 });
208 }
209 removeService(service) {
210 if (service === null || typeof service !== 'object') {
211 throw new Error('removeService() requires object as argument');
212 }
213 const serviceKeys = Object.keys(service);
214 serviceKeys.forEach(name => {
215 const attrs = service[name];
216 this.unregister(attrs.path);
217 });
218 }
219 bind(port, creds) {
220 throw new Error('Not implemented. Use bindAsync() instead');
221 }
222 bindAsync(port, creds, callback) {
223 if (this.started === true) {
224 throw new Error('server is already started');
225 }
226 if (typeof port !== 'string') {
227 throw new TypeError('port must be a string');
228 }
229 if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
230 throw new TypeError('creds must be a ServerCredentials object');
231 }
232 if (typeof callback !== 'function') {
233 throw new TypeError('callback must be a function');
234 }
235 const initialPortUri = (0, uri_parser_1.parseUri)(port);
236 if (initialPortUri === null) {
237 throw new Error(`Could not parse port "${port}"`);
238 }
239 const portUri = (0, resolver_1.mapUriDefaultScheme)(initialPortUri);
240 if (portUri === null) {
241 throw new Error(`Could not get a default scheme for port "${port}"`);
242 }
243 const serverOptions = {
244 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
245 };
246 if ('grpc-node.max_session_memory' in this.options) {
247 serverOptions.maxSessionMemory =
248 this.options['grpc-node.max_session_memory'];
249 }
250 else {
251 /* By default, set a very large max session memory limit, to effectively
252 * disable enforcement of the limit. Some testing indicates that Node's
253 * behavior degrades badly when this limit is reached, so we solve that
254 * by disabling the check entirely. */
255 serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
256 }
257 if ('grpc.max_concurrent_streams' in this.options) {
258 serverOptions.settings = {
259 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
260 };
261 }
262 const deferredCallback = (error, port) => {
263 process.nextTick(() => callback(error, port));
264 };
265 const setupServer = () => {
266 let http2Server;
267 if (creds._isSecure()) {
268 const secureServerOptions = Object.assign(serverOptions, creds._getSettings());
269 secureServerOptions.enableTrace =
270 this.options['grpc-node.tls_enable_trace'] === 1;
271 http2Server = http2.createSecureServer(secureServerOptions);
272 http2Server.on('secureConnection', (socket) => {
273 /* These errors need to be handled by the user of Http2SecureServer,
274 * according to https://github.com/nodejs/node/issues/35824 */
275 socket.on('error', (e) => {
276 this.trace('An incoming TLS connection closed with error: ' + e.message);
277 });
278 });
279 }
280 else {
281 http2Server = http2.createServer(serverOptions);
282 }
283 http2Server.setTimeout(0, noop);
284 this._setupHandlers(http2Server);
285 return http2Server;
286 };
287 const bindSpecificPort = (addressList, portNum, previousCount) => {
288 if (addressList.length === 0) {
289 return Promise.resolve({ port: portNum, count: previousCount });
290 }
291 return Promise.all(addressList.map(address => {
292 this.trace('Attempting to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address));
293 let addr;
294 if ((0, subchannel_address_1.isTcpSubchannelAddress)(address)) {
295 addr = {
296 host: address.host,
297 port: portNum,
298 };
299 }
300 else {
301 addr = address;
302 }
303 const http2Server = setupServer();
304 return new Promise((resolve, reject) => {
305 const onError = (err) => {
306 this.trace('Failed to bind ' +
307 (0, subchannel_address_1.subchannelAddressToString)(address) +
308 ' with error ' +
309 err.message);
310 resolve(err);
311 };
312 http2Server.once('error', onError);
313 http2Server.listen(addr, () => {
314 const boundAddress = http2Server.address();
315 let boundSubchannelAddress;
316 if (typeof boundAddress === 'string') {
317 boundSubchannelAddress = {
318 path: boundAddress,
319 };
320 }
321 else {
322 boundSubchannelAddress = {
323 host: boundAddress.address,
324 port: boundAddress.port,
325 };
326 }
327 const channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
328 return {
329 localAddress: boundSubchannelAddress,
330 remoteAddress: null,
331 security: null,
332 remoteName: null,
333 streamsStarted: 0,
334 streamsSucceeded: 0,
335 streamsFailed: 0,
336 messagesSent: 0,
337 messagesReceived: 0,
338 keepAlivesSent: 0,
339 lastLocalStreamCreatedTimestamp: null,
340 lastRemoteStreamCreatedTimestamp: null,
341 lastMessageSentTimestamp: null,
342 lastMessageReceivedTimestamp: null,
343 localFlowControlWindow: null,
344 remoteFlowControlWindow: null,
345 };
346 }, this.channelzEnabled);
347 if (this.channelzEnabled) {
348 this.listenerChildrenTracker.refChild(channelzRef);
349 }
350 this.http2ServerList.push({
351 server: http2Server,
352 channelzRef: channelzRef,
353 });
354 this.trace('Successfully bound ' +
355 (0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
356 resolve('port' in boundSubchannelAddress
357 ? boundSubchannelAddress.port
358 : portNum);
359 http2Server.removeListener('error', onError);
360 });
361 });
362 })).then(results => {
363 let count = 0;
364 for (const result of results) {
365 if (typeof result === 'number') {
366 count += 1;
367 if (result !== portNum) {
368 throw new Error('Invalid state: multiple port numbers added from single address');
369 }
370 }
371 }
372 return {
373 port: portNum,
374 count: count + previousCount,
375 };
376 });
377 };
378 const bindWildcardPort = (addressList) => {
379 if (addressList.length === 0) {
380 return Promise.resolve({ port: 0, count: 0 });
381 }
382 const address = addressList[0];
383 const http2Server = setupServer();
384 return new Promise((resolve, reject) => {
385 const onError = (err) => {
386 this.trace('Failed to bind ' +
387 (0, subchannel_address_1.subchannelAddressToString)(address) +
388 ' with error ' +
389 err.message);
390 resolve(bindWildcardPort(addressList.slice(1)));
391 };
392 http2Server.once('error', onError);
393 http2Server.listen(address, () => {
394 const boundAddress = http2Server.address();
395 const boundSubchannelAddress = {
396 host: boundAddress.address,
397 port: boundAddress.port,
398 };
399 const channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
400 return {
401 localAddress: boundSubchannelAddress,
402 remoteAddress: null,
403 security: null,
404 remoteName: null,
405 streamsStarted: 0,
406 streamsSucceeded: 0,
407 streamsFailed: 0,
408 messagesSent: 0,
409 messagesReceived: 0,
410 keepAlivesSent: 0,
411 lastLocalStreamCreatedTimestamp: null,
412 lastRemoteStreamCreatedTimestamp: null,
413 lastMessageSentTimestamp: null,
414 lastMessageReceivedTimestamp: null,
415 localFlowControlWindow: null,
416 remoteFlowControlWindow: null,
417 };
418 }, this.channelzEnabled);
419 if (this.channelzEnabled) {
420 this.listenerChildrenTracker.refChild(channelzRef);
421 }
422 this.http2ServerList.push({
423 server: http2Server,
424 channelzRef: channelzRef,
425 });
426 this.trace('Successfully bound ' +
427 (0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
428 resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
429 http2Server.removeListener('error', onError);
430 });
431 });
432 };
433 const resolverListener = {
434 onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
435 // We only want one resolution result. Discard all future results
436 resolverListener.onSuccessfulResolution = () => { };
437 if (addressList.length === 0) {
438 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
439 return;
440 }
441 let bindResultPromise;
442 if ((0, subchannel_address_1.isTcpSubchannelAddress)(addressList[0])) {
443 if (addressList[0].port === 0) {
444 bindResultPromise = bindWildcardPort(addressList);
445 }
446 else {
447 bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
448 }
449 }
450 else {
451 // Use an arbitrary non-zero port for non-TCP addresses
452 bindResultPromise = bindSpecificPort(addressList, 1, 0);
453 }
454 bindResultPromise.then(bindResult => {
455 if (bindResult.count === 0) {
456 const errorString = `No address added out of total ${addressList.length} resolved`;
457 logging.log(constants_1.LogVerbosity.ERROR, errorString);
458 deferredCallback(new Error(errorString), 0);
459 }
460 else {
461 if (bindResult.count < addressList.length) {
462 logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
463 }
464 deferredCallback(null, bindResult.port);
465 }
466 }, error => {
467 const errorString = `No address added out of total ${addressList.length} resolved`;
468 logging.log(constants_1.LogVerbosity.ERROR, errorString);
469 deferredCallback(new Error(errorString), 0);
470 });
471 },
472 onError: error => {
473 deferredCallback(new Error(error.details), 0);
474 },
475 };
476 const resolver = (0, resolver_1.createResolver)(portUri, resolverListener, this.options);
477 resolver.updateResolution();
478 }
479 forceShutdown() {
480 // Close the server if it is still running.
481 for (const { server: http2Server, channelzRef: ref } of this
482 .http2ServerList) {
483 if (http2Server.listening) {
484 http2Server.close(() => {
485 if (this.channelzEnabled) {
486 this.listenerChildrenTracker.unrefChild(ref);
487 (0, channelz_1.unregisterChannelzRef)(ref);
488 }
489 });
490 }
491 }
492 this.started = false;
493 // Always destroy any available sessions. It's possible that one or more
494 // tryShutdown() calls are in progress. Don't wait on them to finish.
495 this.sessions.forEach((channelzInfo, session) => {
496 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
497 // recognize destroy(code) as a valid signature.
498 // eslint-disable-next-line @typescript-eslint/no-explicit-any
499 session.destroy(http2.constants.NGHTTP2_CANCEL);
500 });
501 this.sessions.clear();
502 if (this.channelzEnabled) {
503 (0, channelz_1.unregisterChannelzRef)(this.channelzRef);
504 }
505 }
506 register(name, handler, serialize, deserialize, type) {
507 if (this.handlers.has(name)) {
508 return false;
509 }
510 this.handlers.set(name, {
511 func: handler,
512 serialize,
513 deserialize,
514 type,
515 path: name,
516 });
517 return true;
518 }
519 unregister(name) {
520 return this.handlers.delete(name);
521 }
522 start() {
523 if (this.http2ServerList.length === 0 ||
524 this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) {
525 throw new Error('server must be bound in order to start');
526 }
527 if (this.started === true) {
528 throw new Error('server is already started');
529 }
530 if (this.channelzEnabled) {
531 this.channelzTrace.addTrace('CT_INFO', 'Starting');
532 }
533 this.started = true;
534 }
535 tryShutdown(callback) {
536 const wrappedCallback = (error) => {
537 if (this.channelzEnabled) {
538 (0, channelz_1.unregisterChannelzRef)(this.channelzRef);
539 }
540 callback(error);
541 };
542 let pendingChecks = 0;
543 function maybeCallback() {
544 pendingChecks--;
545 if (pendingChecks === 0) {
546 wrappedCallback();
547 }
548 }
549 // Close the server if necessary.
550 this.started = false;
551 for (const { server: http2Server, channelzRef: ref } of this
552 .http2ServerList) {
553 if (http2Server.listening) {
554 pendingChecks++;
555 http2Server.close(() => {
556 if (this.channelzEnabled) {
557 this.listenerChildrenTracker.unrefChild(ref);
558 (0, channelz_1.unregisterChannelzRef)(ref);
559 }
560 maybeCallback();
561 });
562 }
563 }
564 this.sessions.forEach((channelzInfo, session) => {
565 if (!session.closed) {
566 pendingChecks += 1;
567 session.close(maybeCallback);
568 }
569 });
570 if (pendingChecks === 0) {
571 wrappedCallback();
572 }
573 }
574 addHttp2Port() {
575 throw new Error('Not yet implemented');
576 }
577 /**
578 * Get the channelz reference object for this server. The returned value is
579 * garbage if channelz is disabled for this server.
580 * @returns
581 */
582 getChannelzRef() {
583 return this.channelzRef;
584 }
585 _verifyContentType(stream, headers) {
586 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
587 if (typeof contentType !== 'string' ||
588 !contentType.startsWith('application/grpc')) {
589 stream.respond({
590 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
591 }, { endStream: true });
592 return false;
593 }
594 return true;
595 }
596 _retrieveHandler(path) {
597 this.trace('Received call to method ' +
598 path +
599 ' at address ' +
600 this.serverAddressString);
601 const handler = this.handlers.get(path);
602 if (handler === undefined) {
603 this.trace('No handler registered for method ' +
604 path +
605 '. Sending UNIMPLEMENTED status.');
606 return null;
607 }
608 return handler;
609 }
610 _respondWithError(err, stream, channelzSessionInfo = null) {
611 const call = new server_call_1.Http2ServerCallStream(stream, null, this.options);
612 if (err.code === undefined) {
613 err.code = constants_1.Status.INTERNAL;
614 }
615 if (this.channelzEnabled) {
616 this.callTracker.addCallFailed();
617 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
618 }
619 call.sendError(err);
620 }
621 _channelzHandler(stream, headers) {
622 const channelzSessionInfo = this.sessions.get(stream.session);
623 this.callTracker.addCallStarted();
624 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
625 if (!this._verifyContentType(stream, headers)) {
626 this.callTracker.addCallFailed();
627 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
628 return;
629 }
630 const path = headers[HTTP2_HEADER_PATH];
631 const handler = this._retrieveHandler(path);
632 if (!handler) {
633 this._respondWithError(getUnimplementedStatusResponse(path), stream, channelzSessionInfo);
634 return;
635 }
636 const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
637 call.once('callEnd', (code) => {
638 if (code === constants_1.Status.OK) {
639 this.callTracker.addCallSucceeded();
640 }
641 else {
642 this.callTracker.addCallFailed();
643 }
644 });
645 if (channelzSessionInfo) {
646 call.once('streamEnd', (success) => {
647 if (success) {
648 channelzSessionInfo.streamTracker.addCallSucceeded();
649 }
650 else {
651 channelzSessionInfo.streamTracker.addCallFailed();
652 }
653 });
654 call.on('sendMessage', () => {
655 channelzSessionInfo.messagesSent += 1;
656 channelzSessionInfo.lastMessageSentTimestamp = new Date();
657 });
658 call.on('receiveMessage', () => {
659 channelzSessionInfo.messagesReceived += 1;
660 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
661 });
662 }
663 if (!this._runHandlerForCall(call, handler, headers)) {
664 this.callTracker.addCallFailed();
665 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
666 call.sendError({
667 code: constants_1.Status.INTERNAL,
668 details: `Unknown handler type: ${handler.type}`,
669 });
670 }
671 }
672 _streamHandler(stream, headers) {
673 if (this._verifyContentType(stream, headers) !== true) {
674 return;
675 }
676 const path = headers[HTTP2_HEADER_PATH];
677 const handler = this._retrieveHandler(path);
678 if (!handler) {
679 this._respondWithError(getUnimplementedStatusResponse(path), stream, null);
680 return;
681 }
682 const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
683 if (!this._runHandlerForCall(call, handler, headers)) {
684 call.sendError({
685 code: constants_1.Status.INTERNAL,
686 details: `Unknown handler type: ${handler.type}`,
687 });
688 }
689 }
690 _runHandlerForCall(call, handler, headers) {
691 var _a;
692 const metadata = call.receiveMetadata(headers);
693 const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
694 metadata.remove('grpc-encoding');
695 const { type } = handler;
696 if (type === 'unary') {
697 handleUnary(call, handler, metadata, encoding);
698 }
699 else if (type === 'clientStream') {
700 handleClientStreaming(call, handler, metadata, encoding);
701 }
702 else if (type === 'serverStream') {
703 handleServerStreaming(call, handler, metadata, encoding);
704 }
705 else if (type === 'bidi') {
706 handleBidiStreaming(call, handler, metadata, encoding);
707 }
708 else {
709 return false;
710 }
711 return true;
712 }
713 _setupHandlers(http2Server) {
714 if (http2Server === null) {
715 return;
716 }
717 const serverAddress = http2Server.address();
718 let serverAddressString = 'null';
719 if (serverAddress) {
720 if (typeof serverAddress === 'string') {
721 serverAddressString = serverAddress;
722 }
723 else {
724 serverAddressString = serverAddress.address + ':' + serverAddress.port;
725 }
726 }
727 this.serverAddressString = serverAddressString;
728 const handler = this.channelzEnabled
729 ? this._channelzHandler
730 : this._streamHandler;
731 http2Server.on('stream', handler.bind(this));
732 http2Server.on('session', session => {
733 var _a, _b, _c, _d, _e;
734 if (!this.started) {
735 session.destroy();
736 return;
737 }
738 const channelzRef = (0, channelz_1.registerChannelzSocket)((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
739 const channelzSessionInfo = {
740 ref: channelzRef,
741 streamTracker: new channelz_1.ChannelzCallTracker(),
742 messagesSent: 0,
743 messagesReceived: 0,
744 lastMessageSentTimestamp: null,
745 lastMessageReceivedTimestamp: null,
746 };
747 this.sessions.set(session, channelzSessionInfo);
748 const clientAddress = session.socket.remoteAddress;
749 if (this.channelzEnabled) {
750 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
751 this.sessionChildrenTracker.refChild(channelzRef);
752 }
753 let connectionAgeTimer = null;
754 let connectionAgeGraceTimer = null;
755 let sessionClosedByServer = false;
756 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
757 // Apply a random jitter within a +/-10% range
758 const jitterMagnitude = this.maxConnectionAgeMs / 10;
759 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
760 connectionAgeTimer = (_c = (_b = setTimeout(() => {
761 var _a, _b;
762 sessionClosedByServer = true;
763 if (this.channelzEnabled) {
764 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
765 }
766 try {
767 session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), Buffer.from('max_age'));
768 }
769 catch (e) {
770 // The goaway can't be sent because the session is already closed
771 session.destroy();
772 return;
773 }
774 session.close();
775 /* Allow a grace period after sending the GOAWAY before forcibly
776 * closing the connection. */
777 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
778 connectionAgeGraceTimer = (_b = (_a = setTimeout(() => {
779 session.destroy();
780 }, this.maxConnectionAgeGraceMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
781 }
782 }, this.maxConnectionAgeMs + jitter)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
783 }
784 const keeapliveTimeTimer = (_e = (_d = setInterval(() => {
785 var _a, _b;
786 const timeoutTImer = (_b = (_a = setTimeout(() => {
787 sessionClosedByServer = true;
788 if (this.channelzEnabled) {
789 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
790 }
791 session.close();
792 }, this.keepaliveTimeoutMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
793 try {
794 session.ping((err, duration, payload) => {
795 clearTimeout(timeoutTImer);
796 });
797 }
798 catch (e) {
799 // The ping can't be sent because the session is already closed
800 session.destroy();
801 }
802 }, this.keepaliveTimeMs)).unref) === null || _e === void 0 ? void 0 : _e.call(_d);
803 session.on('close', () => {
804 if (this.channelzEnabled) {
805 if (!sessionClosedByServer) {
806 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
807 }
808 this.sessionChildrenTracker.unrefChild(channelzRef);
809 (0, channelz_1.unregisterChannelzRef)(channelzRef);
810 }
811 if (connectionAgeTimer) {
812 clearTimeout(connectionAgeTimer);
813 }
814 if (connectionAgeGraceTimer) {
815 clearTimeout(connectionAgeGraceTimer);
816 }
817 if (keeapliveTimeTimer) {
818 clearTimeout(keeapliveTimeTimer);
819 }
820 this.sessions.delete(session);
821 });
822 });
823 }
824}
825exports.Server = Server;
826async function handleUnary(call, handler, metadata, encoding) {
827 try {
828 const request = await call.receiveUnaryMessage(encoding);
829 if (request === undefined || call.cancelled) {
830 return;
831 }
832 const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata, request);
833 handler.func(emitter, (err, value, trailer, flags) => {
834 call.sendUnaryMessage(err, value, trailer, flags);
835 });
836 }
837 catch (err) {
838 call.sendError(err);
839 }
840}
841function handleClientStreaming(call, handler, metadata, encoding) {
842 const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
843 function respond(err, value, trailer, flags) {
844 stream.destroy();
845 call.sendUnaryMessage(err, value, trailer, flags);
846 }
847 if (call.cancelled) {
848 return;
849 }
850 stream.on('error', respond);
851 handler.func(stream, respond);
852}
853async function handleServerStreaming(call, handler, metadata, encoding) {
854 try {
855 const request = await call.receiveUnaryMessage(encoding);
856 if (request === undefined || call.cancelled) {
857 return;
858 }
859 const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize, request);
860 handler.func(stream);
861 }
862 catch (err) {
863 call.sendError(err);
864 }
865}
866function handleBidiStreaming(call, handler, metadata, encoding) {
867 const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
868 if (call.cancelled) {
869 return;
870 }
871 handler.func(stream);
872}
873//# sourceMappingURL=server.js.map
\No newline at end of file