1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { EventEmitter } from 'events';
|
19 | import { Duplex, Readable, Writable } from 'stream';
|
20 |
|
21 | import { Status } from './constants';
|
22 | import type { Deserialize, Serialize } from './make-client';
|
23 | import { Metadata } from './metadata';
|
24 | import type { ObjectReadable, ObjectWritable } from './object-stream';
|
25 | import type { StatusObject, PartialStatusObject } from './call-interface';
|
26 | import type { Deadline } from './deadline';
|
27 | import type { ServerInterceptingCallInterface } from './server-interceptors';
|
28 |
|
29 | export type ServerStatusResponse = Partial<StatusObject>;
|
30 |
|
31 | export type ServerErrorResponse = ServerStatusResponse & Error;
|
32 |
|
33 | export type ServerSurfaceCall = {
|
34 | cancelled: boolean;
|
35 | readonly metadata: Metadata;
|
36 | getPeer(): string;
|
37 | sendMetadata(responseMetadata: Metadata): void;
|
38 | getDeadline(): Deadline;
|
39 | getPath(): string;
|
40 | } & EventEmitter;
|
41 |
|
42 | export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
|
43 | request: RequestType;
|
44 | };
|
45 | export type ServerReadableStream<RequestType, ResponseType> =
|
46 | ServerSurfaceCall & ObjectReadable<RequestType>;
|
47 | export type ServerWritableStream<RequestType, ResponseType> =
|
48 | ServerSurfaceCall &
|
49 | ObjectWritable<ResponseType> & {
|
50 | request: RequestType;
|
51 | end: (metadata?: Metadata) => void;
|
52 | };
|
53 | export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
|
54 | ObjectReadable<RequestType> &
|
55 | ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };
|
56 |
|
57 | export function serverErrorToStatus(
|
58 | error: ServerErrorResponse | ServerStatusResponse,
|
59 | overrideTrailers?: Metadata | undefined
|
60 | ): PartialStatusObject {
|
61 | const status: PartialStatusObject = {
|
62 | code: Status.UNKNOWN,
|
63 | details: 'message' in error ? error.message : 'Unknown Error',
|
64 | metadata: overrideTrailers ?? error.metadata ?? null,
|
65 | };
|
66 |
|
67 | if (
|
68 | 'code' in error &&
|
69 | typeof error.code === 'number' &&
|
70 | Number.isInteger(error.code)
|
71 | ) {
|
72 | status.code = error.code;
|
73 |
|
74 | if ('details' in error && typeof error.details === 'string') {
|
75 | status.details = error.details!;
|
76 | }
|
77 | }
|
78 | return status;
|
79 | }
|
80 |
|
81 | export class ServerUnaryCallImpl<RequestType, ResponseType>
|
82 | extends EventEmitter
|
83 | implements ServerUnaryCall<RequestType, ResponseType>
|
84 | {
|
85 | cancelled: boolean;
|
86 |
|
87 | constructor(
|
88 | private path: string,
|
89 | private call: ServerInterceptingCallInterface,
|
90 | public metadata: Metadata,
|
91 | public request: RequestType
|
92 | ) {
|
93 | super();
|
94 | this.cancelled = false;
|
95 | }
|
96 |
|
97 | getPeer(): string {
|
98 | return this.call.getPeer();
|
99 | }
|
100 |
|
101 | sendMetadata(responseMetadata: Metadata): void {
|
102 | this.call.sendMetadata(responseMetadata);
|
103 | }
|
104 |
|
105 | getDeadline(): Deadline {
|
106 | return this.call.getDeadline();
|
107 | }
|
108 |
|
109 | getPath(): string {
|
110 | return this.path;
|
111 | }
|
112 | }
|
113 |
|
114 | export class ServerReadableStreamImpl<RequestType, ResponseType>
|
115 | extends Readable
|
116 | implements ServerReadableStream<RequestType, ResponseType>
|
117 | {
|
118 | cancelled: boolean;
|
119 |
|
120 | constructor(
|
121 | private path: string,
|
122 | private call: ServerInterceptingCallInterface,
|
123 | public metadata: Metadata
|
124 | ) {
|
125 | super({ objectMode: true });
|
126 | this.cancelled = false;
|
127 | }
|
128 |
|
129 | _read(size: number) {
|
130 | this.call.startRead();
|
131 | }
|
132 |
|
133 | getPeer(): string {
|
134 | return this.call.getPeer();
|
135 | }
|
136 |
|
137 | sendMetadata(responseMetadata: Metadata): void {
|
138 | this.call.sendMetadata(responseMetadata);
|
139 | }
|
140 |
|
141 | getDeadline(): Deadline {
|
142 | return this.call.getDeadline();
|
143 | }
|
144 |
|
145 | getPath(): string {
|
146 | return this.path;
|
147 | }
|
148 | }
|
149 |
|
150 | export class ServerWritableStreamImpl<RequestType, ResponseType>
|
151 | extends Writable
|
152 | implements ServerWritableStream<RequestType, ResponseType>
|
153 | {
|
154 | cancelled: boolean;
|
155 | private trailingMetadata: Metadata;
|
156 | private pendingStatus: PartialStatusObject = {
|
157 | code: Status.OK,
|
158 | details: 'OK',
|
159 | };
|
160 |
|
161 | constructor(
|
162 | private path: string,
|
163 | private call: ServerInterceptingCallInterface,
|
164 | public metadata: Metadata,
|
165 | public request: RequestType
|
166 | ) {
|
167 | super({ objectMode: true });
|
168 | this.cancelled = false;
|
169 | this.trailingMetadata = new Metadata();
|
170 |
|
171 | this.on('error', err => {
|
172 | this.pendingStatus = serverErrorToStatus(err);
|
173 | this.end();
|
174 | });
|
175 | }
|
176 |
|
177 | getPeer(): string {
|
178 | return this.call.getPeer();
|
179 | }
|
180 |
|
181 | sendMetadata(responseMetadata: Metadata): void {
|
182 | this.call.sendMetadata(responseMetadata);
|
183 | }
|
184 |
|
185 | getDeadline(): Deadline {
|
186 | return this.call.getDeadline();
|
187 | }
|
188 |
|
189 | getPath(): string {
|
190 | return this.path;
|
191 | }
|
192 |
|
193 | _write(
|
194 | chunk: ResponseType,
|
195 | encoding: string,
|
196 |
|
197 | callback: (...args: any[]) => void
|
198 | ) {
|
199 | this.call.sendMessage(chunk, callback);
|
200 | }
|
201 |
|
202 | _final(callback: Function): void {
|
203 | callback(null);
|
204 | this.call.sendStatus({
|
205 | ...this.pendingStatus,
|
206 | metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
|
207 | });
|
208 | }
|
209 |
|
210 |
|
211 | end(metadata?: any) {
|
212 | if (metadata) {
|
213 | this.trailingMetadata = metadata;
|
214 | }
|
215 |
|
216 | return super.end();
|
217 | }
|
218 | }
|
219 |
|
220 | export class ServerDuplexStreamImpl<RequestType, ResponseType>
|
221 | extends Duplex
|
222 | implements ServerDuplexStream<RequestType, ResponseType>
|
223 | {
|
224 | cancelled: boolean;
|
225 | private trailingMetadata: Metadata;
|
226 | private pendingStatus: PartialStatusObject = {
|
227 | code: Status.OK,
|
228 | details: 'OK',
|
229 | };
|
230 |
|
231 | constructor(
|
232 | private path: string,
|
233 | private call: ServerInterceptingCallInterface,
|
234 | public metadata: Metadata
|
235 | ) {
|
236 | super({ objectMode: true });
|
237 | this.cancelled = false;
|
238 | this.trailingMetadata = new Metadata();
|
239 |
|
240 | this.on('error', err => {
|
241 | this.pendingStatus = serverErrorToStatus(err);
|
242 | this.end();
|
243 | });
|
244 | }
|
245 |
|
246 | getPeer(): string {
|
247 | return this.call.getPeer();
|
248 | }
|
249 |
|
250 | sendMetadata(responseMetadata: Metadata): void {
|
251 | this.call.sendMetadata(responseMetadata);
|
252 | }
|
253 |
|
254 | getDeadline(): Deadline {
|
255 | return this.call.getDeadline();
|
256 | }
|
257 |
|
258 | getPath(): string {
|
259 | return this.path;
|
260 | }
|
261 |
|
262 | _read(size: number) {
|
263 | this.call.startRead();
|
264 | }
|
265 |
|
266 | _write(
|
267 | chunk: ResponseType,
|
268 | encoding: string,
|
269 |
|
270 | callback: (...args: any[]) => void
|
271 | ) {
|
272 | this.call.sendMessage(chunk, callback);
|
273 | }
|
274 |
|
275 | _final(callback: Function): void {
|
276 | callback(null);
|
277 | this.call.sendStatus({
|
278 | ...this.pendingStatus,
|
279 | metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
|
280 | });
|
281 | }
|
282 |
|
283 |
|
284 | end(metadata?: any) {
|
285 | if (metadata) {
|
286 | this.trailingMetadata = metadata;
|
287 | }
|
288 |
|
289 | return super.end();
|
290 | }
|
291 | }
|
292 |
|
293 |
|
294 | export type sendUnaryData<ResponseType> = (
|
295 | error: ServerErrorResponse | ServerStatusResponse | null,
|
296 | value?: ResponseType | null,
|
297 | trailer?: Metadata,
|
298 | flags?: number
|
299 | ) => void;
|
300 |
|
301 |
|
302 | export type handleUnaryCall<RequestType, ResponseType> = (
|
303 | call: ServerUnaryCall<RequestType, ResponseType>,
|
304 | callback: sendUnaryData<ResponseType>
|
305 | ) => void;
|
306 |
|
307 |
|
308 | export type handleClientStreamingCall<RequestType, ResponseType> = (
|
309 | call: ServerReadableStream<RequestType, ResponseType>,
|
310 | callback: sendUnaryData<ResponseType>
|
311 | ) => void;
|
312 |
|
313 |
|
314 | export type handleServerStreamingCall<RequestType, ResponseType> = (
|
315 | call: ServerWritableStream<RequestType, ResponseType>
|
316 | ) => void;
|
317 |
|
318 |
|
319 | export type handleBidiStreamingCall<RequestType, ResponseType> = (
|
320 | call: ServerDuplexStream<RequestType, ResponseType>
|
321 | ) => void;
|
322 |
|
323 | export type HandleCall<RequestType, ResponseType> =
|
324 | | handleUnaryCall<RequestType, ResponseType>
|
325 | | handleClientStreamingCall<RequestType, ResponseType>
|
326 | | handleServerStreamingCall<RequestType, ResponseType>
|
327 | | handleBidiStreamingCall<RequestType, ResponseType>;
|
328 |
|
329 | export interface UnaryHandler<RequestType, ResponseType> {
|
330 | func: handleUnaryCall<RequestType, ResponseType>;
|
331 | serialize: Serialize<ResponseType>;
|
332 | deserialize: Deserialize<RequestType>;
|
333 | type: 'unary';
|
334 | path: string;
|
335 | }
|
336 |
|
337 | export interface ClientStreamingHandler<RequestType, ResponseType> {
|
338 | func: handleClientStreamingCall<RequestType, ResponseType>;
|
339 | serialize: Serialize<ResponseType>;
|
340 | deserialize: Deserialize<RequestType>;
|
341 | type: 'clientStream';
|
342 | path: string;
|
343 | }
|
344 |
|
345 | export interface ServerStreamingHandler<RequestType, ResponseType> {
|
346 | func: handleServerStreamingCall<RequestType, ResponseType>;
|
347 | serialize: Serialize<ResponseType>;
|
348 | deserialize: Deserialize<RequestType>;
|
349 | type: 'serverStream';
|
350 | path: string;
|
351 | }
|
352 |
|
353 | export interface BidiStreamingHandler<RequestType, ResponseType> {
|
354 | func: handleBidiStreamingCall<RequestType, ResponseType>;
|
355 | serialize: Serialize<ResponseType>;
|
356 | deserialize: Deserialize<RequestType>;
|
357 | type: 'bidi';
|
358 | path: string;
|
359 | }
|
360 |
|
361 | export type Handler<RequestType, ResponseType> =
|
362 | | UnaryHandler<RequestType, ResponseType>
|
363 | | ClientStreamingHandler<RequestType, ResponseType>
|
364 | | ServerStreamingHandler<RequestType, ResponseType>
|
365 | | BidiStreamingHandler<RequestType, ResponseType>;
|
366 |
|
367 | export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
|