1 | "use strict";
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | Object.defineProperty(exports, "__esModule", { value: true });
|
19 | exports.Http2SubchannelCall = void 0;
|
20 | const http2 = require("http2");
|
21 | const os = require("os");
|
22 | const constants_1 = require("./constants");
|
23 | const metadata_1 = require("./metadata");
|
24 | const stream_decoder_1 = require("./stream-decoder");
|
25 | const logging = require("./logging");
|
26 | const constants_2 = require("./constants");
|
27 | const TRACER_NAME = 'subchannel_call';
|
28 | const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 | function getSystemErrorName(errno) {
|
36 | for (const [name, num] of Object.entries(os.constants.errno)) {
|
37 | if (num === errno) {
|
38 | return name;
|
39 | }
|
40 | }
|
41 | return 'Unknown system error ' + errno;
|
42 | }
|
43 | class Http2SubchannelCall {
|
44 | constructor(http2Stream, callStatsTracker, listener, subchannel, callId) {
|
45 | this.http2Stream = http2Stream;
|
46 | this.callStatsTracker = callStatsTracker;
|
47 | this.listener = listener;
|
48 | this.subchannel = subchannel;
|
49 | this.callId = callId;
|
50 | this.decoder = new stream_decoder_1.StreamDecoder();
|
51 | this.isReadFilterPending = false;
|
52 | this.canPush = false;
|
53 | |
54 |
|
55 |
|
56 |
|
57 | this.readsClosed = false;
|
58 | this.statusOutput = false;
|
59 | this.unpushedReadMessages = [];
|
60 |
|
61 | this.mappedStatusCode = constants_1.Status.UNKNOWN;
|
62 |
|
63 | this.finalStatus = null;
|
64 | this.internalError = null;
|
65 | this.disconnectListener = () => {
|
66 | this.endCall({
|
67 | code: constants_1.Status.UNAVAILABLE,
|
68 | details: 'Connection dropped',
|
69 | metadata: new metadata_1.Metadata(),
|
70 | });
|
71 | };
|
72 | subchannel.addDisconnectListener(this.disconnectListener);
|
73 | subchannel.callRef();
|
74 | http2Stream.on('response', (headers, flags) => {
|
75 | let headersString = '';
|
76 | for (const header of Object.keys(headers)) {
|
77 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
78 | }
|
79 | this.trace('Received server headers:\n' + headersString);
|
80 | switch (headers[':status']) {
|
81 |
|
82 | case 400:
|
83 | this.mappedStatusCode = constants_1.Status.INTERNAL;
|
84 | break;
|
85 | case 401:
|
86 | this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
|
87 | break;
|
88 | case 403:
|
89 | this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
|
90 | break;
|
91 | case 404:
|
92 | this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
|
93 | break;
|
94 | case 429:
|
95 | case 502:
|
96 | case 503:
|
97 | case 504:
|
98 | this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
|
99 | break;
|
100 | default:
|
101 | this.mappedStatusCode = constants_1.Status.UNKNOWN;
|
102 | }
|
103 | if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
|
104 | this.handleTrailers(headers);
|
105 | }
|
106 | else {
|
107 | let metadata;
|
108 | try {
|
109 | metadata = metadata_1.Metadata.fromHttp2Headers(headers);
|
110 | }
|
111 | catch (error) {
|
112 | this.endCall({
|
113 | code: constants_1.Status.UNKNOWN,
|
114 | details: error.message,
|
115 | metadata: new metadata_1.Metadata(),
|
116 | });
|
117 | return;
|
118 | }
|
119 | this.listener.onReceiveMetadata(metadata);
|
120 | }
|
121 | });
|
122 | http2Stream.on('trailers', (headers) => {
|
123 | this.handleTrailers(headers);
|
124 | });
|
125 | http2Stream.on('data', (data) => {
|
126 | |
127 |
|
128 | if (this.statusOutput) {
|
129 | return;
|
130 | }
|
131 | this.trace('receive HTTP/2 data frame of length ' + data.length);
|
132 | const messages = this.decoder.write(data);
|
133 | for (const message of messages) {
|
134 | this.trace('parsed message of length ' + message.length);
|
135 | this.callStatsTracker.addMessageReceived();
|
136 | this.tryPush(message);
|
137 | }
|
138 | });
|
139 | http2Stream.on('end', () => {
|
140 | this.readsClosed = true;
|
141 | this.maybeOutputStatus();
|
142 | });
|
143 | http2Stream.on('close', () => {
|
144 | |
145 |
|
146 |
|
147 | process.nextTick(() => {
|
148 | var _a;
|
149 | this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
|
150 | |
151 |
|
152 |
|
153 |
|
154 | if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
|
155 | return;
|
156 | }
|
157 | let code;
|
158 | let details = '';
|
159 | switch (http2Stream.rstCode) {
|
160 | case http2.constants.NGHTTP2_NO_ERROR:
|
161 | |
162 |
|
163 |
|
164 | if (this.finalStatus !== null) {
|
165 | return;
|
166 | }
|
167 | code = constants_1.Status.INTERNAL;
|
168 | details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
|
169 | break;
|
170 | case http2.constants.NGHTTP2_REFUSED_STREAM:
|
171 | code = constants_1.Status.UNAVAILABLE;
|
172 | details = 'Stream refused by server';
|
173 | break;
|
174 | case http2.constants.NGHTTP2_CANCEL:
|
175 | code = constants_1.Status.CANCELLED;
|
176 | details = 'Call cancelled';
|
177 | break;
|
178 | case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
|
179 | code = constants_1.Status.RESOURCE_EXHAUSTED;
|
180 | details = 'Bandwidth exhausted or memory limit exceeded';
|
181 | break;
|
182 | case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
|
183 | code = constants_1.Status.PERMISSION_DENIED;
|
184 | details = 'Protocol not secure enough';
|
185 | break;
|
186 | case http2.constants.NGHTTP2_INTERNAL_ERROR:
|
187 | code = constants_1.Status.INTERNAL;
|
188 | if (this.internalError === null) {
|
189 | |
190 |
|
191 |
|
192 |
|
193 |
|
194 | details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
|
195 | }
|
196 | else {
|
197 | if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
|
198 | code = constants_1.Status.UNAVAILABLE;
|
199 | details = this.internalError.message;
|
200 | }
|
201 | else {
|
202 | |
203 |
|
204 |
|
205 |
|
206 | details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
|
207 | }
|
208 | }
|
209 | break;
|
210 | default:
|
211 | code = constants_1.Status.INTERNAL;
|
212 | details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
|
213 | }
|
214 |
|
215 |
|
216 |
|
217 |
|
218 | this.endCall({ code, details, metadata: new metadata_1.Metadata(), rstCode: http2Stream.rstCode });
|
219 | });
|
220 | });
|
221 | http2Stream.on('error', (err) => {
|
222 | |
223 |
|
224 |
|
225 | |
226 |
|
227 |
|
228 |
|
229 | if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
|
230 | this.trace('Node error event: message=' +
|
231 | err.message +
|
232 | ' code=' +
|
233 | err.code +
|
234 | ' errno=' +
|
235 | getSystemErrorName(err.errno) +
|
236 | ' syscall=' +
|
237 | err.syscall);
|
238 | this.internalError = err;
|
239 | }
|
240 | this.callStatsTracker.onStreamEnd(false);
|
241 | });
|
242 | }
|
243 | outputStatus() {
|
244 |
|
245 | if (!this.statusOutput) {
|
246 | this.statusOutput = true;
|
247 | this.trace('ended with status: code=' +
|
248 | this.finalStatus.code +
|
249 | ' details="' +
|
250 | this.finalStatus.details +
|
251 | '"');
|
252 | this.callStatsTracker.onCallEnd(this.finalStatus);
|
253 | |
254 |
|
255 |
|
256 |
|
257 |
|
258 |
|
259 | process.nextTick(() => {
|
260 | this.listener.onReceiveStatus(this.finalStatus);
|
261 | });
|
262 | |
263 |
|
264 |
|
265 |
|
266 | this.http2Stream.resume();
|
267 | this.subchannel.callUnref();
|
268 | this.subchannel.removeDisconnectListener(this.disconnectListener);
|
269 | }
|
270 | }
|
271 | trace(text) {
|
272 | logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callId + '] ' + text);
|
273 | }
|
274 | |
275 |
|
276 |
|
277 |
|
278 |
|
279 | endCall(status) {
|
280 | |
281 |
|
282 | if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
|
283 | this.finalStatus = status;
|
284 | this.maybeOutputStatus();
|
285 | }
|
286 | this.destroyHttp2Stream();
|
287 | }
|
288 | maybeOutputStatus() {
|
289 | if (this.finalStatus !== null) {
|
290 | |
291 |
|
292 |
|
293 | if (this.finalStatus.code !== constants_1.Status.OK ||
|
294 | (this.readsClosed &&
|
295 | this.unpushedReadMessages.length === 0 &&
|
296 | !this.isReadFilterPending)) {
|
297 | this.outputStatus();
|
298 | }
|
299 | }
|
300 | }
|
301 | push(message) {
|
302 | this.trace('pushing to reader message of length ' +
|
303 | (message instanceof Buffer ? message.length : null));
|
304 | this.canPush = false;
|
305 | process.nextTick(() => {
|
306 | |
307 |
|
308 |
|
309 |
|
310 | if (this.statusOutput) {
|
311 | return;
|
312 | }
|
313 | this.listener.onReceiveMessage(message);
|
314 | this.maybeOutputStatus();
|
315 | });
|
316 | }
|
317 | tryPush(messageBytes) {
|
318 | if (this.canPush) {
|
319 | this.http2Stream.pause();
|
320 | this.push(messageBytes);
|
321 | }
|
322 | else {
|
323 | this.trace('unpushedReadMessages.push message of length ' + messageBytes.length);
|
324 | this.unpushedReadMessages.push(messageBytes);
|
325 | }
|
326 | }
|
327 | handleTrailers(headers) {
|
328 | this.callStatsTracker.onStreamEnd(true);
|
329 | let headersString = '';
|
330 | for (const header of Object.keys(headers)) {
|
331 | headersString += '\t\t' + header + ': ' + headers[header] + '\n';
|
332 | }
|
333 | this.trace('Received server trailers:\n' + headersString);
|
334 | let metadata;
|
335 | try {
|
336 | metadata = metadata_1.Metadata.fromHttp2Headers(headers);
|
337 | }
|
338 | catch (e) {
|
339 | metadata = new metadata_1.Metadata();
|
340 | }
|
341 | const metadataMap = metadata.getMap();
|
342 | let code = this.mappedStatusCode;
|
343 | if (code === constants_1.Status.UNKNOWN &&
|
344 | typeof metadataMap['grpc-status'] === 'string') {
|
345 | const receivedStatus = Number(metadataMap['grpc-status']);
|
346 | if (receivedStatus in constants_1.Status) {
|
347 | code = receivedStatus;
|
348 | this.trace('received status code ' + receivedStatus + ' from server');
|
349 | }
|
350 | metadata.remove('grpc-status');
|
351 | }
|
352 | let details = '';
|
353 | if (typeof metadataMap['grpc-message'] === 'string') {
|
354 | try {
|
355 | details = decodeURI(metadataMap['grpc-message']);
|
356 | }
|
357 | catch (e) {
|
358 | details = metadataMap['grpc-message'];
|
359 | }
|
360 | metadata.remove('grpc-message');
|
361 | this.trace('received status details string "' + details + '" from server');
|
362 | }
|
363 | const status = { code, details, metadata };
|
364 |
|
365 | this.endCall(status);
|
366 | }
|
367 | destroyHttp2Stream() {
|
368 | var _a;
|
369 |
|
370 |
|
371 | if (!this.http2Stream.destroyed) {
|
372 | |
373 |
|
374 |
|
375 | let code;
|
376 | if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
|
377 | code = http2.constants.NGHTTP2_NO_ERROR;
|
378 | }
|
379 | else {
|
380 | code = http2.constants.NGHTTP2_CANCEL;
|
381 | }
|
382 | this.trace('close http2 stream with code ' + code);
|
383 | this.http2Stream.close(code);
|
384 | }
|
385 | }
|
386 | cancelWithStatus(status, details) {
|
387 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
388 | this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
|
389 | }
|
390 | getStatus() {
|
391 | return this.finalStatus;
|
392 | }
|
393 | getPeer() {
|
394 | return this.subchannel.getAddress();
|
395 | }
|
396 | getCallNumber() {
|
397 | return this.callId;
|
398 | }
|
399 | startRead() {
|
400 | |
401 |
|
402 | if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
|
403 | this.readsClosed = true;
|
404 | this.maybeOutputStatus();
|
405 | return;
|
406 | }
|
407 | this.canPush = true;
|
408 | if (this.unpushedReadMessages.length > 0) {
|
409 | const nextMessage = this.unpushedReadMessages.shift();
|
410 | this.push(nextMessage);
|
411 | return;
|
412 | }
|
413 | |
414 |
|
415 | this.http2Stream.resume();
|
416 | }
|
417 | sendMessageWithContext(context, message) {
|
418 | this.trace('write() called with message of length ' + message.length);
|
419 | const cb = (error) => {
|
420 | var _a, _b;
|
421 | let code = constants_1.Status.UNAVAILABLE;
|
422 | if (((_a = error) === null || _a === void 0 ? void 0 : _a.code) === 'ERR_STREAM_WRITE_AFTER_END') {
|
423 | code = constants_1.Status.INTERNAL;
|
424 | }
|
425 | if (error) {
|
426 | this.cancelWithStatus(code, `Write error: ${error.message}`);
|
427 | }
|
428 | (_b = context.callback) === null || _b === void 0 ? void 0 : _b.call(context);
|
429 | };
|
430 | this.trace('sending data chunk of length ' + message.length);
|
431 | this.callStatsTracker.addMessageSent();
|
432 | try {
|
433 | this.http2Stream.write(message, cb);
|
434 | }
|
435 | catch (error) {
|
436 | this.endCall({
|
437 | code: constants_1.Status.UNAVAILABLE,
|
438 | details: `Write failed with error ${error.message}`,
|
439 | metadata: new metadata_1.Metadata()
|
440 | });
|
441 | }
|
442 | }
|
443 | halfClose() {
|
444 | this.trace('end() called');
|
445 | this.trace('calling end() on HTTP/2 stream');
|
446 | this.http2Stream.end();
|
447 | }
|
448 | }
|
449 | exports.Http2SubchannelCall = Http2SubchannelCall;
|
450 |
|
\ | No newline at end of file |