UNPKG

24.6 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 ClientDuplexStream,
20 ClientDuplexStreamImpl,
21 ClientReadableStream,
22 ClientReadableStreamImpl,
23 ClientUnaryCall,
24 ClientUnaryCallImpl,
25 ClientWritableStream,
26 ClientWritableStreamImpl,
27 ServiceError,
28 callErrorFromStatus,
29 SurfaceCall,
30} from './call';
31import { CallCredentials } from './call-credentials';
32import { Deadline, StatusObject } from './call-stream';
33import { Channel, ChannelImplementation } from './channel';
34import { ConnectivityState } from './connectivity-state';
35import { ChannelCredentials } from './channel-credentials';
36import { ChannelOptions } from './channel-options';
37import { Status } from './constants';
38import { Metadata } from './metadata';
39import { ClientMethodDefinition } from './make-client';
40import {
41 getInterceptingCall,
42 Interceptor,
43 InterceptorProvider,
44 InterceptorArguments,
45 InterceptingCallInterface,
46} from './client-interceptors';
47import {
48 ServerUnaryCall,
49 ServerReadableStream,
50 ServerWritableStream,
51 ServerDuplexStream,
52} from './server-call';
53
54const CHANNEL_SYMBOL = Symbol();
55const INTERCEPTOR_SYMBOL = Symbol();
56const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
57const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
58
59function isFunction<ResponseType>(
60 arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
61): arg is UnaryCallback<ResponseType> {
62 return typeof arg === 'function';
63}
64
65export interface UnaryCallback<ResponseType> {
66 (err: ServiceError | null, value?: ResponseType): void;
67}
68
69/* eslint-disable @typescript-eslint/no-explicit-any */
70export interface CallOptions {
71 deadline?: Deadline;
72 host?: string;
73 parent?:
74 | ServerUnaryCall<any, any>
75 | ServerReadableStream<any, any>
76 | ServerWritableStream<any, any>
77 | ServerDuplexStream<any, any>;
78 propagate_flags?: number;
79 credentials?: CallCredentials;
80 interceptors?: Interceptor[];
81 interceptor_providers?: InterceptorProvider[];
82}
83/* eslint-enable @typescript-eslint/no-explicit-any */
84
85export interface CallProperties<RequestType, ResponseType> {
86 argument?: RequestType;
87 metadata: Metadata;
88 call: SurfaceCall;
89 channel: Channel;
90 methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
91 callOptions: CallOptions;
92 callback?: UnaryCallback<ResponseType>;
93}
94
95export interface CallInvocationTransformer {
96 (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
97}
98
99export type ClientOptions = Partial<ChannelOptions> & {
100 channelOverride?: Channel;
101 channelFactoryOverride?: (
102 address: string,
103 credentials: ChannelCredentials,
104 options: ClientOptions
105 ) => Channel;
106 interceptors?: Interceptor[];
107 interceptor_providers?: InterceptorProvider[];
108 callInvocationTransformer?: CallInvocationTransformer;
109};
110
111function getErrorStackString(error: Error): string {
112 return error.stack!.split('\n').slice(1).join('\n');
113}
114
115/**
116 * A generic gRPC client. Primarily useful as a base class for all generated
117 * clients.
118 */
119export class Client {
120 private readonly [CHANNEL_SYMBOL]: Channel;
121 private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
122 private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
123 private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
124 constructor(
125 address: string,
126 credentials: ChannelCredentials,
127 options: ClientOptions = {}
128 ) {
129 options = Object.assign({}, options);
130 this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
131 delete options.interceptors;
132 this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
133 delete options.interceptor_providers;
134 if (
135 this[INTERCEPTOR_SYMBOL].length > 0 &&
136 this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
137 ) {
138 throw new Error(
139 'Both interceptors and interceptor_providers were passed as options ' +
140 'to the client constructor. Only one of these is allowed.'
141 );
142 }
143 this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
144 options.callInvocationTransformer;
145 delete options.callInvocationTransformer;
146 if (options.channelOverride) {
147 this[CHANNEL_SYMBOL] = options.channelOverride;
148 } else if (options.channelFactoryOverride) {
149 const channelFactoryOverride = options.channelFactoryOverride;
150 delete options.channelFactoryOverride;
151 this[CHANNEL_SYMBOL] = channelFactoryOverride(
152 address,
153 credentials,
154 options
155 );
156 } else {
157 this[CHANNEL_SYMBOL] = new ChannelImplementation(
158 address,
159 credentials,
160 options
161 );
162 }
163 }
164
165 close(): void {
166 this[CHANNEL_SYMBOL].close();
167 }
168
169 getChannel(): Channel {
170 return this[CHANNEL_SYMBOL];
171 }
172
173 waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
174 const checkState = (err?: Error) => {
175 if (err) {
176 callback(new Error('Failed to connect before the deadline'));
177 return;
178 }
179 let newState;
180 try {
181 newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
182 } catch (e) {
183 callback(new Error('The channel has been closed'));
184 return;
185 }
186 if (newState === ConnectivityState.READY) {
187 callback();
188 } else {
189 try {
190 this[CHANNEL_SYMBOL].watchConnectivityState(
191 newState,
192 deadline,
193 checkState
194 );
195 } catch (e) {
196 callback(new Error('The channel has been closed'));
197 }
198 }
199 };
200 setImmediate(checkState);
201 }
202
203 private checkOptionalUnaryResponseArguments<ResponseType>(
204 arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
205 arg2?: CallOptions | UnaryCallback<ResponseType>,
206 arg3?: UnaryCallback<ResponseType>
207 ): {
208 metadata: Metadata;
209 options: CallOptions;
210 callback: UnaryCallback<ResponseType>;
211 } {
212 if (isFunction(arg1)) {
213 return { metadata: new Metadata(), options: {}, callback: arg1 };
214 } else if (isFunction(arg2)) {
215 if (arg1 instanceof Metadata) {
216 return { metadata: arg1, options: {}, callback: arg2 };
217 } else {
218 return { metadata: new Metadata(), options: arg1, callback: arg2 };
219 }
220 } else {
221 if (
222 !(
223 arg1 instanceof Metadata &&
224 arg2 instanceof Object &&
225 isFunction(arg3)
226 )
227 ) {
228 throw new Error('Incorrect arguments passed');
229 }
230 return { metadata: arg1, options: arg2, callback: arg3 };
231 }
232 }
233
234 makeUnaryRequest<RequestType, ResponseType>(
235 method: string,
236 serialize: (value: RequestType) => Buffer,
237 deserialize: (value: Buffer) => ResponseType,
238 argument: RequestType,
239 metadata: Metadata,
240 options: CallOptions,
241 callback: UnaryCallback<ResponseType>
242 ): ClientUnaryCall;
243 makeUnaryRequest<RequestType, ResponseType>(
244 method: string,
245 serialize: (value: RequestType) => Buffer,
246 deserialize: (value: Buffer) => ResponseType,
247 argument: RequestType,
248 metadata: Metadata,
249 callback: UnaryCallback<ResponseType>
250 ): ClientUnaryCall;
251 makeUnaryRequest<RequestType, ResponseType>(
252 method: string,
253 serialize: (value: RequestType) => Buffer,
254 deserialize: (value: Buffer) => ResponseType,
255 argument: RequestType,
256 options: CallOptions,
257 callback: UnaryCallback<ResponseType>
258 ): ClientUnaryCall;
259 makeUnaryRequest<RequestType, ResponseType>(
260 method: string,
261 serialize: (value: RequestType) => Buffer,
262 deserialize: (value: Buffer) => ResponseType,
263 argument: RequestType,
264 callback: UnaryCallback<ResponseType>
265 ): ClientUnaryCall;
266 makeUnaryRequest<RequestType, ResponseType>(
267 method: string,
268 serialize: (value: RequestType) => Buffer,
269 deserialize: (value: Buffer) => ResponseType,
270 argument: RequestType,
271 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
272 options?: CallOptions | UnaryCallback<ResponseType>,
273 callback?: UnaryCallback<ResponseType>
274 ): ClientUnaryCall {
275 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
276 metadata,
277 options,
278 callback
279 );
280 const methodDefinition: ClientMethodDefinition<
281 RequestType,
282 ResponseType
283 > = {
284 path: method,
285 requestStream: false,
286 responseStream: false,
287 requestSerialize: serialize,
288 responseDeserialize: deserialize,
289 };
290 let callProperties: CallProperties<RequestType, ResponseType> = {
291 argument: argument,
292 metadata: checkedArguments.metadata,
293 call: new ClientUnaryCallImpl(),
294 channel: this[CHANNEL_SYMBOL],
295 methodDefinition: methodDefinition,
296 callOptions: checkedArguments.options,
297 callback: checkedArguments.callback,
298 };
299 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
300 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
301 callProperties
302 ) as CallProperties<RequestType, ResponseType>;
303 }
304 const emitter: ClientUnaryCall = callProperties.call;
305 const interceptorArgs: InterceptorArguments = {
306 clientInterceptors: this[INTERCEPTOR_SYMBOL],
307 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
308 callInterceptors: callProperties.callOptions.interceptors ?? [],
309 callInterceptorProviders:
310 callProperties.callOptions.interceptor_providers ?? [],
311 };
312 const call: InterceptingCallInterface = getInterceptingCall(
313 interceptorArgs,
314 callProperties.methodDefinition,
315 callProperties.callOptions,
316 callProperties.channel
317 );
318 /* This needs to happen before the emitter is used. Unfortunately we can't
319 * enforce this with the type system. We need to construct this emitter
320 * before calling the CallInvocationTransformer, and we need to create the
321 * call after that. */
322 emitter.call = call;
323 if (callProperties.callOptions.credentials) {
324 call.setCredentials(callProperties.callOptions.credentials);
325 }
326 let responseMessage: ResponseType | null = null;
327 let receivedStatus = false;
328 const callerStackError = new Error();
329 call.start(callProperties.metadata, {
330 onReceiveMetadata: (metadata) => {
331 emitter.emit('metadata', metadata);
332 },
333 // eslint-disable-next-line @typescript-eslint/no-explicit-any
334 onReceiveMessage(message: any) {
335 if (responseMessage !== null) {
336 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
337 }
338 responseMessage = message;
339 },
340 onReceiveStatus(status: StatusObject) {
341 if (receivedStatus) {
342 return;
343 }
344 receivedStatus = true;
345 if (status.code === Status.OK) {
346 if (responseMessage === null) {
347 const callerStack = getErrorStackString(callerStackError);
348 callProperties.callback!(callErrorFromStatus({
349 code: Status.INTERNAL,
350 details: 'No message received',
351 metadata: status.metadata
352 }, callerStack));
353 } else {
354 callProperties.callback!(null, responseMessage);
355 }
356 } else {
357 const callerStack = getErrorStackString(callerStackError);
358 callProperties.callback!(callErrorFromStatus(status, callerStack));
359 }
360 emitter.emit('status', status);
361 },
362 });
363 call.sendMessage(argument);
364 call.halfClose();
365 return emitter;
366 }
367
368 makeClientStreamRequest<RequestType, ResponseType>(
369 method: string,
370 serialize: (value: RequestType) => Buffer,
371 deserialize: (value: Buffer) => ResponseType,
372 metadata: Metadata,
373 options: CallOptions,
374 callback: UnaryCallback<ResponseType>
375 ): ClientWritableStream<RequestType>;
376 makeClientStreamRequest<RequestType, ResponseType>(
377 method: string,
378 serialize: (value: RequestType) => Buffer,
379 deserialize: (value: Buffer) => ResponseType,
380 metadata: Metadata,
381 callback: UnaryCallback<ResponseType>
382 ): ClientWritableStream<RequestType>;
383 makeClientStreamRequest<RequestType, ResponseType>(
384 method: string,
385 serialize: (value: RequestType) => Buffer,
386 deserialize: (value: Buffer) => ResponseType,
387 options: CallOptions,
388 callback: UnaryCallback<ResponseType>
389 ): ClientWritableStream<RequestType>;
390 makeClientStreamRequest<RequestType, ResponseType>(
391 method: string,
392 serialize: (value: RequestType) => Buffer,
393 deserialize: (value: Buffer) => ResponseType,
394 callback: UnaryCallback<ResponseType>
395 ): ClientWritableStream<RequestType>;
396 makeClientStreamRequest<RequestType, ResponseType>(
397 method: string,
398 serialize: (value: RequestType) => Buffer,
399 deserialize: (value: Buffer) => ResponseType,
400 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
401 options?: CallOptions | UnaryCallback<ResponseType>,
402 callback?: UnaryCallback<ResponseType>
403 ): ClientWritableStream<RequestType> {
404 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
405 metadata,
406 options,
407 callback
408 );
409 const methodDefinition: ClientMethodDefinition<
410 RequestType,
411 ResponseType
412 > = {
413 path: method,
414 requestStream: true,
415 responseStream: false,
416 requestSerialize: serialize,
417 responseDeserialize: deserialize,
418 };
419 let callProperties: CallProperties<RequestType, ResponseType> = {
420 metadata: checkedArguments.metadata,
421 call: new ClientWritableStreamImpl<RequestType>(serialize),
422 channel: this[CHANNEL_SYMBOL],
423 methodDefinition: methodDefinition,
424 callOptions: checkedArguments.options,
425 callback: checkedArguments.callback,
426 };
427 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
428 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
429 callProperties
430 ) as CallProperties<RequestType, ResponseType>;
431 }
432 const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
433 const interceptorArgs: InterceptorArguments = {
434 clientInterceptors: this[INTERCEPTOR_SYMBOL],
435 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
436 callInterceptors: callProperties.callOptions.interceptors ?? [],
437 callInterceptorProviders:
438 callProperties.callOptions.interceptor_providers ?? [],
439 };
440 const call: InterceptingCallInterface = getInterceptingCall(
441 interceptorArgs,
442 callProperties.methodDefinition,
443 callProperties.callOptions,
444 callProperties.channel
445 );
446 /* This needs to happen before the emitter is used. Unfortunately we can't
447 * enforce this with the type system. We need to construct this emitter
448 * before calling the CallInvocationTransformer, and we need to create the
449 * call after that. */
450 emitter.call = call;
451 if (callProperties.callOptions.credentials) {
452 call.setCredentials(callProperties.callOptions.credentials);
453 }
454 let responseMessage: ResponseType | null = null;
455 let receivedStatus = false;
456 const callerStackError = new Error();
457 call.start(callProperties.metadata, {
458 onReceiveMetadata: (metadata) => {
459 emitter.emit('metadata', metadata);
460 },
461 // eslint-disable-next-line @typescript-eslint/no-explicit-any
462 onReceiveMessage(message: any) {
463 if (responseMessage !== null) {
464 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
465 }
466 responseMessage = message;
467 },
468 onReceiveStatus(status: StatusObject) {
469 if (receivedStatus) {
470 return;
471 }
472 receivedStatus = true;
473 if (status.code === Status.OK) {
474 if (responseMessage === null) {
475 const callerStack = getErrorStackString(callerStackError);
476 callProperties.callback!(callErrorFromStatus({
477 code: Status.INTERNAL,
478 details: 'No message received',
479 metadata: status.metadata
480 }, callerStack));
481 } else {
482 callProperties.callback!(null, responseMessage);
483 }
484 } else {
485 const callerStack = getErrorStackString(callerStackError);
486 callProperties.callback!(callErrorFromStatus(status, callerStack));
487 }
488 emitter.emit('status', status);
489 },
490 });
491 return emitter;
492 }
493
494 private checkMetadataAndOptions(
495 arg1?: Metadata | CallOptions,
496 arg2?: CallOptions
497 ): { metadata: Metadata; options: CallOptions } {
498 let metadata: Metadata;
499 let options: CallOptions;
500 if (arg1 instanceof Metadata) {
501 metadata = arg1;
502 if (arg2) {
503 options = arg2;
504 } else {
505 options = {};
506 }
507 } else {
508 if (arg1) {
509 options = arg1;
510 } else {
511 options = {};
512 }
513 metadata = new Metadata();
514 }
515 return { metadata, options };
516 }
517
518 makeServerStreamRequest<RequestType, ResponseType>(
519 method: string,
520 serialize: (value: RequestType) => Buffer,
521 deserialize: (value: Buffer) => ResponseType,
522 argument: RequestType,
523 metadata: Metadata,
524 options?: CallOptions
525 ): ClientReadableStream<ResponseType>;
526 makeServerStreamRequest<RequestType, ResponseType>(
527 method: string,
528 serialize: (value: RequestType) => Buffer,
529 deserialize: (value: Buffer) => ResponseType,
530 argument: RequestType,
531 options?: CallOptions
532 ): ClientReadableStream<ResponseType>;
533 makeServerStreamRequest<RequestType, ResponseType>(
534 method: string,
535 serialize: (value: RequestType) => Buffer,
536 deserialize: (value: Buffer) => ResponseType,
537 argument: RequestType,
538 metadata?: Metadata | CallOptions,
539 options?: CallOptions
540 ): ClientReadableStream<ResponseType> {
541 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
542 const methodDefinition: ClientMethodDefinition<
543 RequestType,
544 ResponseType
545 > = {
546 path: method,
547 requestStream: false,
548 responseStream: true,
549 requestSerialize: serialize,
550 responseDeserialize: deserialize,
551 };
552 let callProperties: CallProperties<RequestType, ResponseType> = {
553 argument: argument,
554 metadata: checkedArguments.metadata,
555 call: new ClientReadableStreamImpl<ResponseType>(deserialize),
556 channel: this[CHANNEL_SYMBOL],
557 methodDefinition: methodDefinition,
558 callOptions: checkedArguments.options,
559 };
560 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
561 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
562 callProperties
563 ) as CallProperties<RequestType, ResponseType>;
564 }
565 const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
566 const interceptorArgs: InterceptorArguments = {
567 clientInterceptors: this[INTERCEPTOR_SYMBOL],
568 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
569 callInterceptors: callProperties.callOptions.interceptors ?? [],
570 callInterceptorProviders:
571 callProperties.callOptions.interceptor_providers ?? [],
572 };
573 const call: InterceptingCallInterface = getInterceptingCall(
574 interceptorArgs,
575 callProperties.methodDefinition,
576 callProperties.callOptions,
577 callProperties.channel
578 );
579 /* This needs to happen before the emitter is used. Unfortunately we can't
580 * enforce this with the type system. We need to construct this emitter
581 * before calling the CallInvocationTransformer, and we need to create the
582 * call after that. */
583 stream.call = call;
584 if (callProperties.callOptions.credentials) {
585 call.setCredentials(callProperties.callOptions.credentials);
586 }
587 let receivedStatus = false;
588 const callerStackError = new Error();
589 call.start(callProperties.metadata, {
590 onReceiveMetadata(metadata: Metadata) {
591 stream.emit('metadata', metadata);
592 },
593 // eslint-disable-next-line @typescript-eslint/no-explicit-any
594 onReceiveMessage(message: any) {
595 stream.push(message);
596 },
597 onReceiveStatus(status: StatusObject) {
598 if (receivedStatus) {
599 return;
600 }
601 receivedStatus = true;
602 stream.push(null);
603 if (status.code !== Status.OK) {
604 const callerStack = getErrorStackString(callerStackError);
605 stream.emit('error', callErrorFromStatus(status, callerStack));
606 }
607 stream.emit('status', status);
608 },
609 });
610 call.sendMessage(argument);
611 call.halfClose();
612 return stream;
613 }
614
615 makeBidiStreamRequest<RequestType, ResponseType>(
616 method: string,
617 serialize: (value: RequestType) => Buffer,
618 deserialize: (value: Buffer) => ResponseType,
619 metadata: Metadata,
620 options?: CallOptions
621 ): ClientDuplexStream<RequestType, ResponseType>;
622 makeBidiStreamRequest<RequestType, ResponseType>(
623 method: string,
624 serialize: (value: RequestType) => Buffer,
625 deserialize: (value: Buffer) => ResponseType,
626 options?: CallOptions
627 ): ClientDuplexStream<RequestType, ResponseType>;
628 makeBidiStreamRequest<RequestType, ResponseType>(
629 method: string,
630 serialize: (value: RequestType) => Buffer,
631 deserialize: (value: Buffer) => ResponseType,
632 metadata?: Metadata | CallOptions,
633 options?: CallOptions
634 ): ClientDuplexStream<RequestType, ResponseType> {
635 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
636 const methodDefinition: ClientMethodDefinition<
637 RequestType,
638 ResponseType
639 > = {
640 path: method,
641 requestStream: true,
642 responseStream: true,
643 requestSerialize: serialize,
644 responseDeserialize: deserialize,
645 };
646 let callProperties: CallProperties<RequestType, ResponseType> = {
647 metadata: checkedArguments.metadata,
648 call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
649 serialize,
650 deserialize
651 ),
652 channel: this[CHANNEL_SYMBOL],
653 methodDefinition: methodDefinition,
654 callOptions: checkedArguments.options,
655 };
656 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
657 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
658 callProperties
659 ) as CallProperties<RequestType, ResponseType>;
660 }
661 const stream: ClientDuplexStream<
662 RequestType,
663 ResponseType
664 > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
665 const interceptorArgs: InterceptorArguments = {
666 clientInterceptors: this[INTERCEPTOR_SYMBOL],
667 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
668 callInterceptors: callProperties.callOptions.interceptors ?? [],
669 callInterceptorProviders:
670 callProperties.callOptions.interceptor_providers ?? [],
671 };
672 const call: InterceptingCallInterface = getInterceptingCall(
673 interceptorArgs,
674 callProperties.methodDefinition,
675 callProperties.callOptions,
676 callProperties.channel
677 );
678 /* This needs to happen before the emitter is used. Unfortunately we can't
679 * enforce this with the type system. We need to construct this emitter
680 * before calling the CallInvocationTransformer, and we need to create the
681 * call after that. */
682 stream.call = call;
683 if (callProperties.callOptions.credentials) {
684 call.setCredentials(callProperties.callOptions.credentials);
685 }
686 let receivedStatus = false;
687 const callerStackError = new Error();
688 call.start(callProperties.metadata, {
689 onReceiveMetadata(metadata: Metadata) {
690 stream.emit('metadata', metadata);
691 },
692 onReceiveMessage(message: Buffer) {
693 stream.push(message);
694 },
695 onReceiveStatus(status: StatusObject) {
696 if (receivedStatus) {
697 return;
698 }
699 receivedStatus = true;
700 stream.push(null);
701 if (status.code !== Status.OK) {
702 const callerStack = getErrorStackString(callerStackError);
703 stream.emit('error', callErrorFromStatus(status, callerStack));
704 }
705 stream.emit('status', status);
706 },
707 });
708 return stream;
709 }
710}