UNPKG

31.7 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Server = void 0;
20const http2 = require("http2");
21const constants_1 = require("./constants");
22const metadata_1 = require("./metadata");
23const server_call_1 = require("./server-call");
24const server_credentials_1 = require("./server-credentials");
25const resolver_1 = require("./resolver");
26const logging = require("./logging");
27const subchannel_address_1 = require("./subchannel-address");
28const uri_parser_1 = require("./uri-parser");
29const channelz_1 = require("./channelz");
30const TRACER_NAME = 'server';
31function noop() { }
32function getUnimplementedStatusResponse(methodName) {
33 return {
34 code: constants_1.Status.UNIMPLEMENTED,
35 details: `The server does not implement the method ${methodName}`,
36 metadata: new metadata_1.Metadata(),
37 };
38}
39function getDefaultHandler(handlerType, methodName) {
40 const unimplementedStatusResponse = getUnimplementedStatusResponse(methodName);
41 switch (handlerType) {
42 case 'unary':
43 return (call, callback) => {
44 callback(unimplementedStatusResponse, null);
45 };
46 case 'clientStream':
47 return (call, callback) => {
48 callback(unimplementedStatusResponse, null);
49 };
50 case 'serverStream':
51 return (call) => {
52 call.emit('error', unimplementedStatusResponse);
53 };
54 case 'bidi':
55 return (call) => {
56 call.emit('error', unimplementedStatusResponse);
57 };
58 default:
59 throw new Error(`Invalid handlerType ${handlerType}`);
60 }
61}
62class Server {
63 constructor(options) {
64 this.http2ServerList = [];
65 this.handlers = new Map();
66 this.sessions = new Map();
67 this.started = false;
68 // Channelz Info
69 this.channelzEnabled = true;
70 this.channelzTrace = new channelz_1.ChannelzTrace();
71 this.callTracker = new channelz_1.ChannelzCallTracker();
72 this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
73 this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
74 this.options = options !== null && options !== void 0 ? options : {};
75 if (this.options['grpc.enable_channelz'] === 0) {
76 this.channelzEnabled = false;
77 }
78 if (this.channelzEnabled) {
79 this.channelzRef = channelz_1.registerChannelzServer(() => this.getChannelzInfo());
80 this.channelzTrace.addTrace('CT_INFO', 'Server created');
81 this.trace('Server constructed');
82 }
83 else {
84 // Dummy channelz ref that will never be used
85 this.channelzRef = {
86 kind: 'server',
87 id: -1
88 };
89 }
90 }
91 getChannelzInfo() {
92 return {
93 trace: this.channelzTrace,
94 callTracker: this.callTracker,
95 listenerChildren: this.listenerChildrenTracker.getChildLists(),
96 sessionChildren: this.sessionChildrenTracker.getChildLists()
97 };
98 }
99 getChannelzSessionInfoGetter(session) {
100 return () => {
101 var _a, _b, _c;
102 const sessionInfo = this.sessions.get(session);
103 const sessionSocket = session.socket;
104 const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
105 const localAddress = sessionSocket.localAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
106 let tlsInfo;
107 if (session.encrypted) {
108 const tlsSocket = sessionSocket;
109 const cipherInfo = tlsSocket.getCipher();
110 const certificate = tlsSocket.getCertificate();
111 const peerCertificate = tlsSocket.getPeerCertificate();
112 tlsInfo = {
113 cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
114 cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
115 localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
116 remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
117 };
118 }
119 else {
120 tlsInfo = null;
121 }
122 const socketInfo = {
123 remoteAddress: remoteAddress,
124 localAddress: localAddress,
125 security: tlsInfo,
126 remoteName: null,
127 streamsStarted: sessionInfo.streamTracker.callsStarted,
128 streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
129 streamsFailed: sessionInfo.streamTracker.callsFailed,
130 messagesSent: sessionInfo.messagesSent,
131 messagesReceived: sessionInfo.messagesReceived,
132 keepAlivesSent: 0,
133 lastLocalStreamCreatedTimestamp: null,
134 lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
135 lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
136 lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
137 localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
138 remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null
139 };
140 return socketInfo;
141 };
142 }
143 trace(text) {
144 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
145 }
146 addProtoService() {
147 throw new Error('Not implemented. Use addService() instead');
148 }
149 addService(service, implementation) {
150 if (service === null ||
151 typeof service !== 'object' ||
152 implementation === null ||
153 typeof implementation !== 'object') {
154 throw new Error('addService() requires two objects as arguments');
155 }
156 const serviceKeys = Object.keys(service);
157 if (serviceKeys.length === 0) {
158 throw new Error('Cannot add an empty service to a server');
159 }
160 serviceKeys.forEach((name) => {
161 const attrs = service[name];
162 let methodType;
163 if (attrs.requestStream) {
164 if (attrs.responseStream) {
165 methodType = 'bidi';
166 }
167 else {
168 methodType = 'clientStream';
169 }
170 }
171 else {
172 if (attrs.responseStream) {
173 methodType = 'serverStream';
174 }
175 else {
176 methodType = 'unary';
177 }
178 }
179 let implFn = implementation[name];
180 let impl;
181 if (implFn === undefined && typeof attrs.originalName === 'string') {
182 implFn = implementation[attrs.originalName];
183 }
184 if (implFn !== undefined) {
185 impl = implFn.bind(implementation);
186 }
187 else {
188 impl = getDefaultHandler(methodType, name);
189 }
190 const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
191 if (success === false) {
192 throw new Error(`Method handler for ${attrs.path} already provided.`);
193 }
194 });
195 }
196 removeService(service) {
197 if (service === null || typeof service !== 'object') {
198 throw new Error('removeService() requires object as argument');
199 }
200 const serviceKeys = Object.keys(service);
201 serviceKeys.forEach((name) => {
202 const attrs = service[name];
203 this.unregister(attrs.path);
204 });
205 }
206 bind(port, creds) {
207 throw new Error('Not implemented. Use bindAsync() instead');
208 }
209 bindAsync(port, creds, callback) {
210 if (this.started === true) {
211 throw new Error('server is already started');
212 }
213 if (typeof port !== 'string') {
214 throw new TypeError('port must be a string');
215 }
216 if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
217 throw new TypeError('creds must be a ServerCredentials object');
218 }
219 if (typeof callback !== 'function') {
220 throw new TypeError('callback must be a function');
221 }
222 const initialPortUri = uri_parser_1.parseUri(port);
223 if (initialPortUri === null) {
224 throw new Error(`Could not parse port "${port}"`);
225 }
226 const portUri = resolver_1.mapUriDefaultScheme(initialPortUri);
227 if (portUri === null) {
228 throw new Error(`Could not get a default scheme for port "${port}"`);
229 }
230 const serverOptions = {
231 maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
232 };
233 if ('grpc-node.max_session_memory' in this.options) {
234 serverOptions.maxSessionMemory = this.options['grpc-node.max_session_memory'];
235 }
236 if ('grpc.max_concurrent_streams' in this.options) {
237 serverOptions.settings = {
238 maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
239 };
240 }
241 const deferredCallback = (error, port) => {
242 process.nextTick(() => callback(error, port));
243 };
244 const setupServer = () => {
245 let http2Server;
246 if (creds._isSecure()) {
247 const secureServerOptions = Object.assign(serverOptions, creds._getSettings());
248 http2Server = http2.createSecureServer(secureServerOptions);
249 http2Server.on('secureConnection', (socket) => {
250 /* These errors need to be handled by the user of Http2SecureServer,
251 * according to https://github.com/nodejs/node/issues/35824 */
252 socket.on('error', (e) => {
253 this.trace('An incoming TLS connection closed with error: ' + e.message);
254 });
255 });
256 }
257 else {
258 http2Server = http2.createServer(serverOptions);
259 }
260 http2Server.setTimeout(0, noop);
261 this._setupHandlers(http2Server);
262 return http2Server;
263 };
264 const bindSpecificPort = (addressList, portNum, previousCount) => {
265 if (addressList.length === 0) {
266 return Promise.resolve({ port: portNum, count: previousCount });
267 }
268 return Promise.all(addressList.map((address) => {
269 this.trace('Attempting to bind ' + subchannel_address_1.subchannelAddressToString(address));
270 let addr;
271 if (subchannel_address_1.isTcpSubchannelAddress(address)) {
272 addr = {
273 host: address.host,
274 port: portNum,
275 };
276 }
277 else {
278 addr = address;
279 }
280 const http2Server = setupServer();
281 return new Promise((resolve, reject) => {
282 const onError = (err) => {
283 this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message);
284 resolve(err);
285 };
286 http2Server.once('error', onError);
287 http2Server.listen(addr, () => {
288 const boundAddress = http2Server.address();
289 let boundSubchannelAddress;
290 if (typeof boundAddress === 'string') {
291 boundSubchannelAddress = {
292 path: boundAddress
293 };
294 }
295 else {
296 boundSubchannelAddress = {
297 host: boundAddress.address,
298 port: boundAddress.port
299 };
300 }
301 const channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => {
302 return {
303 localAddress: boundSubchannelAddress,
304 remoteAddress: null,
305 security: null,
306 remoteName: null,
307 streamsStarted: 0,
308 streamsSucceeded: 0,
309 streamsFailed: 0,
310 messagesSent: 0,
311 messagesReceived: 0,
312 keepAlivesSent: 0,
313 lastLocalStreamCreatedTimestamp: null,
314 lastRemoteStreamCreatedTimestamp: null,
315 lastMessageSentTimestamp: null,
316 lastMessageReceivedTimestamp: null,
317 localFlowControlWindow: null,
318 remoteFlowControlWindow: null
319 };
320 });
321 this.listenerChildrenTracker.refChild(channelzRef);
322 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
323 this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress));
324 resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
325 http2Server.removeListener('error', onError);
326 });
327 });
328 })).then((results) => {
329 let count = 0;
330 for (const result of results) {
331 if (typeof result === 'number') {
332 count += 1;
333 if (result !== portNum) {
334 throw new Error('Invalid state: multiple port numbers added from single address');
335 }
336 }
337 }
338 return {
339 port: portNum,
340 count: count + previousCount,
341 };
342 });
343 };
344 const bindWildcardPort = (addressList) => {
345 if (addressList.length === 0) {
346 return Promise.resolve({ port: 0, count: 0 });
347 }
348 const address = addressList[0];
349 const http2Server = setupServer();
350 return new Promise((resolve, reject) => {
351 const onError = (err) => {
352 this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message);
353 resolve(bindWildcardPort(addressList.slice(1)));
354 };
355 http2Server.once('error', onError);
356 http2Server.listen(address, () => {
357 const boundAddress = http2Server.address();
358 const boundSubchannelAddress = {
359 host: boundAddress.address,
360 port: boundAddress.port
361 };
362 const channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => {
363 return {
364 localAddress: boundSubchannelAddress,
365 remoteAddress: null,
366 security: null,
367 remoteName: null,
368 streamsStarted: 0,
369 streamsSucceeded: 0,
370 streamsFailed: 0,
371 messagesSent: 0,
372 messagesReceived: 0,
373 keepAlivesSent: 0,
374 lastLocalStreamCreatedTimestamp: null,
375 lastRemoteStreamCreatedTimestamp: null,
376 lastMessageSentTimestamp: null,
377 lastMessageReceivedTimestamp: null,
378 localFlowControlWindow: null,
379 remoteFlowControlWindow: null
380 };
381 });
382 this.listenerChildrenTracker.refChild(channelzRef);
383 this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef });
384 this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress));
385 resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
386 http2Server.removeListener('error', onError);
387 });
388 });
389 };
390 const resolverListener = {
391 onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
392 // We only want one resolution result. Discard all future results
393 resolverListener.onSuccessfulResolution = () => { };
394 if (addressList.length === 0) {
395 deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
396 return;
397 }
398 let bindResultPromise;
399 if (subchannel_address_1.isTcpSubchannelAddress(addressList[0])) {
400 if (addressList[0].port === 0) {
401 bindResultPromise = bindWildcardPort(addressList);
402 }
403 else {
404 bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
405 }
406 }
407 else {
408 // Use an arbitrary non-zero port for non-TCP addresses
409 bindResultPromise = bindSpecificPort(addressList, 1, 0);
410 }
411 bindResultPromise.then((bindResult) => {
412 if (bindResult.count === 0) {
413 const errorString = `No address added out of total ${addressList.length} resolved`;
414 logging.log(constants_1.LogVerbosity.ERROR, errorString);
415 deferredCallback(new Error(errorString), 0);
416 }
417 else {
418 if (bindResult.count < addressList.length) {
419 logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
420 }
421 deferredCallback(null, bindResult.port);
422 }
423 }, (error) => {
424 const errorString = `No address added out of total ${addressList.length} resolved`;
425 logging.log(constants_1.LogVerbosity.ERROR, errorString);
426 deferredCallback(new Error(errorString), 0);
427 });
428 },
429 onError: (error) => {
430 deferredCallback(new Error(error.details), 0);
431 },
432 };
433 const resolver = resolver_1.createResolver(portUri, resolverListener, this.options);
434 resolver.updateResolution();
435 }
436 forceShutdown() {
437 // Close the server if it is still running.
438 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
439 if (http2Server.listening) {
440 http2Server.close(() => {
441 this.listenerChildrenTracker.unrefChild(ref);
442 channelz_1.unregisterChannelzRef(ref);
443 });
444 }
445 }
446 this.started = false;
447 // Always destroy any available sessions. It's possible that one or more
448 // tryShutdown() calls are in progress. Don't wait on them to finish.
449 this.sessions.forEach((channelzInfo, session) => {
450 // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
451 // recognize destroy(code) as a valid signature.
452 // eslint-disable-next-line @typescript-eslint/no-explicit-any
453 session.destroy(http2.constants.NGHTTP2_CANCEL);
454 });
455 this.sessions.clear();
456 channelz_1.unregisterChannelzRef(this.channelzRef);
457 }
458 register(name, handler, serialize, deserialize, type) {
459 if (this.handlers.has(name)) {
460 return false;
461 }
462 this.handlers.set(name, {
463 func: handler,
464 serialize,
465 deserialize,
466 type,
467 path: name,
468 });
469 return true;
470 }
471 unregister(name) {
472 return this.handlers.delete(name);
473 }
474 start() {
475 if (this.http2ServerList.length === 0 ||
476 this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) {
477 throw new Error('server must be bound in order to start');
478 }
479 if (this.started === true) {
480 throw new Error('server is already started');
481 }
482 if (this.channelzEnabled) {
483 this.channelzTrace.addTrace('CT_INFO', 'Starting');
484 }
485 this.started = true;
486 }
487 tryShutdown(callback) {
488 const wrappedCallback = (error) => {
489 channelz_1.unregisterChannelzRef(this.channelzRef);
490 callback(error);
491 };
492 let pendingChecks = 0;
493 function maybeCallback() {
494 pendingChecks--;
495 if (pendingChecks === 0) {
496 wrappedCallback();
497 }
498 }
499 // Close the server if necessary.
500 this.started = false;
501 for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) {
502 if (http2Server.listening) {
503 pendingChecks++;
504 http2Server.close(() => {
505 this.listenerChildrenTracker.unrefChild(ref);
506 channelz_1.unregisterChannelzRef(ref);
507 maybeCallback();
508 });
509 }
510 }
511 this.sessions.forEach((channelzInfo, session) => {
512 if (!session.closed) {
513 pendingChecks += 1;
514 session.close(maybeCallback);
515 }
516 });
517 if (pendingChecks === 0) {
518 wrappedCallback();
519 }
520 }
521 addHttp2Port() {
522 throw new Error('Not yet implemented');
523 }
524 /**
525 * Get the channelz reference object for this server. The returned value is
526 * garbage if channelz is disabled for this server.
527 * @returns
528 */
529 getChannelzRef() {
530 return this.channelzRef;
531 }
532 _setupHandlers(http2Server) {
533 if (http2Server === null) {
534 return;
535 }
536 http2Server.on('stream', (stream, headers) => {
537 var _a;
538 const channelzSessionInfo = this.sessions.get(stream.session);
539 this.callTracker.addCallStarted();
540 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
541 const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
542 if (typeof contentType !== 'string' ||
543 !contentType.startsWith('application/grpc')) {
544 stream.respond({
545 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
546 }, { endStream: true });
547 this.callTracker.addCallFailed();
548 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
549 return;
550 }
551 let call = null;
552 try {
553 const path = headers[http2.constants.HTTP2_HEADER_PATH];
554 const serverAddress = http2Server.address();
555 let serverAddressString = 'null';
556 if (serverAddress) {
557 if (typeof serverAddress === 'string') {
558 serverAddressString = serverAddress;
559 }
560 else {
561 serverAddressString =
562 serverAddress.address + ':' + serverAddress.port;
563 }
564 }
565 this.trace('Received call to method ' +
566 path +
567 ' at address ' +
568 serverAddressString);
569 const handler = this.handlers.get(path);
570 if (handler === undefined) {
571 this.trace('No handler registered for method ' +
572 path +
573 '. Sending UNIMPLEMENTED status.');
574 throw getUnimplementedStatusResponse(path);
575 }
576 call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
577 call.once('callEnd', (code) => {
578 if (code === constants_1.Status.OK) {
579 this.callTracker.addCallSucceeded();
580 }
581 else {
582 this.callTracker.addCallFailed();
583 }
584 });
585 if (channelzSessionInfo) {
586 call.once('streamEnd', (success) => {
587 if (success) {
588 channelzSessionInfo.streamTracker.addCallSucceeded();
589 }
590 else {
591 channelzSessionInfo.streamTracker.addCallFailed();
592 }
593 });
594 call.on('sendMessage', () => {
595 channelzSessionInfo.messagesSent += 1;
596 channelzSessionInfo.lastMessageSentTimestamp = new Date();
597 });
598 call.on('receiveMessage', () => {
599 channelzSessionInfo.messagesReceived += 1;
600 channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
601 });
602 }
603 const metadata = call.receiveMetadata(headers);
604 const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
605 metadata.remove('grpc-encoding');
606 switch (handler.type) {
607 case 'unary':
608 handleUnary(call, handler, metadata, encoding);
609 break;
610 case 'clientStream':
611 handleClientStreaming(call, handler, metadata, encoding);
612 break;
613 case 'serverStream':
614 handleServerStreaming(call, handler, metadata, encoding);
615 break;
616 case 'bidi':
617 handleBidiStreaming(call, handler, metadata, encoding);
618 break;
619 default:
620 throw new Error(`Unknown handler type: ${handler.type}`);
621 }
622 }
623 catch (err) {
624 if (!call) {
625 call = new server_call_1.Http2ServerCallStream(stream, null, this.options);
626 this.callTracker.addCallFailed();
627 channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
628 }
629 if (err.code === undefined) {
630 err.code = constants_1.Status.INTERNAL;
631 }
632 call.sendError(err);
633 }
634 });
635 http2Server.on('session', (session) => {
636 var _a;
637 if (!this.started) {
638 session.destroy();
639 return;
640 }
641 const channelzRef = channelz_1.registerChannelzSocket((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session));
642 const channelzSessionInfo = {
643 ref: channelzRef,
644 streamTracker: new channelz_1.ChannelzCallTracker(),
645 messagesSent: 0,
646 messagesReceived: 0,
647 lastMessageSentTimestamp: null,
648 lastMessageReceivedTimestamp: null
649 };
650 this.sessions.set(session, channelzSessionInfo);
651 const clientAddress = session.socket.remoteAddress;
652 if (this.channelzEnabled) {
653 this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
654 this.sessionChildrenTracker.refChild(channelzRef);
655 }
656 session.on('close', () => {
657 if (this.channelzEnabled) {
658 this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
659 this.sessionChildrenTracker.unrefChild(channelzRef);
660 channelz_1.unregisterChannelzRef(channelzRef);
661 }
662 this.sessions.delete(session);
663 });
664 });
665 }
666}
667exports.Server = Server;
668async function handleUnary(call, handler, metadata, encoding) {
669 const request = await call.receiveUnaryMessage(encoding);
670 if (request === undefined || call.cancelled) {
671 return;
672 }
673 const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata, request);
674 handler.func(emitter, (err, value, trailer, flags) => {
675 call.sendUnaryMessage(err, value, trailer, flags);
676 });
677}
678function handleClientStreaming(call, handler, metadata, encoding) {
679 const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
680 function respond(err, value, trailer, flags) {
681 stream.destroy();
682 call.sendUnaryMessage(err, value, trailer, flags);
683 }
684 if (call.cancelled) {
685 return;
686 }
687 stream.on('error', respond);
688 handler.func(stream, respond);
689}
690async function handleServerStreaming(call, handler, metadata, encoding) {
691 const request = await call.receiveUnaryMessage(encoding);
692 if (request === undefined || call.cancelled) {
693 return;
694 }
695 const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize, request);
696 handler.func(stream);
697}
698function handleBidiStreaming(call, handler, metadata, encoding) {
699 const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
700 if (call.cancelled) {
701 return;
702 }
703 handler.func(stream);
704}
705//# sourceMappingURL=server.js.map
\No newline at end of file