UNPKG

20.1 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2SubchannelCall = void 0;
20const http2 = require("http2");
21const os = require("os");
22const constants_1 = require("./constants");
23const metadata_1 = require("./metadata");
24const stream_decoder_1 = require("./stream-decoder");
25const logging = require("./logging");
26const constants_2 = require("./constants");
27const TRACER_NAME = 'subchannel_call';
28const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
29/**
30 * Should do approximately the same thing as util.getSystemErrorName but the
31 * TypeScript types don't have that function for some reason so I just made my
32 * own.
33 * @param errno
34 */
35function getSystemErrorName(errno) {
36 for (const [name, num] of Object.entries(os.constants.errno)) {
37 if (num === errno) {
38 return name;
39 }
40 }
41 return 'Unknown system error ' + errno;
42}
43class Http2SubchannelCall {
44 constructor(http2Stream, callStatsTracker, listener, subchannel, callId) {
45 this.http2Stream = http2Stream;
46 this.callStatsTracker = callStatsTracker;
47 this.listener = listener;
48 this.subchannel = subchannel;
49 this.callId = callId;
50 this.decoder = new stream_decoder_1.StreamDecoder();
51 this.isReadFilterPending = false;
52 this.canPush = false;
53 /**
54 * Indicates that an 'end' event has come from the http2 stream, so there
55 * will be no more data events.
56 */
57 this.readsClosed = false;
58 this.statusOutput = false;
59 this.unpushedReadMessages = [];
60 // Status code mapped from :status. To be used if grpc-status is not received
61 this.mappedStatusCode = constants_1.Status.UNKNOWN;
62 // This is populated (non-null) if and only if the call has ended
63 this.finalStatus = null;
64 this.internalError = null;
65 this.disconnectListener = () => {
66 this.endCall({
67 code: constants_1.Status.UNAVAILABLE,
68 details: 'Connection dropped',
69 metadata: new metadata_1.Metadata(),
70 });
71 };
72 subchannel.addDisconnectListener(this.disconnectListener);
73 subchannel.callRef();
74 http2Stream.on('response', (headers, flags) => {
75 let headersString = '';
76 for (const header of Object.keys(headers)) {
77 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
78 }
79 this.trace('Received server headers:\n' + headersString);
80 switch (headers[':status']) {
81 // TODO(murgatroid99): handle 100 and 101
82 case 400:
83 this.mappedStatusCode = constants_1.Status.INTERNAL;
84 break;
85 case 401:
86 this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
87 break;
88 case 403:
89 this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
90 break;
91 case 404:
92 this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
93 break;
94 case 429:
95 case 502:
96 case 503:
97 case 504:
98 this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
99 break;
100 default:
101 this.mappedStatusCode = constants_1.Status.UNKNOWN;
102 }
103 if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
104 this.handleTrailers(headers);
105 }
106 else {
107 let metadata;
108 try {
109 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
110 }
111 catch (error) {
112 this.endCall({
113 code: constants_1.Status.UNKNOWN,
114 details: error.message,
115 metadata: new metadata_1.Metadata(),
116 });
117 return;
118 }
119 this.listener.onReceiveMetadata(metadata);
120 }
121 });
122 http2Stream.on('trailers', (headers) => {
123 this.handleTrailers(headers);
124 });
125 http2Stream.on('data', (data) => {
126 /* If the status has already been output, allow the http2 stream to
127 * drain without processing the data. */
128 if (this.statusOutput) {
129 return;
130 }
131 this.trace('receive HTTP/2 data frame of length ' + data.length);
132 const messages = this.decoder.write(data);
133 for (const message of messages) {
134 this.trace('parsed message of length ' + message.length);
135 this.callStatsTracker.addMessageReceived();
136 this.tryPush(message);
137 }
138 });
139 http2Stream.on('end', () => {
140 this.readsClosed = true;
141 this.maybeOutputStatus();
142 });
143 http2Stream.on('close', () => {
144 /* Use process.next tick to ensure that this code happens after any
145 * "error" event that may be emitted at about the same time, so that
146 * we can bubble up the error message from that event. */
147 process.nextTick(() => {
148 var _a;
149 this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode);
150 /* If we have a final status with an OK status code, that means that
151 * we have received all of the messages and we have processed the
152 * trailers and the call completed successfully, so it doesn't matter
153 * how the stream ends after that */
154 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
155 return;
156 }
157 let code;
158 let details = '';
159 switch (http2Stream.rstCode) {
160 case http2.constants.NGHTTP2_NO_ERROR:
161 /* If we get a NO_ERROR code and we already have a status, the
162 * stream completed properly and we just haven't fully processed
163 * it yet */
164 if (this.finalStatus !== null) {
165 return;
166 }
167 code = constants_1.Status.INTERNAL;
168 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
169 break;
170 case http2.constants.NGHTTP2_REFUSED_STREAM:
171 code = constants_1.Status.UNAVAILABLE;
172 details = 'Stream refused by server';
173 break;
174 case http2.constants.NGHTTP2_CANCEL:
175 code = constants_1.Status.CANCELLED;
176 details = 'Call cancelled';
177 break;
178 case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
179 code = constants_1.Status.RESOURCE_EXHAUSTED;
180 details = 'Bandwidth exhausted or memory limit exceeded';
181 break;
182 case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
183 code = constants_1.Status.PERMISSION_DENIED;
184 details = 'Protocol not secure enough';
185 break;
186 case http2.constants.NGHTTP2_INTERNAL_ERROR:
187 code = constants_1.Status.INTERNAL;
188 if (this.internalError === null) {
189 /* This error code was previously handled in the default case, and
190 * there are several instances of it online, so I wanted to
191 * preserve the original error message so that people find existing
192 * information in searches, but also include the more recognizable
193 * "Internal server error" message. */
194 details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`;
195 }
196 else {
197 if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
198 code = constants_1.Status.UNAVAILABLE;
199 details = this.internalError.message;
200 }
201 else {
202 /* The "Received RST_STREAM with code ..." error is preserved
203 * here for continuity with errors reported online, but the
204 * error message at the end will probably be more relevant in
205 * most cases. */
206 details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
207 }
208 }
209 break;
210 default:
211 code = constants_1.Status.INTERNAL;
212 details = `Received RST_STREAM with code ${http2Stream.rstCode}`;
213 }
214 // This is a no-op if trailers were received at all.
215 // This is OK, because status codes emitted here correspond to more
216 // catastrophic issues that prevent us from receiving trailers in the
217 // first place.
218 this.endCall({ code, details, metadata: new metadata_1.Metadata(), rstCode: http2Stream.rstCode });
219 });
220 });
221 http2Stream.on('error', (err) => {
222 /* We need an error handler here to stop "Uncaught Error" exceptions
223 * from bubbling up. However, errors here should all correspond to
224 * "close" events, where we will handle the error more granularly */
225 /* Specifically looking for stream errors that were *not* constructed
226 * from a RST_STREAM response here:
227 * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
228 */
229 if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
230 this.trace('Node error event: message=' +
231 err.message +
232 ' code=' +
233 err.code +
234 ' errno=' +
235 getSystemErrorName(err.errno) +
236 ' syscall=' +
237 err.syscall);
238 this.internalError = err;
239 }
240 this.callStatsTracker.onStreamEnd(false);
241 });
242 }
243 outputStatus() {
244 /* Precondition: this.finalStatus !== null */
245 if (!this.statusOutput) {
246 this.statusOutput = true;
247 this.trace('ended with status: code=' +
248 this.finalStatus.code +
249 ' details="' +
250 this.finalStatus.details +
251 '"');
252 this.callStatsTracker.onCallEnd(this.finalStatus);
253 /* We delay the actual action of bubbling up the status to insulate the
254 * cleanup code in this class from any errors that may be thrown in the
255 * upper layers as a result of bubbling up the status. In particular,
256 * if the status is not OK, the "error" event may be emitted
257 * synchronously at the top level, which will result in a thrown error if
258 * the user does not handle that event. */
259 process.nextTick(() => {
260 this.listener.onReceiveStatus(this.finalStatus);
261 });
262 /* Leave the http2 stream in flowing state to drain incoming messages, to
263 * ensure that the stream closure completes. The call stream already does
264 * not push more messages after the status is output, so the messages go
265 * nowhere either way. */
266 this.http2Stream.resume();
267 this.subchannel.callUnref();
268 this.subchannel.removeDisconnectListener(this.disconnectListener);
269 }
270 }
271 trace(text) {
272 logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callId + '] ' + text);
273 }
274 /**
275 * On first call, emits a 'status' event with the given StatusObject.
276 * Subsequent calls are no-ops.
277 * @param status The status of the call.
278 */
279 endCall(status) {
280 /* If the status is OK and a new status comes in (e.g. from a
281 * deserialization failure), that new status takes priority */
282 if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
283 this.finalStatus = status;
284 this.maybeOutputStatus();
285 }
286 this.destroyHttp2Stream();
287 }
288 maybeOutputStatus() {
289 if (this.finalStatus !== null) {
290 /* The combination check of readsClosed and that the two message buffer
291 * arrays are empty checks that there all incoming data has been fully
292 * processed */
293 if (this.finalStatus.code !== constants_1.Status.OK ||
294 (this.readsClosed &&
295 this.unpushedReadMessages.length === 0 &&
296 !this.isReadFilterPending)) {
297 this.outputStatus();
298 }
299 }
300 }
301 push(message) {
302 this.trace('pushing to reader message of length ' +
303 (message instanceof Buffer ? message.length : null));
304 this.canPush = false;
305 process.nextTick(() => {
306 /* If we have already output the status any later messages should be
307 * ignored, and can cause out-of-order operation errors higher up in the
308 * stack. Checking as late as possible here to avoid any race conditions.
309 */
310 if (this.statusOutput) {
311 return;
312 }
313 this.listener.onReceiveMessage(message);
314 this.maybeOutputStatus();
315 });
316 }
317 tryPush(messageBytes) {
318 if (this.canPush) {
319 this.http2Stream.pause();
320 this.push(messageBytes);
321 }
322 else {
323 this.trace('unpushedReadMessages.push message of length ' + messageBytes.length);
324 this.unpushedReadMessages.push(messageBytes);
325 }
326 }
327 handleTrailers(headers) {
328 this.callStatsTracker.onStreamEnd(true);
329 let headersString = '';
330 for (const header of Object.keys(headers)) {
331 headersString += '\t\t' + header + ': ' + headers[header] + '\n';
332 }
333 this.trace('Received server trailers:\n' + headersString);
334 let metadata;
335 try {
336 metadata = metadata_1.Metadata.fromHttp2Headers(headers);
337 }
338 catch (e) {
339 metadata = new metadata_1.Metadata();
340 }
341 const metadataMap = metadata.getMap();
342 let code = this.mappedStatusCode;
343 if (code === constants_1.Status.UNKNOWN &&
344 typeof metadataMap['grpc-status'] === 'string') {
345 const receivedStatus = Number(metadataMap['grpc-status']);
346 if (receivedStatus in constants_1.Status) {
347 code = receivedStatus;
348 this.trace('received status code ' + receivedStatus + ' from server');
349 }
350 metadata.remove('grpc-status');
351 }
352 let details = '';
353 if (typeof metadataMap['grpc-message'] === 'string') {
354 try {
355 details = decodeURI(metadataMap['grpc-message']);
356 }
357 catch (e) {
358 details = metadataMap['grpc-message'];
359 }
360 metadata.remove('grpc-message');
361 this.trace('received status details string "' + details + '" from server');
362 }
363 const status = { code, details, metadata };
364 // This is a no-op if the call was already ended when handling headers.
365 this.endCall(status);
366 }
367 destroyHttp2Stream() {
368 var _a;
369 // The http2 stream could already have been destroyed if cancelWithStatus
370 // is called in response to an internal http2 error.
371 if (!this.http2Stream.destroyed) {
372 /* If the call has ended with an OK status, communicate that when closing
373 * the stream, partly to avoid a situation in which we detect an error
374 * RST_STREAM as a result after we have the status */
375 let code;
376 if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
377 code = http2.constants.NGHTTP2_NO_ERROR;
378 }
379 else {
380 code = http2.constants.NGHTTP2_CANCEL;
381 }
382 this.trace('close http2 stream with code ' + code);
383 this.http2Stream.close(code);
384 }
385 }
386 cancelWithStatus(status, details) {
387 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
388 this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
389 }
390 getStatus() {
391 return this.finalStatus;
392 }
393 getPeer() {
394 return this.subchannel.getAddress();
395 }
396 getCallNumber() {
397 return this.callId;
398 }
399 startRead() {
400 /* If the stream has ended with an error, we should not emit any more
401 * messages and we should communicate that the stream has ended */
402 if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
403 this.readsClosed = true;
404 this.maybeOutputStatus();
405 return;
406 }
407 this.canPush = true;
408 if (this.unpushedReadMessages.length > 0) {
409 const nextMessage = this.unpushedReadMessages.shift();
410 this.push(nextMessage);
411 return;
412 }
413 /* Only resume reading from the http2Stream if we don't have any pending
414 * messages to emit */
415 this.http2Stream.resume();
416 }
417 sendMessageWithContext(context, message) {
418 this.trace('write() called with message of length ' + message.length);
419 const cb = (error) => {
420 var _a, _b;
421 let code = constants_1.Status.UNAVAILABLE;
422 if (((_a = error) === null || _a === void 0 ? void 0 : _a.code) === 'ERR_STREAM_WRITE_AFTER_END') {
423 code = constants_1.Status.INTERNAL;
424 }
425 if (error) {
426 this.cancelWithStatus(code, `Write error: ${error.message}`);
427 }
428 (_b = context.callback) === null || _b === void 0 ? void 0 : _b.call(context);
429 };
430 this.trace('sending data chunk of length ' + message.length);
431 this.callStatsTracker.addMessageSent();
432 try {
433 this.http2Stream.write(message, cb);
434 }
435 catch (error) {
436 this.endCall({
437 code: constants_1.Status.UNAVAILABLE,
438 details: `Write failed with error ${error.message}`,
439 metadata: new metadata_1.Metadata()
440 });
441 }
442 }
443 halfClose() {
444 this.trace('end() called');
445 this.trace('calling end() on HTTP/2 stream');
446 this.http2Stream.end();
447 }
448}
449exports.Http2SubchannelCall = Http2SubchannelCall;
450//# sourceMappingURL=subchannel-call.js.map
\No newline at end of file