UNPKG

18.4 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import * as os from 'os';
20
21import { Status } from './constants';
22import { Metadata } from './metadata';
23import { StreamDecoder } from './stream-decoder';
24import * as logging from './logging';
25import { LogVerbosity } from './constants';
26import {
27 InterceptingListener,
28 MessageContext,
29 StatusObject,
30 WriteCallback,
31} from './call-interface';
32import { CallEventTracker, Transport } from './transport';
33
34const TRACER_NAME = 'subchannel_call';
35
36/**
37 * https://nodejs.org/api/errors.html#errors_class_systemerror
38 */
39interface SystemError extends Error {
40 address?: string;
41 code: string;
42 dest?: string;
43 errno: number;
44 info?: object;
45 message: string;
46 path?: string;
47 port?: number;
48 syscall: string;
49}
50
51/**
52 * Should do approximately the same thing as util.getSystemErrorName but the
53 * TypeScript types don't have that function for some reason so I just made my
54 * own.
55 * @param errno
56 */
57function getSystemErrorName(errno: number): string {
58 for (const [name, num] of Object.entries(os.constants.errno)) {
59 if (num === errno) {
60 return name;
61 }
62 }
63 return 'Unknown system error ' + errno;
64}
65
66export interface SubchannelCall {
67 cancelWithStatus(status: Status, details: string): void;
68 getPeer(): string;
69 sendMessageWithContext(context: MessageContext, message: Buffer): void;
70 startRead(): void;
71 halfClose(): void;
72 getCallNumber(): number;
73}
74
75export interface StatusObjectWithRstCode extends StatusObject {
76 rstCode?: number;
77}
78
79export interface SubchannelCallInterceptingListener
80 extends InterceptingListener {
81 onReceiveStatus(status: StatusObjectWithRstCode): void;
82}
83
84export class Http2SubchannelCall implements SubchannelCall {
85 private decoder = new StreamDecoder();
86
87 private isReadFilterPending = false;
88 private isPushPending = false;
89 private canPush = false;
90 /**
91 * Indicates that an 'end' event has come from the http2 stream, so there
92 * will be no more data events.
93 */
94 private readsClosed = false;
95
96 private statusOutput = false;
97
98 private unpushedReadMessages: Buffer[] = [];
99
100 // Status code mapped from :status. To be used if grpc-status is not received
101 private mappedStatusCode: Status = Status.UNKNOWN;
102
103 // This is populated (non-null) if and only if the call has ended
104 private finalStatus: StatusObject | null = null;
105
106 private internalError: SystemError | null = null;
107
108 constructor(
109 private readonly http2Stream: http2.ClientHttp2Stream,
110 private readonly callEventTracker: CallEventTracker,
111 private readonly listener: SubchannelCallInterceptingListener,
112 private readonly transport: Transport,
113 private readonly callId: number
114 ) {
115 http2Stream.on('response', (headers, flags) => {
116 let headersString = '';
117 for (const header of Object.keys(headers)) {
118 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
119 }
120 this.trace('Received server headers:\n' + headersString);
121 switch (headers[':status']) {
122 // TODO(murgatroid99): handle 100 and 101
123 case 400:
124 this.mappedStatusCode = Status.INTERNAL;
125 break;
126 case 401:
127 this.mappedStatusCode = Status.UNAUTHENTICATED;
128 break;
129 case 403:
130 this.mappedStatusCode = Status.PERMISSION_DENIED;
131 break;
132 case 404:
133 this.mappedStatusCode = Status.UNIMPLEMENTED;
134 break;
135 case 429:
136 case 502:
137 case 503:
138 case 504:
139 this.mappedStatusCode = Status.UNAVAILABLE;
140 break;
141 default:
142 this.mappedStatusCode = Status.UNKNOWN;
143 }
144
145 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
146 this.handleTrailers(headers);
147 } else {
148 let metadata: Metadata;
149 try {
150 metadata = Metadata.fromHttp2Headers(headers);
151 } catch (error) {
152 this.endCall({
153 code: Status.UNKNOWN,
154 details: (error as Error).message,
155 metadata: new Metadata(),
156 });
157 return;
158 }
159 this.listener.onReceiveMetadata(metadata);
160 }
161 });
162 http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
163 this.handleTrailers(headers);
164 });
165 http2Stream.on('data', (data: Buffer) => {
166 /* If the status has already been output, allow the http2 stream to
167 * drain without processing the data. */
168 if (this.statusOutput) {
169 return;
170 }
171 this.trace('receive HTTP/2 data frame of length ' + data.length);
172 const messages = this.decoder.write(data);
173
174 for (const message of messages) {
175 this.trace('parsed message of length ' + message.length);
176 this.callEventTracker!.addMessageReceived();
177 this.tryPush(message);
178 }
179 });
180 http2Stream.on('end', () => {
181 this.readsClosed = true;
182 this.maybeOutputStatus();
183 });
184 http2Stream.on('close', () => {
185 /* Use process.next tick to ensure that this code happens after any
186 * "error" event that may be emitted at about the same time, so that
187 * we can bubble up the error message from that event. */
188 process.nextTick(() => {
189 this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
190 /* If we have a final status with an OK status code, that means that
191 * we have received all of the messages and we have processed the
192 * trailers and the call completed successfully, so it doesn't matter
193 * how the stream ends after that */
194 if (this.finalStatus?.code === Status.OK) {
195 return;
196 }
197 let code: Status;
198 let details = '';
199 switch (http2Stream.rstCode) {
200 case http2.constants.NGHTTP2_NO_ERROR:
201 /* If we get a NO_ERROR code and we already have a status, the
202 * stream completed properly and we just haven't fully processed
203 * it yet */
204 if (this.finalStatus !== null) {
205 return;
206 }
207 code = Status.INTERNAL;
208 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
209 break;
210 case http2.constants.NGHTTP2_REFUSED_STREAM:
211 code = Status.UNAVAILABLE;
212 details = 'Stream refused by server';
213 break;
214 case http2.constants.NGHTTP2_CANCEL:
215 code = Status.CANCELLED;
216 details = 'Call cancelled';
217 break;
218 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
219 code = Status.RESOURCE_EXHAUSTED;
220 details = 'Bandwidth exhausted or memory limit exceeded';
221 break;
222 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
223 code = Status.PERMISSION_DENIED;
224 details = 'Protocol not secure enough';
225 break;
226 case http2.constants.NGHTTP2_INTERNAL_ERROR:
227 code = Status.INTERNAL;
228 if (this.internalError === null) {
229 /* This error code was previously handled in the default case, and
230 * there are several instances of it online, so I wanted to
231 * preserve the original error message so that people find existing
232 * information in searches, but also include the more recognizable
233 * "Internal server error" message. */
234 details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
235 } else {
236 if (
237 this.internalError.code === 'ECONNRESET' ||
238 this.internalError.code === 'ETIMEDOUT'
239 ) {
240 code = Status.UNAVAILABLE;
241 details = this.internalError.message;
242 } else {
243 /* The "Received RST_STREAM with code ..." error is preserved
244 * here for continuity with errors reported online, but the
245 * error message at the end will probably be more relevant in
246 * most cases. */
247 details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
248 }
249 }
250 break;
251 default:
252 code = Status.INTERNAL;
253 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
254 }
255 // This is a no-op if trailers were received at all.
256 // This is OK, because status codes emitted here correspond to more
257 // catastrophic issues that prevent us from receiving trailers in the
258 // first place.
259 this.endCall({
260 code,
261 details,
262 metadata: new Metadata(),
263 rstCode: http2Stream.rstCode,
264 });
265 });
266 });
267 http2Stream.on('error', (err: SystemError) => {
268 /* We need an error handler here to stop "Uncaught Error" exceptions
269 * from bubbling up. However, errors here should all correspond to
270 * "close" events, where we will handle the error more granularly */
271 /* Specifically looking for stream errors that were *not* constructed
272 * from a RST_STREAM response here:
273 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
274 */
275 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
276 this.trace(
277 'Node error event: message=' +
278 err.message +
279 ' code=' +
280 err.code +
281 ' errno=' +
282 getSystemErrorName(err.errno) +
283 ' syscall=' +
284 err.syscall
285 );
286 this.internalError = err;
287 }
288 this.callEventTracker.onStreamEnd(false);
289 });
290 }
291
292 public onDisconnect() {
293 this.endCall({
294 code: Status.UNAVAILABLE,
295 details: 'Connection dropped',
296 metadata: new Metadata(),
297 });
298 }
299
300 private outputStatus() {
301 /* Precondition: this.finalStatus !== null */
302 if (!this.statusOutput) {
303 this.statusOutput = true;
304 this.trace(
305 'ended with status: code=' +
306 this.finalStatus!.code +
307 ' details="' +
308 this.finalStatus!.details +
309 '"'
310 );
311 this.callEventTracker.onCallEnd(this.finalStatus!);
312 /* We delay the actual action of bubbling up the status to insulate the
313 * cleanup code in this class from any errors that may be thrown in the
314 * upper layers as a result of bubbling up the status. In particular,
315 * if the status is not OK, the "error" event may be emitted
316 * synchronously at the top level, which will result in a thrown error if
317 * the user does not handle that event. */
318 process.nextTick(() => {
319 this.listener.onReceiveStatus(this.finalStatus!);
320 });
321 /* Leave the http2 stream in flowing state to drain incoming messages, to
322 * ensure that the stream closure completes. The call stream already does
323 * not push more messages after the status is output, so the messages go
324 * nowhere either way. */
325 this.http2Stream.resume();
326 }
327 }
328
329 private trace(text: string): void {
330 logging.trace(
331 LogVerbosity.DEBUG,
332 TRACER_NAME,
333 '[' + this.callId + '] ' + text
334 );
335 }
336
337 /**
338 * On first call, emits a 'status' event with the given StatusObject.
339 * Subsequent calls are no-ops.
340 * @param status The status of the call.
341 */
342 private endCall(status: StatusObjectWithRstCode): void {
343 /* If the status is OK and a new status comes in (e.g. from a
344 * deserialization failure), that new status takes priority */
345 if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
346 this.finalStatus = status;
347 this.maybeOutputStatus();
348 }
349 this.destroyHttp2Stream();
350 }
351
352 private maybeOutputStatus() {
353 if (this.finalStatus !== null) {
354 /* The combination check of readsClosed and that the two message buffer
355 * arrays are empty checks that there all incoming data has been fully
356 * processed */
357 if (
358 this.finalStatus.code !== Status.OK ||
359 (this.readsClosed &&
360 this.unpushedReadMessages.length === 0 &&
361 !this.isReadFilterPending &&
362 !this.isPushPending)
363 ) {
364 this.outputStatus();
365 }
366 }
367 }
368
369 private push(message: Buffer): void {
370 this.trace(
371 'pushing to reader message of length ' +
372 (message instanceof Buffer ? message.length : null)
373 );
374 this.canPush = false;
375 this.isPushPending = true;
376 process.nextTick(() => {
377 this.isPushPending = false;
378 /* If we have already output the status any later messages should be
379 * ignored, and can cause out-of-order operation errors higher up in the
380 * stack. Checking as late as possible here to avoid any race conditions.
381 */
382 if (this.statusOutput) {
383 return;
384 }
385 this.listener.onReceiveMessage(message);
386 this.maybeOutputStatus();
387 });
388 }
389
390 private tryPush(messageBytes: Buffer): void {
391 if (this.canPush) {
392 this.http2Stream!.pause();
393 this.push(messageBytes);
394 } else {
395 this.trace(
396 'unpushedReadMessages.push message of length ' + messageBytes.length
397 );
398 this.unpushedReadMessages.push(messageBytes);
399 }
400 }
401
402 private handleTrailers(headers: http2.IncomingHttpHeaders) {
403 this.callEventTracker.onStreamEnd(true);
404 let headersString = '';
405 for (const header of Object.keys(headers)) {
406 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
407 }
408 this.trace('Received server trailers:\n' + headersString);
409 let metadata: Metadata;
410 try {
411 metadata = Metadata.fromHttp2Headers(headers);
412 } catch (e) {
413 metadata = new Metadata();
414 }
415 const metadataMap = metadata.getMap();
416 let code: Status = this.mappedStatusCode;
417 if (
418 code === Status.UNKNOWN &&
419 typeof metadataMap['grpc-status'] === 'string'
420 ) {
421 const receivedStatus = Number(metadataMap['grpc-status']);
422 if (receivedStatus in Status) {
423 code = receivedStatus;
424 this.trace('received status code ' + receivedStatus + ' from server');
425 }
426 metadata.remove('grpc-status');
427 }
428 let details = '';
429 if (typeof metadataMap['grpc-message'] === 'string') {
430 try {
431 details = decodeURI(metadataMap['grpc-message']);
432 } catch (e) {
433 details = metadataMap['grpc-message'];
434 }
435 metadata.remove('grpc-message');
436 this.trace(
437 'received status details string "' + details + '" from server'
438 );
439 }
440 const status: StatusObject = { code, details, metadata };
441 // This is a no-op if the call was already ended when handling headers.
442 this.endCall(status);
443 }
444
445 private destroyHttp2Stream() {
446 // The http2 stream could already have been destroyed if cancelWithStatus
447 // is called in response to an internal http2 error.
448 if (!this.http2Stream.destroyed) {
449 /* If the call has ended with an OK status, communicate that when closing
450 * the stream, partly to avoid a situation in which we detect an error
451 * RST_STREAM as a result after we have the status */
452 let code: number;
453 if (this.finalStatus?.code === Status.OK) {
454 code = http2.constants.NGHTTP2_NO_ERROR;
455 } else {
456 code = http2.constants.NGHTTP2_CANCEL;
457 }
458 this.trace('close http2 stream with code ' + code);
459 this.http2Stream.close(code);
460 }
461 }
462
463 cancelWithStatus(status: Status, details: string): void {
464 this.trace(
465 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
466 );
467 this.endCall({ code: status, details, metadata: new Metadata() });
468 }
469
470 getStatus(): StatusObject | null {
471 return this.finalStatus;
472 }
473
474 getPeer(): string {
475 return this.transport.getPeerName();
476 }
477
478 getCallNumber(): number {
479 return this.callId;
480 }
481
482 startRead() {
483 /* If the stream has ended with an error, we should not emit any more
484 * messages and we should communicate that the stream has ended */
485 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
486 this.readsClosed = true;
487 this.maybeOutputStatus();
488 return;
489 }
490 this.canPush = true;
491 if (this.unpushedReadMessages.length > 0) {
492 const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
493 this.push(nextMessage);
494 return;
495 }
496 /* Only resume reading from the http2Stream if we don't have any pending
497 * messages to emit */
498 this.http2Stream.resume();
499 }
500
501 sendMessageWithContext(context: MessageContext, message: Buffer) {
502 this.trace('write() called with message of length ' + message.length);
503 const cb: WriteCallback = (error?: Error | null) => {
504 /* nextTick here ensures that no stream action can be taken in the call
505 * stack of the write callback, in order to hopefully work around
506 * https://github.com/nodejs/node/issues/49147 */
507 process.nextTick(() => {
508 let code: Status = Status.UNAVAILABLE;
509 if (
510 (error as NodeJS.ErrnoException)?.code ===
511 'ERR_STREAM_WRITE_AFTER_END'
512 ) {
513 code = Status.INTERNAL;
514 }
515 if (error) {
516 this.cancelWithStatus(code, `Write error: ${error.message}`);
517 }
518 context.callback?.();
519 });
520 };
521 this.trace('sending data chunk of length ' + message.length);
522 this.callEventTracker.addMessageSent();
523 try {
524 this.http2Stream!.write(message, cb);
525 } catch (error) {
526 this.endCall({
527 code: Status.UNAVAILABLE,
528 details: `Write failed with error ${(error as Error).message}`,
529 metadata: new Metadata(),
530 });
531 }
532 }
533
534 halfClose() {
535 this.trace('end() called');
536 this.trace('calling end() on HTTP/2 stream');
537 this.http2Stream.end();
538 }
539}