UNPKG

30.4 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2CallStream = exports.InterceptingListenerImpl = exports.isInterceptingListener = void 0;
20const http2 = require("http2");
21const os = require("os");
22const constants_1 = require("./constants");
23const metadata_1 = require("./metadata");
24const stream_decoder_1 = require("./stream-decoder");
25const logging = require("./logging");
26const constants_2 = require("./constants");
27const TRACER_NAME = 'call_stream';
28const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
29/**
30 * Should do approximately the same thing as util.getSystemErrorName but the
31 * TypeScript types don't have that function for some reason so I just made my
32 * own.
33 * @param errno
34 */
35function getSystemErrorName(errno) {
36 for (const [name, num] of Object.entries(os.constants.errno)) {
37 if (num === errno) {
38 return name;
39 }
40 }
41 return 'Unknown system error ' + errno;
42}
43function getMinDeadline(deadlineList) {
44 let minValue = Infinity;
45 for (const deadline of deadlineList) {
46 const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
47 if (deadlineMsecs < minValue) {
48 minValue = deadlineMsecs;
49 }
50 }
51 return minValue;
52}
53function isInterceptingListener(listener) {
54 return (listener.onReceiveMetadata !== undefined &&
55 listener.onReceiveMetadata.length === 1);
56}
57exports.isInterceptingListener = isInterceptingListener;
58class InterceptingListenerImpl {
59 constructor(listener, nextListener) {
60 this.listener = listener;
61 this.nextListener = nextListener;
62 this.processingMetadata = false;
63 this.hasPendingMessage = false;
64 this.processingMessage = false;
65 this.pendingStatus = null;
66 }
67 processPendingMessage() {
68 if (this.hasPendingMessage) {
69 this.nextListener.onReceiveMessage(this.pendingMessage);
70 this.pendingMessage = null;
71 this.hasPendingMessage = false;
72 }
73 }
74 processPendingStatus() {
75 if (this.pendingStatus) {
76 this.nextListener.onReceiveStatus(this.pendingStatus);
77 }
78 }
79 onReceiveMetadata(metadata) {
80 this.processingMetadata = true;
81 this.listener.onReceiveMetadata(metadata, (metadata) => {
82 this.processingMetadata = false;
83 this.nextListener.onReceiveMetadata(metadata);
84 this.processPendingMessage();
85 this.processPendingStatus();
86 });
87 }
88 // eslint-disable-next-line @typescript-eslint/no-explicit-any
89 onReceiveMessage(message) {
90 /* If this listener processes messages asynchronously, the last message may
91 * be reordered with respect to the status */
92 this.processingMessage = true;
93 this.listener.onReceiveMessage(message, (msg) => {
94 this.processingMessage = false;
95 if (this.processingMetadata) {
96 this.pendingMessage = msg;
97 this.hasPendingMessage = true;
98 }
99 else {
100 this.nextListener.onReceiveMessage(msg);
101 this.processPendingStatus();
102 }
103 });
104 }
105 onReceiveStatus(status) {
106 this.listener.onReceiveStatus(status, (processedStatus) => {
107 if (this.processingMetadata || this.processingMessage) {
108 this.pendingStatus = processedStatus;
109 }
110 else {
111 this.nextListener.onReceiveStatus(processedStatus);
112 }
113 });
114 }
115}
116exports.InterceptingListenerImpl = InterceptingListenerImpl;
117class Http2CallStream {
118 constructor(methodName, channel, options, filterStackFactory, channelCallCredentials, callNumber) {
119 this.methodName = methodName;
120 this.channel = channel;
121 this.options = options;
122 this.channelCallCredentials = channelCallCredentials;
123 this.callNumber = callNumber;
124 this.http2Stream = null;
125 this.pendingRead = false;
126 this.isWriteFilterPending = false;
127 this.pendingWrite = null;
128 this.pendingWriteCallback = null;
129 this.writesClosed = false;
130 this.decoder = new stream_decoder_1.StreamDecoder();
131 this.isReadFilterPending = false;
132 this.canPush = false;
133 /**
134 * Indicates that an 'end' event has come from the http2 stream, so there
135 * will be no more data events.
136 */
137 this.readsClosed = false;
138 this.statusOutput = false;
139 this.unpushedReadMessages = [];
140 this.unfilteredReadMessages = [];
141 // Status code mapped from :status. To be used if grpc-status is not received
142 this.mappedStatusCode = constants_1.Status.UNKNOWN;
143 // This is populated (non-null) if and only if the call has ended
144 this.finalStatus = null;
145 this.subchannel = null;
146 this.listener = null;
147 this.internalError = null;
148 this.configDeadline = Infinity;
149 this.statusWatchers = [];
150 this.streamEndWatchers = [];
151 this.callStatsTracker = null;
152 this.filterStack = filterStackFactory.createFilter(this);
153 this.credentials = channelCallCredentials;
154 this.disconnectListener = () => {
155 this.endCall({
156 code: constants_1.Status.UNAVAILABLE,
157 details: 'Connection dropped',
158 metadata: new metadata_1.Metadata(),
159 });
160 };
161 if (this.options.parentCall &&
162 this.options.flags & constants_1.Propagate.CANCELLATION) {
163 this.options.parentCall.on('cancelled', () => {
164 this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call');
165 });
166 }
167 }
168 outputStatus() {
169 /* Precondition: this.finalStatus !== null */
170 if (this.listener && !this.statusOutput) {
171 this.statusOutput = true;
172 const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus);
173 this.trace('ended with status: code=' +
174 filteredStatus.code +
175 ' details="' +
176 filteredStatus.details +
177 '"');
178 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
179 /* We delay the actual action of bubbling up the status to insulate the
180 * cleanup code in this class from any errors that may be thrown in the
181 * upper layers as a result of bubbling up the status. In particular,
182 * if the status is not OK, the "error" event may be emitted
183 * synchronously at the top level, which will result in a thrown error if
184 * the user does not handle that event. */
185 process.nextTick(() => {
186 var _a;
187 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
188 });
189 if (this.subchannel) {
190 this.subchannel.callUnref();
191 this.subchannel.removeDisconnectListener(this.disconnectListener);
192 }
193 }
194 }
195 trace(text) {
196 logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
197 }
198 /**
199 * On first call, emits a 'status' event with the given StatusObject.
200 * Subsequent calls are no-ops.
201 * @param status The status of the call.
202 */
203 endCall(status) {
204 /* If the status is OK and a new status comes in (e.g. from a
205 * deserialization failure), that new status takes priority */
206 if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
207 this.finalStatus = status;
208 this.maybeOutputStatus();
209 }
210 this.destroyHttp2Stream();
211 }
212 maybeOutputStatus() {
213 if (this.finalStatus !== null) {
214 /* The combination check of readsClosed and that the two message buffer
215 * arrays are empty checks that there all incoming data has been fully
216 * processed */
217 if (this.finalStatus.code !== constants_1.Status.OK ||
218 (this.readsClosed &&
219 this.unpushedReadMessages.length === 0 &&
220 this.unfilteredReadMessages.length === 0 &&
221 !this.isReadFilterPending)) {
222 this.outputStatus();
223 }
224 }
225 }
226 push(message) {
227 this.trace('pushing to reader message of length ' +
228 (message instanceof Buffer ? message.length : null));
229 this.canPush = false;
230 process.nextTick(() => {
231 var _a;
232 /* If we have already output the status any later messages should be
233 * ignored, and can cause out-of-order operation errors higher up in the
234 * stack. Checking as late as possible here to avoid any race conditions.
235 */
236 if (this.statusOutput) {
237 return;
238 }
239 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMessage(message);
240 this.maybeOutputStatus();
241 });
242 }
243 handleFilterError(error) {
244 this.cancelWithStatus(constants_1.Status.INTERNAL, error.message);
245 }
246 handleFilteredRead(message) {
247 /* If we the call has already ended with an error, we don't want to do
248 * anything with this message. Dropping it on the floor is correct
249 * behavior */
250 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
251 this.maybeOutputStatus();
252 return;
253 }
254 this.isReadFilterPending = false;
255 if (this.canPush) {
256 this.http2Stream.pause();
257 this.push(message);
258 }
259 else {
260 this.trace('unpushedReadMessages.push message of length ' + message.length);
261 this.unpushedReadMessages.push(message);
262 }
263 if (this.unfilteredReadMessages.length > 0) {
264 /* nextMessage is guaranteed not to be undefined because
265 unfilteredReadMessages is non-empty */
266 const nextMessage = this.unfilteredReadMessages.shift();
267 this.filterReceivedMessage(nextMessage);
268 }
269 }
270 filterReceivedMessage(framedMessage) {
271 /* If we the call has already ended with an error, we don't want to do
272 * anything with this message. Dropping it on the floor is correct
273 * behavior */
274 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
275 this.maybeOutputStatus();
276 return;
277 }
278 this.trace('filterReceivedMessage of length ' + framedMessage.length);
279 this.isReadFilterPending = true;
280 this.filterStack
281 .receiveMessage(Promise.resolve(framedMessage))
282 .then(this.handleFilteredRead.bind(this), this.handleFilterError.bind(this));
283 }
284 tryPush(messageBytes) {
285 if (this.isReadFilterPending) {
286 this.trace('unfilteredReadMessages.push message of length ' +
287 (messageBytes && messageBytes.length));
288 this.unfilteredReadMessages.push(messageBytes);
289 }
290 else {
291 this.filterReceivedMessage(messageBytes);
292 }
293 }
294 handleTrailers(headers) {
295 this.streamEndWatchers.forEach(watcher => watcher(true));
296 let headersString = '';
297 for (const header of Object.keys(headers)) {
298 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
299 }
300 this.trace('Received server trailers:\n' + headersString);
301 let metadata;
302 try {
303 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
304 }
305 catch (e) {
306 metadata = new metadata_1.Metadata();
307 }
308 const metadataMap = metadata.getMap();
309 let code = this.mappedStatusCode;
310 if (code === constants_1.Status.UNKNOWN &&
311 typeof metadataMap['grpc-status'] === 'string') {
312 const receivedStatus = Number(metadataMap['grpc-status']);
313 if (receivedStatus in constants_1.Status) {
314 code = receivedStatus;
315 this.trace('received status code ' + receivedStatus + ' from server');
316 }
317 metadata.remove('grpc-status');
318 }
319 let details = '';
320 if (typeof metadataMap['grpc-message'] === 'string') {
321 details = decodeURI(metadataMap['grpc-message']);
322 metadata.remove('grpc-message');
323 this.trace('received status details string "' + details + '" from server');
324 }
325 const status = { code, details, metadata };
326 // This is a no-op if the call was already ended when handling headers.
327 this.endCall(status);
328 }
329 writeMessageToStream(message, callback) {
330 var _a;
331 (_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent();
332 this.http2Stream.write(message, callback);
333 }
334 attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) {
335 this.filterStack.push(extraFilters);
336 if (this.finalStatus !== null) {
337 stream.close(NGHTTP2_CANCEL);
338 }
339 else {
340 this.trace('attachHttp2Stream from subchannel ' + subchannel.getAddress());
341 this.http2Stream = stream;
342 this.subchannel = subchannel;
343 this.callStatsTracker = callStatsTracker;
344 subchannel.addDisconnectListener(this.disconnectListener);
345 subchannel.callRef();
346 stream.on('response', (headers, flags) => {
347 var _a;
348 let headersString = '';
349 for (const header of Object.keys(headers)) {
350 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
351 }
352 this.trace('Received server headers:\n' + headersString);
353 switch (headers[':status']) {
354 // TODO(murgatroid99): handle 100 and 101
355 case 400:
356 this.mappedStatusCode = constants_1.Status.INTERNAL;
357 break;
358 case 401:
359 this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
360 break;
361 case 403:
362 this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
363 break;
364 case 404:
365 this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
366 break;
367 case 429:
368 case 502:
369 case 503:
370 case 504:
371 this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
372 break;
373 default:
374 this.mappedStatusCode = constants_1.Status.UNKNOWN;
375 }
376 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
377 this.handleTrailers(headers);
378 }
379 else {
380 let metadata;
381 try {
382 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
383 }
384 catch (error) {
385 this.endCall({
386 code: constants_1.Status.UNKNOWN,
387 details: error.message,
388 metadata: new metadata_1.Metadata(),
389 });
390 return;
391 }
392 try {
393 const finalMetadata = this.filterStack.receiveMetadata(metadata);
394 (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMetadata(finalMetadata);
395 }
396 catch (error) {
397 this.endCall({
398 code: constants_1.Status.UNKNOWN,
399 details: error.message,
400 metadata: new metadata_1.Metadata(),
401 });
402 }
403 }
404 });
405 stream.on('trailers', this.handleTrailers.bind(this));
406 stream.on('data', (data) => {
407 this.trace('receive HTTP/2 data frame of length ' + data.length);
408 const messages = this.decoder.write(data);
409 for (const message of messages) {
410 this.trace('parsed message of length ' + message.length);
411 this.callStatsTracker.addMessageReceived();
412 this.tryPush(message);
413 }
414 });
415 stream.on('end', () => {
416 this.readsClosed = true;
417 this.maybeOutputStatus();
418 });
419 stream.on('close', () => {
420 /* Use process.next tick to ensure that this code happens after any
421 * "error" event that may be emitted at about the same time, so that
422 * we can bubble up the error message from that event. */
423 process.nextTick(() => {
424 var _a;
425 this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
426 /* If we have a final status with an OK status code, that means that
427 * we have received all of the messages and we have processed the
428 * trailers and the call completed successfully, so it doesn't matter
429 * how the stream ends after that */
430 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
431 return;
432 }
433 let code;
434 let details = '';
435 switch (stream.rstCode) {
436 case http2.constants.NGHTTP2_NO_ERROR:
437 /* If we get a NO_ERROR code and we already have a status, the
438 * stream completed properly and we just haven't fully processed
439 * it yet */
440 if (this.finalStatus !== null) {
441 return;
442 }
443 code = constants_1.Status.INTERNAL;
444 details = `Received RST_STREAM with code ${stream.rstCode}`;
445 break;
446 case http2.constants.NGHTTP2_REFUSED_STREAM:
447 code = constants_1.Status.UNAVAILABLE;
448 details = 'Stream refused by server';
449 break;
450 case http2.constants.NGHTTP2_CANCEL:
451 code = constants_1.Status.CANCELLED;
452 details = 'Call cancelled';
453 break;
454 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
455 code = constants_1.Status.RESOURCE_EXHAUSTED;
456 details = 'Bandwidth exhausted or memory limit exceeded';
457 break;
458 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
459 code = constants_1.Status.PERMISSION_DENIED;
460 details = 'Protocol not secure enough';
461 break;
462 case http2.constants.NGHTTP2_INTERNAL_ERROR:
463 code = constants_1.Status.INTERNAL;
464 if (this.internalError === null) {
465 /* This error code was previously handled in the default case, and
466 * there are several instances of it online, so I wanted to
467 * preserve the original error message so that people find existing
468 * information in searches, but also include the more recognizable
469 * "Internal server error" message. */
470 details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
471 }
472 else {
473 if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
474 code = constants_1.Status.UNAVAILABLE;
475 details = this.internalError.message;
476 }
477 else {
478 /* The "Received RST_STREAM with code ..." error is preserved
479 * here for continuity with errors reported online, but the
480 * error message at the end will probably be more relevant in
481 * most cases. */
482 details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
483 }
484 }
485 break;
486 default:
487 code = constants_1.Status.INTERNAL;
488 details = `Received RST_STREAM with code ${stream.rstCode}`;
489 }
490 // This is a no-op if trailers were received at all.
491 // This is OK, because status codes emitted here correspond to more
492 // catastrophic issues that prevent us from receiving trailers in the
493 // first place.
494 this.endCall({ code, details, metadata: new metadata_1.Metadata() });
495 });
496 });
497 stream.on('error', (err) => {
498 /* We need an error handler here to stop "Uncaught Error" exceptions
499 * from bubbling up. However, errors here should all correspond to
500 * "close" events, where we will handle the error more granularly */
501 /* Specifically looking for stream errors that were *not* constructed
502 * from a RST_STREAM response here:
503 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
504 */
505 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
506 this.trace('Node error event: message=' +
507 err.message +
508 ' code=' +
509 err.code +
510 ' errno=' +
511 getSystemErrorName(err.errno) +
512 ' syscall=' +
513 err.syscall);
514 this.internalError = err;
515 }
516 this.streamEndWatchers.forEach(watcher => watcher(false));
517 });
518 if (!this.pendingRead) {
519 stream.pause();
520 }
521 if (this.pendingWrite) {
522 if (!this.pendingWriteCallback) {
523 throw new Error('Invalid state in write handling code');
524 }
525 this.trace('sending data chunk of length ' +
526 this.pendingWrite.length +
527 ' (deferred)');
528 try {
529 this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
530 }
531 catch (error) {
532 this.endCall({
533 code: constants_1.Status.UNAVAILABLE,
534 details: `Write failed with error ${error.message}`,
535 metadata: new metadata_1.Metadata()
536 });
537 }
538 }
539 this.maybeCloseWrites();
540 }
541 }
542 start(metadata, listener) {
543 this.trace('Sending metadata');
544 this.listener = listener;
545 this.channel._startCallStream(this, metadata);
546 this.maybeOutputStatus();
547 }
548 destroyHttp2Stream() {
549 var _a;
550 // The http2 stream could already have been destroyed if cancelWithStatus
551 // is called in response to an internal http2 error.
552 if (this.http2Stream !== null && !this.http2Stream.destroyed) {
553 /* If the call has ended with an OK status, communicate that when closing
554 * the stream, partly to avoid a situation in which we detect an error
555 * RST_STREAM as a result after we have the status */
556 let code;
557 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
558 code = http2.constants.NGHTTP2_NO_ERROR;
559 }
560 else {
561 code = http2.constants.NGHTTP2_CANCEL;
562 }
563 this.trace('close http2 stream with code ' + code);
564 this.http2Stream.close(code);
565 }
566 }
567 cancelWithStatus(status, details) {
568 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
569 this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
570 }
571 getDeadline() {
572 const deadlineList = [this.options.deadline];
573 if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) {
574 deadlineList.push(this.options.parentCall.getDeadline());
575 }
576 if (this.configDeadline) {
577 deadlineList.push(this.configDeadline);
578 }
579 return getMinDeadline(deadlineList);
580 }
581 getCredentials() {
582 return this.credentials;
583 }
584 setCredentials(credentials) {
585 this.credentials = this.channelCallCredentials.compose(credentials);
586 }
587 getStatus() {
588 return this.finalStatus;
589 }
590 getPeer() {
591 var _a, _b;
592 return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget();
593 }
594 getMethod() {
595 return this.methodName;
596 }
597 getHost() {
598 return this.options.host;
599 }
600 setConfigDeadline(configDeadline) {
601 this.configDeadline = configDeadline;
602 }
603 addStatusWatcher(watcher) {
604 this.statusWatchers.push(watcher);
605 }
606 addStreamEndWatcher(watcher) {
607 this.streamEndWatchers.push(watcher);
608 }
609 addFilters(extraFilters) {
610 this.filterStack.push(extraFilters);
611 }
612 getCallNumber() {
613 return this.callNumber;
614 }
615 startRead() {
616 /* If the stream has ended with an error, we should not emit any more
617 * messages and we should communicate that the stream has ended */
618 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
619 this.readsClosed = true;
620 this.maybeOutputStatus();
621 return;
622 }
623 this.canPush = true;
624 if (this.http2Stream === null) {
625 this.pendingRead = true;
626 }
627 else {
628 if (this.unpushedReadMessages.length > 0) {
629 const nextMessage = this.unpushedReadMessages.shift();
630 this.push(nextMessage);
631 return;
632 }
633 /* Only resume reading from the http2Stream if we don't have any pending
634 * messages to emit */
635 this.http2Stream.resume();
636 }
637 }
638 maybeCloseWrites() {
639 if (this.writesClosed &&
640 !this.isWriteFilterPending &&
641 this.http2Stream !== null) {
642 this.trace('calling end() on HTTP/2 stream');
643 this.http2Stream.end();
644 }
645 }
646 sendMessageWithContext(context, message) {
647 this.trace('write() called with message of length ' + message.length);
648 const writeObj = {
649 message,
650 flags: context.flags,
651 };
652 const cb = (error) => {
653 var _a, _b;
654 let code = constants_1.Status.UNAVAILABLE;
655 if (((_a = error) === null || _a === void 0 ? void 0 : _a.code) === 'ERR_STREAM_WRITE_AFTER_END') {
656 code = constants_1.Status.INTERNAL;
657 }
658 if (error) {
659 this.cancelWithStatus(code, `Write error: ${error.message}`);
660 }
661 (_b = context.callback) === null || _b === void 0 ? void 0 : _b.call(context);
662 };
663 this.isWriteFilterPending = true;
664 this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
665 this.isWriteFilterPending = false;
666 if (this.http2Stream === null) {
667 this.trace('deferring writing data chunk of length ' + message.message.length);
668 this.pendingWrite = message.message;
669 this.pendingWriteCallback = cb;
670 }
671 else {
672 this.trace('sending data chunk of length ' + message.message.length);
673 try {
674 this.writeMessageToStream(message.message, cb);
675 }
676 catch (error) {
677 this.endCall({
678 code: constants_1.Status.UNAVAILABLE,
679 details: `Write failed with error ${error.message}`,
680 metadata: new metadata_1.Metadata()
681 });
682 }
683 this.maybeCloseWrites();
684 }
685 }, this.handleFilterError.bind(this));
686 }
687 halfClose() {
688 this.trace('end() called');
689 this.writesClosed = true;
690 this.maybeCloseWrites();
691 }
692}
693exports.Http2CallStream = Http2CallStream;
694//# sourceMappingURL=call-stream.js.map
\No newline at end of file