1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import * as http2 from 'http2';
|
19 | import * as os from 'os';
|
20 |
|
21 | import { Status } from './constants';
|
22 | import { Metadata } from './metadata';
|
23 | import { StreamDecoder } from './stream-decoder';
|
24 | import * as logging from './logging';
|
25 | import { LogVerbosity } from './constants';
|
26 | import {
|
27 | InterceptingListener,
|
28 | MessageContext,
|
29 | StatusObject,
|
30 | WriteCallback,
|
31 | } from './call-interface';
|
32 | import { CallEventTracker, Transport } from './transport';
|
33 |
|
34 | const TRACER_NAME = 'subchannel_call';
|
35 |
|
36 |
|
37 |
|
38 |
|
39 | interface SystemError extends Error {
|
40 | address?: string;
|
41 | code: string;
|
42 | dest?: string;
|
43 | errno: number;
|
44 | info?: object;
|
45 | message: string;
|
46 | path?: string;
|
47 | port?: number;
|
48 | syscall: string;
|
49 | }
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 | function getSystemErrorName(errno: number): string {
|
58 | for (const [name, num] of Object.entries(os.constants.errno)) {
|
59 | if (num === errno) {
|
60 | return name;
|
61 | }
|
62 | }
|
63 | return 'Unknown system error ' + errno;
|
64 | }
|
65 |
|
66 | export interface SubchannelCall {
|
67 | cancelWithStatus(status: Status, details: string): void;
|
68 | getPeer(): string;
|
69 | sendMessageWithContext(context: MessageContext, message: Buffer): void;
|
70 | startRead(): void;
|
71 | halfClose(): void;
|
72 | getCallNumber(): number;
|
73 | }
|
74 |
|
75 | export interface StatusObjectWithRstCode extends StatusObject {
|
76 | rstCode?: number;
|
77 | }
|
78 |
|
79 | export interface SubchannelCallInterceptingListener
|
80 | extends InterceptingListener {
|
81 | onReceiveStatus(status: StatusObjectWithRstCode): void;
|
82 | }
|
83 |
|
84 | export class Http2SubchannelCall implements SubchannelCall {
|
85 | private decoder = new StreamDecoder();
|
86 |
|
87 | private isReadFilterPending = false;
|
88 | private isPushPending = false;
|
89 | private canPush = false;
|
90 | |
91 |
|
92 |
|
93 |
|
94 | private readsClosed = false;
|
95 |
|
96 | private statusOutput = false;
|
97 |
|
98 | private unpushedReadMessages: Buffer[] = [];
|
99 |
|
100 |
|
101 | private mappedStatusCode: Status = Status.UNKNOWN;
|
102 |
|
103 |
|
104 | private finalStatus: StatusObject | null = null;
|
105 |
|
106 | private internalError: SystemError | null = null;
|
107 |
|
108 | constructor(
|
109 | private readonly http2Stream: http2.ClientHttp2Stream,
|
110 | private readonly callEventTracker: CallEventTracker,
|
111 | private readonly listener: SubchannelCallInterceptingListener,
|
112 | private readonly transport: Transport,
|
113 | private readonly callId: number
|
114 | ) {
|
115 | http2Stream.on('response', (headers, flags) => {
|
116 | let headersString = '';
|
117 | for (const header of Object.keys(headers)) {
|
118 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
119 | }
|
120 | this.trace('Received server headers:\n' + headersString);
|
121 | switch (headers[':status']) {
|
122 |
|
123 | case 400:
|
124 | this.mappedStatusCode = Status.INTERNAL;
|
125 | break;
|
126 | case 401:
|
127 | this.mappedStatusCode = Status.UNAUTHENTICATED;
|
128 | break;
|
129 | case 403:
|
130 | this.mappedStatusCode = Status.PERMISSION_DENIED;
|
131 | break;
|
132 | case 404:
|
133 | this.mappedStatusCode = Status.UNIMPLEMENTED;
|
134 | break;
|
135 | case 429:
|
136 | case 502:
|
137 | case 503:
|
138 | case 504:
|
139 | this.mappedStatusCode = Status.UNAVAILABLE;
|
140 | break;
|
141 | default:
|
142 | this.mappedStatusCode = Status.UNKNOWN;
|
143 | }
|
144 |
|
145 | if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
|
146 | this.handleTrailers(headers);
|
147 | } else {
|
148 | let metadata: Metadata;
|
149 | try {
|
150 | metadata = Metadata.fromHttp2Headers(headers);
|
151 | } catch (error) {
|
152 | this.endCall({
|
153 | code: Status.UNKNOWN,
|
154 | details: (error as Error).message,
|
155 | metadata: new Metadata(),
|
156 | });
|
157 | return;
|
158 | }
|
159 | this.listener.onReceiveMetadata(metadata);
|
160 | }
|
161 | });
|
162 | http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
|
163 | this.handleTrailers(headers);
|
164 | });
|
165 | http2Stream.on('data', (data: Buffer) => {
|
166 | |
167 |
|
168 | if (this.statusOutput) {
|
169 | return;
|
170 | }
|
171 | this.trace('receive HTTP/2 data frame of length ' + data.length);
|
172 | const messages = this.decoder.write(data);
|
173 |
|
174 | for (const message of messages) {
|
175 | this.trace('parsed message of length ' + message.length);
|
176 | this.callEventTracker!.addMessageReceived();
|
177 | this.tryPush(message);
|
178 | }
|
179 | });
|
180 | http2Stream.on('end', () => {
|
181 | this.readsClosed = true;
|
182 | this.maybeOutputStatus();
|
183 | });
|
184 | http2Stream.on('close', () => {
|
185 | |
186 |
|
187 |
|
188 | process.nextTick(() => {
|
189 | this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
|
190 | |
191 |
|
192 |
|
193 |
|
194 | if (this.finalStatus?.code === Status.OK) {
|
195 | return;
|
196 | }
|
197 | let code: Status;
|
198 | let details = '';
|
199 | switch (http2Stream.rstCode) {
|
200 | case http2.constants.NGHTTP2_NO_ERROR:
|
201 | |
202 |
|
203 |
|
204 | if (this.finalStatus !== null) {
|
205 | return;
|
206 | }
|
207 | code = Status.INTERNAL;
|
208 | details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
|
209 | break;
|
210 | case http2.constants.NGHTTP2_REFUSED_STREAM:
|
211 | code = Status.UNAVAILABLE;
|
212 | details = 'Stream refused by server';
|
213 | break;
|
214 | case http2.constants.NGHTTP2_CANCEL:
|
215 | code = Status.CANCELLED;
|
216 | details = 'Call cancelled';
|
217 | break;
|
218 | case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
|
219 | code = Status.RESOURCE_EXHAUSTED;
|
220 | details = 'Bandwidth exhausted or memory limit exceeded';
|
221 | break;
|
222 | case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
|
223 | code = Status.PERMISSION_DENIED;
|
224 | details = 'Protocol not secure enough';
|
225 | break;
|
226 | case http2.constants.NGHTTP2_INTERNAL_ERROR:
|
227 | code = Status.INTERNAL;
|
228 | if (this.internalError === null) {
|
229 | |
230 |
|
231 |
|
232 |
|
233 |
|
234 | details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
|
235 | } else {
|
236 | if (
|
237 | this.internalError.code === 'ECONNRESET' ||
|
238 | this.internalError.code === 'ETIMEDOUT'
|
239 | ) {
|
240 | code = Status.UNAVAILABLE;
|
241 | details = this.internalError.message;
|
242 | } else {
|
243 | |
244 |
|
245 |
|
246 |
|
247 | details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
|
248 | }
|
249 | }
|
250 | break;
|
251 | default:
|
252 | code = Status.INTERNAL;
|
253 | details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
|
254 | }
|
255 |
|
256 |
|
257 |
|
258 |
|
259 | this.endCall({
|
260 | code,
|
261 | details,
|
262 | metadata: new Metadata(),
|
263 | rstCode: http2Stream.rstCode,
|
264 | });
|
265 | });
|
266 | });
|
267 | http2Stream.on('error', (err: SystemError) => {
|
268 | |
269 |
|
270 |
|
271 | |
272 |
|
273 |
|
274 |
|
275 | if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
|
276 | this.trace(
|
277 | 'Node error event: message=' +
|
278 | err.message +
|
279 | ' code=' +
|
280 | err.code +
|
281 | ' errno=' +
|
282 | getSystemErrorName(err.errno) +
|
283 | ' syscall=' +
|
284 | err.syscall
|
285 | );
|
286 | this.internalError = err;
|
287 | }
|
288 | this.callEventTracker.onStreamEnd(false);
|
289 | });
|
290 | }
|
291 |
|
292 | public onDisconnect() {
|
293 | this.endCall({
|
294 | code: Status.UNAVAILABLE,
|
295 | details: 'Connection dropped',
|
296 | metadata: new Metadata(),
|
297 | });
|
298 | }
|
299 |
|
300 | private outputStatus() {
|
301 |
|
302 | if (!this.statusOutput) {
|
303 | this.statusOutput = true;
|
304 | this.trace(
|
305 | 'ended with status: code=' +
|
306 | this.finalStatus!.code +
|
307 | ' details="' +
|
308 | this.finalStatus!.details +
|
309 | '"'
|
310 | );
|
311 | this.callEventTracker.onCallEnd(this.finalStatus!);
|
312 | |
313 |
|
314 |
|
315 |
|
316 |
|
317 |
|
318 | process.nextTick(() => {
|
319 | this.listener.onReceiveStatus(this.finalStatus!);
|
320 | });
|
321 | |
322 |
|
323 |
|
324 |
|
325 | this.http2Stream.resume();
|
326 | }
|
327 | }
|
328 |
|
329 | private trace(text: string): void {
|
330 | logging.trace(
|
331 | LogVerbosity.DEBUG,
|
332 | TRACER_NAME,
|
333 | '[' + this.callId + '] ' + text
|
334 | );
|
335 | }
|
336 |
|
337 | |
338 |
|
339 |
|
340 |
|
341 |
|
342 | private endCall(status: StatusObjectWithRstCode): void {
|
343 | |
344 |
|
345 | if (this.finalStatus === null || this.finalStatus.code === Status.OK) {
|
346 | this.finalStatus = status;
|
347 | this.maybeOutputStatus();
|
348 | }
|
349 | this.destroyHttp2Stream();
|
350 | }
|
351 |
|
352 | private maybeOutputStatus() {
|
353 | if (this.finalStatus !== null) {
|
354 | |
355 |
|
356 |
|
357 | if (
|
358 | this.finalStatus.code !== Status.OK ||
|
359 | (this.readsClosed &&
|
360 | this.unpushedReadMessages.length === 0 &&
|
361 | !this.isReadFilterPending &&
|
362 | !this.isPushPending)
|
363 | ) {
|
364 | this.outputStatus();
|
365 | }
|
366 | }
|
367 | }
|
368 |
|
369 | private push(message: Buffer): void {
|
370 | this.trace(
|
371 | 'pushing to reader message of length ' +
|
372 | (message instanceof Buffer ? message.length : null)
|
373 | );
|
374 | this.canPush = false;
|
375 | this.isPushPending = true;
|
376 | process.nextTick(() => {
|
377 | this.isPushPending = false;
|
378 | |
379 |
|
380 |
|
381 |
|
382 | if (this.statusOutput) {
|
383 | return;
|
384 | }
|
385 | this.listener.onReceiveMessage(message);
|
386 | this.maybeOutputStatus();
|
387 | });
|
388 | }
|
389 |
|
390 | private tryPush(messageBytes: Buffer): void {
|
391 | if (this.canPush) {
|
392 | this.http2Stream!.pause();
|
393 | this.push(messageBytes);
|
394 | } else {
|
395 | this.trace(
|
396 | 'unpushedReadMessages.push message of length ' + messageBytes.length
|
397 | );
|
398 | this.unpushedReadMessages.push(messageBytes);
|
399 | }
|
400 | }
|
401 |
|
402 | private handleTrailers(headers: http2.IncomingHttpHeaders) {
|
403 | this.callEventTracker.onStreamEnd(true);
|
404 | let headersString = '';
|
405 | for (const header of Object.keys(headers)) {
|
406 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
407 | }
|
408 | this.trace('Received server trailers:\n' + headersString);
|
409 | let metadata: Metadata;
|
410 | try {
|
411 | metadata = Metadata.fromHttp2Headers(headers);
|
412 | } catch (e) {
|
413 | metadata = new Metadata();
|
414 | }
|
415 | const metadataMap = metadata.getMap();
|
416 | let code: Status = this.mappedStatusCode;
|
417 | if (
|
418 | code === Status.UNKNOWN &&
|
419 | typeof metadataMap['grpc-status'] === 'string'
|
420 | ) {
|
421 | const receivedStatus = Number(metadataMap['grpc-status']);
|
422 | if (receivedStatus in Status) {
|
423 | code = receivedStatus;
|
424 | this.trace('received status code ' + receivedStatus + ' from server');
|
425 | }
|
426 | metadata.remove('grpc-status');
|
427 | }
|
428 | let details = '';
|
429 | if (typeof metadataMap['grpc-message'] === 'string') {
|
430 | try {
|
431 | details = decodeURI(metadataMap['grpc-message']);
|
432 | } catch (e) {
|
433 | details = metadataMap['grpc-message'];
|
434 | }
|
435 | metadata.remove('grpc-message');
|
436 | this.trace(
|
437 | 'received status details string "' + details + '" from server'
|
438 | );
|
439 | }
|
440 | const status: StatusObject = { code, details, metadata };
|
441 |
|
442 | this.endCall(status);
|
443 | }
|
444 |
|
445 | private destroyHttp2Stream() {
|
446 |
|
447 |
|
448 | if (!this.http2Stream.destroyed) {
|
449 | |
450 |
|
451 |
|
452 | let code: number;
|
453 | if (this.finalStatus?.code === Status.OK) {
|
454 | code = http2.constants.NGHTTP2_NO_ERROR;
|
455 | } else {
|
456 | code = http2.constants.NGHTTP2_CANCEL;
|
457 | }
|
458 | this.trace('close http2 stream with code ' + code);
|
459 | this.http2Stream.close(code);
|
460 | }
|
461 | }
|
462 |
|
463 | cancelWithStatus(status: Status, details: string): void {
|
464 | this.trace(
|
465 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
466 | );
|
467 | this.endCall({ code: status, details, metadata: new Metadata() });
|
468 | }
|
469 |
|
470 | getStatus(): StatusObject | null {
|
471 | return this.finalStatus;
|
472 | }
|
473 |
|
474 | getPeer(): string {
|
475 | return this.transport.getPeerName();
|
476 | }
|
477 |
|
478 | getCallNumber(): number {
|
479 | return this.callId;
|
480 | }
|
481 |
|
482 | startRead() {
|
483 | |
484 |
|
485 | if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) {
|
486 | this.readsClosed = true;
|
487 | this.maybeOutputStatus();
|
488 | return;
|
489 | }
|
490 | this.canPush = true;
|
491 | if (this.unpushedReadMessages.length > 0) {
|
492 | const nextMessage: Buffer = this.unpushedReadMessages.shift()!;
|
493 | this.push(nextMessage);
|
494 | return;
|
495 | }
|
496 | |
497 |
|
498 | this.http2Stream.resume();
|
499 | }
|
500 |
|
501 | sendMessageWithContext(context: MessageContext, message: Buffer) {
|
502 | this.trace('write() called with message of length ' + message.length);
|
503 | const cb: WriteCallback = (error?: Error | null) => {
|
504 | |
505 |
|
506 |
|
507 | process.nextTick(() => {
|
508 | let code: Status = Status.UNAVAILABLE;
|
509 | if (
|
510 | (error as NodeJS.ErrnoException)?.code ===
|
511 | 'ERR_STREAM_WRITE_AFTER_END'
|
512 | ) {
|
513 | code = Status.INTERNAL;
|
514 | }
|
515 | if (error) {
|
516 | this.cancelWithStatus(code, `Write error: ${error.message}`);
|
517 | }
|
518 | context.callback?.();
|
519 | });
|
520 | };
|
521 | this.trace('sending data chunk of length ' + message.length);
|
522 | this.callEventTracker.addMessageSent();
|
523 | try {
|
524 | this.http2Stream!.write(message, cb);
|
525 | } catch (error) {
|
526 | this.endCall({
|
527 | code: Status.UNAVAILABLE,
|
528 | details: `Write failed with error ${(error as Error).message}`,
|
529 | metadata: new Metadata(),
|
530 | });
|
531 | }
|
532 | }
|
533 |
|
534 | halfClose() {
|
535 | this.trace('end() called');
|
536 | this.trace('calling end() on HTTP/2 stream');
|
537 | this.http2Stream.end();
|
538 | }
|
539 | }
|