UNPKG

23.2 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.Http2ServerCallStream = exports.ServerDuplexStreamImpl = exports.ServerWritableStreamImpl = exports.ServerReadableStreamImpl = exports.ServerUnaryCallImpl = void 0;
20const events_1 = require("events");
21const http2 = require("http2");
22const stream_1 = require("stream");
23const zlib = require("zlib");
24const constants_1 = require("./constants");
25const metadata_1 = require("./metadata");
26const stream_decoder_1 = require("./stream-decoder");
27const logging = require("./logging");
28const TRACER_NAME = 'server_call';
29function trace(text) {
30 logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text);
31}
32const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
33const GRPC_ENCODING_HEADER = 'grpc-encoding';
34const GRPC_MESSAGE_HEADER = 'grpc-message';
35const GRPC_STATUS_HEADER = 'grpc-status';
36const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
37const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
38const deadlineUnitsToMs = {
39 H: 3600000,
40 M: 60000,
41 S: 1000,
42 m: 1,
43 u: 0.001,
44 n: 0.000001,
45};
46const defaultResponseHeaders = {
47 // TODO(cjihrig): Remove these encoding headers from the default response
48 // once compression is integrated.
49 [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
50 [GRPC_ENCODING_HEADER]: 'identity',
51 [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
52 [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
53};
54const defaultResponseOptions = {
55 waitForTrailers: true,
56};
57class ServerUnaryCallImpl extends events_1.EventEmitter {
58 constructor(call, metadata, request) {
59 super();
60 this.call = call;
61 this.metadata = metadata;
62 this.request = request;
63 this.cancelled = false;
64 this.call.setupSurfaceCall(this);
65 }
66 getPeer() {
67 return this.call.getPeer();
68 }
69 sendMetadata(responseMetadata) {
70 this.call.sendMetadata(responseMetadata);
71 }
72 getDeadline() {
73 return this.call.getDeadline();
74 }
75}
76exports.ServerUnaryCallImpl = ServerUnaryCallImpl;
77class ServerReadableStreamImpl extends stream_1.Readable {
78 constructor(call, metadata, deserialize, encoding) {
79 super({ objectMode: true });
80 this.call = call;
81 this.metadata = metadata;
82 this.deserialize = deserialize;
83 this.cancelled = false;
84 this.call.setupSurfaceCall(this);
85 this.call.setupReadable(this, encoding);
86 }
87 _read(size) {
88 if (!this.call.consumeUnpushedMessages(this)) {
89 return;
90 }
91 this.call.resume();
92 }
93 getPeer() {
94 return this.call.getPeer();
95 }
96 sendMetadata(responseMetadata) {
97 this.call.sendMetadata(responseMetadata);
98 }
99 getDeadline() {
100 return this.call.getDeadline();
101 }
102}
103exports.ServerReadableStreamImpl = ServerReadableStreamImpl;
104class ServerWritableStreamImpl extends stream_1.Writable {
105 constructor(call, metadata, serialize, request) {
106 super({ objectMode: true });
107 this.call = call;
108 this.metadata = metadata;
109 this.serialize = serialize;
110 this.request = request;
111 this.cancelled = false;
112 this.trailingMetadata = new metadata_1.Metadata();
113 this.call.setupSurfaceCall(this);
114 this.on('error', (err) => {
115 this.call.sendError(err);
116 this.end();
117 });
118 }
119 getPeer() {
120 return this.call.getPeer();
121 }
122 sendMetadata(responseMetadata) {
123 this.call.sendMetadata(responseMetadata);
124 }
125 getDeadline() {
126 return this.call.getDeadline();
127 }
128 _write(chunk, encoding,
129 // eslint-disable-next-line @typescript-eslint/no-explicit-any
130 callback) {
131 try {
132 const response = this.call.serializeMessage(chunk);
133 if (!this.call.write(response)) {
134 this.call.once('drain', callback);
135 return;
136 }
137 }
138 catch (err) {
139 err.code = constants_1.Status.INTERNAL;
140 this.emit('error', err);
141 }
142 callback();
143 }
144 _final(callback) {
145 this.call.sendStatus({
146 code: constants_1.Status.OK,
147 details: 'OK',
148 metadata: this.trailingMetadata,
149 });
150 callback(null);
151 }
152 // eslint-disable-next-line @typescript-eslint/no-explicit-any
153 end(metadata) {
154 if (metadata) {
155 this.trailingMetadata = metadata;
156 }
157 return super.end();
158 }
159}
160exports.ServerWritableStreamImpl = ServerWritableStreamImpl;
161class ServerDuplexStreamImpl extends stream_1.Duplex {
162 constructor(call, metadata, serialize, deserialize, encoding) {
163 super({ objectMode: true });
164 this.call = call;
165 this.metadata = metadata;
166 this.serialize = serialize;
167 this.deserialize = deserialize;
168 this.cancelled = false;
169 this.trailingMetadata = new metadata_1.Metadata();
170 this.call.setupSurfaceCall(this);
171 this.call.setupReadable(this, encoding);
172 this.on('error', (err) => {
173 this.call.sendError(err);
174 this.end();
175 });
176 }
177 getPeer() {
178 return this.call.getPeer();
179 }
180 sendMetadata(responseMetadata) {
181 this.call.sendMetadata(responseMetadata);
182 }
183 getDeadline() {
184 return this.call.getDeadline();
185 }
186 // eslint-disable-next-line @typescript-eslint/no-explicit-any
187 end(metadata) {
188 if (metadata) {
189 this.trailingMetadata = metadata;
190 }
191 return super.end();
192 }
193}
194exports.ServerDuplexStreamImpl = ServerDuplexStreamImpl;
195ServerDuplexStreamImpl.prototype._read =
196 ServerReadableStreamImpl.prototype._read;
197ServerDuplexStreamImpl.prototype._write =
198 ServerWritableStreamImpl.prototype._write;
199ServerDuplexStreamImpl.prototype._final =
200 ServerWritableStreamImpl.prototype._final;
201// Internal class that wraps the HTTP2 request.
202class Http2ServerCallStream extends events_1.EventEmitter {
203 constructor(stream, handler, options) {
204 super();
205 this.stream = stream;
206 this.handler = handler;
207 this.options = options;
208 this.cancelled = false;
209 this.deadlineTimer = setTimeout(() => { }, 0);
210 this.deadline = Infinity;
211 this.wantTrailers = false;
212 this.metadataSent = false;
213 this.canPush = false;
214 this.isPushPending = false;
215 this.bufferedMessages = [];
216 this.messagesToPush = [];
217 this.maxSendMessageSize = constants_1.DEFAULT_MAX_SEND_MESSAGE_LENGTH;
218 this.maxReceiveMessageSize = constants_1.DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
219 this.stream.once('error', (err) => {
220 /* We need an error handler to avoid uncaught error event exceptions, but
221 * there is nothing we can reasonably do here. Any error event should
222 * have a corresponding close event, which handles emitting the cancelled
223 * event. And the stream is now in a bad state, so we can't reasonably
224 * expect to be able to send an error over it. */
225 });
226 this.stream.once('close', () => {
227 var _a;
228 trace('Request to method ' + ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
229 ' stream closed with rstCode ' +
230 this.stream.rstCode);
231 this.cancelled = true;
232 this.emit('cancelled', 'cancelled');
233 this.emit('streamEnd', false);
234 this.sendStatus({ code: constants_1.Status.CANCELLED, details: 'Cancelled by client', metadata: new metadata_1.Metadata() });
235 });
236 this.stream.on('drain', () => {
237 this.emit('drain');
238 });
239 if ('grpc.max_send_message_length' in options) {
240 this.maxSendMessageSize = options['grpc.max_send_message_length'];
241 }
242 if ('grpc.max_receive_message_length' in options) {
243 this.maxReceiveMessageSize = options['grpc.max_receive_message_length'];
244 }
245 // Clear noop timer
246 clearTimeout(this.deadlineTimer);
247 }
248 checkCancelled() {
249 /* In some cases the stream can become destroyed before the close event
250 * fires. That creates a race condition that this check works around */
251 if (this.stream.destroyed || this.stream.closed) {
252 this.cancelled = true;
253 }
254 return this.cancelled;
255 }
256 getDecompressedMessage(message, encoding) {
257 switch (encoding) {
258 case 'deflate': {
259 return new Promise((resolve, reject) => {
260 zlib.inflate(message.slice(5), (err, output) => {
261 if (err) {
262 this.sendError({
263 code: constants_1.Status.INTERNAL,
264 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
265 });
266 resolve();
267 }
268 else {
269 resolve(output);
270 }
271 });
272 });
273 }
274 case 'gzip': {
275 return new Promise((resolve, reject) => {
276 zlib.unzip(message.slice(5), (err, output) => {
277 if (err) {
278 this.sendError({
279 code: constants_1.Status.INTERNAL,
280 details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
281 });
282 resolve();
283 }
284 else {
285 resolve(output);
286 }
287 });
288 });
289 }
290 case 'identity': {
291 return Promise.resolve(message.slice(5));
292 }
293 default: {
294 this.sendError({
295 code: constants_1.Status.UNIMPLEMENTED,
296 details: `Received message compressed with unsupported encoding "${encoding}"`,
297 });
298 return Promise.resolve();
299 }
300 }
301 }
302 sendMetadata(customMetadata) {
303 if (this.checkCancelled()) {
304 return;
305 }
306 if (this.metadataSent) {
307 return;
308 }
309 this.metadataSent = true;
310 const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
311 // TODO(cjihrig): Include compression headers.
312 const headers = Object.assign({}, defaultResponseHeaders, custom);
313 this.stream.respond(headers, defaultResponseOptions);
314 }
315 receiveMetadata(headers) {
316 const metadata = metadata_1.Metadata.fromHttp2Headers(headers);
317 // TODO(cjihrig): Receive compression metadata.
318 const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
319 if (timeoutHeader.length > 0) {
320 const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
321 if (match === null) {
322 const err = new Error('Invalid deadline');
323 err.code = constants_1.Status.OUT_OF_RANGE;
324 this.sendError(err);
325 return metadata;
326 }
327 const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
328 const now = new Date();
329 this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
330 this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
331 metadata.remove(GRPC_TIMEOUT_HEADER);
332 }
333 // Remove several headers that should not be propagated to the application
334 metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
335 metadata.remove(http2.constants.HTTP2_HEADER_TE);
336 metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
337 metadata.remove('grpc-accept-encoding');
338 return metadata;
339 }
340 receiveUnaryMessage(encoding) {
341 return new Promise((resolve, reject) => {
342 const stream = this.stream;
343 const chunks = [];
344 let totalLength = 0;
345 stream.on('data', (data) => {
346 chunks.push(data);
347 totalLength += data.byteLength;
348 });
349 stream.once('end', async () => {
350 try {
351 const requestBytes = Buffer.concat(chunks, totalLength);
352 if (this.maxReceiveMessageSize !== -1 &&
353 requestBytes.length > this.maxReceiveMessageSize) {
354 this.sendError({
355 code: constants_1.Status.RESOURCE_EXHAUSTED,
356 details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`,
357 });
358 resolve();
359 }
360 this.emit('receiveMessage');
361 const compressed = requestBytes.readUInt8(0) === 1;
362 const compressedMessageEncoding = compressed ? encoding : 'identity';
363 const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding);
364 // Encountered an error with decompression; it'll already have been propogated back
365 // Just return early
366 if (!decompressedMessage) {
367 resolve();
368 }
369 else {
370 resolve(this.deserializeMessage(decompressedMessage));
371 }
372 }
373 catch (err) {
374 err.code = constants_1.Status.INTERNAL;
375 this.sendError(err);
376 resolve();
377 }
378 });
379 });
380 }
381 serializeMessage(value) {
382 const messageBuffer = this.handler.serialize(value);
383 // TODO(cjihrig): Call compression aware serializeMessage().
384 const byteLength = messageBuffer.byteLength;
385 const output = Buffer.allocUnsafe(byteLength + 5);
386 output.writeUInt8(0, 0);
387 output.writeUInt32BE(byteLength, 1);
388 messageBuffer.copy(output, 5);
389 return output;
390 }
391 deserializeMessage(bytes) {
392 return this.handler.deserialize(bytes);
393 }
394 async sendUnaryMessage(err, value, metadata, flags) {
395 if (this.checkCancelled()) {
396 return;
397 }
398 if (!metadata) {
399 metadata = new metadata_1.Metadata();
400 }
401 if (err) {
402 if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) {
403 err.metadata = metadata;
404 }
405 this.sendError(err);
406 return;
407 }
408 try {
409 const response = this.serializeMessage(value);
410 this.write(response);
411 this.sendStatus({ code: constants_1.Status.OK, details: 'OK', metadata });
412 }
413 catch (err) {
414 err.code = constants_1.Status.INTERNAL;
415 this.sendError(err);
416 }
417 }
418 sendStatus(statusObj) {
419 var _a;
420 this.emit('callEnd', statusObj.code);
421 this.emit('streamEnd', statusObj.code === constants_1.Status.OK);
422 if (this.checkCancelled()) {
423 return;
424 }
425 trace('Request to method ' + ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
426 ' ended with status code: ' +
427 constants_1.Status[statusObj.code] +
428 ' details: ' +
429 statusObj.details);
430 clearTimeout(this.deadlineTimer);
431 if (!this.wantTrailers) {
432 this.wantTrailers = true;
433 this.stream.once('wantTrailers', () => {
434 const trailersToSend = Object.assign({
435 [GRPC_STATUS_HEADER]: statusObj.code,
436 [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
437 }, statusObj.metadata.toHttp2Headers());
438 this.stream.sendTrailers(trailersToSend);
439 });
440 this.sendMetadata();
441 this.stream.end();
442 }
443 }
444 sendError(error) {
445 const status = {
446 code: constants_1.Status.UNKNOWN,
447 details: 'message' in error ? error.message : 'Unknown Error',
448 metadata: 'metadata' in error && error.metadata !== undefined
449 ? error.metadata
450 : new metadata_1.Metadata(),
451 };
452 if ('code' in error &&
453 typeof error.code === 'number' &&
454 Number.isInteger(error.code)) {
455 status.code = error.code;
456 if ('details' in error && typeof error.details === 'string') {
457 status.details = error.details;
458 }
459 }
460 this.sendStatus(status);
461 }
462 write(chunk) {
463 if (this.checkCancelled()) {
464 return;
465 }
466 if (this.maxSendMessageSize !== -1 &&
467 chunk.length > this.maxSendMessageSize) {
468 this.sendError({
469 code: constants_1.Status.RESOURCE_EXHAUSTED,
470 details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
471 });
472 return;
473 }
474 this.sendMetadata();
475 this.emit('sendMessage');
476 return this.stream.write(chunk);
477 }
478 resume() {
479 this.stream.resume();
480 }
481 setupSurfaceCall(call) {
482 this.once('cancelled', (reason) => {
483 call.cancelled = true;
484 call.emit('cancelled', reason);
485 });
486 }
487 setupReadable(readable, encoding) {
488 const decoder = new stream_decoder_1.StreamDecoder();
489 let readsDone = false;
490 let pendingMessageProcessing = false;
491 let pushedEnd = false;
492 const maybePushEnd = () => {
493 if (!pushedEnd && readsDone && !pendingMessageProcessing) {
494 pushedEnd = true;
495 this.pushOrBufferMessage(readable, null);
496 }
497 };
498 this.stream.on('data', async (data) => {
499 const messages = decoder.write(data);
500 pendingMessageProcessing = true;
501 this.stream.pause();
502 for (const message of messages) {
503 if (this.maxReceiveMessageSize !== -1 &&
504 message.length > this.maxReceiveMessageSize) {
505 this.sendError({
506 code: constants_1.Status.RESOURCE_EXHAUSTED,
507 details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
508 });
509 return;
510 }
511 this.emit('receiveMessage');
512 const compressed = message.readUInt8(0) === 1;
513 const compressedMessageEncoding = compressed ? encoding : 'identity';
514 const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
515 // Encountered an error with decompression; it'll already have been propogated back
516 // Just return early
517 if (!decompressedMessage)
518 return;
519 this.pushOrBufferMessage(readable, decompressedMessage);
520 }
521 pendingMessageProcessing = false;
522 this.stream.resume();
523 maybePushEnd();
524 });
525 this.stream.once('end', () => {
526 readsDone = true;
527 maybePushEnd();
528 });
529 }
530 consumeUnpushedMessages(readable) {
531 this.canPush = true;
532 while (this.messagesToPush.length > 0) {
533 const nextMessage = this.messagesToPush.shift();
534 const canPush = readable.push(nextMessage);
535 if (nextMessage === null || canPush === false) {
536 this.canPush = false;
537 break;
538 }
539 }
540 return this.canPush;
541 }
542 pushOrBufferMessage(readable, messageBytes) {
543 if (this.isPushPending) {
544 this.bufferedMessages.push(messageBytes);
545 }
546 else {
547 this.pushMessage(readable, messageBytes);
548 }
549 }
550 async pushMessage(readable, messageBytes) {
551 if (messageBytes === null) {
552 trace('Received end of stream');
553 if (this.canPush) {
554 readable.push(null);
555 }
556 else {
557 this.messagesToPush.push(null);
558 }
559 return;
560 }
561 trace('Received message of length ' + messageBytes.length);
562 this.isPushPending = true;
563 try {
564 const deserialized = await this.deserializeMessage(messageBytes);
565 if (this.canPush) {
566 if (!readable.push(deserialized)) {
567 this.canPush = false;
568 this.stream.pause();
569 }
570 }
571 else {
572 this.messagesToPush.push(deserialized);
573 }
574 }
575 catch (error) {
576 // Ignore any remaining messages when errors occur.
577 this.bufferedMessages.length = 0;
578 if (!('code' in error &&
579 typeof error.code === 'number' &&
580 Number.isInteger(error.code) &&
581 error.code >= constants_1.Status.OK &&
582 error.code <= constants_1.Status.UNAUTHENTICATED)) {
583 // The error code is not a valid gRPC code so its being overwritten.
584 error.code = constants_1.Status.INTERNAL;
585 }
586 readable.emit('error', error);
587 }
588 this.isPushPending = false;
589 if (this.bufferedMessages.length > 0) {
590 this.pushMessage(readable, this.bufferedMessages.shift());
591 }
592 }
593 getPeer() {
594 const socket = this.stream.session.socket;
595 if (socket.remoteAddress) {
596 if (socket.remotePort) {
597 return `${socket.remoteAddress}:${socket.remotePort}`;
598 }
599 else {
600 return socket.remoteAddress;
601 }
602 }
603 else {
604 return 'unknown';
605 }
606 }
607 getDeadline() {
608 return this.deadline;
609 }
610}
611exports.Http2ServerCallStream = Http2ServerCallStream;
612function handleExpiredDeadline(call) {
613 const err = new Error('Deadline exceeded');
614 err.code = constants_1.Status.DEADLINE_EXCEEDED;
615 call.sendError(err);
616 call.cancelled = true;
617 call.emit('cancelled', 'deadline');
618}
619//# sourceMappingURL=server-call.js.map
\No newline at end of file