UNPKG

32.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Server = void 0;
20const http2 = require("http2");
21const constants_1 = require("./constants");
22const metadata_1 = require("./metadata");
23const server_call_1 = require("./server-call");
24const server_credentials_1 = require("./server-credentials");
25const resolver_1 = require("./resolver");
26const logging = require("./logging");
27const subchannel_address_1 = require("./subchannel-address");
28const uri_parser_1 = require("./uri-parser");
29const channelz_1 = require("./channelz");
30const TRACER_NAME = 'server';
31function noop() { }
32function getUnimplementedStatusResponse(methodName) {
33 return {
34 code: constants_1.Status.UNIMPLEMENTED,
35 details: `The server does not implement the method ${methodName}`,
36 metadata: new metadata_1.Metadata(),
37 };
38}
39function getDefaultHandler(handlerType, methodName) {
40 const unimplementedStatusResponse = getUnimplementedStatusResponse(methodName);
41 switch (handlerType) {
42 case 'unary':
43 return (call, callback) => {
44 callback(unimplementedStatusResponse, null);
45 };
46 case 'clientStream':
47 return (call, callback) => {
48 callback(unimplementedStatusResponse, null);
49 };
50 case 'serverStream':
51 return (call) => {
52 call.emit('error', unimplementedStatusResponse);
53 };
54 case 'bidi':
55 return (call) => {
56 call.emit('error', unimplementedStatusResponse);
57 };
58 default:
59 throw new Error(`Invalid handlerType ${handlerType}`);
60 }
61}
62class Server {
63 constructor(options) {
64 this.http2ServerList = [];
65 this.handlers = new Map();
66 this.sessions = new Map();
67 this.started = false;
68 // Channelz Info
69 this.channelzEnabled = true;
70 this.channelzTrace = new channelz_1.ChannelzTrace();
71 this.callTracker = new channelz_1.ChannelzCallTracker();
72 this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
73 this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
74 this.options = options !== null && options !== void 0 ? options : {};
75 if (this.options['grpc.enable_channelz'] === 0) {
76 this.channelzEnabled = false;
77 }
78 this.channelzRef = channelz_1.registerChannelzServer(() => this.getChannelzInfo(), this.channelzEnabled);
79 if (this.channelzEnabled) {
80 this.channelzTrace.addTrace('CT_INFO', 'Server created');
81 }
82 this.trace('Server constructed');
83 }
84 getChannelzInfo() {
85 return {
86 trace: this.channelzTrace,
87 callTracker: this.callTracker,
88 listenerChildren: this.listenerChildrenTracker.getChildLists(),
89 sessionChildren: this.sessionChildrenTracker.getChildLists()
90 };
91 }
92 getChannelzSessionInfoGetter(session) {
93 return () => {
94 var _a, _b, _c;
95 const sessionInfo = this.sessions.get(session);
96 const sessionSocket = session.socket;
97 const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
98 const localAddress = sessionSocket.localAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
99 let tlsInfo;
100 if (session.encrypted) {
101 const tlsSocket = sessionSocket;
102 const cipherInfo = tlsSocket.getCipher();
103 const certificate = tlsSocket.getCertificate();
104 const peerCertificate = tlsSocket.getPeerCertificate();
105 tlsInfo = {
106 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
107 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
108 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
109 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
110 };
111 }
112 else {
113 tlsInfo = null;
114 }
115 const socketInfo = {
116 remoteAddress: remoteAddress,
117 localAddress: localAddress,
118 security: tlsInfo,
119 remoteName: null,
120 streamsStarted: sessionInfo.streamTracker.callsStarted,
121 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
122 streamsFailed: sessionInfo.streamTracker.callsFailed,
123 messagesSent: sessionInfo.messagesSent,
124 messagesReceived: sessionInfo.messagesReceived,
125 keepAlivesSent: 0,
126 lastLocalStreamCreatedTimestamp: null,
127 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
128 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
129 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
130 localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
131 remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
132 };
133 return socketInfo;
134 };
135 }
136 trace(text) {
137 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
138 }
139 addProtoService() {
140 throw new Error('Not implemented. Use addService() instead');
141 }
142 addService(service, implementation) {
143 if (service === null ||
144 typeof service !== 'object' ||
145 implementation === null ||
146 typeof implementation !== 'object') {
147 throw new Error('addService() requires two objects as arguments');
148 }
149 const serviceKeys = Object.keys(service);
150 if (serviceKeys.length === 0) {
151 throw new Error('Cannot add an empty service to a server');
152 }
153 serviceKeys.forEach((name) => {
154 const attrs = service[name];
155 let methodType;
156 if (attrs.requestStream) {
157 if (attrs.responseStream) {
158 methodType = 'bidi';
159 }
160 else {
161 methodType = 'clientStream';
162 }
163 }
164 else {
165 if (attrs.responseStream) {
166 methodType = 'serverStream';
167 }
168 else {
169 methodType = 'unary';
170 }
171 }
172 let implFn = implementation[name];
173 let impl;
174 if (implFn === undefined && typeof attrs.originalName === 'string') {
175 implFn = implementation[attrs.originalName];
176 }
177 if (implFn !== undefined) {
178 impl = implFn.bind(implementation);
179 }
180 else {
181 impl = getDefaultHandler(methodType, name);
182 }
183 const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
184 if (success === false) {
185 throw new Error(`Method handler for ${attrs.path} already provided.`);
186 }
187 });
188 }
189 removeService(service) {
190 if (service === null || typeof service !== 'object') {
191 throw new Error('removeService() requires object as argument');
192 }
193 const serviceKeys = Object.keys(service);
194 serviceKeys.forEach((name) => {
195 const attrs = service[name];
196 this.unregister(attrs.path);
197 });
198 }
199 bind(port, creds) {
200 throw new Error('Not implemented. Use bindAsync() instead');
201 }
202 bindAsync(port, creds, callback) {
203 if (this.started === true) {
204 throw new Error('server is already started');
205 }
206 if (typeof port !== 'string') {
207 throw new TypeError('port must be a string');
208 }
209 if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
210 throw new TypeError('creds must be a ServerCredentials object');
211 }
212 if (typeof callback !== 'function') {
213 throw new TypeError('callback must be a function');
214 }
215 const initialPortUri = uri_parser_1.parseUri(port);
216 if (initialPortUri === null) {
217 throw new Error(`Could not parse port "${port}"`);
218 }
219 const portUri = resolver_1.mapUriDefaultScheme(initialPortUri);
220 if (portUri === null) {
221 throw new Error(`Could not get a default scheme for port "${port}"`);
222 }
223 const serverOptions = {
224 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
225 };
226 if ('grpc-node.max_session_memory' in this.options) {
227 serverOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
228 }
229 if ('grpc.max_concurrent_streams' in this.options) {
230 serverOptions.settings = {
231 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
232 };
233 }
234 const deferredCallback = (error, port) => {
235 process.nextTick(() => callback(error, port));
236 };
237 const setupServer = () => {
238 let http2Server;
239 if (creds._isSecure()) {
240 const secureServerOptions = Object.assign(serverOptions, creds._getSettings());
241 http2Server = http2.createSecureServer(secureServerOptions);
242 http2Server.on('secureConnection', (socket) => {
243 /* These errors need to be handled by the user of Http2SecureServer,
244 * according to https://github.com/nodejs/node/issues/35824 */
245 socket.on('error', (e) => {
246 this.trace('An incoming TLS connection closed with error: ' + e.message);
247 });
248 });
249 }
250 else {
251 http2Server = http2.createServer(serverOptions);
252 }
253 http2Server.setTimeout(0, noop);
254 this._setupHandlers(http2Server);
255 return http2Server;
256 };
257 const bindSpecificPort = (addressList, portNum, previousCount) => {
258 if (addressList.length === 0) {
259 return Promise.resolve({ port: portNum, count: previousCount });
260 }
261 return Promise.all(addressList.map((address) => {
262 this.trace('Attempting to bind ' + subchannel_address_1.subchannelAddressToString(address));
263 let addr;
264 if (subchannel_address_1.isTcpSubchannelAddress(address)) {
265 addr = {
266 host: address.host,
267 port: portNum,
268 };
269 }
270 else {
271 addr = address;
272 }
273 const http2Server = setupServer();
274 return new Promise((resolve, reject) => {
275 const onError = (err) => {
276 this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message);
277 resolve(err);
278 };
279 http2Server.once('error', onError);
280 http2Server.listen(addr, () => {
281 const boundAddress = http2Server.address();
282 let boundSubchannelAddress;
283 if (typeof boundAddress === 'string') {
284 boundSubchannelAddress = {
285 path: boundAddress
286 };
287 }
288 else {
289 boundSubchannelAddress = {
290 host: boundAddress.address,
291 port: boundAddress.port
292 };
293 }
294 let channelzRef;
295 channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => {
296 return {
297 localAddress: boundSubchannelAddress,
298 remoteAddress: null,
299 security: null,
300 remoteName: null,
301 streamsStarted: 0,
302 streamsSucceeded: 0,
303 streamsFailed: 0,
304 messagesSent: 0,
305 messagesReceived: 0,
306 keepAlivesSent: 0,
307 lastLocalStreamCreatedTimestamp: null,
308 lastRemoteStreamCreatedTimestamp: null,
309 lastMessageSentTimestamp: null,
310 lastMessageReceivedTimestamp: null,
311 localFlowControlWindow: null,
312 remoteFlowControlWindow: null
313 };
314 }, this.channelzEnabled);
315 if (this.channelzEnabled) {
316 this.listenerChildrenTracker.refChild(channelzRef);
317 }
318 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
319 this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress));
320 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
321 http2Server.removeListener('error', onError);
322 });
323 });
324 })).then((results) => {
325 let count = 0;
326 for (const result of results) {
327 if (typeof result === 'number') {
328 count += 1;
329 if (result !== portNum) {
330 throw new Error('Invalid state: multiple port numbers added from single address');
331 }
332 }
333 }
334 return {
335 port: portNum,
336 count: count + previousCount,
337 };
338 });
339 };
340 const bindWildcardPort = (addressList) => {
341 if (addressList.length === 0) {
342 return Promise.resolve({ port: 0, count: 0 });
343 }
344 const address = addressList[0];
345 const http2Server = setupServer();
346 return new Promise((resolve, reject) => {
347 const onError = (err) => {
348 this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message);
349 resolve(bindWildcardPort(addressList.slice(1)));
350 };
351 http2Server.once('error', onError);
352 http2Server.listen(address, () => {
353 const boundAddress = http2Server.address();
354 const boundSubchannelAddress = {
355 host: boundAddress.address,
356 port: boundAddress.port
357 };
358 let channelzRef;
359 channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => {
360 return {
361 localAddress: boundSubchannelAddress,
362 remoteAddress: null,
363 security: null,
364 remoteName: null,
365 streamsStarted: 0,
366 streamsSucceeded: 0,
367 streamsFailed: 0,
368 messagesSent: 0,
369 messagesReceived: 0,
370 keepAlivesSent: 0,
371 lastLocalStreamCreatedTimestamp: null,
372 lastRemoteStreamCreatedTimestamp: null,
373 lastMessageSentTimestamp: null,
374 lastMessageReceivedTimestamp: null,
375 localFlowControlWindow: null,
376 remoteFlowControlWindow: null
377 };
378 }, this.channelzEnabled);
379 if (this.channelzEnabled) {
380 this.listenerChildrenTracker.refChild(channelzRef);
381 }
382 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
383 this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress));
384 resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
385 http2Server.removeListener('error', onError);
386 });
387 });
388 };
389 const resolverListener = {
390 onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
391 // We only want one resolution result. Discard all future results
392 resolverListener.onSuccessfulResolution = () => { };
393 if (addressList.length === 0) {
394 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
395 return;
396 }
397 let bindResultPromise;
398 if (subchannel_address_1.isTcpSubchannelAddress(addressList[0])) {
399 if (addressList[0].port === 0) {
400 bindResultPromise = bindWildcardPort(addressList);
401 }
402 else {
403 bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
404 }
405 }
406 else {
407 // Use an arbitrary non-zero port for non-TCP addresses
408 bindResultPromise = bindSpecificPort(addressList, 1, 0);
409 }
410 bindResultPromise.then((bindResult) => {
411 if (bindResult.count === 0) {
412 const errorString = `No address added out of total ${addressList.length} resolved`;
413 logging.log(constants_1.LogVerbosity.ERROR, errorString);
414 deferredCallback(new Error(errorString), 0);
415 }
416 else {
417 if (bindResult.count < addressList.length) {
418 logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
419 }
420 deferredCallback(null, bindResult.port);
421 }
422 }, (error) => {
423 const errorString = `No address added out of total ${addressList.length} resolved`;
424 logging.log(constants_1.LogVerbosity.ERROR, errorString);
425 deferredCallback(new Error(errorString), 0);
426 });
427 },
428 onError: (error) => {
429 deferredCallback(new Error(error.details), 0);
430 },
431 };
432 const resolver = resolver_1.createResolver(portUri, resolverListener, this.options);
433 resolver.updateResolution();
434 }
435 forceShutdown() {
436 // Close the server if it is still running.
437 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
438 if (http2Server.listening) {
439 http2Server.close(() => {
440 if (this.channelzEnabled) {
441 this.listenerChildrenTracker.unrefChild(ref);
442 channelz_1.unregisterChannelzRef(ref);
443 }
444 });
445 }
446 }
447 this.started = false;
448 // Always destroy any available sessions. It's possible that one or more
449 // tryShutdown() calls are in progress. Don't wait on them to finish.
450 this.sessions.forEach((channelzInfo, session) => {
451 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
452 // recognize destroy(code) as a valid signature.
453 // eslint-disable-next-line @typescript-eslint/no-explicit-any
454 session.destroy(http2.constants.NGHTTP2_CANCEL);
455 });
456 this.sessions.clear();
457 if (this.channelzEnabled) {
458 channelz_1.unregisterChannelzRef(this.channelzRef);
459 }
460 }
461 register(name, handler, serialize, deserialize, type) {
462 if (this.handlers.has(name)) {
463 return false;
464 }
465 this.handlers.set(name, {
466 func: handler,
467 serialize,
468 deserialize,
469 type,
470 path: name,
471 });
472 return true;
473 }
474 unregister(name) {
475 return this.handlers.delete(name);
476 }
477 start() {
478 if (this.http2ServerList.length === 0 ||
479 this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) {
480 throw new Error('server must be bound in order to start');
481 }
482 if (this.started === true) {
483 throw new Error('server is already started');
484 }
485 if (this.channelzEnabled) {
486 this.channelzTrace.addTrace('CT_INFO', 'Starting');
487 }
488 this.started = true;
489 }
490 tryShutdown(callback) {
491 const wrappedCallback = (error) => {
492 if (this.channelzEnabled) {
493 channelz_1.unregisterChannelzRef(this.channelzRef);
494 }
495 callback(error);
496 };
497 let pendingChecks = 0;
498 function maybeCallback() {
499 pendingChecks--;
500 if (pendingChecks === 0) {
501 wrappedCallback();
502 }
503 }
504 // Close the server if necessary.
505 this.started = false;
506 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
507 if (http2Server.listening) {
508 pendingChecks++;
509 http2Server.close(() => {
510 if (this.channelzEnabled) {
511 this.listenerChildrenTracker.unrefChild(ref);
512 channelz_1.unregisterChannelzRef(ref);
513 }
514 maybeCallback();
515 });
516 }
517 }
518 this.sessions.forEach((channelzInfo, session) => {
519 if (!session.closed) {
520 pendingChecks += 1;
521 session.close(maybeCallback);
522 }
523 });
524 if (pendingChecks === 0) {
525 wrappedCallback();
526 }
527 }
528 addHttp2Port() {
529 throw new Error('Not yet implemented');
530 }
531 /**
532 * Get the channelz reference object for this server. The returned value is
533 * garbage if channelz is disabled for this server.
534 * @returns
535 */
536 getChannelzRef() {
537 return this.channelzRef;
538 }
539 _setupHandlers(http2Server) {
540 if (http2Server === null) {
541 return;
542 }
543 http2Server.on('stream', (stream, headers) => {
544 var _a;
545 const channelzSessionInfo = this.sessions.get(stream.session);
546 if (this.channelzEnabled) {
547 this.callTracker.addCallStarted();
548 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
549 }
550 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
551 if (typeof contentType !== 'string' ||
552 !contentType.startsWith('application/grpc')) {
553 stream.respond({
554 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
555 }, { endStream: true });
556 this.callTracker.addCallFailed();
557 if (this.channelzEnabled) {
558 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
559 }
560 return;
561 }
562 let call = null;
563 try {
564 const path = headers[http2.constants.HTTP2_HEADER_PATH];
565 const serverAddress = http2Server.address();
566 let serverAddressString = 'null';
567 if (serverAddress) {
568 if (typeof serverAddress === 'string') {
569 serverAddressString = serverAddress;
570 }
571 else {
572 serverAddressString =
573 serverAddress.address + ':' + serverAddress.port;
574 }
575 }
576 this.trace('Received call to method ' +
577 path +
578 ' at address ' +
579 serverAddressString);
580 const handler = this.handlers.get(path);
581 if (handler === undefined) {
582 this.trace('No handler registered for method ' +
583 path +
584 '. Sending UNIMPLEMENTED status.');
585 throw getUnimplementedStatusResponse(path);
586 }
587 call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
588 call.once('callEnd', (code) => {
589 if (code === constants_1.Status.OK) {
590 this.callTracker.addCallSucceeded();
591 }
592 else {
593 this.callTracker.addCallFailed();
594 }
595 });
596 if (this.channelzEnabled && channelzSessionInfo) {
597 call.once('streamEnd', (success) => {
598 if (success) {
599 channelzSessionInfo.streamTracker.addCallSucceeded();
600 }
601 else {
602 channelzSessionInfo.streamTracker.addCallFailed();
603 }
604 });
605 call.on('sendMessage', () => {
606 channelzSessionInfo.messagesSent += 1;
607 channelzSessionInfo.lastMessageSentTimestamp = new Date();
608 });
609 call.on('receiveMessage', () => {
610 channelzSessionInfo.messagesReceived += 1;
611 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
612 });
613 }
614 const metadata = call.receiveMetadata(headers);
615 const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
616 metadata.remove('grpc-encoding');
617 switch (handler.type) {
618 case 'unary':
619 handleUnary(call, handler, metadata, encoding);
620 break;
621 case 'clientStream':
622 handleClientStreaming(call, handler, metadata, encoding);
623 break;
624 case 'serverStream':
625 handleServerStreaming(call, handler, metadata, encoding);
626 break;
627 case 'bidi':
628 handleBidiStreaming(call, handler, metadata, encoding);
629 break;
630 default:
631 throw new Error(`Unknown handler type: ${handler.type}`);
632 }
633 }
634 catch (err) {
635 if (!call) {
636 call = new server_call_1.Http2ServerCallStream(stream, null, this.options);
637 if (this.channelzEnabled) {
638 this.callTracker.addCallFailed();
639 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
640 }
641 }
642 if (err.code === undefined) {
643 err.code = constants_1.Status.INTERNAL;
644 }
645 call.sendError(err);
646 }
647 });
648 http2Server.on('session', (session) => {
649 var _a;
650 if (!this.started) {
651 session.destroy();
652 return;
653 }
654 let channelzRef;
655 channelzRef = channelz_1.registerChannelzSocket((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
656 const channelzSessionInfo = {
657 ref: channelzRef,
658 streamTracker: new channelz_1.ChannelzCallTracker(),
659 messagesSent: 0,
660 messagesReceived: 0,
661 lastMessageSentTimestamp: null,
662 lastMessageReceivedTimestamp: null
663 };
664 this.sessions.set(session, channelzSessionInfo);
665 const clientAddress = session.socket.remoteAddress;
666 if (this.channelzEnabled) {
667 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
668 this.sessionChildrenTracker.refChild(channelzRef);
669 }
670 session.on('close', () => {
671 if (this.channelzEnabled) {
672 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
673 this.sessionChildrenTracker.unrefChild(channelzRef);
674 channelz_1.unregisterChannelzRef(channelzRef);
675 }
676 this.sessions.delete(session);
677 });
678 });
679 }
680}
681exports.Server = Server;
682async function handleUnary(call, handler, metadata, encoding) {
683 const request = await call.receiveUnaryMessage(encoding);
684 if (request === undefined || call.cancelled) {
685 return;
686 }
687 const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata, request);
688 handler.func(emitter, (err, value, trailer, flags) => {
689 call.sendUnaryMessage(err, value, trailer, flags);
690 });
691}
692function handleClientStreaming(call, handler, metadata, encoding) {
693 const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
694 function respond(err, value, trailer, flags) {
695 stream.destroy();
696 call.sendUnaryMessage(err, value, trailer, flags);
697 }
698 if (call.cancelled) {
699 return;
700 }
701 stream.on('error', respond);
702 handler.func(stream, respond);
703}
704async function handleServerStreaming(call, handler, metadata, encoding) {
705 const request = await call.receiveUnaryMessage(encoding);
706 if (request === undefined || call.cancelled) {
707 return;
708 }
709 const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize, request);
710 handler.func(stream);
711}
712function handleBidiStreaming(call, handler, metadata, encoding) {
713 const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
714 if (call.cancelled) {
715 return;
716 }
717 handler.func(stream);
718}
719//# sourceMappingURL=server.js.map
\No newline at end of file