1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import {
|
19 | ClientDuplexStream,
|
20 | ClientDuplexStreamImpl,
|
21 | ClientReadableStream,
|
22 | ClientReadableStreamImpl,
|
23 | ClientUnaryCall,
|
24 | ClientUnaryCallImpl,
|
25 | ClientWritableStream,
|
26 | ClientWritableStreamImpl,
|
27 | ServiceError,
|
28 | callErrorFromStatus,
|
29 | SurfaceCall,
|
30 | } from './call';
|
31 | import { CallCredentials } from './call-credentials';
|
32 | import { Deadline, StatusObject } from './call-stream';
|
33 | import { Channel, ChannelImplementation } from './channel';
|
34 | import { ConnectivityState } from './connectivity-state';
|
35 | import { ChannelCredentials } from './channel-credentials';
|
36 | import { ChannelOptions } from './channel-options';
|
37 | import { Status } from './constants';
|
38 | import { Metadata } from './metadata';
|
39 | import { ClientMethodDefinition } from './make-client';
|
40 | import {
|
41 | getInterceptingCall,
|
42 | Interceptor,
|
43 | InterceptorProvider,
|
44 | InterceptorArguments,
|
45 | InterceptingCallInterface,
|
46 | } from './client-interceptors';
|
47 | import {
|
48 | ServerUnaryCall,
|
49 | ServerReadableStream,
|
50 | ServerWritableStream,
|
51 | ServerDuplexStream,
|
52 | } from './server-call';
|
53 |
|
54 | const CHANNEL_SYMBOL = Symbol();
|
55 | const INTERCEPTOR_SYMBOL = Symbol();
|
56 | const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
|
57 | const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
|
58 |
|
59 | function isFunction<ResponseType>(
|
60 | arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
|
61 | ): arg is UnaryCallback<ResponseType> {
|
62 | return typeof arg === 'function';
|
63 | }
|
64 |
|
65 | export interface UnaryCallback<ResponseType> {
|
66 | (err: ServiceError | null, value?: ResponseType): void;
|
67 | }
|
68 |
|
69 |
|
70 | export interface CallOptions {
|
71 | deadline?: Deadline;
|
72 | host?: string;
|
73 | parent?:
|
74 | | ServerUnaryCall<any, any>
|
75 | | ServerReadableStream<any, any>
|
76 | | ServerWritableStream<any, any>
|
77 | | ServerDuplexStream<any, any>;
|
78 | propagate_flags?: number;
|
79 | credentials?: CallCredentials;
|
80 | interceptors?: Interceptor[];
|
81 | interceptor_providers?: InterceptorProvider[];
|
82 | }
|
83 |
|
84 |
|
85 | export interface CallProperties<RequestType, ResponseType> {
|
86 | argument?: RequestType;
|
87 | metadata: Metadata;
|
88 | call: SurfaceCall;
|
89 | channel: Channel;
|
90 | methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
|
91 | callOptions: CallOptions;
|
92 | callback?: UnaryCallback<ResponseType>;
|
93 | }
|
94 |
|
95 | export interface CallInvocationTransformer {
|
96 | (callProperties: CallProperties<any, any>): CallProperties<any, any>;
|
97 | }
|
98 |
|
99 | export type ClientOptions = Partial<ChannelOptions> & {
|
100 | channelOverride?: Channel;
|
101 | channelFactoryOverride?: (
|
102 | address: string,
|
103 | credentials: ChannelCredentials,
|
104 | options: ClientOptions
|
105 | ) => Channel;
|
106 | interceptors?: Interceptor[];
|
107 | interceptor_providers?: InterceptorProvider[];
|
108 | callInvocationTransformer?: CallInvocationTransformer;
|
109 | };
|
110 |
|
111 | function getErrorStackString(error: Error): string {
|
112 | return error.stack!.split('\n').slice(1).join('\n');
|
113 | }
|
114 |
|
115 |
|
116 |
|
117 |
|
118 |
|
119 | export class Client {
|
120 | private readonly [CHANNEL_SYMBOL]: Channel;
|
121 | private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
|
122 | private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
|
123 | private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
|
124 | constructor(
|
125 | address: string,
|
126 | credentials: ChannelCredentials,
|
127 | options: ClientOptions = {}
|
128 | ) {
|
129 | options = Object.assign({}, options);
|
130 | this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
|
131 | delete options.interceptors;
|
132 | this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
|
133 | delete options.interceptor_providers;
|
134 | if (
|
135 | this[INTERCEPTOR_SYMBOL].length > 0 &&
|
136 | this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
|
137 | ) {
|
138 | throw new Error(
|
139 | 'Both interceptors and interceptor_providers were passed as options ' +
|
140 | 'to the client constructor. Only one of these is allowed.'
|
141 | );
|
142 | }
|
143 | this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
|
144 | options.callInvocationTransformer;
|
145 | delete options.callInvocationTransformer;
|
146 | if (options.channelOverride) {
|
147 | this[CHANNEL_SYMBOL] = options.channelOverride;
|
148 | } else if (options.channelFactoryOverride) {
|
149 | const channelFactoryOverride = options.channelFactoryOverride;
|
150 | delete options.channelFactoryOverride;
|
151 | this[CHANNEL_SYMBOL] = channelFactoryOverride(
|
152 | address,
|
153 | credentials,
|
154 | options
|
155 | );
|
156 | } else {
|
157 | this[CHANNEL_SYMBOL] = new ChannelImplementation(
|
158 | address,
|
159 | credentials,
|
160 | options
|
161 | );
|
162 | }
|
163 | }
|
164 |
|
165 | close(): void {
|
166 | this[CHANNEL_SYMBOL].close();
|
167 | }
|
168 |
|
169 | getChannel(): Channel {
|
170 | return this[CHANNEL_SYMBOL];
|
171 | }
|
172 |
|
173 | waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
|
174 | const checkState = (err?: Error) => {
|
175 | if (err) {
|
176 | callback(new Error('Failed to connect before the deadline'));
|
177 | return;
|
178 | }
|
179 | let newState;
|
180 | try {
|
181 | newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
|
182 | } catch (e) {
|
183 | callback(new Error('The channel has been closed'));
|
184 | return;
|
185 | }
|
186 | if (newState === ConnectivityState.READY) {
|
187 | callback();
|
188 | } else {
|
189 | try {
|
190 | this[CHANNEL_SYMBOL].watchConnectivityState(
|
191 | newState,
|
192 | deadline,
|
193 | checkState
|
194 | );
|
195 | } catch (e) {
|
196 | callback(new Error('The channel has been closed'));
|
197 | }
|
198 | }
|
199 | };
|
200 | setImmediate(checkState);
|
201 | }
|
202 |
|
203 | private checkOptionalUnaryResponseArguments<ResponseType>(
|
204 | arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
|
205 | arg2?: CallOptions | UnaryCallback<ResponseType>,
|
206 | arg3?: UnaryCallback<ResponseType>
|
207 | ): {
|
208 | metadata: Metadata;
|
209 | options: CallOptions;
|
210 | callback: UnaryCallback<ResponseType>;
|
211 | } {
|
212 | if (isFunction(arg1)) {
|
213 | return { metadata: new Metadata(), options: {}, callback: arg1 };
|
214 | } else if (isFunction(arg2)) {
|
215 | if (arg1 instanceof Metadata) {
|
216 | return { metadata: arg1, options: {}, callback: arg2 };
|
217 | } else {
|
218 | return { metadata: new Metadata(), options: arg1, callback: arg2 };
|
219 | }
|
220 | } else {
|
221 | if (
|
222 | !(
|
223 | arg1 instanceof Metadata &&
|
224 | arg2 instanceof Object &&
|
225 | isFunction(arg3)
|
226 | )
|
227 | ) {
|
228 | throw new Error('Incorrect arguments passed');
|
229 | }
|
230 | return { metadata: arg1, options: arg2, callback: arg3 };
|
231 | }
|
232 | }
|
233 |
|
234 | makeUnaryRequest<RequestType, ResponseType>(
|
235 | method: string,
|
236 | serialize: (value: RequestType) => Buffer,
|
237 | deserialize: (value: Buffer) => ResponseType,
|
238 | argument: RequestType,
|
239 | metadata: Metadata,
|
240 | options: CallOptions,
|
241 | callback: UnaryCallback<ResponseType>
|
242 | ): ClientUnaryCall;
|
243 | makeUnaryRequest<RequestType, ResponseType>(
|
244 | method: string,
|
245 | serialize: (value: RequestType) => Buffer,
|
246 | deserialize: (value: Buffer) => ResponseType,
|
247 | argument: RequestType,
|
248 | metadata: Metadata,
|
249 | callback: UnaryCallback<ResponseType>
|
250 | ): ClientUnaryCall;
|
251 | makeUnaryRequest<RequestType, ResponseType>(
|
252 | method: string,
|
253 | serialize: (value: RequestType) => Buffer,
|
254 | deserialize: (value: Buffer) => ResponseType,
|
255 | argument: RequestType,
|
256 | options: CallOptions,
|
257 | callback: UnaryCallback<ResponseType>
|
258 | ): ClientUnaryCall;
|
259 | makeUnaryRequest<RequestType, ResponseType>(
|
260 | method: string,
|
261 | serialize: (value: RequestType) => Buffer,
|
262 | deserialize: (value: Buffer) => ResponseType,
|
263 | argument: RequestType,
|
264 | callback: UnaryCallback<ResponseType>
|
265 | ): ClientUnaryCall;
|
266 | makeUnaryRequest<RequestType, ResponseType>(
|
267 | method: string,
|
268 | serialize: (value: RequestType) => Buffer,
|
269 | deserialize: (value: Buffer) => ResponseType,
|
270 | argument: RequestType,
|
271 | metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
|
272 | options?: CallOptions | UnaryCallback<ResponseType>,
|
273 | callback?: UnaryCallback<ResponseType>
|
274 | ): ClientUnaryCall {
|
275 | const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
|
276 | metadata,
|
277 | options,
|
278 | callback
|
279 | );
|
280 | const methodDefinition: ClientMethodDefinition<
|
281 | RequestType,
|
282 | ResponseType
|
283 | > = {
|
284 | path: method,
|
285 | requestStream: false,
|
286 | responseStream: false,
|
287 | requestSerialize: serialize,
|
288 | responseDeserialize: deserialize,
|
289 | };
|
290 | let callProperties: CallProperties<RequestType, ResponseType> = {
|
291 | argument: argument,
|
292 | metadata: checkedArguments.metadata,
|
293 | call: new ClientUnaryCallImpl(),
|
294 | channel: this[CHANNEL_SYMBOL],
|
295 | methodDefinition: methodDefinition,
|
296 | callOptions: checkedArguments.options,
|
297 | callback: checkedArguments.callback,
|
298 | };
|
299 | if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
|
300 | callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
|
301 | callProperties
|
302 | ) as CallProperties<RequestType, ResponseType>;
|
303 | }
|
304 | const emitter: ClientUnaryCall = callProperties.call;
|
305 | const interceptorArgs: InterceptorArguments = {
|
306 | clientInterceptors: this[INTERCEPTOR_SYMBOL],
|
307 | clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
|
308 | callInterceptors: callProperties.callOptions.interceptors ?? [],
|
309 | callInterceptorProviders:
|
310 | callProperties.callOptions.interceptor_providers ?? [],
|
311 | };
|
312 | const call: InterceptingCallInterface = getInterceptingCall(
|
313 | interceptorArgs,
|
314 | callProperties.methodDefinition,
|
315 | callProperties.callOptions,
|
316 | callProperties.channel
|
317 | );
|
318 | |
319 |
|
320 |
|
321 |
|
322 | emitter.call = call;
|
323 | if (callProperties.callOptions.credentials) {
|
324 | call.setCredentials(callProperties.callOptions.credentials);
|
325 | }
|
326 | let responseMessage: ResponseType | null = null;
|
327 | let receivedStatus = false;
|
328 | const callerStackError = new Error();
|
329 | call.start(callProperties.metadata, {
|
330 | onReceiveMetadata: (metadata) => {
|
331 | emitter.emit('metadata', metadata);
|
332 | },
|
333 |
|
334 | onReceiveMessage(message: any) {
|
335 | if (responseMessage !== null) {
|
336 | call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
|
337 | }
|
338 | responseMessage = message;
|
339 | },
|
340 | onReceiveStatus(status: StatusObject) {
|
341 | if (receivedStatus) {
|
342 | return;
|
343 | }
|
344 | receivedStatus = true;
|
345 | if (status.code === Status.OK) {
|
346 | if (responseMessage === null) {
|
347 | const callerStack = getErrorStackString(callerStackError);
|
348 | callProperties.callback!(callErrorFromStatus({
|
349 | code: Status.INTERNAL,
|
350 | details: 'No message received',
|
351 | metadata: status.metadata
|
352 | }, callerStack));
|
353 | } else {
|
354 | callProperties.callback!(null, responseMessage);
|
355 | }
|
356 | } else {
|
357 | const callerStack = getErrorStackString(callerStackError);
|
358 | callProperties.callback!(callErrorFromStatus(status, callerStack));
|
359 | }
|
360 | emitter.emit('status', status);
|
361 | },
|
362 | });
|
363 | call.sendMessage(argument);
|
364 | call.halfClose();
|
365 | return emitter;
|
366 | }
|
367 |
|
368 | makeClientStreamRequest<RequestType, ResponseType>(
|
369 | method: string,
|
370 | serialize: (value: RequestType) => Buffer,
|
371 | deserialize: (value: Buffer) => ResponseType,
|
372 | metadata: Metadata,
|
373 | options: CallOptions,
|
374 | callback: UnaryCallback<ResponseType>
|
375 | ): ClientWritableStream<RequestType>;
|
376 | makeClientStreamRequest<RequestType, ResponseType>(
|
377 | method: string,
|
378 | serialize: (value: RequestType) => Buffer,
|
379 | deserialize: (value: Buffer) => ResponseType,
|
380 | metadata: Metadata,
|
381 | callback: UnaryCallback<ResponseType>
|
382 | ): ClientWritableStream<RequestType>;
|
383 | makeClientStreamRequest<RequestType, ResponseType>(
|
384 | method: string,
|
385 | serialize: (value: RequestType) => Buffer,
|
386 | deserialize: (value: Buffer) => ResponseType,
|
387 | options: CallOptions,
|
388 | callback: UnaryCallback<ResponseType>
|
389 | ): ClientWritableStream<RequestType>;
|
390 | makeClientStreamRequest<RequestType, ResponseType>(
|
391 | method: string,
|
392 | serialize: (value: RequestType) => Buffer,
|
393 | deserialize: (value: Buffer) => ResponseType,
|
394 | callback: UnaryCallback<ResponseType>
|
395 | ): ClientWritableStream<RequestType>;
|
396 | makeClientStreamRequest<RequestType, ResponseType>(
|
397 | method: string,
|
398 | serialize: (value: RequestType) => Buffer,
|
399 | deserialize: (value: Buffer) => ResponseType,
|
400 | metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
|
401 | options?: CallOptions | UnaryCallback<ResponseType>,
|
402 | callback?: UnaryCallback<ResponseType>
|
403 | ): ClientWritableStream<RequestType> {
|
404 | const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
|
405 | metadata,
|
406 | options,
|
407 | callback
|
408 | );
|
409 | const methodDefinition: ClientMethodDefinition<
|
410 | RequestType,
|
411 | ResponseType
|
412 | > = {
|
413 | path: method,
|
414 | requestStream: true,
|
415 | responseStream: false,
|
416 | requestSerialize: serialize,
|
417 | responseDeserialize: deserialize,
|
418 | };
|
419 | let callProperties: CallProperties<RequestType, ResponseType> = {
|
420 | metadata: checkedArguments.metadata,
|
421 | call: new ClientWritableStreamImpl<RequestType>(serialize),
|
422 | channel: this[CHANNEL_SYMBOL],
|
423 | methodDefinition: methodDefinition,
|
424 | callOptions: checkedArguments.options,
|
425 | callback: checkedArguments.callback,
|
426 | };
|
427 | if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
|
428 | callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
|
429 | callProperties
|
430 | ) as CallProperties<RequestType, ResponseType>;
|
431 | }
|
432 | const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
|
433 | const interceptorArgs: InterceptorArguments = {
|
434 | clientInterceptors: this[INTERCEPTOR_SYMBOL],
|
435 | clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
|
436 | callInterceptors: callProperties.callOptions.interceptors ?? [],
|
437 | callInterceptorProviders:
|
438 | callProperties.callOptions.interceptor_providers ?? [],
|
439 | };
|
440 | const call: InterceptingCallInterface = getInterceptingCall(
|
441 | interceptorArgs,
|
442 | callProperties.methodDefinition,
|
443 | callProperties.callOptions,
|
444 | callProperties.channel
|
445 | );
|
446 | |
447 |
|
448 |
|
449 |
|
450 | emitter.call = call;
|
451 | if (callProperties.callOptions.credentials) {
|
452 | call.setCredentials(callProperties.callOptions.credentials);
|
453 | }
|
454 | let responseMessage: ResponseType | null = null;
|
455 | let receivedStatus = false;
|
456 | const callerStackError = new Error();
|
457 | call.start(callProperties.metadata, {
|
458 | onReceiveMetadata: (metadata) => {
|
459 | emitter.emit('metadata', metadata);
|
460 | },
|
461 |
|
462 | onReceiveMessage(message: any) {
|
463 | if (responseMessage !== null) {
|
464 | call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
|
465 | }
|
466 | responseMessage = message;
|
467 | },
|
468 | onReceiveStatus(status: StatusObject) {
|
469 | if (receivedStatus) {
|
470 | return;
|
471 | }
|
472 | receivedStatus = true;
|
473 | if (status.code === Status.OK) {
|
474 | if (responseMessage === null) {
|
475 | const callerStack = getErrorStackString(callerStackError);
|
476 | callProperties.callback!(callErrorFromStatus({
|
477 | code: Status.INTERNAL,
|
478 | details: 'No message received',
|
479 | metadata: status.metadata
|
480 | }, callerStack));
|
481 | } else {
|
482 | callProperties.callback!(null, responseMessage);
|
483 | }
|
484 | } else {
|
485 | const callerStack = getErrorStackString(callerStackError);
|
486 | callProperties.callback!(callErrorFromStatus(status, callerStack));
|
487 | }
|
488 | emitter.emit('status', status);
|
489 | },
|
490 | });
|
491 | return emitter;
|
492 | }
|
493 |
|
494 | private checkMetadataAndOptions(
|
495 | arg1?: Metadata | CallOptions,
|
496 | arg2?: CallOptions
|
497 | ): { metadata: Metadata; options: CallOptions } {
|
498 | let metadata: Metadata;
|
499 | let options: CallOptions;
|
500 | if (arg1 instanceof Metadata) {
|
501 | metadata = arg1;
|
502 | if (arg2) {
|
503 | options = arg2;
|
504 | } else {
|
505 | options = {};
|
506 | }
|
507 | } else {
|
508 | if (arg1) {
|
509 | options = arg1;
|
510 | } else {
|
511 | options = {};
|
512 | }
|
513 | metadata = new Metadata();
|
514 | }
|
515 | return { metadata, options };
|
516 | }
|
517 |
|
518 | makeServerStreamRequest<RequestType, ResponseType>(
|
519 | method: string,
|
520 | serialize: (value: RequestType) => Buffer,
|
521 | deserialize: (value: Buffer) => ResponseType,
|
522 | argument: RequestType,
|
523 | metadata: Metadata,
|
524 | options?: CallOptions
|
525 | ): ClientReadableStream<ResponseType>;
|
526 | makeServerStreamRequest<RequestType, ResponseType>(
|
527 | method: string,
|
528 | serialize: (value: RequestType) => Buffer,
|
529 | deserialize: (value: Buffer) => ResponseType,
|
530 | argument: RequestType,
|
531 | options?: CallOptions
|
532 | ): ClientReadableStream<ResponseType>;
|
533 | makeServerStreamRequest<RequestType, ResponseType>(
|
534 | method: string,
|
535 | serialize: (value: RequestType) => Buffer,
|
536 | deserialize: (value: Buffer) => ResponseType,
|
537 | argument: RequestType,
|
538 | metadata?: Metadata | CallOptions,
|
539 | options?: CallOptions
|
540 | ): ClientReadableStream<ResponseType> {
|
541 | const checkedArguments = this.checkMetadataAndOptions(metadata, options);
|
542 | const methodDefinition: ClientMethodDefinition<
|
543 | RequestType,
|
544 | ResponseType
|
545 | > = {
|
546 | path: method,
|
547 | requestStream: false,
|
548 | responseStream: true,
|
549 | requestSerialize: serialize,
|
550 | responseDeserialize: deserialize,
|
551 | };
|
552 | let callProperties: CallProperties<RequestType, ResponseType> = {
|
553 | argument: argument,
|
554 | metadata: checkedArguments.metadata,
|
555 | call: new ClientReadableStreamImpl<ResponseType>(deserialize),
|
556 | channel: this[CHANNEL_SYMBOL],
|
557 | methodDefinition: methodDefinition,
|
558 | callOptions: checkedArguments.options,
|
559 | };
|
560 | if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
|
561 | callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
|
562 | callProperties
|
563 | ) as CallProperties<RequestType, ResponseType>;
|
564 | }
|
565 | const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
|
566 | const interceptorArgs: InterceptorArguments = {
|
567 | clientInterceptors: this[INTERCEPTOR_SYMBOL],
|
568 | clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
|
569 | callInterceptors: callProperties.callOptions.interceptors ?? [],
|
570 | callInterceptorProviders:
|
571 | callProperties.callOptions.interceptor_providers ?? [],
|
572 | };
|
573 | const call: InterceptingCallInterface = getInterceptingCall(
|
574 | interceptorArgs,
|
575 | callProperties.methodDefinition,
|
576 | callProperties.callOptions,
|
577 | callProperties.channel
|
578 | );
|
579 | |
580 |
|
581 |
|
582 |
|
583 | stream.call = call;
|
584 | if (callProperties.callOptions.credentials) {
|
585 | call.setCredentials(callProperties.callOptions.credentials);
|
586 | }
|
587 | let receivedStatus = false;
|
588 | const callerStackError = new Error();
|
589 | call.start(callProperties.metadata, {
|
590 | onReceiveMetadata(metadata: Metadata) {
|
591 | stream.emit('metadata', metadata);
|
592 | },
|
593 |
|
594 | onReceiveMessage(message: any) {
|
595 | stream.push(message);
|
596 | },
|
597 | onReceiveStatus(status: StatusObject) {
|
598 | if (receivedStatus) {
|
599 | return;
|
600 | }
|
601 | receivedStatus = true;
|
602 | stream.push(null);
|
603 | if (status.code !== Status.OK) {
|
604 | const callerStack = getErrorStackString(callerStackError);
|
605 | stream.emit('error', callErrorFromStatus(status, callerStack));
|
606 | }
|
607 | stream.emit('status', status);
|
608 | },
|
609 | });
|
610 | call.sendMessage(argument);
|
611 | call.halfClose();
|
612 | return stream;
|
613 | }
|
614 |
|
615 | makeBidiStreamRequest<RequestType, ResponseType>(
|
616 | method: string,
|
617 | serialize: (value: RequestType) => Buffer,
|
618 | deserialize: (value: Buffer) => ResponseType,
|
619 | metadata: Metadata,
|
620 | options?: CallOptions
|
621 | ): ClientDuplexStream<RequestType, ResponseType>;
|
622 | makeBidiStreamRequest<RequestType, ResponseType>(
|
623 | method: string,
|
624 | serialize: (value: RequestType) => Buffer,
|
625 | deserialize: (value: Buffer) => ResponseType,
|
626 | options?: CallOptions
|
627 | ): ClientDuplexStream<RequestType, ResponseType>;
|
628 | makeBidiStreamRequest<RequestType, ResponseType>(
|
629 | method: string,
|
630 | serialize: (value: RequestType) => Buffer,
|
631 | deserialize: (value: Buffer) => ResponseType,
|
632 | metadata?: Metadata | CallOptions,
|
633 | options?: CallOptions
|
634 | ): ClientDuplexStream<RequestType, ResponseType> {
|
635 | const checkedArguments = this.checkMetadataAndOptions(metadata, options);
|
636 | const methodDefinition: ClientMethodDefinition<
|
637 | RequestType,
|
638 | ResponseType
|
639 | > = {
|
640 | path: method,
|
641 | requestStream: true,
|
642 | responseStream: true,
|
643 | requestSerialize: serialize,
|
644 | responseDeserialize: deserialize,
|
645 | };
|
646 | let callProperties: CallProperties<RequestType, ResponseType> = {
|
647 | metadata: checkedArguments.metadata,
|
648 | call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
|
649 | serialize,
|
650 | deserialize
|
651 | ),
|
652 | channel: this[CHANNEL_SYMBOL],
|
653 | methodDefinition: methodDefinition,
|
654 | callOptions: checkedArguments.options,
|
655 | };
|
656 | if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
|
657 | callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
|
658 | callProperties
|
659 | ) as CallProperties<RequestType, ResponseType>;
|
660 | }
|
661 | const stream: ClientDuplexStream<
|
662 | RequestType,
|
663 | ResponseType
|
664 | > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
|
665 | const interceptorArgs: InterceptorArguments = {
|
666 | clientInterceptors: this[INTERCEPTOR_SYMBOL],
|
667 | clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
|
668 | callInterceptors: callProperties.callOptions.interceptors ?? [],
|
669 | callInterceptorProviders:
|
670 | callProperties.callOptions.interceptor_providers ?? [],
|
671 | };
|
672 | const call: InterceptingCallInterface = getInterceptingCall(
|
673 | interceptorArgs,
|
674 | callProperties.methodDefinition,
|
675 | callProperties.callOptions,
|
676 | callProperties.channel
|
677 | );
|
678 | |
679 |
|
680 |
|
681 |
|
682 | stream.call = call;
|
683 | if (callProperties.callOptions.credentials) {
|
684 | call.setCredentials(callProperties.callOptions.credentials);
|
685 | }
|
686 | let receivedStatus = false;
|
687 | const callerStackError = new Error();
|
688 | call.start(callProperties.metadata, {
|
689 | onReceiveMetadata(metadata: Metadata) {
|
690 | stream.emit('metadata', metadata);
|
691 | },
|
692 | onReceiveMessage(message: Buffer) {
|
693 | stream.push(message);
|
694 | },
|
695 | onReceiveStatus(status: StatusObject) {
|
696 | if (receivedStatus) {
|
697 | return;
|
698 | }
|
699 | receivedStatus = true;
|
700 | stream.push(null);
|
701 | if (status.code !== Status.OK) {
|
702 | const callerStack = getErrorStackString(callerStackError);
|
703 | stream.emit('error', callErrorFromStatus(status, callerStack));
|
704 | }
|
705 | stream.emit('status', status);
|
706 | },
|
707 | });
|
708 | return stream;
|
709 | }
|
710 | }
|