UNPKG

18.9 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import * as os from 'os';
20
21import { Status } from './constants';
22import { Metadata } from './metadata';
23import { StreamDecoder } from './stream-decoder';
24import * as logging from './logging';
25import { LogVerbosity } from './constants';
26import {
27 InterceptingListener,
28 MessageContext,
29 StatusObject,
30 WriteCallback,
31} from './call-interface';
32import { CallEventTracker, Transport } from './transport';
33
34const TRACER_NAME = 'subchannel_call';
35
36/**
37 * https://nodejs.org/api/errors.html#errors_class_systemerror
38 */
39interface SystemError extends Error {
40 address?: string;
41 code: string;
42 dest?: string;
43 errno: number;
44 info?: object;
45 message: string;
46 path?: string;
47 port?: number;
48 syscall: string;
49}
50
51/**
52 * Should do approximately the same thing as util.getSystemErrorName but the
53 * TypeScript types don't have that function for some reason so I just made my
54 * own.
55 * @param errno
56 */
57function getSystemErrorName(errno: number): string {
58 for (const [name, num] of Object.entries(os.constants.errno)) {
59 if (num === errno) {
60 return name;
61 }
62 }
63 return 'Unknown system error ' + errno;
64}
65
66export interface SubchannelCall {
67 cancelWithStatus(status: Status, details: string): void;
68 getPeer(): string;
69 sendMessageWithContext(context: MessageContext, message: Buffer): void;
70 startRead(): void;
71 halfClose(): void;
72 getCallNumber(): number;
73 getDeadlineInfo(): string[];
74}
75
76export interface StatusObjectWithRstCode extends StatusObject {
77 rstCode?: number;
78}
79
80export interface SubchannelCallInterceptingListener
81 extends InterceptingListener {
82 onReceiveStatus(status: StatusObjectWithRstCode): void;
83}
84
85export class Http2SubchannelCall implements SubchannelCall {
86 private decoder = new StreamDecoder();
87
88 private isReadFilterPending = false;
89 private isPushPending = false;
90 private canPush = false;
91 /**
92 * Indicates that an 'end' event has come from the http2 stream, so there
93 * will be no more data events.
94 */
95 private readsClosed = false;
96
97 private statusOutput = false;
98
99 private unpushedReadMessages: Buffer[] = [];
100
101 // Status code mapped from :status. To be used if grpc-status is not received
102 private mappedStatusCode: Status = Status.UNKNOWN;
103
104 // This is populated (non-null) if and only if the call has ended
105 private finalStatus: StatusObject | null = null;
106
107 private internalError: SystemError | null = null;
108
109 private serverEndedCall = false;
110
111 constructor(
112 private readonly http2Stream: http2.ClientHttp2Stream,
113 private readonly callEventTracker: CallEventTracker,
114 private readonly listener: SubchannelCallInterceptingListener,
115 private readonly transport: Transport,
116 private readonly callId: number
117 ) {
118 http2Stream.on('response', (headers, flags) => {
119 let headersString = '';
120 for (const header of Object.keys(headers)) {
121 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
122 }
123 this.trace('Received server headers:\n' + headersString);
124 switch (headers[':status']) {
125 // TODO(murgatroid99): handle 100 and 101
126 case 400:
127 this.mappedStatusCode = Status.INTERNAL;
128 break;
129 case 401:
130 this.mappedStatusCode = Status.UNAUTHENTICATED;
131 break;
132 case 403:
133 this.mappedStatusCode = Status.PERMISSION_DENIED;
134 break;
135 case 404:
136 this.mappedStatusCode = Status.UNIMPLEMENTED;
137 break;
138 case 429:
139 case 502:
140 case 503:
141 case 504:
142 this.mappedStatusCode = Status.UNAVAILABLE;
143 break;
144 default:
145 this.mappedStatusCode = Status.UNKNOWN;
146 }
147
148 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
149 this.handleTrailers(headers);
150 } else {
151 let metadata: Metadata;
152 try {
153 metadata = Metadata.fromHttp2Headers(headers);
154 } catch (error) {
155 this.endCall({
156 code: Status.UNKNOWN,
157 details: (error as Error).message,
158 metadata: new Metadata(),
159 });
160 return;
161 }
162 this.listener.onReceiveMetadata(metadata);
163 }
164 });
165 http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
166 this.handleTrailers(headers);
167 });
168 http2Stream.on('data', (data: Buffer) => {
169 /* If the status has already been output, allow the http2 stream to
170 * drain without processing the data. */
171 if (this.statusOutput) {
172 return;
173 }
174 this.trace('receive HTTP/2 data frame of length ' + data.length);
175 const messages = this.decoder.write(data);
176
177 for (const message of messages) {
178 this.trace('parsed message of length ' + message.length);
179 this.callEventTracker!.addMessageReceived();
180 this.tryPush(message);
181 }
182 });
183 http2Stream.on('end', () => {
184 this.readsClosed = true;
185 this.maybeOutputStatus();
186 });
187 http2Stream.on('close', () => {
188 this.serverEndedCall = true;
189 /* Use process.next tick to ensure that this code happens after any
190 * "error" event that may be emitted at about the same time, so that
191 * we can bubble up the error message from that event. */
192 process.nextTick(() => {
193 this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
194 /* If we have a final status with an OK status code, that means that
195 * we have received all of the messages and we have processed the
196 * trailers and the call completed successfully, so it doesn't matter
197 * how the stream ends after that */
198 if (this.finalStatus?.code === Status.OK) {
199 return;
200 }
201 let code: Status;
202 let details = '';
203 switch (http2Stream.rstCode) {
204 case http2.constants.NGHTTP2_NO_ERROR:
205 /* If we get a NO_ERROR code and we already have a status, the
206 * stream completed properly and we just haven't fully processed
207 * it yet */
208 if (this.finalStatus !== null) {
209 return;
210 }
211 code = Status.INTERNAL;
212 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
213 break;
214 case http2.constants.NGHTTP2_REFUSED_STREAM:
215 code = Status.UNAVAILABLE;
216 details = 'Stream refused by server';
217 break;
218 case http2.constants.NGHTTP2_CANCEL:
219 code = Status.CANCELLED;
220 details = 'Call cancelled';
221 break;
222 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
223 code = Status.RESOURCE_EXHAUSTED;
224 details = 'Bandwidth exhausted or memory limit exceeded';
225 break;
226 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
227 code = Status.PERMISSION_DENIED;
228 details = 'Protocol not secure enough';
229 break;
230 case http2.constants.NGHTTP2_INTERNAL_ERROR:
231 code = Status.INTERNAL;
232 if (this.internalError === null) {
233 /* This error code was previously handled in the default case, and
234 * there are several instances of it online, so I wanted to
235 * preserve the original error message so that people find existing
236 * information in searches, but also include the more recognizable
237 * "Internal server error" message. */
238 details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
239 } else {
240 if (
241 this.internalError.code === 'ECONNRESET' ||
242 this.internalError.code === 'ETIMEDOUT'
243 ) {
244 code = Status.UNAVAILABLE;
245 details = this.internalError.message;
246 } else {
247 /* The "Received RST_STREAM with code ..." error is preserved
248 * here for continuity with errors reported online, but the
249 * error message at the end will probably be more relevant in
250 * most cases. */
251 details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
252 }
253 }
254 break;
255 default:
256 code = Status.INTERNAL;
257 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
258 }
259 // This is a no-op if trailers were received at all.
260 // This is OK, because status codes emitted here correspond to more
261 // catastrophic issues that prevent us from receiving trailers in the
262 // first place.
263 this.endCall({
264 code,
265 details,
266 metadata: new Metadata(),
267 rstCode: http2Stream.rstCode,
268 });
269 });
270 });
271 http2Stream.on('error', (err: SystemError) => {
272 /* We need an error handler here to stop "Uncaught Error" exceptions
273 * from bubbling up. However, errors here should all correspond to
274 * "close" events, where we will handle the error more granularly */
275 /* Specifically looking for stream errors that were *not* constructed
276 * from a RST_STREAM response here:
277 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
278 */
279 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
280 this.trace(
281 'Node error event: message=' +
282 err.message +
283 ' code=' +
284 err.code +
285 ' errno=' +
286 getSystemErrorName(err.errno) +
287 ' syscall=' +
288 err.syscall
289 );
290 this.internalError = err;
291 }
292 this.callEventTracker.onStreamEnd(false);
293 });
294 }
295 getDeadlineInfo(): string[] {
296 return [`remote_addr=${this.getPeer()}`];
297 }
298
299 public onDisconnect() {
300 this.endCall({
301 code: Status.UNAVAILABLE,
302 details: 'Connection dropped',
303 metadata: new Metadata(),
304 });
305 }
306
307 private outputStatus() {
308 /* Precondition: this.finalStatus !== null */
309 if (!this.statusOutput) {
310 this.statusOutput = true;
311 this.trace(
312 'ended with status: code=' +
313 this.finalStatus!.code +
314 ' details="' +
315 this.finalStatus!.details +
316 '"'
317 );
318 this.callEventTracker.onCallEnd(this.finalStatus!);
319 /* We delay the actual action of bubbling up the status to insulate the
320 * cleanup code in this class from any errors that may be thrown in the
321 * upper layers as a result of bubbling up the status. In particular,
322 * if the status is not OK, the "error" event may be emitted
323 * synchronously at the top level, which will result in a thrown error if
324 * the user does not handle that event. */
325 process.nextTick(() => {
326 this.listener.onReceiveStatus(this.finalStatus!);
327 });
328 /* Leave the http2 stream in flowing state to drain incoming messages, to
329 * ensure that the stream closure completes. The call stream already does
330 * not push more messages after the status is output, so the messages go
331 * nowhere either way. */
332 this.http2Stream.resume();
333 }
334 }
335
336 private trace(text: string): void {
337 logging.trace(
338 LogVerbosity.DEBUG,
339 TRACER_NAME,
340 '[' + this.callId + '] ' + text
341 );
342 }
343
344 /**
345 * On first call, emits a 'status' event with the given StatusObject.
346 * Subsequent calls are no-ops.
347 * @param status The status of the call.
348 */
349 private endCall(status: StatusObjectWithRstCode): void {
350 /* If the status is OK and a new status comes in (e.g. from a
351 * deserialization failure), that new status takes priority */
352 if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
353 this.finalStatus = status;
354 this.maybeOutputStatus();
355 }
356 this.destroyHttp2Stream();
357 }
358
359 private maybeOutputStatus() {
360 if (this.finalStatus !== null) {
361 /* The combination check of readsClosed and that the two message buffer
362 * arrays are empty checks that there all incoming data has been fully
363 * processed */
364 if (
365 this.finalStatus.code !== Status.OK ||
366 (this.readsClosed &&
367 this.unpushedReadMessages.length === 0 &&
368 !this.isReadFilterPending &&
369 !this.isPushPending)
370 ) {
371 this.outputStatus();
372 }
373 }
374 }
375
376 private push(message: Buffer): void {
377 this.trace(
378 'pushing to reader message of length ' +
379 (message instanceof Buffer ? message.length : null)
380 );
381 this.canPush = false;
382 this.isPushPending = true;
383 process.nextTick(() => {
384 this.isPushPending = false;
385 /* If we have already output the status any later messages should be
386 * ignored, and can cause out-of-order operation errors higher up in the
387 * stack. Checking as late as possible here to avoid any race conditions.
388 */
389 if (this.statusOutput) {
390 return;
391 }
392 this.listener.onReceiveMessage(message);
393 this.maybeOutputStatus();
394 });
395 }
396
397 private tryPush(messageBytes: Buffer): void {
398 if (this.canPush) {
399 this.http2Stream!.pause();
400 this.push(messageBytes);
401 } else {
402 this.trace(
403 'unpushedReadMessages.push message of length ' + messageBytes.length
404 );
405 this.unpushedReadMessages.push(messageBytes);
406 }
407 }
408
409 private handleTrailers(headers: http2.IncomingHttpHeaders) {
410 this.serverEndedCall = true;
411 this.callEventTracker.onStreamEnd(true);
412 let headersString = '';
413 for (const header of Object.keys(headers)) {
414 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
415 }
416 this.trace('Received server trailers:\n' + headersString);
417 let metadata: Metadata;
418 try {
419 metadata = Metadata.fromHttp2Headers(headers);
420 } catch (e) {
421 metadata = new Metadata();
422 }
423 const metadataMap = metadata.getMap();
424 let code: Status = this.mappedStatusCode;
425 if (
426 code === Status.UNKNOWN &&
427 typeof metadataMap['grpc-status'] === 'string'
428 ) {
429 const receivedStatus = Number(metadataMap['grpc-status']);
430 if (receivedStatus in Status) {
431 code = receivedStatus;
432 this.trace('received status code ' + receivedStatus + ' from server');
433 }
434 metadata.remove('grpc-status');
435 }
436 let details = '';
437 if (typeof metadataMap['grpc-message'] === 'string') {
438 try {
439 details = decodeURI(metadataMap['grpc-message']);
440 } catch (e) {
441 details = metadataMap['grpc-message'];
442 }
443 metadata.remove('grpc-message');
444 this.trace(
445 'received status details string "' + details + '" from server'
446 );
447 }
448 const status: StatusObject = { code, details, metadata };
449 // This is a no-op if the call was already ended when handling headers.
450 this.endCall(status);
451 }
452
453 private destroyHttp2Stream() {
454 // The http2 stream could already have been destroyed if cancelWithStatus
455 // is called in response to an internal http2 error.
456 if (this.http2Stream.destroyed) {
457 return;
458 }
459 /* If the server ended the call, sending an RST_STREAM is redundant, so we
460 * just half close on the client side instead to finish closing the stream.
461 */
462 if (this.serverEndedCall) {
463 this.http2Stream.end();
464 } else {
465 /* If the call has ended with an OK status, communicate that when closing
466 * the stream, partly to avoid a situation in which we detect an error
467 * RST_STREAM as a result after we have the status */
468 let code: number;
469 if (this.finalStatus?.code === Status.OK) {
470 code = http2.constants.NGHTTP2_NO_ERROR;
471 } else {
472 code = http2.constants.NGHTTP2_CANCEL;
473 }
474 this.trace('close http2 stream with code ' + code);
475 this.http2Stream.close(code);
476 }
477 }
478
479 cancelWithStatus(status: Status, details: string): void {
480 this.trace(
481 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
482 );
483 this.endCall({ code: status, details, metadata: new Metadata() });
484 }
485
486 getStatus(): StatusObject | null {
487 return this.finalStatus;
488 }
489
490 getPeer(): string {
491 return this.transport.getPeerName();
492 }
493
494 getCallNumber(): number {
495 return this.callId;
496 }
497
498 startRead() {
499 /* If the stream has ended with an error, we should not emit any more
500 * messages and we should communicate that the stream has ended */
501 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
502 this.readsClosed = true;
503 this.maybeOutputStatus();
504 return;
505 }
506 this.canPush = true;
507 if (this.unpushedReadMessages.length > 0) {
508 const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
509 this.push(nextMessage);
510 return;
511 }
512 /* Only resume reading from the http2Stream if we don't have any pending
513 * messages to emit */
514 this.http2Stream.resume();
515 }
516
517 sendMessageWithContext(context: MessageContext, message: Buffer) {
518 this.trace('write() called with message of length ' + message.length);
519 const cb: WriteCallback = (error?: Error | null) => {
520 /* nextTick here ensures that no stream action can be taken in the call
521 * stack of the write callback, in order to hopefully work around
522 * https://github.com/nodejs/node/issues/49147 */
523 process.nextTick(() => {
524 let code: Status = Status.UNAVAILABLE;
525 if (
526 (error as NodeJS.ErrnoException)?.code ===
527 'ERR_STREAM_WRITE_AFTER_END'
528 ) {
529 code = Status.INTERNAL;
530 }
531 if (error) {
532 this.cancelWithStatus(code, `Write error: ${error.message}`);
533 }
534 context.callback?.();
535 });
536 };
537 this.trace('sending data chunk of length ' + message.length);
538 this.callEventTracker.addMessageSent();
539 try {
540 this.http2Stream!.write(message, cb);
541 } catch (error) {
542 this.endCall({
543 code: Status.UNAVAILABLE,
544 details: `Write failed with error ${(error as Error).message}`,
545 metadata: new Metadata(),
546 });
547 }
548 }
549
550 halfClose() {
551 this.trace('end() called');
552 this.trace('calling end() on HTTP/2 stream');
553 this.http2Stream.end();
554 }
555}