UNPKG

24.3 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2ServerCallStream = exports.ServerDuplexStreamImpl = exports.ServerWritableStreamImpl = exports.ServerReadableStreamImpl = exports.ServerUnaryCallImpl = void 0;
20const events_1 = require("events");
21const http2 = require("http2");
22const stream_1 = require("stream");
23const zlib = require("zlib");
24const util_1 = require("util");
25const constants_1 = require("./constants");
26const metadata_1 = require("./metadata");
27const stream_decoder_1 = require("./stream-decoder");
28const logging = require("./logging");
29const error_1 = require("./error");
30const TRACER_NAME = 'server_call';
31const unzip = (0, util_1.promisify)(zlib.unzip);
32const inflate = (0, util_1.promisify)(zlib.inflate);
33function trace(text) {
34 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text);
35}
36const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
37const GRPC_ENCODING_HEADER = 'grpc-encoding';
38const GRPC_MESSAGE_HEADER = 'grpc-message';
39const GRPC_STATUS_HEADER = 'grpc-status';
40const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
41const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
42const deadlineUnitsToMs = {
43 H: 3600000,
44 M: 60000,
45 S: 1000,
46 m: 1,
47 u: 0.001,
48 n: 0.000001,
49};
50const defaultCompressionHeaders = {
51 // TODO(cjihrig): Remove these encoding headers from the default response
52 // once compression is integrated.
53 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
54 [GRPC_ENCODING_HEADER]: 'identity',
55};
56const defaultResponseHeaders = {
57 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
58 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
59};
60const defaultResponseOptions = {
61 waitForTrailers: true,
62};
63class ServerUnaryCallImpl extends events_1.EventEmitter {
64 constructor(call, metadata, request) {
65 super();
66 this.call = call;
67 this.metadata = metadata;
68 this.request = request;
69 this.cancelled = false;
70 this.call.setupSurfaceCall(this);
71 }
72 getPeer() {
73 return this.call.getPeer();
74 }
75 sendMetadata(responseMetadata) {
76 this.call.sendMetadata(responseMetadata);
77 }
78 getDeadline() {
79 return this.call.getDeadline();
80 }
81 getPath() {
82 return this.call.getPath();
83 }
84}
85exports.ServerUnaryCallImpl = ServerUnaryCallImpl;
86class ServerReadableStreamImpl extends stream_1.Readable {
87 constructor(call, metadata, deserialize, encoding) {
88 super({ objectMode: true });
89 this.call = call;
90 this.metadata = metadata;
91 this.deserialize = deserialize;
92 this.cancelled = false;
93 this.call.setupSurfaceCall(this);
94 this.call.setupReadable(this, encoding);
95 }
96 _read(size) {
97 if (!this.call.consumeUnpushedMessages(this)) {
98 return;
99 }
100 this.call.resume();
101 }
102 getPeer() {
103 return this.call.getPeer();
104 }
105 sendMetadata(responseMetadata) {
106 this.call.sendMetadata(responseMetadata);
107 }
108 getDeadline() {
109 return this.call.getDeadline();
110 }
111 getPath() {
112 return this.call.getPath();
113 }
114}
115exports.ServerReadableStreamImpl = ServerReadableStreamImpl;
116class ServerWritableStreamImpl extends stream_1.Writable {
117 constructor(call, metadata, serialize, request) {
118 super({ objectMode: true });
119 this.call = call;
120 this.metadata = metadata;
121 this.serialize = serialize;
122 this.request = request;
123 this.cancelled = false;
124 this.trailingMetadata = new metadata_1.Metadata();
125 this.call.setupSurfaceCall(this);
126 this.on('error', (err) => {
127 this.call.sendError(err);
128 this.end();
129 });
130 }
131 getPeer() {
132 return this.call.getPeer();
133 }
134 sendMetadata(responseMetadata) {
135 this.call.sendMetadata(responseMetadata);
136 }
137 getDeadline() {
138 return this.call.getDeadline();
139 }
140 getPath() {
141 return this.call.getPath();
142 }
143 _write(chunk, encoding,
144 // eslint-disable-next-line @typescript-eslint/no-explicit-any
145 callback) {
146 try {
147 const response = this.call.serializeMessage(chunk);
148 if (!this.call.write(response)) {
149 this.call.once('drain', callback);
150 return;
151 }
152 }
153 catch (err) {
154 this.emit('error', {
155 details: (0, error_1.getErrorMessage)(err),
156 code: constants_1.Status.INTERNAL
157 });
158 }
159 callback();
160 }
161 _final(callback) {
162 this.call.sendStatus({
163 code: constants_1.Status.OK,
164 details: 'OK',
165 metadata: this.trailingMetadata,
166 });
167 callback(null);
168 }
169 // eslint-disable-next-line @typescript-eslint/no-explicit-any
170 end(metadata) {
171 if (metadata) {
172 this.trailingMetadata = metadata;
173 }
174 return super.end();
175 }
176}
177exports.ServerWritableStreamImpl = ServerWritableStreamImpl;
178class ServerDuplexStreamImpl extends stream_1.Duplex {
179 constructor(call, metadata, serialize, deserialize, encoding) {
180 super({ objectMode: true });
181 this.call = call;
182 this.metadata = metadata;
183 this.serialize = serialize;
184 this.deserialize = deserialize;
185 this.cancelled = false;
186 this.trailingMetadata = new metadata_1.Metadata();
187 this.call.setupSurfaceCall(this);
188 this.call.setupReadable(this, encoding);
189 this.on('error', (err) => {
190 this.call.sendError(err);
191 this.end();
192 });
193 }
194 getPeer() {
195 return this.call.getPeer();
196 }
197 sendMetadata(responseMetadata) {
198 this.call.sendMetadata(responseMetadata);
199 }
200 getDeadline() {
201 return this.call.getDeadline();
202 }
203 getPath() {
204 return this.call.getPath();
205 }
206 // eslint-disable-next-line @typescript-eslint/no-explicit-any
207 end(metadata) {
208 if (metadata) {
209 this.trailingMetadata = metadata;
210 }
211 return super.end();
212 }
213}
214exports.ServerDuplexStreamImpl = ServerDuplexStreamImpl;
215ServerDuplexStreamImpl.prototype._read =
216 ServerReadableStreamImpl.prototype._read;
217ServerDuplexStreamImpl.prototype._write =
218 ServerWritableStreamImpl.prototype._write;
219ServerDuplexStreamImpl.prototype._final =
220 ServerWritableStreamImpl.prototype._final;
221// Internal class that wraps the HTTP2 request.
222class Http2ServerCallStream extends events_1.EventEmitter {
223 constructor(stream, handler, options) {
224 super();
225 this.stream = stream;
226 this.handler = handler;
227 this.options = options;
228 this.cancelled = false;
229 this.deadlineTimer = null;
230 this.statusSent = false;
231 this.deadline = Infinity;
232 this.wantTrailers = false;
233 this.metadataSent = false;
234 this.canPush = false;
235 this.isPushPending = false;
236 this.bufferedMessages = [];
237 this.messagesToPush = [];
238 this.maxSendMessageSize = constants_1.DEFAULT_MAX_SEND_MESSAGE_LENGTH;
239 this.maxReceiveMessageSize = constants_1.DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
240 this.stream.once('error', (err) => {
241 /* We need an error handler to avoid uncaught error event exceptions, but
242 * there is nothing we can reasonably do here. Any error event should
243 * have a corresponding close event, which handles emitting the cancelled
244 * event. And the stream is now in a bad state, so we can't reasonably
245 * expect to be able to send an error over it. */
246 });
247 this.stream.once('close', () => {
248 var _a;
249 trace('Request to method ' +
250 ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
251 ' stream closed with rstCode ' +
252 this.stream.rstCode);
253 if (!this.statusSent) {
254 this.cancelled = true;
255 this.emit('cancelled', 'cancelled');
256 this.emit('streamEnd', false);
257 this.sendStatus({
258 code: constants_1.Status.CANCELLED,
259 details: 'Cancelled by client',
260 metadata: null,
261 });
262 }
263 });
264 this.stream.on('drain', () => {
265 this.emit('drain');
266 });
267 if ('grpc.max_send_message_length' in options) {
268 this.maxSendMessageSize = options['grpc.max_send_message_length'];
269 }
270 if ('grpc.max_receive_message_length' in options) {
271 this.maxReceiveMessageSize = options['grpc.max_receive_message_length'];
272 }
273 }
274 checkCancelled() {
275 /* In some cases the stream can become destroyed before the close event
276 * fires. That creates a race condition that this check works around */
277 if (this.stream.destroyed || this.stream.closed) {
278 this.cancelled = true;
279 }
280 return this.cancelled;
281 }
282 getDecompressedMessage(message, encoding) {
283 if (encoding === 'deflate') {
284 return inflate(message.subarray(5));
285 }
286 else if (encoding === 'gzip') {
287 return unzip(message.subarray(5));
288 }
289 else if (encoding === 'identity') {
290 return message.subarray(5);
291 }
292 return Promise.reject({
293 code: constants_1.Status.UNIMPLEMENTED,
294 details: `Received message compressed with unsupported encoding "${encoding}"`,
295 });
296 }
297 sendMetadata(customMetadata) {
298 if (this.checkCancelled()) {
299 return;
300 }
301 if (this.metadataSent) {
302 return;
303 }
304 this.metadataSent = true;
305 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
306 // TODO(cjihrig): Include compression headers.
307 const headers = Object.assign(Object.assign(Object.assign({}, defaultResponseHeaders), defaultCompressionHeaders), custom);
308 this.stream.respond(headers, defaultResponseOptions);
309 }
310 receiveMetadata(headers) {
311 const metadata = metadata_1.Metadata.fromHttp2Headers(headers);
312 if (logging.isTracerEnabled(TRACER_NAME)) {
313 trace('Request to ' +
314 this.handler.path +
315 ' received headers ' +
316 JSON.stringify(metadata.toJSON()));
317 }
318 // TODO(cjihrig): Receive compression metadata.
319 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
320 if (timeoutHeader.length > 0) {
321 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
322 if (match === null) {
323 const err = new Error('Invalid deadline');
324 err.code = constants_1.Status.OUT_OF_RANGE;
325 this.sendError(err);
326 return metadata;
327 }
328 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
329 const now = new Date();
330 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
331 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
332 metadata.remove(GRPC_TIMEOUT_HEADER);
333 }
334 // Remove several headers that should not be propagated to the application
335 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
336 metadata.remove(http2.constants.HTTP2_HEADER_TE);
337 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
338 metadata.remove('grpc-accept-encoding');
339 return metadata;
340 }
341 receiveUnaryMessage(encoding, next) {
342 const { stream } = this;
343 let receivedLength = 0;
344 const call = this;
345 const body = [];
346 const limit = this.maxReceiveMessageSize;
347 stream.on('data', onData);
348 stream.on('end', onEnd);
349 stream.on('error', onEnd);
350 function onData(chunk) {
351 receivedLength += chunk.byteLength;
352 if (limit !== -1 && receivedLength > limit) {
353 stream.removeListener('data', onData);
354 stream.removeListener('end', onEnd);
355 stream.removeListener('error', onEnd);
356 next({
357 code: constants_1.Status.RESOURCE_EXHAUSTED,
358 details: `Received message larger than max (${receivedLength} vs. ${limit})`,
359 });
360 return;
361 }
362 body.push(chunk);
363 }
364 function onEnd(err) {
365 stream.removeListener('data', onData);
366 stream.removeListener('end', onEnd);
367 stream.removeListener('error', onEnd);
368 if (err !== undefined) {
369 next({ code: constants_1.Status.INTERNAL, details: err.message });
370 return;
371 }
372 if (receivedLength === 0) {
373 next({ code: constants_1.Status.INTERNAL, details: 'received empty unary message' });
374 return;
375 }
376 call.emit('receiveMessage');
377 const requestBytes = Buffer.concat(body, receivedLength);
378 const compressed = requestBytes.readUInt8(0) === 1;
379 const compressedMessageEncoding = compressed ? encoding : 'identity';
380 const decompressedMessage = call.getDecompressedMessage(requestBytes, compressedMessageEncoding);
381 if (Buffer.isBuffer(decompressedMessage)) {
382 call.safeDeserializeMessage(decompressedMessage, next);
383 return;
384 }
385 decompressedMessage.then((decompressed) => call.safeDeserializeMessage(decompressed, next), (err) => next(err.code
386 ? err
387 : {
388 code: constants_1.Status.INTERNAL,
389 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
390 }));
391 }
392 }
393 safeDeserializeMessage(buffer, next) {
394 try {
395 next(null, this.deserializeMessage(buffer));
396 }
397 catch (err) {
398 next({
399 details: (0, error_1.getErrorMessage)(err),
400 code: constants_1.Status.INTERNAL
401 });
402 }
403 }
404 serializeMessage(value) {
405 const messageBuffer = this.handler.serialize(value);
406 // TODO(cjihrig): Call compression aware serializeMessage().
407 const byteLength = messageBuffer.byteLength;
408 const output = Buffer.allocUnsafe(byteLength + 5);
409 output.writeUInt8(0, 0);
410 output.writeUInt32BE(byteLength, 1);
411 messageBuffer.copy(output, 5);
412 return output;
413 }
414 deserializeMessage(bytes) {
415 return this.handler.deserialize(bytes);
416 }
417 async sendUnaryMessage(err, value, metadata, flags) {
418 if (this.checkCancelled()) {
419 return;
420 }
421 if (metadata === undefined) {
422 metadata = null;
423 }
424 if (err) {
425 if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
426 err.metadata = metadata;
427 }
428 this.sendError(err);
429 return;
430 }
431 try {
432 const response = this.serializeMessage(value);
433 this.write(response);
434 this.sendStatus({ code: constants_1.Status.OK, details: 'OK', metadata });
435 }
436 catch (err) {
437 this.sendError({
438 details: (0, error_1.getErrorMessage)(err),
439 code: constants_1.Status.INTERNAL
440 });
441 }
442 }
443 sendStatus(statusObj) {
444 var _a, _b;
445 this.emit('callEnd', statusObj.code);
446 this.emit('streamEnd', statusObj.code === constants_1.Status.OK);
447 if (this.checkCancelled()) {
448 return;
449 }
450 trace('Request to method ' +
451 ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
452 ' ended with status code: ' +
453 constants_1.Status[statusObj.code] +
454 ' details: ' +
455 statusObj.details);
456 if (this.deadlineTimer)
457 clearTimeout(this.deadlineTimer);
458 if (this.stream.headersSent) {
459 if (!this.wantTrailers) {
460 this.wantTrailers = true;
461 this.stream.once('wantTrailers', () => {
462 var _a;
463 const trailersToSend = Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, (_a = statusObj.metadata) === null || _a === void 0 ? void 0 : _a.toHttp2Headers());
464 this.stream.sendTrailers(trailersToSend);
465 this.statusSent = true;
466 });
467 this.stream.end();
468 }
469 }
470 else {
471 // Trailers-only response
472 const trailersToSend = Object.assign(Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, defaultResponseHeaders), (_b = statusObj.metadata) === null || _b === void 0 ? void 0 : _b.toHttp2Headers());
473 this.stream.respond(trailersToSend, { endStream: true });
474 this.statusSent = true;
475 }
476 }
477 sendError(error) {
478 const status = {
479 code: constants_1.Status.UNKNOWN,
480 details: 'message' in error ? error.message : 'Unknown Error',
481 metadata: 'metadata' in error && error.metadata !== undefined
482 ? error.metadata
483 : null,
484 };
485 if ('code' in error &&
486 typeof error.code === 'number' &&
487 Number.isInteger(error.code)) {
488 status.code = error.code;
489 if ('details' in error && typeof error.details === 'string') {
490 status.details = error.details;
491 }
492 }
493 this.sendStatus(status);
494 }
495 write(chunk) {
496 if (this.checkCancelled()) {
497 return;
498 }
499 if (this.maxSendMessageSize !== -1 &&
500 chunk.length > this.maxSendMessageSize) {
501 this.sendError({
502 code: constants_1.Status.RESOURCE_EXHAUSTED,
503 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
504 });
505 return;
506 }
507 this.sendMetadata();
508 this.emit('sendMessage');
509 return this.stream.write(chunk);
510 }
511 resume() {
512 this.stream.resume();
513 }
514 setupSurfaceCall(call) {
515 this.once('cancelled', (reason) => {
516 call.cancelled = true;
517 call.emit('cancelled', reason);
518 });
519 this.once('callEnd', (status) => call.emit('callEnd', status));
520 }
521 setupReadable(readable, encoding) {
522 const decoder = new stream_decoder_1.StreamDecoder();
523 let readsDone = false;
524 let pendingMessageProcessing = false;
525 let pushedEnd = false;
526 const maybePushEnd = async () => {
527 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
528 pushedEnd = true;
529 await this.pushOrBufferMessage(readable, null);
530 }
531 };
532 this.stream.on('data', async (data) => {
533 const messages = decoder.write(data);
534 pendingMessageProcessing = true;
535 this.stream.pause();
536 for (const message of messages) {
537 if (this.maxReceiveMessageSize !== -1 &&
538 message.length > this.maxReceiveMessageSize) {
539 this.sendError({
540 code: constants_1.Status.RESOURCE_EXHAUSTED,
541 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
542 });
543 return;
544 }
545 this.emit('receiveMessage');
546 const compressed = message.readUInt8(0) === 1;
547 const compressedMessageEncoding = compressed ? encoding : 'identity';
548 const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
549 // Encountered an error with decompression; it'll already have been propogated back
550 // Just return early
551 if (!decompressedMessage)
552 return;
553 await this.pushOrBufferMessage(readable, decompressedMessage);
554 }
555 pendingMessageProcessing = false;
556 this.stream.resume();
557 await maybePushEnd();
558 });
559 this.stream.once('end', async () => {
560 readsDone = true;
561 await maybePushEnd();
562 });
563 }
564 consumeUnpushedMessages(readable) {
565 this.canPush = true;
566 while (this.messagesToPush.length > 0) {
567 const nextMessage = this.messagesToPush.shift();
568 const canPush = readable.push(nextMessage);
569 if (nextMessage === null || canPush === false) {
570 this.canPush = false;
571 break;
572 }
573 }
574 return this.canPush;
575 }
576 async pushOrBufferMessage(readable, messageBytes) {
577 if (this.isPushPending) {
578 this.bufferedMessages.push(messageBytes);
579 }
580 else {
581 await this.pushMessage(readable, messageBytes);
582 }
583 }
584 async pushMessage(readable, messageBytes) {
585 if (messageBytes === null) {
586 trace('Received end of stream');
587 if (this.canPush) {
588 readable.push(null);
589 }
590 else {
591 this.messagesToPush.push(null);
592 }
593 return;
594 }
595 trace('Received message of length ' + messageBytes.length);
596 this.isPushPending = true;
597 try {
598 const deserialized = await this.deserializeMessage(messageBytes);
599 if (this.canPush) {
600 if (!readable.push(deserialized)) {
601 this.canPush = false;
602 this.stream.pause();
603 }
604 }
605 else {
606 this.messagesToPush.push(deserialized);
607 }
608 }
609 catch (error) {
610 // Ignore any remaining messages when errors occur.
611 this.bufferedMessages.length = 0;
612 let code = (0, error_1.getErrorCode)(error);
613 if (code === null || code < constants_1.Status.OK || code > constants_1.Status.UNAUTHENTICATED) {
614 code = constants_1.Status.INTERNAL;
615 }
616 readable.emit('error', {
617 details: (0, error_1.getErrorMessage)(error),
618 code: code
619 });
620 }
621 this.isPushPending = false;
622 if (this.bufferedMessages.length > 0) {
623 await this.pushMessage(readable, this.bufferedMessages.shift());
624 }
625 }
626 getPeer() {
627 const socket = this.stream.session.socket;
628 if (socket.remoteAddress) {
629 if (socket.remotePort) {
630 return `${socket.remoteAddress}:${socket.remotePort}`;
631 }
632 else {
633 return socket.remoteAddress;
634 }
635 }
636 else {
637 return 'unknown';
638 }
639 }
640 getDeadline() {
641 return this.deadline;
642 }
643 getPath() {
644 return this.handler.path;
645 }
646}
647exports.Http2ServerCallStream = Http2ServerCallStream;
648function handleExpiredDeadline(call) {
649 const err = new Error('Deadline exceeded');
650 err.code = constants_1.Status.DEADLINE_EXCEEDED;
651 call.sendError(err);
652 call.cancelled = true;
653 call.emit('cancelled', 'deadline');
654}
655//# sourceMappingURL=server-call.js.map
\No newline at end of file