UNPKG

38.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Server = void 0;
20const http2 = require("http2");
21const constants_1 = require("./constants");
22const server_call_1 = require("./server-call");
23const server_credentials_1 = require("./server-credentials");
24const resolver_1 = require("./resolver");
25const logging = require("./logging");
26const subchannel_address_1 = require("./subchannel-address");
27const uri_parser_1 = require("./uri-parser");
28const channelz_1 = require("./channelz");
29const error_1 = require("./error");
30const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
31const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
32const KEEPALIVE_TIMEOUT_MS = 20000;
33const { HTTP2_HEADER_PATH } = http2.constants;
34const TRACER_NAME = 'server';
35function noop() { }
36function getUnimplementedStatusResponse(methodName) {
37 return {
38 code: constants_1.Status.UNIMPLEMENTED,
39 details: `The server does not implement the method ${methodName}`,
40 };
41}
42function getDefaultHandler(handlerType, methodName) {
43 const unimplementedStatusResponse = getUnimplementedStatusResponse(methodName);
44 switch (handlerType) {
45 case 'unary':
46 return (call, callback) => {
47 callback(unimplementedStatusResponse, null);
48 };
49 case 'clientStream':
50 return (call, callback) => {
51 callback(unimplementedStatusResponse, null);
52 };
53 case 'serverStream':
54 return (call) => {
55 call.emit('error', unimplementedStatusResponse);
56 };
57 case 'bidi':
58 return (call) => {
59 call.emit('error', unimplementedStatusResponse);
60 };
61 default:
62 throw new Error(`Invalid handlerType ${handlerType}`);
63 }
64}
65class Server {
66 constructor(options) {
67 var _a, _b, _c, _d;
68 this.http2ServerList = [];
69 this.handlers = new Map();
70 this.sessions = new Map();
71 this.started = false;
72 this.serverAddressString = 'null';
73 // Channelz Info
74 this.channelzEnabled = true;
75 this.channelzTrace = new channelz_1.ChannelzTrace();
76 this.callTracker = new channelz_1.ChannelzCallTracker();
77 this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
78 this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
79 this.options = options !== null && options !== void 0 ? options : {};
80 if (this.options['grpc.enable_channelz'] === 0) {
81 this.channelzEnabled = false;
82 }
83 this.channelzRef = (0, channelz_1.registerChannelzServer)(() => this.getChannelzInfo(), this.channelzEnabled);
84 if (this.channelzEnabled) {
85 this.channelzTrace.addTrace('CT_INFO', 'Server created');
86 }
87 this.maxConnectionAgeMs = (_a = this.options['grpc.max_connection_age_ms']) !== null && _a !== void 0 ? _a : UNLIMITED_CONNECTION_AGE_MS;
88 this.maxConnectionAgeGraceMs = (_b = this.options['grpc.max_connection_age_grace_ms']) !== null && _b !== void 0 ? _b : UNLIMITED_CONNECTION_AGE_MS;
89 this.keepaliveTimeMs = (_c = this.options['grpc.keepalive_time_ms']) !== null && _c !== void 0 ? _c : KEEPALIVE_MAX_TIME_MS;
90 this.keepaliveTimeoutMs = (_d = this.options['grpc.keepalive_timeout_ms']) !== null && _d !== void 0 ? _d : KEEPALIVE_TIMEOUT_MS;
91 this.trace('Server constructed');
92 }
93 getChannelzInfo() {
94 return {
95 trace: this.channelzTrace,
96 callTracker: this.callTracker,
97 listenerChildren: this.listenerChildrenTracker.getChildLists(),
98 sessionChildren: this.sessionChildrenTracker.getChildLists()
99 };
100 }
101 getChannelzSessionInfoGetter(session) {
102 return () => {
103 var _a, _b, _c;
104 const sessionInfo = this.sessions.get(session);
105 const sessionSocket = session.socket;
106 const remoteAddress = sessionSocket.remoteAddress ? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
107 const localAddress = sessionSocket.localAddress ? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.localAddress, sessionSocket.localPort) : null;
108 let tlsInfo;
109 if (session.encrypted) {
110 const tlsSocket = sessionSocket;
111 const cipherInfo = tlsSocket.getCipher();
112 const certificate = tlsSocket.getCertificate();
113 const peerCertificate = tlsSocket.getPeerCertificate();
114 tlsInfo = {
115 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
116 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
117 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
118 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
119 };
120 }
121 else {
122 tlsInfo = null;
123 }
124 const socketInfo = {
125 remoteAddress: remoteAddress,
126 localAddress: localAddress,
127 security: tlsInfo,
128 remoteName: null,
129 streamsStarted: sessionInfo.streamTracker.callsStarted,
130 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
131 streamsFailed: sessionInfo.streamTracker.callsFailed,
132 messagesSent: sessionInfo.messagesSent,
133 messagesReceived: sessionInfo.messagesReceived,
134 keepAlivesSent: 0,
135 lastLocalStreamCreatedTimestamp: null,
136 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
137 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
138 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
139 localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
140 remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
141 };
142 return socketInfo;
143 };
144 }
145 trace(text) {
146 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
147 }
148 addProtoService() {
149 throw new Error('Not implemented. Use addService() instead');
150 }
151 addService(service, implementation) {
152 if (service === null ||
153 typeof service !== 'object' ||
154 implementation === null ||
155 typeof implementation !== 'object') {
156 throw new Error('addService() requires two objects as arguments');
157 }
158 const serviceKeys = Object.keys(service);
159 if (serviceKeys.length === 0) {
160 throw new Error('Cannot add an empty service to a server');
161 }
162 serviceKeys.forEach((name) => {
163 const attrs = service[name];
164 let methodType;
165 if (attrs.requestStream) {
166 if (attrs.responseStream) {
167 methodType = 'bidi';
168 }
169 else {
170 methodType = 'clientStream';
171 }
172 }
173 else {
174 if (attrs.responseStream) {
175 methodType = 'serverStream';
176 }
177 else {
178 methodType = 'unary';
179 }
180 }
181 let implFn = implementation[name];
182 let impl;
183 if (implFn === undefined && typeof attrs.originalName === 'string') {
184 implFn = implementation[attrs.originalName];
185 }
186 if (implFn !== undefined) {
187 impl = implFn.bind(implementation);
188 }
189 else {
190 impl = getDefaultHandler(methodType, name);
191 }
192 const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
193 if (success === false) {
194 throw new Error(`Method handler for ${attrs.path} already provided.`);
195 }
196 });
197 }
198 removeService(service) {
199 if (service === null || typeof service !== 'object') {
200 throw new Error('removeService() requires object as argument');
201 }
202 const serviceKeys = Object.keys(service);
203 serviceKeys.forEach((name) => {
204 const attrs = service[name];
205 this.unregister(attrs.path);
206 });
207 }
208 bind(port, creds) {
209 throw new Error('Not implemented. Use bindAsync() instead');
210 }
211 bindAsync(port, creds, callback) {
212 if (this.started === true) {
213 throw new Error('server is already started');
214 }
215 if (typeof port !== 'string') {
216 throw new TypeError('port must be a string');
217 }
218 if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
219 throw new TypeError('creds must be a ServerCredentials object');
220 }
221 if (typeof callback !== 'function') {
222 throw new TypeError('callback must be a function');
223 }
224 const initialPortUri = (0, uri_parser_1.parseUri)(port);
225 if (initialPortUri === null) {
226 throw new Error(`Could not parse port "${port}"`);
227 }
228 const portUri = (0, resolver_1.mapUriDefaultScheme)(initialPortUri);
229 if (portUri === null) {
230 throw new Error(`Could not get a default scheme for port "${port}"`);
231 }
232 const serverOptions = {
233 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
234 };
235 if ('grpc-node.max_session_memory' in this.options) {
236 serverOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
237 }
238 else {
239 /* By default, set a very large max session memory limit, to effectively
240 * disable enforcement of the limit. Some testing indicates that Node's
241 * behavior degrades badly when this limit is reached, so we solve that
242 * by disabling the check entirely. */
243 serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
244 }
245 if ('grpc.max_concurrent_streams' in this.options) {
246 serverOptions.settings = {
247 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
248 };
249 }
250 const deferredCallback = (error, port) => {
251 process.nextTick(() => callback(error, port));
252 };
253 const setupServer = () => {
254 let http2Server;
255 if (creds._isSecure()) {
256 const secureServerOptions = Object.assign(serverOptions, creds._getSettings());
257 http2Server = http2.createSecureServer(secureServerOptions);
258 http2Server.on('secureConnection', (socket) => {
259 /* These errors need to be handled by the user of Http2SecureServer,
260 * according to https://github.com/nodejs/node/issues/35824 */
261 socket.on('error', (e) => {
262 this.trace('An incoming TLS connection closed with error: ' + e.message);
263 });
264 });
265 }
266 else {
267 http2Server = http2.createServer(serverOptions);
268 }
269 http2Server.setTimeout(0, noop);
270 this._setupHandlers(http2Server);
271 return http2Server;
272 };
273 const bindSpecificPort = (addressList, portNum, previousCount) => {
274 if (addressList.length === 0) {
275 return Promise.resolve({ port: portNum, count: previousCount });
276 }
277 return Promise.all(addressList.map((address) => {
278 this.trace('Attempting to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address));
279 let addr;
280 if ((0, subchannel_address_1.isTcpSubchannelAddress)(address)) {
281 addr = {
282 host: address.host,
283 port: portNum,
284 };
285 }
286 else {
287 addr = address;
288 }
289 const http2Server = setupServer();
290 return new Promise((resolve, reject) => {
291 const onError = (err) => {
292 this.trace('Failed to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address) + ' with error ' + err.message);
293 resolve(err);
294 };
295 http2Server.once('error', onError);
296 http2Server.listen(addr, () => {
297 const boundAddress = http2Server.address();
298 let boundSubchannelAddress;
299 if (typeof boundAddress === 'string') {
300 boundSubchannelAddress = {
301 path: boundAddress
302 };
303 }
304 else {
305 boundSubchannelAddress = {
306 host: boundAddress.address,
307 port: boundAddress.port
308 };
309 }
310 let channelzRef;
311 channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
312 return {
313 localAddress: boundSubchannelAddress,
314 remoteAddress: null,
315 security: null,
316 remoteName: null,
317 streamsStarted: 0,
318 streamsSucceeded: 0,
319 streamsFailed: 0,
320 messagesSent: 0,
321 messagesReceived: 0,
322 keepAlivesSent: 0,
323 lastLocalStreamCreatedTimestamp: null,
324 lastRemoteStreamCreatedTimestamp: null,
325 lastMessageSentTimestamp: null,
326 lastMessageReceivedTimestamp: null,
327 localFlowControlWindow: null,
328 remoteFlowControlWindow: null
329 };
330 }, this.channelzEnabled);
331 if (this.channelzEnabled) {
332 this.listenerChildrenTracker.refChild(channelzRef);
333 }
334 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
335 this.trace('Successfully bound ' + (0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
336 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
337 http2Server.removeListener('error', onError);
338 });
339 });
340 })).then((results) => {
341 let count = 0;
342 for (const result of results) {
343 if (typeof result === 'number') {
344 count += 1;
345 if (result !== portNum) {
346 throw new Error('Invalid state: multiple port numbers added from single address');
347 }
348 }
349 }
350 return {
351 port: portNum,
352 count: count + previousCount,
353 };
354 });
355 };
356 const bindWildcardPort = (addressList) => {
357 if (addressList.length === 0) {
358 return Promise.resolve({ port: 0, count: 0 });
359 }
360 const address = addressList[0];
361 const http2Server = setupServer();
362 return new Promise((resolve, reject) => {
363 const onError = (err) => {
364 this.trace('Failed to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address) + ' with error ' + err.message);
365 resolve(bindWildcardPort(addressList.slice(1)));
366 };
367 http2Server.once('error', onError);
368 http2Server.listen(address, () => {
369 const boundAddress = http2Server.address();
370 const boundSubchannelAddress = {
371 host: boundAddress.address,
372 port: boundAddress.port
373 };
374 let channelzRef;
375 channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
376 return {
377 localAddress: boundSubchannelAddress,
378 remoteAddress: null,
379 security: null,
380 remoteName: null,
381 streamsStarted: 0,
382 streamsSucceeded: 0,
383 streamsFailed: 0,
384 messagesSent: 0,
385 messagesReceived: 0,
386 keepAlivesSent: 0,
387 lastLocalStreamCreatedTimestamp: null,
388 lastRemoteStreamCreatedTimestamp: null,
389 lastMessageSentTimestamp: null,
390 lastMessageReceivedTimestamp: null,
391 localFlowControlWindow: null,
392 remoteFlowControlWindow: null
393 };
394 }, this.channelzEnabled);
395 if (this.channelzEnabled) {
396 this.listenerChildrenTracker.refChild(channelzRef);
397 }
398 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
399 this.trace('Successfully bound ' + (0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
400 resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
401 http2Server.removeListener('error', onError);
402 });
403 });
404 };
405 const resolverListener = {
406 onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
407 // We only want one resolution result. Discard all future results
408 resolverListener.onSuccessfulResolution = () => { };
409 if (addressList.length === 0) {
410 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
411 return;
412 }
413 let bindResultPromise;
414 if ((0, subchannel_address_1.isTcpSubchannelAddress)(addressList[0])) {
415 if (addressList[0].port === 0) {
416 bindResultPromise = bindWildcardPort(addressList);
417 }
418 else {
419 bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
420 }
421 }
422 else {
423 // Use an arbitrary non-zero port for non-TCP addresses
424 bindResultPromise = bindSpecificPort(addressList, 1, 0);
425 }
426 bindResultPromise.then((bindResult) => {
427 if (bindResult.count === 0) {
428 const errorString = `No address added out of total ${addressList.length} resolved`;
429 logging.log(constants_1.LogVerbosity.ERROR, errorString);
430 deferredCallback(new Error(errorString), 0);
431 }
432 else {
433 if (bindResult.count < addressList.length) {
434 logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
435 }
436 deferredCallback(null, bindResult.port);
437 }
438 }, (error) => {
439 const errorString = `No address added out of total ${addressList.length} resolved`;
440 logging.log(constants_1.LogVerbosity.ERROR, errorString);
441 deferredCallback(new Error(errorString), 0);
442 });
443 },
444 onError: (error) => {
445 deferredCallback(new Error(error.details), 0);
446 },
447 };
448 const resolver = (0, resolver_1.createResolver)(portUri, resolverListener, this.options);
449 resolver.updateResolution();
450 }
451 forceShutdown() {
452 // Close the server if it is still running.
453 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
454 if (http2Server.listening) {
455 http2Server.close(() => {
456 if (this.channelzEnabled) {
457 this.listenerChildrenTracker.unrefChild(ref);
458 (0, channelz_1.unregisterChannelzRef)(ref);
459 }
460 });
461 }
462 }
463 this.started = false;
464 // Always destroy any available sessions. It's possible that one or more
465 // tryShutdown() calls are in progress. Don't wait on them to finish.
466 this.sessions.forEach((channelzInfo, session) => {
467 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
468 // recognize destroy(code) as a valid signature.
469 // eslint-disable-next-line @typescript-eslint/no-explicit-any
470 session.destroy(http2.constants.NGHTTP2_CANCEL);
471 });
472 this.sessions.clear();
473 if (this.channelzEnabled) {
474 (0, channelz_1.unregisterChannelzRef)(this.channelzRef);
475 }
476 }
477 register(name, handler, serialize, deserialize, type) {
478 if (this.handlers.has(name)) {
479 return false;
480 }
481 this.handlers.set(name, {
482 func: handler,
483 serialize,
484 deserialize,
485 type,
486 path: name,
487 });
488 return true;
489 }
490 unregister(name) {
491 return this.handlers.delete(name);
492 }
493 start() {
494 if (this.http2ServerList.length === 0 ||
495 this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) {
496 throw new Error('server must be bound in order to start');
497 }
498 if (this.started === true) {
499 throw new Error('server is already started');
500 }
501 if (this.channelzEnabled) {
502 this.channelzTrace.addTrace('CT_INFO', 'Starting');
503 }
504 this.started = true;
505 }
506 tryShutdown(callback) {
507 const wrappedCallback = (error) => {
508 if (this.channelzEnabled) {
509 (0, channelz_1.unregisterChannelzRef)(this.channelzRef);
510 }
511 callback(error);
512 };
513 let pendingChecks = 0;
514 function maybeCallback() {
515 pendingChecks--;
516 if (pendingChecks === 0) {
517 wrappedCallback();
518 }
519 }
520 // Close the server if necessary.
521 this.started = false;
522 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
523 if (http2Server.listening) {
524 pendingChecks++;
525 http2Server.close(() => {
526 if (this.channelzEnabled) {
527 this.listenerChildrenTracker.unrefChild(ref);
528 (0, channelz_1.unregisterChannelzRef)(ref);
529 }
530 maybeCallback();
531 });
532 }
533 }
534 this.sessions.forEach((channelzInfo, session) => {
535 if (!session.closed) {
536 pendingChecks += 1;
537 session.close(maybeCallback);
538 }
539 });
540 if (pendingChecks === 0) {
541 wrappedCallback();
542 }
543 }
544 addHttp2Port() {
545 throw new Error('Not yet implemented');
546 }
547 /**
548 * Get the channelz reference object for this server. The returned value is
549 * garbage if channelz is disabled for this server.
550 * @returns
551 */
552 getChannelzRef() {
553 return this.channelzRef;
554 }
555 _verifyContentType(stream, headers) {
556 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
557 if (typeof contentType !== 'string' ||
558 !contentType.startsWith('application/grpc')) {
559 stream.respond({
560 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
561 }, { endStream: true });
562 return false;
563 }
564 return true;
565 }
566 _retrieveHandler(headers) {
567 const path = headers[HTTP2_HEADER_PATH];
568 this.trace('Received call to method ' +
569 path +
570 ' at address ' +
571 this.serverAddressString);
572 const handler = this.handlers.get(path);
573 if (handler === undefined) {
574 this.trace('No handler registered for method ' +
575 path +
576 '. Sending UNIMPLEMENTED status.');
577 throw getUnimplementedStatusResponse(path);
578 }
579 return handler;
580 }
581 _respondWithError(err, stream, channelzSessionInfo = null) {
582 const call = new server_call_1.Http2ServerCallStream(stream, null, this.options);
583 if (err.code === undefined) {
584 err.code = constants_1.Status.INTERNAL;
585 }
586 if (this.channelzEnabled) {
587 this.callTracker.addCallFailed();
588 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
589 }
590 call.sendError(err);
591 }
592 _channelzHandler(stream, headers) {
593 var _a;
594 const channelzSessionInfo = this.sessions.get(stream.session);
595 this.callTracker.addCallStarted();
596 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
597 if (!this._verifyContentType(stream, headers)) {
598 this.callTracker.addCallFailed();
599 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
600 return;
601 }
602 let handler;
603 try {
604 handler = this._retrieveHandler(headers);
605 }
606 catch (err) {
607 this._respondWithError({
608 details: (0, error_1.getErrorMessage)(err),
609 code: (_a = (0, error_1.getErrorCode)(err)) !== null && _a !== void 0 ? _a : undefined
610 }, stream, channelzSessionInfo);
611 return;
612 }
613 const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
614 call.once('callEnd', (code) => {
615 if (code === constants_1.Status.OK) {
616 this.callTracker.addCallSucceeded();
617 }
618 else {
619 this.callTracker.addCallFailed();
620 }
621 });
622 if (channelzSessionInfo) {
623 call.once('streamEnd', (success) => {
624 if (success) {
625 channelzSessionInfo.streamTracker.addCallSucceeded();
626 }
627 else {
628 channelzSessionInfo.streamTracker.addCallFailed();
629 }
630 });
631 call.on('sendMessage', () => {
632 channelzSessionInfo.messagesSent += 1;
633 channelzSessionInfo.lastMessageSentTimestamp = new Date();
634 });
635 call.on('receiveMessage', () => {
636 channelzSessionInfo.messagesReceived += 1;
637 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
638 });
639 }
640 if (!this._runHandlerForCall(call, handler, headers)) {
641 this.callTracker.addCallFailed();
642 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
643 call.sendError({
644 code: constants_1.Status.INTERNAL,
645 details: `Unknown handler type: ${handler.type}`
646 });
647 }
648 }
649 _streamHandler(stream, headers) {
650 var _a;
651 if (this._verifyContentType(stream, headers) !== true) {
652 return;
653 }
654 let handler;
655 try {
656 handler = this._retrieveHandler(headers);
657 }
658 catch (err) {
659 this._respondWithError({
660 details: (0, error_1.getErrorMessage)(err),
661 code: (_a = (0, error_1.getErrorCode)(err)) !== null && _a !== void 0 ? _a : undefined
662 }, stream, null);
663 return;
664 }
665 const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
666 if (!this._runHandlerForCall(call, handler, headers)) {
667 call.sendError({
668 code: constants_1.Status.INTERNAL,
669 details: `Unknown handler type: ${handler.type}`
670 });
671 }
672 }
673 _runHandlerForCall(call, handler, headers) {
674 var _a;
675 const metadata = call.receiveMetadata(headers);
676 const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
677 metadata.remove('grpc-encoding');
678 const { type } = handler;
679 if (type === 'unary') {
680 handleUnary(call, handler, metadata, encoding);
681 }
682 else if (type === 'clientStream') {
683 handleClientStreaming(call, handler, metadata, encoding);
684 }
685 else if (type === 'serverStream') {
686 handleServerStreaming(call, handler, metadata, encoding);
687 }
688 else if (type === 'bidi') {
689 handleBidiStreaming(call, handler, metadata, encoding);
690 }
691 else {
692 return false;
693 }
694 return true;
695 }
696 _setupHandlers(http2Server) {
697 if (http2Server === null) {
698 return;
699 }
700 const serverAddress = http2Server.address();
701 let serverAddressString = 'null';
702 if (serverAddress) {
703 if (typeof serverAddress === 'string') {
704 serverAddressString = serverAddress;
705 }
706 else {
707 serverAddressString =
708 serverAddress.address + ':' + serverAddress.port;
709 }
710 }
711 this.serverAddressString = serverAddressString;
712 const handler = this.channelzEnabled
713 ? this._channelzHandler
714 : this._streamHandler;
715 http2Server.on('stream', handler.bind(this));
716 http2Server.on('session', (session) => {
717 var _a, _b, _c, _d, _e;
718 if (!this.started) {
719 session.destroy();
720 return;
721 }
722 let channelzRef;
723 channelzRef = (0, channelz_1.registerChannelzSocket)((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
724 const channelzSessionInfo = {
725 ref: channelzRef,
726 streamTracker: new channelz_1.ChannelzCallTracker(),
727 messagesSent: 0,
728 messagesReceived: 0,
729 lastMessageSentTimestamp: null,
730 lastMessageReceivedTimestamp: null
731 };
732 this.sessions.set(session, channelzSessionInfo);
733 const clientAddress = session.socket.remoteAddress;
734 if (this.channelzEnabled) {
735 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
736 this.sessionChildrenTracker.refChild(channelzRef);
737 }
738 let connectionAgeTimer = null;
739 let connectionAgeGraceTimer = null;
740 let sessionClosedByServer = false;
741 if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
742 // Apply a random jitter within a +/-10% range
743 const jitterMagnitude = this.maxConnectionAgeMs / 10;
744 const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
745 connectionAgeTimer = (_c = (_b = setTimeout(() => {
746 var _a, _b;
747 sessionClosedByServer = true;
748 if (this.channelzEnabled) {
749 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
750 }
751 try {
752 session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), Buffer.from('max_age'));
753 }
754 catch (e) {
755 // The goaway can't be sent because the session is already closed
756 session.destroy();
757 return;
758 }
759 session.close();
760 /* Allow a grace period after sending the GOAWAY before forcibly
761 * closing the connection. */
762 if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
763 connectionAgeGraceTimer = (_b = (_a = setTimeout(() => {
764 session.destroy();
765 }, this.maxConnectionAgeGraceMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
766 }
767 }, this.maxConnectionAgeMs + jitter)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
768 }
769 const keeapliveTimeTimer = (_e = (_d = setInterval(() => {
770 var _a, _b;
771 const timeoutTImer = (_b = (_a = setTimeout(() => {
772 sessionClosedByServer = true;
773 if (this.channelzEnabled) {
774 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
775 }
776 session.close();
777 }, this.keepaliveTimeoutMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
778 try {
779 session.ping((err, duration, payload) => {
780 clearTimeout(timeoutTImer);
781 });
782 }
783 catch (e) {
784 // The ping can't be sent because the session is already closed
785 session.destroy();
786 }
787 }, this.keepaliveTimeMs)).unref) === null || _e === void 0 ? void 0 : _e.call(_d);
788 session.on('close', () => {
789 if (this.channelzEnabled) {
790 if (!sessionClosedByServer) {
791 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
792 }
793 this.sessionChildrenTracker.unrefChild(channelzRef);
794 (0, channelz_1.unregisterChannelzRef)(channelzRef);
795 }
796 if (connectionAgeTimer) {
797 clearTimeout(connectionAgeTimer);
798 }
799 if (connectionAgeGraceTimer) {
800 clearTimeout(connectionAgeGraceTimer);
801 }
802 if (keeapliveTimeTimer) {
803 clearTimeout(keeapliveTimeTimer);
804 }
805 this.sessions.delete(session);
806 });
807 });
808 }
809}
810exports.Server = Server;
811function handleUnary(call, handler, metadata, encoding) {
812 call.receiveUnaryMessage(encoding, (err, request) => {
813 if (err) {
814 call.sendError(err);
815 return;
816 }
817 if (request === undefined || call.cancelled) {
818 return;
819 }
820 const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata, request);
821 handler.func(emitter, (err, value, trailer, flags) => {
822 call.sendUnaryMessage(err, value, trailer, flags);
823 });
824 });
825}
826function handleClientStreaming(call, handler, metadata, encoding) {
827 const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
828 function respond(err, value, trailer, flags) {
829 stream.destroy();
830 call.sendUnaryMessage(err, value, trailer, flags);
831 }
832 if (call.cancelled) {
833 return;
834 }
835 stream.on('error', respond);
836 handler.func(stream, respond);
837}
838function handleServerStreaming(call, handler, metadata, encoding) {
839 call.receiveUnaryMessage(encoding, (err, request) => {
840 if (err) {
841 call.sendError(err);
842 return;
843 }
844 if (request === undefined || call.cancelled) {
845 return;
846 }
847 const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize, request);
848 handler.func(stream);
849 });
850}
851function handleBidiStreaming(call, handler, metadata, encoding) {
852 const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
853 if (call.cancelled) {
854 return;
855 }
856 handler.func(stream);
857}
858//# sourceMappingURL=server.js.map
\No newline at end of file