1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { Metadata } from './metadata';
|
19 | import {
|
20 | StatusObject,
|
21 | Listener,
|
22 | MetadataListener,
|
23 | MessageListener,
|
24 | StatusListener,
|
25 | FullListener,
|
26 | InterceptingListener,
|
27 | InterceptingListenerImpl,
|
28 | isInterceptingListener,
|
29 | MessageContext,
|
30 | Call,
|
31 | } from './call-interface';
|
32 | import { Status } from './constants';
|
33 | import { Channel } from './channel';
|
34 | import { CallOptions } from './client';
|
35 | import { ClientMethodDefinition } from './make-client';
|
36 | import { getErrorMessage } from './error';
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 | export class InterceptorConfigurationError extends Error {
|
43 | constructor(message: string) {
|
44 | super(message);
|
45 | this.name = 'InterceptorConfigurationError';
|
46 | Error.captureStackTrace(this, InterceptorConfigurationError);
|
47 | }
|
48 | }
|
49 |
|
50 | export interface MetadataRequester {
|
51 | (
|
52 | metadata: Metadata,
|
53 | listener: InterceptingListener,
|
54 | next: (
|
55 | metadata: Metadata,
|
56 | listener: InterceptingListener | Listener
|
57 | ) => void
|
58 | ): void;
|
59 | }
|
60 |
|
61 | export interface MessageRequester {
|
62 |
|
63 | (message: any, next: (message: any) => void): void;
|
64 | }
|
65 |
|
66 | export interface CloseRequester {
|
67 | (next: () => void): void;
|
68 | }
|
69 |
|
70 | export interface CancelRequester {
|
71 | (next: () => void): void;
|
72 | }
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | export interface FullRequester {
|
78 | start: MetadataRequester;
|
79 | sendMessage: MessageRequester;
|
80 | halfClose: CloseRequester;
|
81 | cancel: CancelRequester;
|
82 | }
|
83 |
|
84 | export type Requester = Partial<FullRequester>;
|
85 |
|
86 | export class ListenerBuilder {
|
87 | private metadata: MetadataListener | undefined = undefined;
|
88 | private message: MessageListener | undefined = undefined;
|
89 | private status: StatusListener | undefined = undefined;
|
90 |
|
91 | withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
|
92 | this.metadata = onReceiveMetadata;
|
93 | return this;
|
94 | }
|
95 |
|
96 | withOnReceiveMessage(onReceiveMessage: MessageListener): this {
|
97 | this.message = onReceiveMessage;
|
98 | return this;
|
99 | }
|
100 |
|
101 | withOnReceiveStatus(onReceiveStatus: StatusListener): this {
|
102 | this.status = onReceiveStatus;
|
103 | return this;
|
104 | }
|
105 |
|
106 | build(): Listener {
|
107 | return {
|
108 | onReceiveMetadata: this.metadata,
|
109 | onReceiveMessage: this.message,
|
110 | onReceiveStatus: this.status,
|
111 | };
|
112 | }
|
113 | }
|
114 |
|
115 | export class RequesterBuilder {
|
116 | private start: MetadataRequester | undefined = undefined;
|
117 | private message: MessageRequester | undefined = undefined;
|
118 | private halfClose: CloseRequester | undefined = undefined;
|
119 | private cancel: CancelRequester | undefined = undefined;
|
120 |
|
121 | withStart(start: MetadataRequester): this {
|
122 | this.start = start;
|
123 | return this;
|
124 | }
|
125 |
|
126 | withSendMessage(sendMessage: MessageRequester): this {
|
127 | this.message = sendMessage;
|
128 | return this;
|
129 | }
|
130 |
|
131 | withHalfClose(halfClose: CloseRequester): this {
|
132 | this.halfClose = halfClose;
|
133 | return this;
|
134 | }
|
135 |
|
136 | withCancel(cancel: CancelRequester): this {
|
137 | this.cancel = cancel;
|
138 | return this;
|
139 | }
|
140 |
|
141 | build(): Requester {
|
142 | return {
|
143 | start: this.start,
|
144 | sendMessage: this.message,
|
145 | halfClose: this.halfClose,
|
146 | cancel: this.cancel,
|
147 | };
|
148 | }
|
149 | }
|
150 |
|
151 |
|
152 |
|
153 |
|
154 |
|
155 | const defaultListener: FullListener = {
|
156 | onReceiveMetadata: (metadata, next) => {
|
157 | next(metadata);
|
158 | },
|
159 | onReceiveMessage: (message, next) => {
|
160 | next(message);
|
161 | },
|
162 | onReceiveStatus: (status, next) => {
|
163 | next(status);
|
164 | },
|
165 | };
|
166 |
|
167 |
|
168 |
|
169 |
|
170 |
|
171 | const defaultRequester: FullRequester = {
|
172 | start: (metadata, listener, next) => {
|
173 | next(metadata, listener);
|
174 | },
|
175 | sendMessage: (message, next) => {
|
176 | next(message);
|
177 | },
|
178 | halfClose: next => {
|
179 | next();
|
180 | },
|
181 | cancel: next => {
|
182 | next();
|
183 | },
|
184 | };
|
185 |
|
186 | export interface InterceptorOptions extends CallOptions {
|
187 |
|
188 | method_definition: ClientMethodDefinition<any, any>;
|
189 | }
|
190 |
|
191 | export interface InterceptingCallInterface {
|
192 | cancelWithStatus(status: Status, details: string): void;
|
193 | getPeer(): string;
|
194 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
|
195 |
|
196 | sendMessageWithContext(context: MessageContext, message: any): void;
|
197 |
|
198 | sendMessage(message: any): void;
|
199 | startRead(): void;
|
200 | halfClose(): void;
|
201 | }
|
202 |
|
203 | export class InterceptingCall implements InterceptingCallInterface {
|
204 | |
205 |
|
206 |
|
207 | private requester: FullRequester;
|
208 | |
209 |
|
210 |
|
211 |
|
212 | private processingMetadata = false;
|
213 | |
214 |
|
215 |
|
216 | private pendingMessageContext: MessageContext | null = null;
|
217 | private pendingMessage: any;
|
218 | |
219 |
|
220 |
|
221 |
|
222 | private processingMessage = false;
|
223 | |
224 |
|
225 |
|
226 |
|
227 | private pendingHalfClose = false;
|
228 | constructor(
|
229 | private nextCall: InterceptingCallInterface,
|
230 | requester?: Requester
|
231 | ) {
|
232 | if (requester) {
|
233 | this.requester = {
|
234 | start: requester.start ?? defaultRequester.start,
|
235 | sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
|
236 | halfClose: requester.halfClose ?? defaultRequester.halfClose,
|
237 | cancel: requester.cancel ?? defaultRequester.cancel,
|
238 | };
|
239 | } else {
|
240 | this.requester = defaultRequester;
|
241 | }
|
242 | }
|
243 |
|
244 | cancelWithStatus(status: Status, details: string) {
|
245 | this.requester.cancel(() => {
|
246 | this.nextCall.cancelWithStatus(status, details);
|
247 | });
|
248 | }
|
249 |
|
250 | getPeer() {
|
251 | return this.nextCall.getPeer();
|
252 | }
|
253 |
|
254 | private processPendingMessage() {
|
255 | if (this.pendingMessageContext) {
|
256 | this.nextCall.sendMessageWithContext(
|
257 | this.pendingMessageContext,
|
258 | this.pendingMessage
|
259 | );
|
260 | this.pendingMessageContext = null;
|
261 | this.pendingMessage = null;
|
262 | }
|
263 | }
|
264 |
|
265 | private processPendingHalfClose() {
|
266 | if (this.pendingHalfClose) {
|
267 | this.nextCall.halfClose();
|
268 | }
|
269 | }
|
270 |
|
271 | start(
|
272 | metadata: Metadata,
|
273 | interceptingListener?: Partial<InterceptingListener>
|
274 | ): void {
|
275 | const fullInterceptingListener: InterceptingListener = {
|
276 | onReceiveMetadata:
|
277 | interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
|
278 | (metadata => {}),
|
279 | onReceiveMessage:
|
280 | interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
|
281 | (message => {}),
|
282 | onReceiveStatus:
|
283 | interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
|
284 | (status => {}),
|
285 | };
|
286 | this.processingMetadata = true;
|
287 | this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
|
288 | this.processingMetadata = false;
|
289 | let finalInterceptingListener: InterceptingListener;
|
290 | if (isInterceptingListener(listener)) {
|
291 | finalInterceptingListener = listener;
|
292 | } else {
|
293 | const fullListener: FullListener = {
|
294 | onReceiveMetadata:
|
295 | listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
|
296 | onReceiveMessage:
|
297 | listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
|
298 | onReceiveStatus:
|
299 | listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
|
300 | };
|
301 | finalInterceptingListener = new InterceptingListenerImpl(
|
302 | fullListener,
|
303 | fullInterceptingListener
|
304 | );
|
305 | }
|
306 | this.nextCall.start(md, finalInterceptingListener);
|
307 | this.processPendingMessage();
|
308 | this.processPendingHalfClose();
|
309 | });
|
310 | }
|
311 |
|
312 | sendMessageWithContext(context: MessageContext, message: any): void {
|
313 | this.processingMessage = true;
|
314 | this.requester.sendMessage(message, finalMessage => {
|
315 | this.processingMessage = false;
|
316 | if (this.processingMetadata) {
|
317 | this.pendingMessageContext = context;
|
318 | this.pendingMessage = message;
|
319 | } else {
|
320 | this.nextCall.sendMessageWithContext(context, finalMessage);
|
321 | this.processPendingHalfClose();
|
322 | }
|
323 | });
|
324 | }
|
325 |
|
326 | sendMessage(message: any): void {
|
327 | this.sendMessageWithContext({}, message);
|
328 | }
|
329 | startRead(): void {
|
330 | this.nextCall.startRead();
|
331 | }
|
332 | halfClose(): void {
|
333 | this.requester.halfClose(() => {
|
334 | if (this.processingMetadata || this.processingMessage) {
|
335 | this.pendingHalfClose = true;
|
336 | } else {
|
337 | this.nextCall.halfClose();
|
338 | }
|
339 | });
|
340 | }
|
341 | }
|
342 |
|
343 | function getCall(channel: Channel, path: string, options: CallOptions): Call {
|
344 | const deadline = options.deadline ?? Infinity;
|
345 | const host = options.host;
|
346 | const parent = options.parent ?? null;
|
347 | const propagateFlags = options.propagate_flags;
|
348 | const credentials = options.credentials;
|
349 | const call = channel.createCall(path, deadline, host, parent, propagateFlags);
|
350 | if (credentials) {
|
351 | call.setCredentials(credentials);
|
352 | }
|
353 | return call;
|
354 | }
|
355 |
|
356 |
|
357 |
|
358 |
|
359 |
|
360 | class BaseInterceptingCall implements InterceptingCallInterface {
|
361 | constructor(
|
362 | protected call: Call,
|
363 |
|
364 | protected methodDefinition: ClientMethodDefinition<any, any>
|
365 | ) {}
|
366 | cancelWithStatus(status: Status, details: string): void {
|
367 | this.call.cancelWithStatus(status, details);
|
368 | }
|
369 | getPeer(): string {
|
370 | return this.call.getPeer();
|
371 | }
|
372 |
|
373 | sendMessageWithContext(context: MessageContext, message: any): void {
|
374 | let serialized: Buffer;
|
375 | try {
|
376 | serialized = this.methodDefinition.requestSerialize(message);
|
377 | } catch (e) {
|
378 | this.call.cancelWithStatus(
|
379 | Status.INTERNAL,
|
380 | `Request message serialization failure: ${getErrorMessage(e)}`
|
381 | );
|
382 | return;
|
383 | }
|
384 | this.call.sendMessageWithContext(context, serialized);
|
385 | }
|
386 |
|
387 | sendMessage(message: any) {
|
388 | this.sendMessageWithContext({}, message);
|
389 | }
|
390 | start(
|
391 | metadata: Metadata,
|
392 | interceptingListener?: Partial<InterceptingListener>
|
393 | ): void {
|
394 | let readError: StatusObject | null = null;
|
395 | this.call.start(metadata, {
|
396 | onReceiveMetadata: metadata => {
|
397 | interceptingListener?.onReceiveMetadata?.(metadata);
|
398 | },
|
399 | onReceiveMessage: message => {
|
400 |
|
401 | let deserialized: any;
|
402 | try {
|
403 | deserialized = this.methodDefinition.responseDeserialize(message);
|
404 | } catch (e) {
|
405 | readError = {
|
406 | code: Status.INTERNAL,
|
407 | details: `Response message parsing error: ${getErrorMessage(e)}`,
|
408 | metadata: new Metadata(),
|
409 | };
|
410 | this.call.cancelWithStatus(readError.code, readError.details);
|
411 | return;
|
412 | }
|
413 | interceptingListener?.onReceiveMessage?.(deserialized);
|
414 | },
|
415 | onReceiveStatus: status => {
|
416 | if (readError) {
|
417 | interceptingListener?.onReceiveStatus?.(readError);
|
418 | } else {
|
419 | interceptingListener?.onReceiveStatus?.(status);
|
420 | }
|
421 | },
|
422 | });
|
423 | }
|
424 | startRead() {
|
425 | this.call.startRead();
|
426 | }
|
427 | halfClose(): void {
|
428 | this.call.halfClose();
|
429 | }
|
430 | }
|
431 |
|
432 |
|
433 |
|
434 |
|
435 |
|
436 | class BaseUnaryInterceptingCall
|
437 | extends BaseInterceptingCall
|
438 | implements InterceptingCallInterface
|
439 | {
|
440 |
|
441 | constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
|
442 | super(call, methodDefinition);
|
443 | }
|
444 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
|
445 | let receivedMessage = false;
|
446 | const wrapperListener: InterceptingListener = {
|
447 | onReceiveMetadata:
|
448 | listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
|
449 |
|
450 | onReceiveMessage: (message: any) => {
|
451 | receivedMessage = true;
|
452 | listener?.onReceiveMessage?.(message);
|
453 | },
|
454 | onReceiveStatus: (status: StatusObject) => {
|
455 | if (!receivedMessage) {
|
456 | listener?.onReceiveMessage?.(null);
|
457 | }
|
458 | listener?.onReceiveStatus?.(status);
|
459 | },
|
460 | };
|
461 | super.start(metadata, wrapperListener);
|
462 | this.call.startRead();
|
463 | }
|
464 | }
|
465 |
|
466 |
|
467 |
|
468 |
|
469 |
|
470 | class BaseStreamingInterceptingCall
|
471 | extends BaseInterceptingCall
|
472 | implements InterceptingCallInterface {}
|
473 |
|
474 | function getBottomInterceptingCall(
|
475 | channel: Channel,
|
476 | options: InterceptorOptions,
|
477 |
|
478 | methodDefinition: ClientMethodDefinition<any, any>
|
479 | ) {
|
480 | const call = getCall(channel, methodDefinition.path, options);
|
481 | if (methodDefinition.responseStream) {
|
482 | return new BaseStreamingInterceptingCall(call, methodDefinition);
|
483 | } else {
|
484 | return new BaseUnaryInterceptingCall(call, methodDefinition);
|
485 | }
|
486 | }
|
487 |
|
488 | export interface NextCall {
|
489 | (options: InterceptorOptions): InterceptingCallInterface;
|
490 | }
|
491 |
|
492 | export interface Interceptor {
|
493 | (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
|
494 | }
|
495 |
|
496 | export interface InterceptorProvider {
|
497 |
|
498 | (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
|
499 | }
|
500 |
|
501 | export interface InterceptorArguments {
|
502 | clientInterceptors: Interceptor[];
|
503 | clientInterceptorProviders: InterceptorProvider[];
|
504 | callInterceptors: Interceptor[];
|
505 | callInterceptorProviders: InterceptorProvider[];
|
506 | }
|
507 |
|
508 | export function getInterceptingCall(
|
509 | interceptorArgs: InterceptorArguments,
|
510 |
|
511 | methodDefinition: ClientMethodDefinition<any, any>,
|
512 | options: CallOptions,
|
513 | channel: Channel
|
514 | ): InterceptingCallInterface {
|
515 | if (
|
516 | interceptorArgs.clientInterceptors.length > 0 &&
|
517 | interceptorArgs.clientInterceptorProviders.length > 0
|
518 | ) {
|
519 | throw new InterceptorConfigurationError(
|
520 | 'Both interceptors and interceptor_providers were passed as options ' +
|
521 | 'to the client constructor. Only one of these is allowed.'
|
522 | );
|
523 | }
|
524 | if (
|
525 | interceptorArgs.callInterceptors.length > 0 &&
|
526 | interceptorArgs.callInterceptorProviders.length > 0
|
527 | ) {
|
528 | throw new InterceptorConfigurationError(
|
529 | 'Both interceptors and interceptor_providers were passed as call ' +
|
530 | 'options. Only one of these is allowed.'
|
531 | );
|
532 | }
|
533 | let interceptors: Interceptor[] = [];
|
534 |
|
535 | if (
|
536 | interceptorArgs.callInterceptors.length > 0 ||
|
537 | interceptorArgs.callInterceptorProviders.length > 0
|
538 | ) {
|
539 | interceptors = ([] as Interceptor[])
|
540 | .concat(
|
541 | interceptorArgs.callInterceptors,
|
542 | interceptorArgs.callInterceptorProviders.map(provider =>
|
543 | provider(methodDefinition)
|
544 | )
|
545 | )
|
546 | .filter(interceptor => interceptor);
|
547 |
|
548 | } else {
|
549 | interceptors = ([] as Interceptor[])
|
550 | .concat(
|
551 | interceptorArgs.clientInterceptors,
|
552 | interceptorArgs.clientInterceptorProviders.map(provider =>
|
553 | provider(methodDefinition)
|
554 | )
|
555 | )
|
556 | .filter(interceptor => interceptor);
|
557 |
|
558 | }
|
559 | const interceptorOptions = Object.assign({}, options, {
|
560 | method_definition: methodDefinition,
|
561 | });
|
562 | |
563 |
|
564 |
|
565 |
|
566 |
|
567 |
|
568 |
|
569 | const getCall: NextCall = interceptors.reduceRight<NextCall>(
|
570 | (nextCall: NextCall, nextInterceptor: Interceptor) => {
|
571 | return currentOptions => nextInterceptor(currentOptions, nextCall);
|
572 | },
|
573 | (finalOptions: InterceptorOptions) =>
|
574 | getBottomInterceptingCall(channel, finalOptions, methodDefinition)
|
575 | );
|
576 | return getCall(interceptorOptions);
|
577 | }
|