UNPKG

9.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import { Duplex, Readable, Writable } from 'stream';
20
21import { Status } from './constants';
22import type { Deserialize, Serialize } from './make-client';
23import { Metadata } from './metadata';
24import type { ObjectReadable, ObjectWritable } from './object-stream';
25import type { StatusObject, PartialStatusObject } from './call-interface';
26import type { Deadline } from './deadline';
27import type { ServerInterceptingCallInterface } from './server-interceptors';
28
29export type ServerStatusResponse = Partial<StatusObject>;
30
31export type ServerErrorResponse = ServerStatusResponse & Error;
32
33export type ServerSurfaceCall = {
34 cancelled: boolean;
35 readonly metadata: Metadata;
36 getPeer(): string;
37 sendMetadata(responseMetadata: Metadata): void;
38 getDeadline(): Deadline;
39 getPath(): string;
40} & EventEmitter;
41
42export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
43 request: RequestType;
44};
45export type ServerReadableStream<RequestType, ResponseType> =
46 ServerSurfaceCall & ObjectReadable<RequestType>;
47export type ServerWritableStream<RequestType, ResponseType> =
48 ServerSurfaceCall &
49 ObjectWritable<ResponseType> & {
50 request: RequestType;
51 end: (metadata?: Metadata) => void;
52 };
53export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
54 ObjectReadable<RequestType> &
55 ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
56
57export function serverErrorToStatus(
58 error: ServerErrorResponse | ServerStatusResponse,
59 overrideTrailers?: Metadata | undefined
60): PartialStatusObject {
61 const status: PartialStatusObject = {
62 code: Status.UNKNOWN,
63 details: 'message' in error ? error.message : 'Unknown Error',
64 metadata: overrideTrailers ?? error.metadata ?? null,
65 };
66
67 if (
68 'code' in error &&
69 typeof error.code === 'number' &&
70 Number.isInteger(error.code)
71 ) {
72 status.code = error.code;
73
74 if ('details' in error && typeof error.details === 'string') {
75 status.details = error.details!;
76 }
77 }
78 return status;
79}
80
81export class ServerUnaryCallImpl<RequestType, ResponseType>
82 extends EventEmitter
83 implements ServerUnaryCall<RequestType, ResponseType>
84{
85 cancelled: boolean;
86
87 constructor(
88 private path: string,
89 private call: ServerInterceptingCallInterface,
90 public metadata: Metadata,
91 public request: RequestType
92 ) {
93 super();
94 this.cancelled = false;
95 }
96
97 getPeer(): string {
98 return this.call.getPeer();
99 }
100
101 sendMetadata(responseMetadata: Metadata): void {
102 this.call.sendMetadata(responseMetadata);
103 }
104
105 getDeadline(): Deadline {
106 return this.call.getDeadline();
107 }
108
109 getPath(): string {
110 return this.path;
111 }
112}
113
114export class ServerReadableStreamImpl<RequestType, ResponseType>
115 extends Readable
116 implements ServerReadableStream<RequestType, ResponseType>
117{
118 cancelled: boolean;
119
120 constructor(
121 private path: string,
122 private call: ServerInterceptingCallInterface,
123 public metadata: Metadata
124 ) {
125 super({ objectMode: true });
126 this.cancelled = false;
127 }
128
129 _read(size: number) {
130 this.call.startRead();
131 }
132
133 getPeer(): string {
134 return this.call.getPeer();
135 }
136
137 sendMetadata(responseMetadata: Metadata): void {
138 this.call.sendMetadata(responseMetadata);
139 }
140
141 getDeadline(): Deadline {
142 return this.call.getDeadline();
143 }
144
145 getPath(): string {
146 return this.path;
147 }
148}
149
150export class ServerWritableStreamImpl<RequestType, ResponseType>
151 extends Writable
152 implements ServerWritableStream<RequestType, ResponseType>
153{
154 cancelled: boolean;
155 private trailingMetadata: Metadata;
156 private pendingStatus: PartialStatusObject = {
157 code: Status.OK,
158 details: 'OK',
159 };
160
161 constructor(
162 private path: string,
163 private call: ServerInterceptingCallInterface,
164 public metadata: Metadata,
165 public request: RequestType
166 ) {
167 super({ objectMode: true });
168 this.cancelled = false;
169 this.trailingMetadata = new Metadata();
170
171 this.on('error', err => {
172 this.pendingStatus = serverErrorToStatus(err);
173 this.end();
174 });
175 }
176
177 getPeer(): string {
178 return this.call.getPeer();
179 }
180
181 sendMetadata(responseMetadata: Metadata): void {
182 this.call.sendMetadata(responseMetadata);
183 }
184
185 getDeadline(): Deadline {
186 return this.call.getDeadline();
187 }
188
189 getPath(): string {
190 return this.path;
191 }
192
193 _write(
194 chunk: ResponseType,
195 encoding: string,
196 // eslint-disable-next-line @typescript-eslint/no-explicit-any
197 callback: (...args: any[]) => void
198 ) {
199 this.call.sendMessage(chunk, callback);
200 }
201
202 _final(callback: Function): void {
203 callback(null);
204 this.call.sendStatus({
205 ...this.pendingStatus,
206 metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
207 });
208 }
209
210 // eslint-disable-next-line @typescript-eslint/no-explicit-any
211 end(metadata?: any) {
212 if (metadata) {
213 this.trailingMetadata = metadata;
214 }
215
216 return super.end();
217 }
218}
219
220export class ServerDuplexStreamImpl<RequestType, ResponseType>
221 extends Duplex
222 implements ServerDuplexStream<RequestType, ResponseType>
223{
224 cancelled: boolean;
225 private trailingMetadata: Metadata;
226 private pendingStatus: PartialStatusObject = {
227 code: Status.OK,
228 details: 'OK',
229 };
230
231 constructor(
232 private path: string,
233 private call: ServerInterceptingCallInterface,
234 public metadata: Metadata
235 ) {
236 super({ objectMode: true });
237 this.cancelled = false;
238 this.trailingMetadata = new Metadata();
239
240 this.on('error', err => {
241 this.pendingStatus = serverErrorToStatus(err);
242 this.end();
243 });
244 }
245
246 getPeer(): string {
247 return this.call.getPeer();
248 }
249
250 sendMetadata(responseMetadata: Metadata): void {
251 this.call.sendMetadata(responseMetadata);
252 }
253
254 getDeadline(): Deadline {
255 return this.call.getDeadline();
256 }
257
258 getPath(): string {
259 return this.path;
260 }
261
262 _read(size: number) {
263 this.call.startRead();
264 }
265
266 _write(
267 chunk: ResponseType,
268 encoding: string,
269 // eslint-disable-next-line @typescript-eslint/no-explicit-any
270 callback: (...args: any[]) => void
271 ) {
272 this.call.sendMessage(chunk, callback);
273 }
274
275 _final(callback: Function): void {
276 callback(null);
277 this.call.sendStatus({
278 ...this.pendingStatus,
279 metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
280 });
281 }
282
283 // eslint-disable-next-line @typescript-eslint/no-explicit-any
284 end(metadata?: any) {
285 if (metadata) {
286 this.trailingMetadata = metadata;
287 }
288
289 return super.end();
290 }
291}
292
293// Unary response callback signature.
294export type sendUnaryData<ResponseType> = (
295 error: ServerErrorResponse | ServerStatusResponse | null,
296 value?: ResponseType | null,
297 trailer?: Metadata,
298 flags?: number
299) => void;
300
301// User provided handler for unary calls.
302export type handleUnaryCall<RequestType, ResponseType> = (
303 call: ServerUnaryCall<RequestType, ResponseType>,
304 callback: sendUnaryData<ResponseType>
305) => void;
306
307// User provided handler for client streaming calls.
308export type handleClientStreamingCall<RequestType, ResponseType> = (
309 call: ServerReadableStream<RequestType, ResponseType>,
310 callback: sendUnaryData<ResponseType>
311) => void;
312
313// User provided handler for server streaming calls.
314export type handleServerStreamingCall<RequestType, ResponseType> = (
315 call: ServerWritableStream<RequestType, ResponseType>
316) => void;
317
318// User provided handler for bidirectional streaming calls.
319export type handleBidiStreamingCall<RequestType, ResponseType> = (
320 call: ServerDuplexStream<RequestType, ResponseType>
321) => void;
322
323export type HandleCall<RequestType, ResponseType> =
324 | handleUnaryCall<RequestType, ResponseType>
325 | handleClientStreamingCall<RequestType, ResponseType>
326 | handleServerStreamingCall<RequestType, ResponseType>
327 | handleBidiStreamingCall<RequestType, ResponseType>;
328
329export interface UnaryHandler<RequestType, ResponseType> {
330 func: handleUnaryCall<RequestType, ResponseType>;
331 serialize: Serialize<ResponseType>;
332 deserialize: Deserialize<RequestType>;
333 type: 'unary';
334 path: string;
335}
336
337export interface ClientStreamingHandler<RequestType, ResponseType> {
338 func: handleClientStreamingCall<RequestType, ResponseType>;
339 serialize: Serialize<ResponseType>;
340 deserialize: Deserialize<RequestType>;
341 type: 'clientStream';
342 path: string;
343}
344
345export interface ServerStreamingHandler<RequestType, ResponseType> {
346 func: handleServerStreamingCall<RequestType, ResponseType>;
347 serialize: Serialize<ResponseType>;
348 deserialize: Deserialize<RequestType>;
349 type: 'serverStream';
350 path: string;
351}
352
353export interface BidiStreamingHandler<RequestType, ResponseType> {
354 func: handleBidiStreamingCall<RequestType, ResponseType>;
355 serialize: Serialize<ResponseType>;
356 deserialize: Deserialize<RequestType>;
357 type: 'bidi';
358 path: string;
359}
360
361export type Handler<RequestType, ResponseType> =
362 | UnaryHandler<RequestType, ResponseType>
363 | ClientStreamingHandler<RequestType, ResponseType>
364 | ServerStreamingHandler<RequestType, ResponseType>
365 | BidiStreamingHandler<RequestType, ResponseType>;
366
367export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';