1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | Object.defineProperty(exports, "__esModule", { value: true });
|
19 | exports.Http2CallStream = exports.InterceptingListenerImpl = exports.isInterceptingListener = void 0;
|
20 | const http2 = require("http2");
|
21 | const os = require("os");
|
22 | const constants_1 = require("./constants");
|
23 | const metadata_1 = require("./metadata");
|
24 | const stream_decoder_1 = require("./stream-decoder");
|
25 | const logging = require("./logging");
|
26 | const constants_2 = require("./constants");
|
27 | const error_1 = require("./error");
|
28 | const TRACER_NAME = 'call_stream';
|
29 | const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 | function getSystemErrorName(errno) {
|
37 | for (const [name, num] of Object.entries(os.constants.errno)) {
|
38 | if (num === errno) {
|
39 | return name;
|
40 | }
|
41 | }
|
42 | return 'Unknown system error ' + errno;
|
43 | }
|
44 | function getMinDeadline(deadlineList) {
|
45 | let minValue = Infinity;
|
46 | for (const deadline of deadlineList) {
|
47 | const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
|
48 | if (deadlineMsecs < minValue) {
|
49 | minValue = deadlineMsecs;
|
50 | }
|
51 | }
|
52 | return minValue;
|
53 | }
|
54 | function isInterceptingListener(listener) {
|
55 | return (listener.onReceiveMetadata !== undefined &&
|
56 | listener.onReceiveMetadata.length === 1);
|
57 | }
|
58 | exports.isInterceptingListener = isInterceptingListener;
|
59 | class InterceptingListenerImpl {
|
60 | constructor(listener, nextListener) {
|
61 | this.listener = listener;
|
62 | this.nextListener = nextListener;
|
63 | this.processingMetadata = false;
|
64 | this.hasPendingMessage = false;
|
65 | this.processingMessage = false;
|
66 | this.pendingStatus = null;
|
67 | }
|
68 | processPendingMessage() {
|
69 | if (this.hasPendingMessage) {
|
70 | this.nextListener.onReceiveMessage(this.pendingMessage);
|
71 | this.pendingMessage = null;
|
72 | this.hasPendingMessage = false;
|
73 | }
|
74 | }
|
75 | processPendingStatus() {
|
76 | if (this.pendingStatus) {
|
77 | this.nextListener.onReceiveStatus(this.pendingStatus);
|
78 | }
|
79 | }
|
80 | onReceiveMetadata(metadata) {
|
81 | this.processingMetadata = true;
|
82 | this.listener.onReceiveMetadata(metadata, (metadata) => {
|
83 | this.processingMetadata = false;
|
84 | this.nextListener.onReceiveMetadata(metadata);
|
85 | this.processPendingMessage();
|
86 | this.processPendingStatus();
|
87 | });
|
88 | }
|
89 |
|
90 | onReceiveMessage(message) {
|
91 | |
92 |
|
93 | this.processingMessage = true;
|
94 | this.listener.onReceiveMessage(message, (msg) => {
|
95 | this.processingMessage = false;
|
96 | if (this.processingMetadata) {
|
97 | this.pendingMessage = msg;
|
98 | this.hasPendingMessage = true;
|
99 | }
|
100 | else {
|
101 | this.nextListener.onReceiveMessage(msg);
|
102 | this.processPendingStatus();
|
103 | }
|
104 | });
|
105 | }
|
106 | onReceiveStatus(status) {
|
107 | this.listener.onReceiveStatus(status, (processedStatus) => {
|
108 | if (this.processingMetadata || this.processingMessage) {
|
109 | this.pendingStatus = processedStatus;
|
110 | }
|
111 | else {
|
112 | this.nextListener.onReceiveStatus(processedStatus);
|
113 | }
|
114 | });
|
115 | }
|
116 | }
|
117 | exports.InterceptingListenerImpl = InterceptingListenerImpl;
|
118 | class Http2CallStream {
|
119 | constructor(methodName, channel, options, filterStackFactory, channelCallCredentials, callNumber) {
|
120 | this.methodName = methodName;
|
121 | this.channel = channel;
|
122 | this.options = options;
|
123 | this.channelCallCredentials = channelCallCredentials;
|
124 | this.callNumber = callNumber;
|
125 | this.http2Stream = null;
|
126 | this.pendingRead = false;
|
127 | this.isWriteFilterPending = false;
|
128 | this.pendingWrite = null;
|
129 | this.pendingWriteCallback = null;
|
130 | this.writesClosed = false;
|
131 | this.decoder = new stream_decoder_1.StreamDecoder();
|
132 | this.isReadFilterPending = false;
|
133 | this.canPush = false;
|
134 | |
135 |
|
136 |
|
137 |
|
138 | this.readsClosed = false;
|
139 | this.statusOutput = false;
|
140 | this.unpushedReadMessages = [];
|
141 | this.unfilteredReadMessages = [];
|
142 |
|
143 | this.mappedStatusCode = constants_1.Status.UNKNOWN;
|
144 |
|
145 | this.finalStatus = null;
|
146 | this.subchannel = null;
|
147 | this.listener = null;
|
148 | this.internalError = null;
|
149 | this.configDeadline = Infinity;
|
150 | this.statusWatchers = [];
|
151 | this.streamEndWatchers = [];
|
152 | this.callStatsTracker = null;
|
153 | this.filterStack = filterStackFactory.createFilter(this);
|
154 | this.credentials = channelCallCredentials;
|
155 | this.disconnectListener = () => {
|
156 | this.endCall({
|
157 | code: constants_1.Status.UNAVAILABLE,
|
158 | details: 'Connection dropped',
|
159 | metadata: new metadata_1.Metadata(),
|
160 | });
|
161 | };
|
162 | if (this.options.parentCall &&
|
163 | this.options.flags & constants_1.Propagate.CANCELLATION) {
|
164 | this.options.parentCall.on('cancelled', () => {
|
165 | this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call');
|
166 | });
|
167 | }
|
168 | }
|
169 | outputStatus() {
|
170 |
|
171 | if (this.listener && !this.statusOutput) {
|
172 | this.statusOutput = true;
|
173 | const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus);
|
174 | this.trace('ended with status: code=' +
|
175 | filteredStatus.code +
|
176 | ' details="' +
|
177 | filteredStatus.details +
|
178 | '"');
|
179 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
|
180 | |
181 |
|
182 |
|
183 |
|
184 |
|
185 |
|
186 | process.nextTick(() => {
|
187 | var _a;
|
188 | (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
|
189 | });
|
190 | if (this.subchannel) {
|
191 | this.subchannel.callUnref();
|
192 | this.subchannel.removeDisconnectListener(this.disconnectListener);
|
193 | }
|
194 | }
|
195 | }
|
196 | trace(text) {
|
197 | logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
|
198 | }
|
199 | |
200 |
|
201 |
|
202 |
|
203 |
|
204 | endCall(status) {
|
205 | |
206 |
|
207 | if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
|
208 | this.finalStatus = status;
|
209 | this.maybeOutputStatus();
|
210 | }
|
211 | this.destroyHttp2Stream();
|
212 | }
|
213 | maybeOutputStatus() {
|
214 | if (this.finalStatus !== null) {
|
215 | |
216 |
|
217 |
|
218 | if (this.finalStatus.code !== constants_1.Status.OK ||
|
219 | (this.readsClosed &&
|
220 | this.unpushedReadMessages.length === 0 &&
|
221 | this.unfilteredReadMessages.length === 0 &&
|
222 | !this.isReadFilterPending)) {
|
223 | this.outputStatus();
|
224 | }
|
225 | }
|
226 | }
|
227 | push(message) {
|
228 | this.trace('pushing to reader message of length ' +
|
229 | (message instanceof Buffer ? message.length : null));
|
230 | this.canPush = false;
|
231 | process.nextTick(() => {
|
232 | var _a;
|
233 | |
234 |
|
235 |
|
236 |
|
237 | if (this.statusOutput) {
|
238 | return;
|
239 | }
|
240 | (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMessage(message);
|
241 | this.maybeOutputStatus();
|
242 | });
|
243 | }
|
244 | handleFilterError(error) {
|
245 | this.cancelWithStatus(constants_1.Status.INTERNAL, error.message);
|
246 | }
|
247 | handleFilteredRead(message) {
|
248 | |
249 |
|
250 |
|
251 | if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
|
252 | this.maybeOutputStatus();
|
253 | return;
|
254 | }
|
255 | this.isReadFilterPending = false;
|
256 | if (this.canPush) {
|
257 | this.http2Stream.pause();
|
258 | this.push(message);
|
259 | }
|
260 | else {
|
261 | this.trace('unpushedReadMessages.push message of length ' + message.length);
|
262 | this.unpushedReadMessages.push(message);
|
263 | }
|
264 | if (this.unfilteredReadMessages.length > 0) {
|
265 | |
266 |
|
267 | const nextMessage = this.unfilteredReadMessages.shift();
|
268 | this.filterReceivedMessage(nextMessage);
|
269 | }
|
270 | }
|
271 | filterReceivedMessage(framedMessage) {
|
272 | |
273 |
|
274 |
|
275 | if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
|
276 | this.maybeOutputStatus();
|
277 | return;
|
278 | }
|
279 | this.trace('filterReceivedMessage of length ' + framedMessage.length);
|
280 | this.isReadFilterPending = true;
|
281 | this.filterStack
|
282 | .receiveMessage(Promise.resolve(framedMessage))
|
283 | .then(this.handleFilteredRead.bind(this), this.handleFilterError.bind(this));
|
284 | }
|
285 | tryPush(messageBytes) {
|
286 | if (this.isReadFilterPending) {
|
287 | this.trace('unfilteredReadMessages.push message of length ' +
|
288 | (messageBytes && messageBytes.length));
|
289 | this.unfilteredReadMessages.push(messageBytes);
|
290 | }
|
291 | else {
|
292 | this.filterReceivedMessage(messageBytes);
|
293 | }
|
294 | }
|
295 | handleTrailers(headers) {
|
296 | this.streamEndWatchers.forEach(watcher => watcher(true));
|
297 | let headersString = '';
|
298 | for (const header of Object.keys(headers)) {
|
299 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
300 | }
|
301 | this.trace('Received server trailers:\n' + headersString);
|
302 | let metadata;
|
303 | try {
|
304 | metadata = metadata_1.Metadata.fromHttp2Headers(headers);
|
305 | }
|
306 | catch (e) {
|
307 | metadata = new metadata_1.Metadata();
|
308 | }
|
309 | const metadataMap = metadata.getMap();
|
310 | let code = this.mappedStatusCode;
|
311 | if (code === constants_1.Status.UNKNOWN &&
|
312 | typeof metadataMap['grpc-status'] === 'string') {
|
313 | const receivedStatus = Number(metadataMap['grpc-status']);
|
314 | if (receivedStatus in constants_1.Status) {
|
315 | code = receivedStatus;
|
316 | this.trace('received status code ' + receivedStatus + ' from server');
|
317 | }
|
318 | metadata.remove('grpc-status');
|
319 | }
|
320 | let details = '';
|
321 | if (typeof metadataMap['grpc-message'] === 'string') {
|
322 | details = decodeURI(metadataMap['grpc-message']);
|
323 | metadata.remove('grpc-message');
|
324 | this.trace('received status details string "' + details + '" from server');
|
325 | }
|
326 | const status = { code, details, metadata };
|
327 |
|
328 | this.endCall(status);
|
329 | }
|
330 | writeMessageToStream(message, callback) {
|
331 | var _a;
|
332 | (_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent();
|
333 | this.http2Stream.write(message, callback);
|
334 | }
|
335 | attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) {
|
336 | this.filterStack.push(extraFilters);
|
337 | if (this.finalStatus !== null) {
|
338 | stream.close(NGHTTP2_CANCEL);
|
339 | }
|
340 | else {
|
341 | this.trace('attachHttp2Stream from subchannel ' + subchannel.getAddress());
|
342 | this.http2Stream = stream;
|
343 | this.subchannel = subchannel;
|
344 | this.callStatsTracker = callStatsTracker;
|
345 | subchannel.addDisconnectListener(this.disconnectListener);
|
346 | subchannel.callRef();
|
347 | stream.on('response', (headers, flags) => {
|
348 | var _a;
|
349 | let headersString = '';
|
350 | for (const header of Object.keys(headers)) {
|
351 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
352 | }
|
353 | this.trace('Received server headers:\n' + headersString);
|
354 | switch (headers[':status']) {
|
355 |
|
356 | case 400:
|
357 | this.mappedStatusCode = constants_1.Status.INTERNAL;
|
358 | break;
|
359 | case 401:
|
360 | this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
|
361 | break;
|
362 | case 403:
|
363 | this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
|
364 | break;
|
365 | case 404:
|
366 | this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
|
367 | break;
|
368 | case 429:
|
369 | case 502:
|
370 | case 503:
|
371 | case 504:
|
372 | this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
|
373 | break;
|
374 | default:
|
375 | this.mappedStatusCode = constants_1.Status.UNKNOWN;
|
376 | }
|
377 | if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
|
378 | this.handleTrailers(headers);
|
379 | }
|
380 | else {
|
381 | let metadata;
|
382 | try {
|
383 | metadata = metadata_1.Metadata.fromHttp2Headers(headers);
|
384 | }
|
385 | catch (error) {
|
386 | this.endCall({
|
387 | code: constants_1.Status.UNKNOWN,
|
388 | details: (0, error_1.getErrorMessage)(error),
|
389 | metadata: new metadata_1.Metadata(),
|
390 | });
|
391 | return;
|
392 | }
|
393 | try {
|
394 | const finalMetadata = this.filterStack.receiveMetadata(metadata);
|
395 | (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMetadata(finalMetadata);
|
396 | }
|
397 | catch (error) {
|
398 | this.endCall({
|
399 | code: constants_1.Status.UNKNOWN,
|
400 | details: (0, error_1.getErrorMessage)(error),
|
401 | metadata: new metadata_1.Metadata(),
|
402 | });
|
403 | }
|
404 | }
|
405 | });
|
406 | stream.on('trailers', this.handleTrailers.bind(this));
|
407 | stream.on('data', (data) => {
|
408 | this.trace('receive HTTP/2 data frame of length ' + data.length);
|
409 | const messages = this.decoder.write(data);
|
410 | for (const message of messages) {
|
411 | this.trace('parsed message of length ' + message.length);
|
412 | this.callStatsTracker.addMessageReceived();
|
413 | this.tryPush(message);
|
414 | }
|
415 | });
|
416 | stream.on('end', () => {
|
417 | this.readsClosed = true;
|
418 | this.maybeOutputStatus();
|
419 | });
|
420 | stream.on('close', () => {
|
421 | |
422 |
|
423 |
|
424 | process.nextTick(() => {
|
425 | var _a;
|
426 | this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
|
427 | |
428 |
|
429 |
|
430 |
|
431 | if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
|
432 | return;
|
433 | }
|
434 | let code;
|
435 | let details = '';
|
436 | switch (stream.rstCode) {
|
437 | case http2.constants.NGHTTP2_NO_ERROR:
|
438 | |
439 |
|
440 |
|
441 | if (this.finalStatus !== null) {
|
442 | return;
|
443 | }
|
444 | code = constants_1.Status.INTERNAL;
|
445 | details = `Received RST_STREAM with code ${stream.rstCode}`;
|
446 | break;
|
447 | case http2.constants.NGHTTP2_REFUSED_STREAM:
|
448 | code = constants_1.Status.UNAVAILABLE;
|
449 | details = 'Stream refused by server';
|
450 | break;
|
451 | case http2.constants.NGHTTP2_CANCEL:
|
452 | code = constants_1.Status.CANCELLED;
|
453 | details = 'Call cancelled';
|
454 | break;
|
455 | case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
|
456 | code = constants_1.Status.RESOURCE_EXHAUSTED;
|
457 | details = 'Bandwidth exhausted or memory limit exceeded';
|
458 | break;
|
459 | case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
|
460 | code = constants_1.Status.PERMISSION_DENIED;
|
461 | details = 'Protocol not secure enough';
|
462 | break;
|
463 | case http2.constants.NGHTTP2_INTERNAL_ERROR:
|
464 | code = constants_1.Status.INTERNAL;
|
465 | if (this.internalError === null) {
|
466 | |
467 |
|
468 |
|
469 |
|
470 |
|
471 | details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
|
472 | }
|
473 | else {
|
474 | if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
|
475 | code = constants_1.Status.UNAVAILABLE;
|
476 | details = this.internalError.message;
|
477 | }
|
478 | else {
|
479 | |
480 |
|
481 |
|
482 |
|
483 | details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
|
484 | }
|
485 | }
|
486 | break;
|
487 | default:
|
488 | code = constants_1.Status.INTERNAL;
|
489 | details = `Received RST_STREAM with code ${stream.rstCode}`;
|
490 | }
|
491 |
|
492 |
|
493 |
|
494 |
|
495 | this.endCall({ code, details, metadata: new metadata_1.Metadata() });
|
496 | });
|
497 | });
|
498 | stream.on('error', (err) => {
|
499 | |
500 |
|
501 |
|
502 | |
503 |
|
504 |
|
505 |
|
506 | if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
|
507 | this.trace('Node error event: message=' +
|
508 | err.message +
|
509 | ' code=' +
|
510 | err.code +
|
511 | ' errno=' +
|
512 | getSystemErrorName(err.errno) +
|
513 | ' syscall=' +
|
514 | err.syscall);
|
515 | this.internalError = err;
|
516 | }
|
517 | this.streamEndWatchers.forEach(watcher => watcher(false));
|
518 | });
|
519 | if (!this.pendingRead) {
|
520 | stream.pause();
|
521 | }
|
522 | if (this.pendingWrite) {
|
523 | if (!this.pendingWriteCallback) {
|
524 | throw new Error('Invalid state in write handling code');
|
525 | }
|
526 | this.trace('sending data chunk of length ' +
|
527 | this.pendingWrite.length +
|
528 | ' (deferred)');
|
529 | try {
|
530 | this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
|
531 | }
|
532 | catch (error) {
|
533 | this.endCall({
|
534 | code: constants_1.Status.UNAVAILABLE,
|
535 | details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`,
|
536 | metadata: new metadata_1.Metadata()
|
537 | });
|
538 | }
|
539 | }
|
540 | this.maybeCloseWrites();
|
541 | }
|
542 | }
|
543 | start(metadata, listener) {
|
544 | this.trace('Sending metadata');
|
545 | this.listener = listener;
|
546 | this.channel._startCallStream(this, metadata);
|
547 | this.maybeOutputStatus();
|
548 | }
|
549 | destroyHttp2Stream() {
|
550 | var _a;
|
551 |
|
552 |
|
553 | if (this.http2Stream !== null && !this.http2Stream.destroyed) {
|
554 | |
555 |
|
556 |
|
557 | let code;
|
558 | if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
|
559 | code = http2.constants.NGHTTP2_NO_ERROR;
|
560 | }
|
561 | else {
|
562 | code = http2.constants.NGHTTP2_CANCEL;
|
563 | }
|
564 | this.trace('close http2 stream with code ' + code);
|
565 | this.http2Stream.close(code);
|
566 | }
|
567 | }
|
568 | cancelWithStatus(status, details) {
|
569 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
570 | this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
|
571 | }
|
572 | getDeadline() {
|
573 | const deadlineList = [this.options.deadline];
|
574 | if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) {
|
575 | deadlineList.push(this.options.parentCall.getDeadline());
|
576 | }
|
577 | if (this.configDeadline) {
|
578 | deadlineList.push(this.configDeadline);
|
579 | }
|
580 | return getMinDeadline(deadlineList);
|
581 | }
|
582 | getCredentials() {
|
583 | return this.credentials;
|
584 | }
|
585 | setCredentials(credentials) {
|
586 | this.credentials = this.channelCallCredentials.compose(credentials);
|
587 | }
|
588 | getStatus() {
|
589 | return this.finalStatus;
|
590 | }
|
591 | getPeer() {
|
592 | var _a, _b;
|
593 | return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget();
|
594 | }
|
595 | getMethod() {
|
596 | return this.methodName;
|
597 | }
|
598 | getHost() {
|
599 | return this.options.host;
|
600 | }
|
601 | setConfigDeadline(configDeadline) {
|
602 | this.configDeadline = configDeadline;
|
603 | }
|
604 | addStatusWatcher(watcher) {
|
605 | this.statusWatchers.push(watcher);
|
606 | }
|
607 | addStreamEndWatcher(watcher) {
|
608 | this.streamEndWatchers.push(watcher);
|
609 | }
|
610 | addFilters(extraFilters) {
|
611 | this.filterStack.push(extraFilters);
|
612 | }
|
613 | getCallNumber() {
|
614 | return this.callNumber;
|
615 | }
|
616 | startRead() {
|
617 | |
618 |
|
619 | if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
|
620 | this.readsClosed = true;
|
621 | this.maybeOutputStatus();
|
622 | return;
|
623 | }
|
624 | this.canPush = true;
|
625 | if (this.http2Stream === null) {
|
626 | this.pendingRead = true;
|
627 | }
|
628 | else {
|
629 | if (this.unpushedReadMessages.length > 0) {
|
630 | const nextMessage = this.unpushedReadMessages.shift();
|
631 | this.push(nextMessage);
|
632 | return;
|
633 | }
|
634 | |
635 |
|
636 | this.http2Stream.resume();
|
637 | }
|
638 | }
|
639 | maybeCloseWrites() {
|
640 | if (this.writesClosed &&
|
641 | !this.isWriteFilterPending &&
|
642 | this.http2Stream !== null) {
|
643 | this.trace('calling end() on HTTP/2 stream');
|
644 | this.http2Stream.end();
|
645 | }
|
646 | }
|
647 | sendMessageWithContext(context, message) {
|
648 | var _a;
|
649 | this.trace('write() called with message of length ' + message.length);
|
650 | const writeObj = {
|
651 | message,
|
652 | flags: context.flags,
|
653 | };
|
654 | const cb = (_a = context.callback) !== null && _a !== void 0 ? _a : (() => { });
|
655 | this.isWriteFilterPending = true;
|
656 | this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
|
657 | this.isWriteFilterPending = false;
|
658 | if (this.http2Stream === null) {
|
659 | this.trace('deferring writing data chunk of length ' + message.message.length);
|
660 | this.pendingWrite = message.message;
|
661 | this.pendingWriteCallback = cb;
|
662 | }
|
663 | else {
|
664 | this.trace('sending data chunk of length ' + message.message.length);
|
665 | try {
|
666 | this.writeMessageToStream(message.message, cb);
|
667 | }
|
668 | catch (error) {
|
669 | this.endCall({
|
670 | code: constants_1.Status.UNAVAILABLE,
|
671 | details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`,
|
672 | metadata: new metadata_1.Metadata()
|
673 | });
|
674 | }
|
675 | this.maybeCloseWrites();
|
676 | }
|
677 | }, this.handleFilterError.bind(this));
|
678 | }
|
679 | halfClose() {
|
680 | this.trace('end() called');
|
681 | this.writesClosed = true;
|
682 | this.maybeCloseWrites();
|
683 | }
|
684 | }
|
685 | exports.Http2CallStream = Http2CallStream;
|
686 |
|
\ | No newline at end of file |