UNPKG

24.9 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2ServerCallStream = exports.ServerDuplexStreamImpl = exports.ServerWritableStreamImpl = exports.ServerReadableStreamImpl = exports.ServerUnaryCallImpl = void 0;
20const events_1 = require("events");
21const http2 = require("http2");
22const stream_1 = require("stream");
23const zlib = require("zlib");
24const util_1 = require("util");
25const constants_1 = require("./constants");
26const metadata_1 = require("./metadata");
27const stream_decoder_1 = require("./stream-decoder");
28const logging = require("./logging");
29const error_1 = require("./error");
30const TRACER_NAME = 'server_call';
31const unzip = (0, util_1.promisify)(zlib.unzip);
32const inflate = (0, util_1.promisify)(zlib.inflate);
33function trace(text) {
34 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text);
35}
36const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
37const GRPC_ENCODING_HEADER = 'grpc-encoding';
38const GRPC_MESSAGE_HEADER = 'grpc-message';
39const GRPC_STATUS_HEADER = 'grpc-status';
40const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
41const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
42const deadlineUnitsToMs = {
43 H: 3600000,
44 M: 60000,
45 S: 1000,
46 m: 1,
47 u: 0.001,
48 n: 0.000001,
49};
50const defaultCompressionHeaders = {
51 // TODO(cjihrig): Remove these encoding headers from the default response
52 // once compression is integrated.
53 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
54 [GRPC_ENCODING_HEADER]: 'identity',
55};
56const defaultResponseHeaders = {
57 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
58 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
59};
60const defaultResponseOptions = {
61 waitForTrailers: true,
62};
63class ServerUnaryCallImpl extends events_1.EventEmitter {
64 constructor(call, metadata, request) {
65 super();
66 this.call = call;
67 this.metadata = metadata;
68 this.request = request;
69 this.cancelled = false;
70 this.call.setupSurfaceCall(this);
71 }
72 getPeer() {
73 return this.call.getPeer();
74 }
75 sendMetadata(responseMetadata) {
76 this.call.sendMetadata(responseMetadata);
77 }
78 getDeadline() {
79 return this.call.getDeadline();
80 }
81 getPath() {
82 return this.call.getPath();
83 }
84}
85exports.ServerUnaryCallImpl = ServerUnaryCallImpl;
86class ServerReadableStreamImpl extends stream_1.Readable {
87 constructor(call, metadata, deserialize, encoding) {
88 super({ objectMode: true });
89 this.call = call;
90 this.metadata = metadata;
91 this.deserialize = deserialize;
92 this.cancelled = false;
93 this.call.setupSurfaceCall(this);
94 this.call.setupReadable(this, encoding);
95 }
96 _read(size) {
97 if (!this.call.consumeUnpushedMessages(this)) {
98 return;
99 }
100 this.call.resume();
101 }
102 getPeer() {
103 return this.call.getPeer();
104 }
105 sendMetadata(responseMetadata) {
106 this.call.sendMetadata(responseMetadata);
107 }
108 getDeadline() {
109 return this.call.getDeadline();
110 }
111 getPath() {
112 return this.call.getPath();
113 }
114}
115exports.ServerReadableStreamImpl = ServerReadableStreamImpl;
116class ServerWritableStreamImpl extends stream_1.Writable {
117 constructor(call, metadata, serialize, request) {
118 super({ objectMode: true });
119 this.call = call;
120 this.metadata = metadata;
121 this.serialize = serialize;
122 this.request = request;
123 this.cancelled = false;
124 this.trailingMetadata = new metadata_1.Metadata();
125 this.call.setupSurfaceCall(this);
126 this.on('error', err => {
127 this.call.sendError(err);
128 this.end();
129 });
130 }
131 getPeer() {
132 return this.call.getPeer();
133 }
134 sendMetadata(responseMetadata) {
135 this.call.sendMetadata(responseMetadata);
136 }
137 getDeadline() {
138 return this.call.getDeadline();
139 }
140 getPath() {
141 return this.call.getPath();
142 }
143 _write(chunk, encoding,
144 // eslint-disable-next-line @typescript-eslint/no-explicit-any
145 callback) {
146 try {
147 const response = this.call.serializeMessage(chunk);
148 if (!this.call.write(response)) {
149 this.call.once('drain', callback);
150 return;
151 }
152 }
153 catch (err) {
154 this.emit('error', {
155 details: (0, error_1.getErrorMessage)(err),
156 code: constants_1.Status.INTERNAL,
157 });
158 }
159 callback();
160 }
161 _final(callback) {
162 this.call.sendStatus({
163 code: constants_1.Status.OK,
164 details: 'OK',
165 metadata: this.trailingMetadata,
166 });
167 callback(null);
168 }
169 // eslint-disable-next-line @typescript-eslint/no-explicit-any
170 end(metadata) {
171 if (metadata) {
172 this.trailingMetadata = metadata;
173 }
174 return super.end();
175 }
176}
177exports.ServerWritableStreamImpl = ServerWritableStreamImpl;
178class ServerDuplexStreamImpl extends stream_1.Duplex {
179 constructor(call, metadata, serialize, deserialize, encoding) {
180 super({ objectMode: true });
181 this.call = call;
182 this.metadata = metadata;
183 this.serialize = serialize;
184 this.deserialize = deserialize;
185 this.cancelled = false;
186 this.trailingMetadata = new metadata_1.Metadata();
187 this.call.setupSurfaceCall(this);
188 this.call.setupReadable(this, encoding);
189 this.on('error', err => {
190 this.call.sendError(err);
191 this.end();
192 });
193 }
194 getPeer() {
195 return this.call.getPeer();
196 }
197 sendMetadata(responseMetadata) {
198 this.call.sendMetadata(responseMetadata);
199 }
200 getDeadline() {
201 return this.call.getDeadline();
202 }
203 getPath() {
204 return this.call.getPath();
205 }
206 // eslint-disable-next-line @typescript-eslint/no-explicit-any
207 end(metadata) {
208 if (metadata) {
209 this.trailingMetadata = metadata;
210 }
211 return super.end();
212 }
213}
214exports.ServerDuplexStreamImpl = ServerDuplexStreamImpl;
215ServerDuplexStreamImpl.prototype._read =
216 ServerReadableStreamImpl.prototype._read;
217ServerDuplexStreamImpl.prototype._write =
218 ServerWritableStreamImpl.prototype._write;
219ServerDuplexStreamImpl.prototype._final =
220 ServerWritableStreamImpl.prototype._final;
221// Internal class that wraps the HTTP2 request.
222class Http2ServerCallStream extends events_1.EventEmitter {
223 constructor(stream, handler, options) {
224 super();
225 this.stream = stream;
226 this.handler = handler;
227 this.cancelled = false;
228 this.deadlineTimer = null;
229 this.statusSent = false;
230 this.deadline = Infinity;
231 this.wantTrailers = false;
232 this.metadataSent = false;
233 this.canPush = false;
234 this.isPushPending = false;
235 this.bufferedMessages = [];
236 this.messagesToPush = [];
237 this.maxSendMessageSize = constants_1.DEFAULT_MAX_SEND_MESSAGE_LENGTH;
238 this.maxReceiveMessageSize = constants_1.DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
239 this.stream.once('error', (err) => {
240 /* We need an error handler to avoid uncaught error event exceptions, but
241 * there is nothing we can reasonably do here. Any error event should
242 * have a corresponding close event, which handles emitting the cancelled
243 * event. And the stream is now in a bad state, so we can't reasonably
244 * expect to be able to send an error over it. */
245 });
246 this.stream.once('close', () => {
247 var _a;
248 trace('Request to method ' +
249 ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
250 ' stream closed with rstCode ' +
251 this.stream.rstCode);
252 if (!this.statusSent) {
253 this.cancelled = true;
254 this.emit('cancelled', 'cancelled');
255 this.emit('streamEnd', false);
256 this.sendStatus({
257 code: constants_1.Status.CANCELLED,
258 details: 'Cancelled by client',
259 metadata: null,
260 });
261 if (this.deadlineTimer)
262 clearTimeout(this.deadlineTimer);
263 }
264 });
265 this.stream.on('drain', () => {
266 this.emit('drain');
267 });
268 if ('grpc.max_send_message_length' in options) {
269 this.maxSendMessageSize = options['grpc.max_send_message_length'];
270 }
271 if ('grpc.max_receive_message_length' in options) {
272 this.maxReceiveMessageSize = options['grpc.max_receive_message_length'];
273 }
274 }
275 checkCancelled() {
276 /* In some cases the stream can become destroyed before the close event
277 * fires. That creates a race condition that this check works around */
278 if (this.stream.destroyed || this.stream.closed) {
279 this.cancelled = true;
280 }
281 return this.cancelled;
282 }
283 getDecompressedMessage(message, encoding) {
284 if (encoding === 'deflate') {
285 return inflate(message.subarray(5));
286 }
287 else if (encoding === 'gzip') {
288 return unzip(message.subarray(5));
289 }
290 else if (encoding === 'identity') {
291 return message.subarray(5);
292 }
293 return Promise.reject({
294 code: constants_1.Status.UNIMPLEMENTED,
295 details: `Received message compressed with unsupported encoding "${encoding}"`,
296 });
297 }
298 sendMetadata(customMetadata) {
299 if (this.checkCancelled()) {
300 return;
301 }
302 if (this.metadataSent) {
303 return;
304 }
305 this.metadataSent = true;
306 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
307 // TODO(cjihrig): Include compression headers.
308 const headers = Object.assign(Object.assign(Object.assign({}, defaultResponseHeaders), defaultCompressionHeaders), custom);
309 this.stream.respond(headers, defaultResponseOptions);
310 }
311 receiveMetadata(headers) {
312 const metadata = metadata_1.Metadata.fromHttp2Headers(headers);
313 if (logging.isTracerEnabled(TRACER_NAME)) {
314 trace('Request to ' +
315 this.handler.path +
316 ' received headers ' +
317 JSON.stringify(metadata.toJSON()));
318 }
319 // TODO(cjihrig): Receive compression metadata.
320 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
321 if (timeoutHeader.length > 0) {
322 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
323 if (match === null) {
324 const err = new Error('Invalid deadline');
325 err.code = constants_1.Status.OUT_OF_RANGE;
326 this.sendError(err);
327 return metadata;
328 }
329 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
330 const now = new Date();
331 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
332 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
333 metadata.remove(GRPC_TIMEOUT_HEADER);
334 }
335 // Remove several headers that should not be propagated to the application
336 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
337 metadata.remove(http2.constants.HTTP2_HEADER_TE);
338 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
339 metadata.remove('grpc-accept-encoding');
340 return metadata;
341 }
342 receiveUnaryMessage(encoding) {
343 return new Promise((resolve, reject) => {
344 const { stream } = this;
345 let receivedLength = 0;
346 // eslint-disable-next-line @typescript-eslint/no-this-alias
347 const call = this;
348 const body = [];
349 const limit = this.maxReceiveMessageSize;
350 this.stream.on('data', onData);
351 this.stream.on('end', onEnd);
352 this.stream.on('error', onEnd);
353 function onData(chunk) {
354 receivedLength += chunk.byteLength;
355 if (limit !== -1 && receivedLength > limit) {
356 stream.removeListener('data', onData);
357 stream.removeListener('end', onEnd);
358 stream.removeListener('error', onEnd);
359 reject({
360 code: constants_1.Status.RESOURCE_EXHAUSTED,
361 details: `Received message larger than max (${receivedLength} vs. ${limit})`,
362 });
363 return;
364 }
365 body.push(chunk);
366 }
367 function onEnd(err) {
368 stream.removeListener('data', onData);
369 stream.removeListener('end', onEnd);
370 stream.removeListener('error', onEnd);
371 if (err !== undefined) {
372 reject({ code: constants_1.Status.INTERNAL, details: err.message });
373 return;
374 }
375 if (receivedLength === 0) {
376 reject({
377 code: constants_1.Status.INTERNAL,
378 details: 'received empty unary message',
379 });
380 return;
381 }
382 call.emit('receiveMessage');
383 const requestBytes = Buffer.concat(body, receivedLength);
384 const compressed = requestBytes.readUInt8(0) === 1;
385 const compressedMessageEncoding = compressed ? encoding : 'identity';
386 const decompressedMessage = call.getDecompressedMessage(requestBytes, compressedMessageEncoding);
387 if (Buffer.isBuffer(decompressedMessage)) {
388 resolve(call.deserializeMessageWithInternalError(decompressedMessage));
389 return;
390 }
391 decompressedMessage.then(decompressed => resolve(call.deserializeMessageWithInternalError(decompressed)), (err) => reject(err.code
392 ? err
393 : {
394 code: constants_1.Status.INTERNAL,
395 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
396 }));
397 }
398 });
399 }
400 async deserializeMessageWithInternalError(buffer) {
401 try {
402 return this.deserializeMessage(buffer);
403 }
404 catch (err) {
405 throw {
406 details: (0, error_1.getErrorMessage)(err),
407 code: constants_1.Status.INTERNAL,
408 };
409 }
410 }
411 serializeMessage(value) {
412 const messageBuffer = this.handler.serialize(value);
413 // TODO(cjihrig): Call compression aware serializeMessage().
414 const byteLength = messageBuffer.byteLength;
415 const output = Buffer.allocUnsafe(byteLength + 5);
416 output.writeUInt8(0, 0);
417 output.writeUInt32BE(byteLength, 1);
418 messageBuffer.copy(output, 5);
419 return output;
420 }
421 deserializeMessage(bytes) {
422 return this.handler.deserialize(bytes);
423 }
424 async sendUnaryMessage(err, value, metadata, flags) {
425 if (this.checkCancelled()) {
426 return;
427 }
428 if (metadata === undefined) {
429 metadata = null;
430 }
431 if (err) {
432 if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
433 err.metadata = metadata;
434 }
435 this.sendError(err);
436 return;
437 }
438 try {
439 const response = this.serializeMessage(value);
440 this.write(response);
441 this.sendStatus({ code: constants_1.Status.OK, details: 'OK', metadata });
442 }
443 catch (err) {
444 this.sendError({
445 details: (0, error_1.getErrorMessage)(err),
446 code: constants_1.Status.INTERNAL,
447 });
448 }
449 }
450 sendStatus(statusObj) {
451 var _a, _b;
452 this.emit('callEnd', statusObj.code);
453 this.emit('streamEnd', statusObj.code === constants_1.Status.OK);
454 if (this.checkCancelled()) {
455 return;
456 }
457 trace('Request to method ' +
458 ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
459 ' ended with status code: ' +
460 constants_1.Status[statusObj.code] +
461 ' details: ' +
462 statusObj.details);
463 if (this.deadlineTimer)
464 clearTimeout(this.deadlineTimer);
465 if (this.stream.headersSent) {
466 if (!this.wantTrailers) {
467 this.wantTrailers = true;
468 this.stream.once('wantTrailers', () => {
469 var _a;
470 const trailersToSend = Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, (_a = statusObj.metadata) === null || _a === void 0 ? void 0 : _a.toHttp2Headers());
471 this.stream.sendTrailers(trailersToSend);
472 this.statusSent = true;
473 });
474 this.stream.end();
475 }
476 }
477 else {
478 // Trailers-only response
479 const trailersToSend = Object.assign(Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, defaultResponseHeaders), (_b = statusObj.metadata) === null || _b === void 0 ? void 0 : _b.toHttp2Headers());
480 this.stream.respond(trailersToSend, { endStream: true });
481 this.statusSent = true;
482 }
483 }
484 sendError(error) {
485 const status = {
486 code: constants_1.Status.UNKNOWN,
487 details: 'message' in error ? error.message : 'Unknown Error',
488 metadata: 'metadata' in error && error.metadata !== undefined
489 ? error.metadata
490 : null,
491 };
492 if ('code' in error &&
493 typeof error.code === 'number' &&
494 Number.isInteger(error.code)) {
495 status.code = error.code;
496 if ('details' in error && typeof error.details === 'string') {
497 status.details = error.details;
498 }
499 }
500 this.sendStatus(status);
501 }
502 write(chunk) {
503 if (this.checkCancelled()) {
504 return;
505 }
506 if (this.maxSendMessageSize !== -1 &&
507 chunk.length > this.maxSendMessageSize) {
508 this.sendError({
509 code: constants_1.Status.RESOURCE_EXHAUSTED,
510 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
511 });
512 return;
513 }
514 this.sendMetadata();
515 this.emit('sendMessage');
516 return this.stream.write(chunk);
517 }
518 resume() {
519 this.stream.resume();
520 }
521 setupSurfaceCall(call) {
522 this.once('cancelled', reason => {
523 call.cancelled = true;
524 call.emit('cancelled', reason);
525 });
526 this.once('callEnd', status => call.emit('callEnd', status));
527 }
528 setupReadable(readable, encoding) {
529 const decoder = new stream_decoder_1.StreamDecoder();
530 let readsDone = false;
531 let pendingMessageProcessing = false;
532 let pushedEnd = false;
533 const maybePushEnd = async () => {
534 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
535 pushedEnd = true;
536 await this.pushOrBufferMessage(readable, null);
537 }
538 };
539 this.stream.on('data', async (data) => {
540 const messages = decoder.write(data);
541 pendingMessageProcessing = true;
542 this.stream.pause();
543 for (const message of messages) {
544 if (this.maxReceiveMessageSize !== -1 &&
545 message.length > this.maxReceiveMessageSize) {
546 this.sendError({
547 code: constants_1.Status.RESOURCE_EXHAUSTED,
548 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
549 });
550 return;
551 }
552 this.emit('receiveMessage');
553 const compressed = message.readUInt8(0) === 1;
554 const compressedMessageEncoding = compressed ? encoding : 'identity';
555 const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
556 // Encountered an error with decompression; it'll already have been propogated back
557 // Just return early
558 if (!decompressedMessage)
559 return;
560 await this.pushOrBufferMessage(readable, decompressedMessage);
561 }
562 pendingMessageProcessing = false;
563 this.stream.resume();
564 await maybePushEnd();
565 });
566 this.stream.once('end', async () => {
567 readsDone = true;
568 await maybePushEnd();
569 });
570 }
571 consumeUnpushedMessages(readable) {
572 this.canPush = true;
573 while (this.messagesToPush.length > 0) {
574 const nextMessage = this.messagesToPush.shift();
575 const canPush = readable.push(nextMessage);
576 if (nextMessage === null || canPush === false) {
577 this.canPush = false;
578 break;
579 }
580 }
581 return this.canPush;
582 }
583 async pushOrBufferMessage(readable, messageBytes) {
584 if (this.isPushPending) {
585 this.bufferedMessages.push(messageBytes);
586 }
587 else {
588 await this.pushMessage(readable, messageBytes);
589 }
590 }
591 async pushMessage(readable, messageBytes) {
592 if (messageBytes === null) {
593 trace('Received end of stream');
594 if (this.canPush) {
595 readable.push(null);
596 }
597 else {
598 this.messagesToPush.push(null);
599 }
600 return;
601 }
602 trace('Received message of length ' + messageBytes.length);
603 this.isPushPending = true;
604 try {
605 const deserialized = await this.deserializeMessage(messageBytes);
606 if (this.canPush) {
607 if (!readable.push(deserialized)) {
608 this.canPush = false;
609 this.stream.pause();
610 }
611 }
612 else {
613 this.messagesToPush.push(deserialized);
614 }
615 }
616 catch (error) {
617 // Ignore any remaining messages when errors occur.
618 this.bufferedMessages.length = 0;
619 let code = (0, error_1.getErrorCode)(error);
620 if (code === null || code < constants_1.Status.OK || code > constants_1.Status.UNAUTHENTICATED) {
621 code = constants_1.Status.INTERNAL;
622 }
623 readable.emit('error', {
624 details: (0, error_1.getErrorMessage)(error),
625 code: code,
626 });
627 }
628 this.isPushPending = false;
629 if (this.bufferedMessages.length > 0) {
630 await this.pushMessage(readable, this.bufferedMessages.shift());
631 }
632 }
633 getPeer() {
634 var _a;
635 const socket = (_a = this.stream.session) === null || _a === void 0 ? void 0 : _a.socket;
636 if (socket === null || socket === void 0 ? void 0 : socket.remoteAddress) {
637 if (socket.remotePort) {
638 return `${socket.remoteAddress}:${socket.remotePort}`;
639 }
640 else {
641 return socket.remoteAddress;
642 }
643 }
644 else {
645 return 'unknown';
646 }
647 }
648 getDeadline() {
649 return this.deadline;
650 }
651 getPath() {
652 return this.handler.path;
653 }
654}
655exports.Http2ServerCallStream = Http2ServerCallStream;
656function handleExpiredDeadline(call) {
657 const err = new Error('Deadline exceeded');
658 err.code = constants_1.Status.DEADLINE_EXCEEDED;
659 call.sendError(err);
660 call.cancelled = true;
661 call.emit('cancelled', 'deadline');
662}
663//# sourceMappingURL=server-call.js.map
\No newline at end of file