UNPKG

18.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import * as http2 from 'http2';
19import * as os from 'os';
20
21import { Status } from './constants';
22import { Metadata } from './metadata';
23import { StreamDecoder } from './stream-decoder';
24import * as logging from './logging';
25import { LogVerbosity } from './constants';
26import { ServerSurfaceCall } from './server-call';
27import { Deadline } from './deadline';
28import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface';
29import { CallEventTracker, Transport } from './transport';
30
31const TRACER_NAME = 'subchannel_call';
32
33const {
34 HTTP2_HEADER_STATUS,
35 HTTP2_HEADER_CONTENT_TYPE,
36 NGHTTP2_CANCEL,
37} = http2.constants;
38
39/**
40 * https://nodejs.org/api/errors.html#errors_class_systemerror
41 */
42interface SystemError extends Error {
43 address?: string;
44 code: string;
45 dest?: string;
46 errno: number;
47 info?: object;
48 message: string;
49 path?: string;
50 port?: number;
51 syscall: string;
52}
53
54/**
55 * Should do approximately the same thing as util.getSystemErrorName but the
56 * TypeScript types don't have that function for some reason so I just made my
57 * own.
58 * @param errno
59 */
60function getSystemErrorName(errno: number): string {
61 for (const [name, num] of Object.entries(os.constants.errno)) {
62 if (num === errno) {
63 return name;
64 }
65 }
66 return 'Unknown system error ' + errno;
67}
68
69export interface SubchannelCall {
70 cancelWithStatus(status: Status, details: string): void;
71 getPeer(): string;
72 sendMessageWithContext(context: MessageContext, message: Buffer): void;
73 startRead(): void;
74 halfClose(): void;
75 getCallNumber(): number;
76}
77
78export interface StatusObjectWithRstCode extends StatusObject {
79 rstCode?: number;
80}
81
82export interface SubchannelCallInterceptingListener extends InterceptingListener {
83 onReceiveStatus(status: StatusObjectWithRstCode): void;
84}
85
86export class Http2SubchannelCall implements SubchannelCall {
87 private decoder = new StreamDecoder();
88
89 private isReadFilterPending = false;
90 private isPushPending = false;
91 private canPush = false;
92 /**
93 * Indicates that an 'end' event has come from the http2 stream, so there
94 * will be no more data events.
95 */
96 private readsClosed = false;
97
98 private statusOutput = false;
99
100 private unpushedReadMessages: Buffer[] = [];
101
102 // Status code mapped from :status. To be used if grpc-status is not received
103 private mappedStatusCode: Status = Status.UNKNOWN;
104
105 // This is populated (non-null) if and only if the call has ended
106 private finalStatus: StatusObject | null = null;
107
108 private internalError: SystemError | null = null;
109
110 constructor(
111 private readonly http2Stream: http2.ClientHttp2Stream,
112 private readonly callEventTracker: CallEventTracker,
113 private readonly listener: SubchannelCallInterceptingListener,
114 private readonly transport: Transport,
115 private readonly callId: number
116 ) {
117 http2Stream.on('response', (headers, flags) => {
118 let headersString = '';
119 for (const header of Object.keys(headers)) {
120 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
121 }
122 this.trace('Received server headers:\n' + headersString);
123 switch (headers[':status']) {
124 // TODO(murgatroid99): handle 100 and 101
125 case 400:
126 this.mappedStatusCode = Status.INTERNAL;
127 break;
128 case 401:
129 this.mappedStatusCode = Status.UNAUTHENTICATED;
130 break;
131 case 403:
132 this.mappedStatusCode = Status.PERMISSION_DENIED;
133 break;
134 case 404:
135 this.mappedStatusCode = Status.UNIMPLEMENTED;
136 break;
137 case 429:
138 case 502:
139 case 503:
140 case 504:
141 this.mappedStatusCode = Status.UNAVAILABLE;
142 break;
143 default:
144 this.mappedStatusCode = Status.UNKNOWN;
145 }
146
147 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
148 this.handleTrailers(headers);
149 } else {
150 let metadata: Metadata;
151 try {
152 metadata = Metadata.fromHttp2Headers(headers);
153 } catch (error) {
154 this.endCall({
155 code: Status.UNKNOWN,
156 details: (error as Error).message,
157 metadata: new Metadata(),
158 });
159 return;
160 }
161 this.listener.onReceiveMetadata(metadata);
162 }
163 });
164 http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
165 this.handleTrailers(headers);
166 });
167 http2Stream.on('data', (data: Buffer) => {
168 /* If the status has already been output, allow the http2 stream to
169 * drain without processing the data. */
170 if (this.statusOutput) {
171 return;
172 }
173 this.trace('receive HTTP/2 data frame of length ' + data.length);
174 const messages = this.decoder.write(data);
175
176 for (const message of messages) {
177 this.trace('parsed message of length ' + message.length);
178 this.callEventTracker!.addMessageReceived();
179 this.tryPush(message);
180 }
181 });
182 http2Stream.on('end', () => {
183 this.readsClosed = true;
184 this.maybeOutputStatus();
185 });
186 http2Stream.on('close', () => {
187 /* Use process.next tick to ensure that this code happens after any
188 * "error" event that may be emitted at about the same time, so that
189 * we can bubble up the error message from that event. */
190 process.nextTick(() => {
191 this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
192 /* If we have a final status with an OK status code, that means that
193 * we have received all of the messages and we have processed the
194 * trailers and the call completed successfully, so it doesn't matter
195 * how the stream ends after that */
196 if (this.finalStatus?.code === Status.OK) {
197 return;
198 }
199 let code: Status;
200 let details = '';
201 switch (http2Stream.rstCode) {
202 case http2.constants.NGHTTP2_NO_ERROR:
203 /* If we get a NO_ERROR code and we already have a status, the
204 * stream completed properly and we just haven't fully processed
205 * it yet */
206 if (this.finalStatus !== null) {
207 return;
208 }
209 code = Status.INTERNAL;
210 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
211 break;
212 case http2.constants.NGHTTP2_REFUSED_STREAM:
213 code = Status.UNAVAILABLE;
214 details = 'Stream refused by server';
215 break;
216 case http2.constants.NGHTTP2_CANCEL:
217 code = Status.CANCELLED;
218 details = 'Call cancelled';
219 break;
220 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
221 code = Status.RESOURCE_EXHAUSTED;
222 details = 'Bandwidth exhausted or memory limit exceeded';
223 break;
224 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
225 code = Status.PERMISSION_DENIED;
226 details = 'Protocol not secure enough';
227 break;
228 case http2.constants.NGHTTP2_INTERNAL_ERROR:
229 code = Status.INTERNAL;
230 if (this.internalError === null) {
231 /* This error code was previously handled in the default case, and
232 * there are several instances of it online, so I wanted to
233 * preserve the original error message so that people find existing
234 * information in searches, but also include the more recognizable
235 * "Internal server error" message. */
236 details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
237 } else {
238 if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
239 code = Status.UNAVAILABLE;
240 details = this.internalError.message;
241 } else {
242 /* The "Received RST_STREAM with code ..." error is preserved
243 * here for continuity with errors reported online, but the
244 * error message at the end will probably be more relevant in
245 * most cases. */
246 details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
247 }
248 }
249 break;
250 default:
251 code = Status.INTERNAL;
252 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
253 }
254 // This is a no-op if trailers were received at all.
255 // This is OK, because status codes emitted here correspond to more
256 // catastrophic issues that prevent us from receiving trailers in the
257 // first place.
258 this.endCall({ code, details, metadata: new Metadata(), rstCode: http2Stream.rstCode });
259 });
260 });
261 http2Stream.on('error', (err: SystemError) => {
262 /* We need an error handler here to stop "Uncaught Error" exceptions
263 * from bubbling up. However, errors here should all correspond to
264 * "close" events, where we will handle the error more granularly */
265 /* Specifically looking for stream errors that were *not* constructed
266 * from a RST_STREAM response here:
267 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
268 */
269 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
270 this.trace(
271 'Node error event: message=' +
272 err.message +
273 ' code=' +
274 err.code +
275 ' errno=' +
276 getSystemErrorName(err.errno) +
277 ' syscall=' +
278 err.syscall
279 );
280 this.internalError = err;
281 }
282 this.callEventTracker.onStreamEnd(false);
283 });
284 }
285
286 public onDisconnect() {
287 this.endCall({
288 code: Status.UNAVAILABLE,
289 details: 'Connection dropped',
290 metadata: new Metadata(),
291 });
292 }
293
294 private outputStatus() {
295 /* Precondition: this.finalStatus !== null */
296 if (!this.statusOutput) {
297 this.statusOutput = true;
298 this.trace(
299 'ended with status: code=' +
300 this.finalStatus!.code +
301 ' details="' +
302 this.finalStatus!.details +
303 '"'
304 );
305 this.callEventTracker.onCallEnd(this.finalStatus!);
306 /* We delay the actual action of bubbling up the status to insulate the
307 * cleanup code in this class from any errors that may be thrown in the
308 * upper layers as a result of bubbling up the status. In particular,
309 * if the status is not OK, the "error" event may be emitted
310 * synchronously at the top level, which will result in a thrown error if
311 * the user does not handle that event. */
312 process.nextTick(() => {
313 this.listener.onReceiveStatus(this.finalStatus!);
314 });
315 /* Leave the http2 stream in flowing state to drain incoming messages, to
316 * ensure that the stream closure completes. The call stream already does
317 * not push more messages after the status is output, so the messages go
318 * nowhere either way. */
319 this.http2Stream.resume();
320 }
321 }
322
323 private trace(text: string): void {
324 logging.trace(
325 LogVerbosity.DEBUG,
326 TRACER_NAME,
327 '[' + this.callId + '] ' + text
328 );
329 }
330
331 /**
332 * On first call, emits a 'status' event with the given StatusObject.
333 * Subsequent calls are no-ops.
334 * @param status The status of the call.
335 */
336 private endCall(status: StatusObjectWithRstCode): void {
337 /* If the status is OK and a new status comes in (e.g. from a
338 * deserialization failure), that new status takes priority */
339 if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
340 this.finalStatus = status;
341 this.maybeOutputStatus();
342 }
343 this.destroyHttp2Stream();
344 }
345
346 private maybeOutputStatus() {
347 if (this.finalStatus !== null) {
348 /* The combination check of readsClosed and that the two message buffer
349 * arrays are empty checks that there all incoming data has been fully
350 * processed */
351 if (
352 this.finalStatus.code !== Status.OK ||
353 (this.readsClosed &&
354 this.unpushedReadMessages.length === 0 &&
355 !this.isReadFilterPending &&
356 !this.isPushPending)
357 ) {
358 this.outputStatus();
359 }
360 }
361 }
362
363 private push(message: Buffer): void {
364 this.trace(
365 'pushing to reader message of length ' +
366 (message instanceof Buffer ? message.length : null)
367 );
368 this.canPush = false;
369 this.isPushPending = true;
370 process.nextTick(() => {
371 this.isPushPending = false;
372 /* If we have already output the status any later messages should be
373 * ignored, and can cause out-of-order operation errors higher up in the
374 * stack. Checking as late as possible here to avoid any race conditions.
375 */
376 if (this.statusOutput) {
377 return;
378 }
379 this.listener.onReceiveMessage(message);
380 this.maybeOutputStatus();
381 });
382 }
383
384 private tryPush(messageBytes: Buffer): void {
385 if (this.canPush) {
386 this.http2Stream!.pause();
387 this.push(messageBytes);
388 } else {
389 this.trace(
390 'unpushedReadMessages.push message of length ' + messageBytes.length
391 );
392 this.unpushedReadMessages.push(messageBytes);
393 }
394 }
395
396 private handleTrailers(headers: http2.IncomingHttpHeaders) {
397 this.callEventTracker.onStreamEnd(true);
398 let headersString = '';
399 for (const header of Object.keys(headers)) {
400 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
401 }
402 this.trace('Received server trailers:\n' + headersString);
403 let metadata: Metadata;
404 try {
405 metadata = Metadata.fromHttp2Headers(headers);
406 } catch (e) {
407 metadata = new Metadata();
408 }
409 const metadataMap = metadata.getMap();
410 let code: Status = this.mappedStatusCode;
411 if (
412 code === Status.UNKNOWN &&
413 typeof metadataMap['grpc-status'] === 'string'
414 ) {
415 const receivedStatus = Number(metadataMap['grpc-status']);
416 if (receivedStatus in Status) {
417 code = receivedStatus;
418 this.trace('received status code ' + receivedStatus + ' from server');
419 }
420 metadata.remove('grpc-status');
421 }
422 let details = '';
423 if (typeof metadataMap['grpc-message'] === 'string') {
424 try {
425 details = decodeURI(metadataMap['grpc-message']);
426 } catch (e) {
427 details = metadataMap['grpc-message'];
428 }
429 metadata.remove('grpc-message');
430 this.trace(
431 'received status details string "' + details + '" from server'
432 );
433 }
434 const status: StatusObject = { code, details, metadata };
435 // This is a no-op if the call was already ended when handling headers.
436 this.endCall(status);
437 }
438
439 private destroyHttp2Stream() {
440 // The http2 stream could already have been destroyed if cancelWithStatus
441 // is called in response to an internal http2 error.
442 if (!this.http2Stream.destroyed) {
443 /* If the call has ended with an OK status, communicate that when closing
444 * the stream, partly to avoid a situation in which we detect an error
445 * RST_STREAM as a result after we have the status */
446 let code: number;
447 if (this.finalStatus?.code === Status.OK) {
448 code = http2.constants.NGHTTP2_NO_ERROR;
449 } else {
450 code = http2.constants.NGHTTP2_CANCEL;
451 }
452 this.trace('close http2 stream with code ' + code);
453 this.http2Stream.close(code);
454 }
455 }
456
457 cancelWithStatus(status: Status, details: string): void {
458 this.trace(
459 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
460 );
461 this.endCall({ code: status, details, metadata: new Metadata() });
462 }
463
464 getStatus(): StatusObject | null {
465 return this.finalStatus;
466 }
467
468 getPeer(): string {
469 return this.transport.getPeerName();
470 }
471
472 getCallNumber(): number {
473 return this.callId;
474 }
475
476 startRead() {
477 /* If the stream has ended with an error, we should not emit any more
478 * messages and we should communicate that the stream has ended */
479 if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
480 this.readsClosed = true;
481 this.maybeOutputStatus();
482 return;
483 }
484 this.canPush = true;
485 if (this.unpushedReadMessages.length > 0) {
486 const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
487 this.push(nextMessage);
488 return;
489 }
490 /* Only resume reading from the http2Stream if we don't have any pending
491 * messages to emit */
492 this.http2Stream.resume();
493 }
494
495 sendMessageWithContext(context: MessageContext, message: Buffer) {
496 this.trace('write() called with message of length ' + message.length);
497 const cb: WriteCallback = (error?: Error | null) => {
498 let code: Status = Status.UNAVAILABLE;
499 if ((error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END') {
500 code = Status.INTERNAL;
501 }
502 if (error) {
503 this.cancelWithStatus(code, `Write error: ${error.message}`);
504 }
505 context.callback?.();
506 };
507 this.trace('sending data chunk of length ' + message.length);
508 this.callEventTracker.addMessageSent();
509 try {
510 this.http2Stream!.write(message, cb);
511 } catch (error) {
512 this.endCall({
513 code: Status.UNAVAILABLE,
514 details: `Write failed with error ${(error as Error).message}`,
515 metadata: new Metadata()
516 });
517 }
518 }
519
520 halfClose() {
521 this.trace('end() called');
522 this.trace('calling end() on HTTP/2 stream');
523 this.http2Stream.end();
524 }
525}