/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; import { Status } from './constants'; import type { Deserialize, Serialize } from './make-client'; import { Metadata } from './metadata'; import type { ObjectReadable, ObjectWritable } from './object-stream'; import type { StatusObject, PartialStatusObject } from './call-interface'; import type { Deadline } from './deadline'; import type { ServerInterceptingCallInterface } from './server-interceptors'; export type ServerStatusResponse = Partial; export type ServerErrorResponse = ServerStatusResponse & Error; export type ServerSurfaceCall = { cancelled: boolean; readonly metadata: Metadata; getPeer(): string; sendMetadata(responseMetadata: Metadata): void; getDeadline(): Deadline; getPath(): string; } & EventEmitter; export type ServerUnaryCall = ServerSurfaceCall & { request: RequestType; }; export type ServerReadableStream = ServerSurfaceCall & ObjectReadable; export type ServerWritableStream = ServerSurfaceCall & ObjectWritable & { request: RequestType; end: (metadata?: Metadata) => void; }; export type ServerDuplexStream = ServerSurfaceCall & ObjectReadable & ObjectWritable & { end: (metadata?: Metadata) => void }; export function serverErrorToStatus( error: ServerErrorResponse | ServerStatusResponse, overrideTrailers?: Metadata | undefined ): PartialStatusObject { const status: PartialStatusObject = { code: Status.UNKNOWN, details: 'message' in error ? error.message : 'Unknown Error', metadata: overrideTrailers ?? error.metadata ?? null, }; if ( 'code' in error && typeof error.code === 'number' && Number.isInteger(error.code) ) { status.code = error.code; if ('details' in error && typeof error.details === 'string') { status.details = error.details!; } } return status; } export class ServerUnaryCallImpl extends EventEmitter implements ServerUnaryCall { cancelled: boolean; constructor( private path: string, private call: ServerInterceptingCallInterface, public metadata: Metadata, public request: RequestType ) { super(); this.cancelled = false; } getPeer(): string { return this.call.getPeer(); } sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } getDeadline(): Deadline { return this.call.getDeadline(); } getPath(): string { return this.path; } } export class ServerReadableStreamImpl extends Readable implements ServerReadableStream { cancelled: boolean; constructor( private path: string, private call: ServerInterceptingCallInterface, public metadata: Metadata ) { super({ objectMode: true }); this.cancelled = false; } _read(size: number) { this.call.startRead(); } getPeer(): string { return this.call.getPeer(); } sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } getDeadline(): Deadline { return this.call.getDeadline(); } getPath(): string { return this.path; } } export class ServerWritableStreamImpl extends Writable implements ServerWritableStream { cancelled: boolean; private trailingMetadata: Metadata; private pendingStatus: PartialStatusObject = { code: Status.OK, details: 'OK', }; constructor( private path: string, private call: ServerInterceptingCallInterface, public metadata: Metadata, public request: RequestType ) { super({ objectMode: true }); this.cancelled = false; this.trailingMetadata = new Metadata(); this.on('error', err => { this.pendingStatus = serverErrorToStatus(err); this.end(); }); } getPeer(): string { return this.call.getPeer(); } sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } getDeadline(): Deadline { return this.call.getDeadline(); } getPath(): string { return this.path; } _write( chunk: ResponseType, encoding: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any callback: (...args: any[]) => void ) { this.call.sendMessage(chunk, callback); } _final(callback: Function): void { callback(null); this.call.sendStatus({ ...this.pendingStatus, metadata: this.pendingStatus.metadata ?? this.trailingMetadata, }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any end(metadata?: any) { if (metadata) { this.trailingMetadata = metadata; } return super.end(); } } export class ServerDuplexStreamImpl extends Duplex implements ServerDuplexStream { cancelled: boolean; private trailingMetadata: Metadata; private pendingStatus: PartialStatusObject = { code: Status.OK, details: 'OK', }; constructor( private path: string, private call: ServerInterceptingCallInterface, public metadata: Metadata ) { super({ objectMode: true }); this.cancelled = false; this.trailingMetadata = new Metadata(); this.on('error', err => { this.pendingStatus = serverErrorToStatus(err); this.end(); }); } getPeer(): string { return this.call.getPeer(); } sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } getDeadline(): Deadline { return this.call.getDeadline(); } getPath(): string { return this.path; } _read(size: number) { this.call.startRead(); } _write( chunk: ResponseType, encoding: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any callback: (...args: any[]) => void ) { this.call.sendMessage(chunk, callback); } _final(callback: Function): void { callback(null); this.call.sendStatus({ ...this.pendingStatus, metadata: this.pendingStatus.metadata ?? this.trailingMetadata, }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any end(metadata?: any) { if (metadata) { this.trailingMetadata = metadata; } return super.end(); } } // Unary response callback signature. export type sendUnaryData = ( error: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, trailer?: Metadata, flags?: number ) => void; // User provided handler for unary calls. export type handleUnaryCall = ( call: ServerUnaryCall, callback: sendUnaryData ) => void; // User provided handler for client streaming calls. export type handleClientStreamingCall = ( call: ServerReadableStream, callback: sendUnaryData ) => void; // User provided handler for server streaming calls. export type handleServerStreamingCall = ( call: ServerWritableStream ) => void; // User provided handler for bidirectional streaming calls. export type handleBidiStreamingCall = ( call: ServerDuplexStream ) => void; export type HandleCall = | handleUnaryCall | handleClientStreamingCall | handleServerStreamingCall | handleBidiStreamingCall; export interface UnaryHandler { func: handleUnaryCall; serialize: Serialize; deserialize: Deserialize; type: 'unary'; path: string; } export interface ClientStreamingHandler { func: handleClientStreamingCall; serialize: Serialize; deserialize: Deserialize; type: 'clientStream'; path: string; } export interface ServerStreamingHandler { func: handleServerStreamingCall; serialize: Serialize; deserialize: Deserialize; type: 'serverStream'; path: string; } export interface BidiStreamingHandler { func: handleBidiStreamingCall; serialize: Serialize; deserialize: Deserialize; type: 'bidi'; path: string; } export type Handler = | UnaryHandler | ClientStreamingHandler | ServerStreamingHandler | BidiStreamingHandler; export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';