UNPKG

30.1 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2CallStream = exports.InterceptingListenerImpl = exports.isInterceptingListener = void 0;
20const http2 = require("http2");
21const os = require("os");
22const constants_1 = require("./constants");
23const metadata_1 = require("./metadata");
24const stream_decoder_1 = require("./stream-decoder");
25const logging = require("./logging");
26const constants_2 = require("./constants");
27const error_1 = require("./error");
28const TRACER_NAME = 'call_stream';
29const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
30/**
31 * Should do approximately the same thing as util.getSystemErrorName but the
32 * TypeScript types don't have that function for some reason so I just made my
33 * own.
34 * @param errno
35 */
36function getSystemErrorName(errno) {
37 for (const [name, num] of Object.entries(os.constants.errno)) {
38 if (num === errno) {
39 return name;
40 }
41 }
42 return 'Unknown system error ' + errno;
43}
44function getMinDeadline(deadlineList) {
45 let minValue = Infinity;
46 for (const deadline of deadlineList) {
47 const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
48 if (deadlineMsecs < minValue) {
49 minValue = deadlineMsecs;
50 }
51 }
52 return minValue;
53}
54function isInterceptingListener(listener) {
55 return (listener.onReceiveMetadata !== undefined &&
56 listener.onReceiveMetadata.length === 1);
57}
58exports.isInterceptingListener = isInterceptingListener;
59class InterceptingListenerImpl {
60 constructor(listener, nextListener) {
61 this.listener = listener;
62 this.nextListener = nextListener;
63 this.processingMetadata = false;
64 this.hasPendingMessage = false;
65 this.processingMessage = false;
66 this.pendingStatus = null;
67 }
68 processPendingMessage() {
69 if (this.hasPendingMessage) {
70 this.nextListener.onReceiveMessage(this.pendingMessage);
71 this.pendingMessage = null;
72 this.hasPendingMessage = false;
73 }
74 }
75 processPendingStatus() {
76 if (this.pendingStatus) {
77 this.nextListener.onReceiveStatus(this.pendingStatus);
78 }
79 }
80 onReceiveMetadata(metadata) {
81 this.processingMetadata = true;
82 this.listener.onReceiveMetadata(metadata, (metadata) => {
83 this.processingMetadata = false;
84 this.nextListener.onReceiveMetadata(metadata);
85 this.processPendingMessage();
86 this.processPendingStatus();
87 });
88 }
89 // eslint-disable-next-line @typescript-eslint/no-explicit-any
90 onReceiveMessage(message) {
91 /* If this listener processes messages asynchronously, the last message may
92 * be reordered with respect to the status */
93 this.processingMessage = true;
94 this.listener.onReceiveMessage(message, (msg) => {
95 this.processingMessage = false;
96 if (this.processingMetadata) {
97 this.pendingMessage = msg;
98 this.hasPendingMessage = true;
99 }
100 else {
101 this.nextListener.onReceiveMessage(msg);
102 this.processPendingStatus();
103 }
104 });
105 }
106 onReceiveStatus(status) {
107 this.listener.onReceiveStatus(status, (processedStatus) => {
108 if (this.processingMetadata || this.processingMessage) {
109 this.pendingStatus = processedStatus;
110 }
111 else {
112 this.nextListener.onReceiveStatus(processedStatus);
113 }
114 });
115 }
116}
117exports.InterceptingListenerImpl = InterceptingListenerImpl;
118class Http2CallStream {
119 constructor(methodName, channel, options, filterStackFactory, channelCallCredentials, callNumber) {
120 this.methodName = methodName;
121 this.channel = channel;
122 this.options = options;
123 this.channelCallCredentials = channelCallCredentials;
124 this.callNumber = callNumber;
125 this.http2Stream = null;
126 this.pendingRead = false;
127 this.isWriteFilterPending = false;
128 this.pendingWrite = null;
129 this.pendingWriteCallback = null;
130 this.writesClosed = false;
131 this.decoder = new stream_decoder_1.StreamDecoder();
132 this.isReadFilterPending = false;
133 this.canPush = false;
134 /**
135 * Indicates that an 'end' event has come from the http2 stream, so there
136 * will be no more data events.
137 */
138 this.readsClosed = false;
139 this.statusOutput = false;
140 this.unpushedReadMessages = [];
141 this.unfilteredReadMessages = [];
142 // Status code mapped from :status. To be used if grpc-status is not received
143 this.mappedStatusCode = constants_1.Status.UNKNOWN;
144 // This is populated (non-null) if and only if the call has ended
145 this.finalStatus = null;
146 this.subchannel = null;
147 this.listener = null;
148 this.internalError = null;
149 this.configDeadline = Infinity;
150 this.statusWatchers = [];
151 this.streamEndWatchers = [];
152 this.callStatsTracker = null;
153 this.filterStack = filterStackFactory.createFilter(this);
154 this.credentials = channelCallCredentials;
155 this.disconnectListener = () => {
156 this.endCall({
157 code: constants_1.Status.UNAVAILABLE,
158 details: 'Connection dropped',
159 metadata: new metadata_1.Metadata(),
160 });
161 };
162 if (this.options.parentCall &&
163 this.options.flags & constants_1.Propagate.CANCELLATION) {
164 this.options.parentCall.on('cancelled', () => {
165 this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call');
166 });
167 }
168 }
169 outputStatus() {
170 /* Precondition: this.finalStatus !== null */
171 if (this.listener && !this.statusOutput) {
172 this.statusOutput = true;
173 const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus);
174 this.trace('ended with status: code=' +
175 filteredStatus.code +
176 ' details="' +
177 filteredStatus.details +
178 '"');
179 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
180 /* We delay the actual action of bubbling up the status to insulate the
181 * cleanup code in this class from any errors that may be thrown in the
182 * upper layers as a result of bubbling up the status. In particular,
183 * if the status is not OK, the "error" event may be emitted
184 * synchronously at the top level, which will result in a thrown error if
185 * the user does not handle that event. */
186 process.nextTick(() => {
187 var _a;
188 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
189 });
190 if (this.subchannel) {
191 this.subchannel.callUnref();
192 this.subchannel.removeDisconnectListener(this.disconnectListener);
193 }
194 }
195 }
196 trace(text) {
197 logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
198 }
199 /**
200 * On first call, emits a 'status' event with the given StatusObject.
201 * Subsequent calls are no-ops.
202 * @param status The status of the call.
203 */
204 endCall(status) {
205 /* If the status is OK and a new status comes in (e.g. from a
206 * deserialization failure), that new status takes priority */
207 if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
208 this.finalStatus = status;
209 this.maybeOutputStatus();
210 }
211 this.destroyHttp2Stream();
212 }
213 maybeOutputStatus() {
214 if (this.finalStatus !== null) {
215 /* The combination check of readsClosed and that the two message buffer
216 * arrays are empty checks that there all incoming data has been fully
217 * processed */
218 if (this.finalStatus.code !== constants_1.Status.OK ||
219 (this.readsClosed &&
220 this.unpushedReadMessages.length === 0 &&
221 this.unfilteredReadMessages.length === 0 &&
222 !this.isReadFilterPending)) {
223 this.outputStatus();
224 }
225 }
226 }
227 push(message) {
228 this.trace('pushing to reader message of length ' +
229 (message instanceof Buffer ? message.length : null));
230 this.canPush = false;
231 process.nextTick(() => {
232 var _a;
233 /* If we have already output the status any later messages should be
234 * ignored, and can cause out-of-order operation errors higher up in the
235 * stack. Checking as late as possible here to avoid any race conditions.
236 */
237 if (this.statusOutput) {
238 return;
239 }
240 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMessage(message);
241 this.maybeOutputStatus();
242 });
243 }
244 handleFilterError(error) {
245 this.cancelWithStatus(constants_1.Status.INTERNAL, error.message);
246 }
247 handleFilteredRead(message) {
248 /* If we the call has already ended with an error, we don't want to do
249 * anything with this message. Dropping it on the floor is correct
250 * behavior */
251 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
252 this.maybeOutputStatus();
253 return;
254 }
255 this.isReadFilterPending = false;
256 if (this.canPush) {
257 this.http2Stream.pause();
258 this.push(message);
259 }
260 else {
261 this.trace('unpushedReadMessages.push message of length ' + message.length);
262 this.unpushedReadMessages.push(message);
263 }
264 if (this.unfilteredReadMessages.length > 0) {
265 /* nextMessage is guaranteed not to be undefined because
266 unfilteredReadMessages is non-empty */
267 const nextMessage = this.unfilteredReadMessages.shift();
268 this.filterReceivedMessage(nextMessage);
269 }
270 }
271 filterReceivedMessage(framedMessage) {
272 /* If we the call has already ended with an error, we don't want to do
273 * anything with this message. Dropping it on the floor is correct
274 * behavior */
275 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
276 this.maybeOutputStatus();
277 return;
278 }
279 this.trace('filterReceivedMessage of length ' + framedMessage.length);
280 this.isReadFilterPending = true;
281 this.filterStack
282 .receiveMessage(Promise.resolve(framedMessage))
283 .then(this.handleFilteredRead.bind(this), this.handleFilterError.bind(this));
284 }
285 tryPush(messageBytes) {
286 if (this.isReadFilterPending) {
287 this.trace('unfilteredReadMessages.push message of length ' +
288 (messageBytes && messageBytes.length));
289 this.unfilteredReadMessages.push(messageBytes);
290 }
291 else {
292 this.filterReceivedMessage(messageBytes);
293 }
294 }
295 handleTrailers(headers) {
296 this.streamEndWatchers.forEach(watcher => watcher(true));
297 let headersString = '';
298 for (const header of Object.keys(headers)) {
299 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
300 }
301 this.trace('Received server trailers:\n' + headersString);
302 let metadata;
303 try {
304 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
305 }
306 catch (e) {
307 metadata = new metadata_1.Metadata();
308 }
309 const metadataMap = metadata.getMap();
310 let code = this.mappedStatusCode;
311 if (code === constants_1.Status.UNKNOWN &&
312 typeof metadataMap['grpc-status'] === 'string') {
313 const receivedStatus = Number(metadataMap['grpc-status']);
314 if (receivedStatus in constants_1.Status) {
315 code = receivedStatus;
316 this.trace('received status code ' + receivedStatus + ' from server');
317 }
318 metadata.remove('grpc-status');
319 }
320 let details = '';
321 if (typeof metadataMap['grpc-message'] === 'string') {
322 details = decodeURI(metadataMap['grpc-message']);
323 metadata.remove('grpc-message');
324 this.trace('received status details string "' + details + '" from server');
325 }
326 const status = { code, details, metadata };
327 // This is a no-op if the call was already ended when handling headers.
328 this.endCall(status);
329 }
330 writeMessageToStream(message, callback) {
331 var _a;
332 (_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent();
333 this.http2Stream.write(message, callback);
334 }
335 attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) {
336 this.filterStack.push(extraFilters);
337 if (this.finalStatus !== null) {
338 stream.close(NGHTTP2_CANCEL);
339 }
340 else {
341 this.trace('attachHttp2Stream from subchannel ' + subchannel.getAddress());
342 this.http2Stream = stream;
343 this.subchannel = subchannel;
344 this.callStatsTracker = callStatsTracker;
345 subchannel.addDisconnectListener(this.disconnectListener);
346 subchannel.callRef();
347 stream.on('response', (headers, flags) => {
348 var _a;
349 let headersString = '';
350 for (const header of Object.keys(headers)) {
351 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
352 }
353 this.trace('Received server headers:\n' + headersString);
354 switch (headers[':status']) {
355 // TODO(murgatroid99): handle 100 and 101
356 case 400:
357 this.mappedStatusCode = constants_1.Status.INTERNAL;
358 break;
359 case 401:
360 this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
361 break;
362 case 403:
363 this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
364 break;
365 case 404:
366 this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
367 break;
368 case 429:
369 case 502:
370 case 503:
371 case 504:
372 this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
373 break;
374 default:
375 this.mappedStatusCode = constants_1.Status.UNKNOWN;
376 }
377 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
378 this.handleTrailers(headers);
379 }
380 else {
381 let metadata;
382 try {
383 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
384 }
385 catch (error) {
386 this.endCall({
387 code: constants_1.Status.UNKNOWN,
388 details: (0, error_1.getErrorMessage)(error),
389 metadata: new metadata_1.Metadata(),
390 });
391 return;
392 }
393 try {
394 const finalMetadata = this.filterStack.receiveMetadata(metadata);
395 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMetadata(finalMetadata);
396 }
397 catch (error) {
398 this.endCall({
399 code: constants_1.Status.UNKNOWN,
400 details: (0, error_1.getErrorMessage)(error),
401 metadata: new metadata_1.Metadata(),
402 });
403 }
404 }
405 });
406 stream.on('trailers', this.handleTrailers.bind(this));
407 stream.on('data', (data) => {
408 this.trace('receive HTTP/2 data frame of length ' + data.length);
409 const messages = this.decoder.write(data);
410 for (const message of messages) {
411 this.trace('parsed message of length ' + message.length);
412 this.callStatsTracker.addMessageReceived();
413 this.tryPush(message);
414 }
415 });
416 stream.on('end', () => {
417 this.readsClosed = true;
418 this.maybeOutputStatus();
419 });
420 stream.on('close', () => {
421 /* Use process.next tick to ensure that this code happens after any
422 * "error" event that may be emitted at about the same time, so that
423 * we can bubble up the error message from that event. */
424 process.nextTick(() => {
425 var _a;
426 this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
427 /* If we have a final status with an OK status code, that means that
428 * we have received all of the messages and we have processed the
429 * trailers and the call completed successfully, so it doesn't matter
430 * how the stream ends after that */
431 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
432 return;
433 }
434 let code;
435 let details = '';
436 switch (stream.rstCode) {
437 case http2.constants.NGHTTP2_NO_ERROR:
438 /* If we get a NO_ERROR code and we already have a status, the
439 * stream completed properly and we just haven't fully processed
440 * it yet */
441 if (this.finalStatus !== null) {
442 return;
443 }
444 code = constants_1.Status.INTERNAL;
445 details = `Received RST_STREAM with code ${stream.rstCode}`;
446 break;
447 case http2.constants.NGHTTP2_REFUSED_STREAM:
448 code = constants_1.Status.UNAVAILABLE;
449 details = 'Stream refused by server';
450 break;
451 case http2.constants.NGHTTP2_CANCEL:
452 code = constants_1.Status.CANCELLED;
453 details = 'Call cancelled';
454 break;
455 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
456 code = constants_1.Status.RESOURCE_EXHAUSTED;
457 details = 'Bandwidth exhausted or memory limit exceeded';
458 break;
459 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
460 code = constants_1.Status.PERMISSION_DENIED;
461 details = 'Protocol not secure enough';
462 break;
463 case http2.constants.NGHTTP2_INTERNAL_ERROR:
464 code = constants_1.Status.INTERNAL;
465 if (this.internalError === null) {
466 /* This error code was previously handled in the default case, and
467 * there are several instances of it online, so I wanted to
468 * preserve the original error message so that people find existing
469 * information in searches, but also include the more recognizable
470 * "Internal server error" message. */
471 details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
472 }
473 else {
474 if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
475 code = constants_1.Status.UNAVAILABLE;
476 details = this.internalError.message;
477 }
478 else {
479 /* The "Received RST_STREAM with code ..." error is preserved
480 * here for continuity with errors reported online, but the
481 * error message at the end will probably be more relevant in
482 * most cases. */
483 details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
484 }
485 }
486 break;
487 default:
488 code = constants_1.Status.INTERNAL;
489 details = `Received RST_STREAM with code ${stream.rstCode}`;
490 }
491 // This is a no-op if trailers were received at all.
492 // This is OK, because status codes emitted here correspond to more
493 // catastrophic issues that prevent us from receiving trailers in the
494 // first place.
495 this.endCall({ code, details, metadata: new metadata_1.Metadata() });
496 });
497 });
498 stream.on('error', (err) => {
499 /* We need an error handler here to stop "Uncaught Error" exceptions
500 * from bubbling up. However, errors here should all correspond to
501 * "close" events, where we will handle the error more granularly */
502 /* Specifically looking for stream errors that were *not* constructed
503 * from a RST_STREAM response here:
504 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
505 */
506 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
507 this.trace('Node error event: message=' +
508 err.message +
509 ' code=' +
510 err.code +
511 ' errno=' +
512 getSystemErrorName(err.errno) +
513 ' syscall=' +
514 err.syscall);
515 this.internalError = err;
516 }
517 this.streamEndWatchers.forEach(watcher => watcher(false));
518 });
519 if (!this.pendingRead) {
520 stream.pause();
521 }
522 if (this.pendingWrite) {
523 if (!this.pendingWriteCallback) {
524 throw new Error('Invalid state in write handling code');
525 }
526 this.trace('sending data chunk of length ' +
527 this.pendingWrite.length +
528 ' (deferred)');
529 try {
530 this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
531 }
532 catch (error) {
533 this.endCall({
534 code: constants_1.Status.UNAVAILABLE,
535 details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`,
536 metadata: new metadata_1.Metadata()
537 });
538 }
539 }
540 this.maybeCloseWrites();
541 }
542 }
543 start(metadata, listener) {
544 this.trace('Sending metadata');
545 this.listener = listener;
546 this.channel._startCallStream(this, metadata);
547 this.maybeOutputStatus();
548 }
549 destroyHttp2Stream() {
550 var _a;
551 // The http2 stream could already have been destroyed if cancelWithStatus
552 // is called in response to an internal http2 error.
553 if (this.http2Stream !== null && !this.http2Stream.destroyed) {
554 /* If the call has ended with an OK status, communicate that when closing
555 * the stream, partly to avoid a situation in which we detect an error
556 * RST_STREAM as a result after we have the status */
557 let code;
558 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
559 code = http2.constants.NGHTTP2_NO_ERROR;
560 }
561 else {
562 code = http2.constants.NGHTTP2_CANCEL;
563 }
564 this.trace('close http2 stream with code ' + code);
565 this.http2Stream.close(code);
566 }
567 }
568 cancelWithStatus(status, details) {
569 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
570 this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
571 }
572 getDeadline() {
573 const deadlineList = [this.options.deadline];
574 if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) {
575 deadlineList.push(this.options.parentCall.getDeadline());
576 }
577 if (this.configDeadline) {
578 deadlineList.push(this.configDeadline);
579 }
580 return getMinDeadline(deadlineList);
581 }
582 getCredentials() {
583 return this.credentials;
584 }
585 setCredentials(credentials) {
586 this.credentials = this.channelCallCredentials.compose(credentials);
587 }
588 getStatus() {
589 return this.finalStatus;
590 }
591 getPeer() {
592 var _a, _b;
593 return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget();
594 }
595 getMethod() {
596 return this.methodName;
597 }
598 getHost() {
599 return this.options.host;
600 }
601 setConfigDeadline(configDeadline) {
602 this.configDeadline = configDeadline;
603 }
604 addStatusWatcher(watcher) {
605 this.statusWatchers.push(watcher);
606 }
607 addStreamEndWatcher(watcher) {
608 this.streamEndWatchers.push(watcher);
609 }
610 addFilters(extraFilters) {
611 this.filterStack.push(extraFilters);
612 }
613 getCallNumber() {
614 return this.callNumber;
615 }
616 startRead() {
617 /* If the stream has ended with an error, we should not emit any more
618 * messages and we should communicate that the stream has ended */
619 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
620 this.readsClosed = true;
621 this.maybeOutputStatus();
622 return;
623 }
624 this.canPush = true;
625 if (this.http2Stream === null) {
626 this.pendingRead = true;
627 }
628 else {
629 if (this.unpushedReadMessages.length > 0) {
630 const nextMessage = this.unpushedReadMessages.shift();
631 this.push(nextMessage);
632 return;
633 }
634 /* Only resume reading from the http2Stream if we don't have any pending
635 * messages to emit */
636 this.http2Stream.resume();
637 }
638 }
639 maybeCloseWrites() {
640 if (this.writesClosed &&
641 !this.isWriteFilterPending &&
642 this.http2Stream !== null) {
643 this.trace('calling end() on HTTP/2 stream');
644 this.http2Stream.end();
645 }
646 }
647 sendMessageWithContext(context, message) {
648 var _a;
649 this.trace('write() called with message of length ' + message.length);
650 const writeObj = {
651 message,
652 flags: context.flags,
653 };
654 const cb = (_a = context.callback) !== null && _a !== void 0 ? _a : (() => { });
655 this.isWriteFilterPending = true;
656 this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
657 this.isWriteFilterPending = false;
658 if (this.http2Stream === null) {
659 this.trace('deferring writing data chunk of length ' + message.message.length);
660 this.pendingWrite = message.message;
661 this.pendingWriteCallback = cb;
662 }
663 else {
664 this.trace('sending data chunk of length ' + message.message.length);
665 try {
666 this.writeMessageToStream(message.message, cb);
667 }
668 catch (error) {
669 this.endCall({
670 code: constants_1.Status.UNAVAILABLE,
671 details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`,
672 metadata: new metadata_1.Metadata()
673 });
674 }
675 this.maybeCloseWrites();
676 }
677 }, this.handleFilterError.bind(this));
678 }
679 halfClose() {
680 this.trace('end() called');
681 this.writesClosed = true;
682 this.maybeCloseWrites();
683 }
684}
685exports.Http2CallStream = Http2CallStream;
686//# sourceMappingURL=call-stream.js.map
\No newline at end of file