UNPKG

24.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 ClientDuplexStream,
20 ClientDuplexStreamImpl,
21 ClientReadableStream,
22 ClientReadableStreamImpl,
23 ClientUnaryCall,
24 ClientUnaryCallImpl,
25 ClientWritableStream,
26 ClientWritableStreamImpl,
27 ServiceError,
28 callErrorFromStatus,
29 SurfaceCall,
30} from './call';
31import { CallCredentials } from './call-credentials';
32import { Deadline, StatusObject } from './call-stream';
33import { Channel, ChannelImplementation } from './channel';
34import { ConnectivityState } from './connectivity-state';
35import { ChannelCredentials } from './channel-credentials';
36import { ChannelOptions } from './channel-options';
37import { Status } from './constants';
38import { Metadata } from './metadata';
39import { ClientMethodDefinition } from './make-client';
40import {
41 getInterceptingCall,
42 Interceptor,
43 InterceptorProvider,
44 InterceptorArguments,
45 InterceptingCallInterface,
46} from './client-interceptors';
47import {
48 ServerUnaryCall,
49 ServerReadableStream,
50 ServerWritableStream,
51 ServerDuplexStream,
52} from './server-call';
53
54const CHANNEL_SYMBOL = Symbol();
55const INTERCEPTOR_SYMBOL = Symbol();
56const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
57const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
58
59function isFunction<ResponseType>(
60 arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
61): arg is UnaryCallback<ResponseType> {
62 return typeof arg === 'function';
63}
64
65export interface UnaryCallback<ResponseType> {
66 (err: ServiceError | null, value?: ResponseType): void;
67}
68
69/* eslint-disable @typescript-eslint/no-explicit-any */
70export interface CallOptions {
71 deadline?: Deadline;
72 host?: string;
73 parent?:
74 | ServerUnaryCall<any, any>
75 | ServerReadableStream<any, any>
76 | ServerWritableStream<any, any>
77 | ServerDuplexStream<any, any>;
78 propagate_flags?: number;
79 credentials?: CallCredentials;
80 interceptors?: Interceptor[];
81 interceptor_providers?: InterceptorProvider[];
82}
83/* eslint-enable @typescript-eslint/no-explicit-any */
84
85export interface CallProperties<RequestType, ResponseType> {
86 argument?: RequestType;
87 metadata: Metadata;
88 call: SurfaceCall;
89 channel: Channel;
90 methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
91 callOptions: CallOptions;
92 callback?: UnaryCallback<ResponseType>;
93}
94
95export interface CallInvocationTransformer {
96 (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
97}
98
99export type ClientOptions = Partial<ChannelOptions> & {
100 channelOverride?: Channel;
101 channelFactoryOverride?: (
102 address: string,
103 credentials: ChannelCredentials,
104 options: ClientOptions
105 ) => Channel;
106 interceptors?: Interceptor[];
107 interceptor_providers?: InterceptorProvider[];
108 callInvocationTransformer?: CallInvocationTransformer;
109};
110
111/**
112 * A generic gRPC client. Primarily useful as a base class for all generated
113 * clients.
114 */
115export class Client {
116 private readonly [CHANNEL_SYMBOL]: Channel;
117 private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
118 private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
119 private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
120 constructor(
121 address: string,
122 credentials: ChannelCredentials,
123 options: ClientOptions = {}
124 ) {
125 options = Object.assign({}, options);
126 this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
127 delete options.interceptors;
128 this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
129 delete options.interceptor_providers;
130 if (
131 this[INTERCEPTOR_SYMBOL].length > 0 &&
132 this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
133 ) {
134 throw new Error(
135 'Both interceptors and interceptor_providers were passed as options ' +
136 'to the client constructor. Only one of these is allowed.'
137 );
138 }
139 this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
140 options.callInvocationTransformer;
141 delete options.callInvocationTransformer;
142 if (options.channelOverride) {
143 this[CHANNEL_SYMBOL] = options.channelOverride;
144 } else if (options.channelFactoryOverride) {
145 const channelFactoryOverride = options.channelFactoryOverride;
146 delete options.channelFactoryOverride;
147 this[CHANNEL_SYMBOL] = channelFactoryOverride(
148 address,
149 credentials,
150 options
151 );
152 } else {
153 this[CHANNEL_SYMBOL] = new ChannelImplementation(
154 address,
155 credentials,
156 options
157 );
158 }
159 }
160
161 close(): void {
162 this[CHANNEL_SYMBOL].close();
163 }
164
165 getChannel(): Channel {
166 return this[CHANNEL_SYMBOL];
167 }
168
169 waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
170 const checkState = (err?: Error) => {
171 if (err) {
172 callback(new Error('Failed to connect before the deadline'));
173 return;
174 }
175 let newState;
176 try {
177 newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
178 } catch (e) {
179 callback(new Error('The channel has been closed'));
180 return;
181 }
182 if (newState === ConnectivityState.READY) {
183 callback();
184 } else {
185 try {
186 this[CHANNEL_SYMBOL].watchConnectivityState(
187 newState,
188 deadline,
189 checkState
190 );
191 } catch (e) {
192 callback(new Error('The channel has been closed'));
193 }
194 }
195 };
196 setImmediate(checkState);
197 }
198
199 private checkOptionalUnaryResponseArguments<ResponseType>(
200 arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
201 arg2?: CallOptions | UnaryCallback<ResponseType>,
202 arg3?: UnaryCallback<ResponseType>
203 ): {
204 metadata: Metadata;
205 options: CallOptions;
206 callback: UnaryCallback<ResponseType>;
207 } {
208 if (isFunction(arg1)) {
209 return { metadata: new Metadata(), options: {}, callback: arg1 };
210 } else if (isFunction(arg2)) {
211 if (arg1 instanceof Metadata) {
212 return { metadata: arg1, options: {}, callback: arg2 };
213 } else {
214 return { metadata: new Metadata(), options: arg1, callback: arg2 };
215 }
216 } else {
217 if (
218 !(
219 arg1 instanceof Metadata &&
220 arg2 instanceof Object &&
221 isFunction(arg3)
222 )
223 ) {
224 throw new Error('Incorrect arguments passed');
225 }
226 return { metadata: arg1, options: arg2, callback: arg3 };
227 }
228 }
229
230 makeUnaryRequest<RequestType, ResponseType>(
231 method: string,
232 serialize: (value: RequestType) => Buffer,
233 deserialize: (value: Buffer) => ResponseType,
234 argument: RequestType,
235 metadata: Metadata,
236 options: CallOptions,
237 callback: UnaryCallback<ResponseType>
238 ): ClientUnaryCall;
239 makeUnaryRequest<RequestType, ResponseType>(
240 method: string,
241 serialize: (value: RequestType) => Buffer,
242 deserialize: (value: Buffer) => ResponseType,
243 argument: RequestType,
244 metadata: Metadata,
245 callback: UnaryCallback<ResponseType>
246 ): ClientUnaryCall;
247 makeUnaryRequest<RequestType, ResponseType>(
248 method: string,
249 serialize: (value: RequestType) => Buffer,
250 deserialize: (value: Buffer) => ResponseType,
251 argument: RequestType,
252 options: CallOptions,
253 callback: UnaryCallback<ResponseType>
254 ): ClientUnaryCall;
255 makeUnaryRequest<RequestType, ResponseType>(
256 method: string,
257 serialize: (value: RequestType) => Buffer,
258 deserialize: (value: Buffer) => ResponseType,
259 argument: RequestType,
260 callback: UnaryCallback<ResponseType>
261 ): ClientUnaryCall;
262 makeUnaryRequest<RequestType, ResponseType>(
263 method: string,
264 serialize: (value: RequestType) => Buffer,
265 deserialize: (value: Buffer) => ResponseType,
266 argument: RequestType,
267 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
268 options?: CallOptions | UnaryCallback<ResponseType>,
269 callback?: UnaryCallback<ResponseType>
270 ): ClientUnaryCall {
271 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
272 metadata,
273 options,
274 callback
275 );
276 const methodDefinition: ClientMethodDefinition<
277 RequestType,
278 ResponseType
279 > = {
280 path: method,
281 requestStream: false,
282 responseStream: false,
283 requestSerialize: serialize,
284 responseDeserialize: deserialize,
285 };
286 let callProperties: CallProperties<RequestType, ResponseType> = {
287 argument: argument,
288 metadata: checkedArguments.metadata,
289 call: new ClientUnaryCallImpl(),
290 channel: this[CHANNEL_SYMBOL],
291 methodDefinition: methodDefinition,
292 callOptions: checkedArguments.options,
293 callback: checkedArguments.callback,
294 };
295 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
296 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
297 callProperties
298 ) as CallProperties<RequestType, ResponseType>;
299 }
300 const emitter: ClientUnaryCall = callProperties.call;
301 const interceptorArgs: InterceptorArguments = {
302 clientInterceptors: this[INTERCEPTOR_SYMBOL],
303 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
304 callInterceptors: callProperties.callOptions.interceptors ?? [],
305 callInterceptorProviders:
306 callProperties.callOptions.interceptor_providers ?? [],
307 };
308 const call: InterceptingCallInterface = getInterceptingCall(
309 interceptorArgs,
310 callProperties.methodDefinition,
311 callProperties.callOptions,
312 callProperties.channel
313 );
314 /* This needs to happen before the emitter is used. Unfortunately we can't
315 * enforce this with the type system. We need to construct this emitter
316 * before calling the CallInvocationTransformer, and we need to create the
317 * call after that. */
318 emitter.call = call;
319 if (callProperties.callOptions.credentials) {
320 call.setCredentials(callProperties.callOptions.credentials);
321 }
322 let responseMessage: ResponseType | null = null;
323 let receivedStatus = false;
324 const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
325 call.start(callProperties.metadata, {
326 onReceiveMetadata: (metadata) => {
327 emitter.emit('metadata', metadata);
328 },
329 // eslint-disable-next-line @typescript-eslint/no-explicit-any
330 onReceiveMessage(message: any) {
331 if (responseMessage !== null) {
332 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
333 }
334 responseMessage = message;
335 },
336 onReceiveStatus(status: StatusObject) {
337 if (receivedStatus) {
338 return;
339 }
340 receivedStatus = true;
341 if (status.code === Status.OK) {
342 if (responseMessage === null) {
343 callProperties.callback!(callErrorFromStatus({
344 code: Status.INTERNAL,
345 details: 'No message received',
346 metadata: status.metadata
347 }, callerStack));
348 } else {
349 callProperties.callback!(null, responseMessage);
350 }
351 } else {
352 callProperties.callback!(callErrorFromStatus(status, callerStack));
353 }
354 emitter.emit('status', status);
355 },
356 });
357 call.sendMessage(argument);
358 call.halfClose();
359 return emitter;
360 }
361
362 makeClientStreamRequest<RequestType, ResponseType>(
363 method: string,
364 serialize: (value: RequestType) => Buffer,
365 deserialize: (value: Buffer) => ResponseType,
366 metadata: Metadata,
367 options: CallOptions,
368 callback: UnaryCallback<ResponseType>
369 ): ClientWritableStream<RequestType>;
370 makeClientStreamRequest<RequestType, ResponseType>(
371 method: string,
372 serialize: (value: RequestType) => Buffer,
373 deserialize: (value: Buffer) => ResponseType,
374 metadata: Metadata,
375 callback: UnaryCallback<ResponseType>
376 ): ClientWritableStream<RequestType>;
377 makeClientStreamRequest<RequestType, ResponseType>(
378 method: string,
379 serialize: (value: RequestType) => Buffer,
380 deserialize: (value: Buffer) => ResponseType,
381 options: CallOptions,
382 callback: UnaryCallback<ResponseType>
383 ): ClientWritableStream<RequestType>;
384 makeClientStreamRequest<RequestType, ResponseType>(
385 method: string,
386 serialize: (value: RequestType) => Buffer,
387 deserialize: (value: Buffer) => ResponseType,
388 callback: UnaryCallback<ResponseType>
389 ): ClientWritableStream<RequestType>;
390 makeClientStreamRequest<RequestType, ResponseType>(
391 method: string,
392 serialize: (value: RequestType) => Buffer,
393 deserialize: (value: Buffer) => ResponseType,
394 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
395 options?: CallOptions | UnaryCallback<ResponseType>,
396 callback?: UnaryCallback<ResponseType>
397 ): ClientWritableStream<RequestType> {
398 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
399 metadata,
400 options,
401 callback
402 );
403 const methodDefinition: ClientMethodDefinition<
404 RequestType,
405 ResponseType
406 > = {
407 path: method,
408 requestStream: true,
409 responseStream: false,
410 requestSerialize: serialize,
411 responseDeserialize: deserialize,
412 };
413 let callProperties: CallProperties<RequestType, ResponseType> = {
414 metadata: checkedArguments.metadata,
415 call: new ClientWritableStreamImpl<RequestType>(serialize),
416 channel: this[CHANNEL_SYMBOL],
417 methodDefinition: methodDefinition,
418 callOptions: checkedArguments.options,
419 callback: checkedArguments.callback,
420 };
421 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
422 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
423 callProperties
424 ) as CallProperties<RequestType, ResponseType>;
425 }
426 const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
427 const interceptorArgs: InterceptorArguments = {
428 clientInterceptors: this[INTERCEPTOR_SYMBOL],
429 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
430 callInterceptors: callProperties.callOptions.interceptors ?? [],
431 callInterceptorProviders:
432 callProperties.callOptions.interceptor_providers ?? [],
433 };
434 const call: InterceptingCallInterface = getInterceptingCall(
435 interceptorArgs,
436 callProperties.methodDefinition,
437 callProperties.callOptions,
438 callProperties.channel
439 );
440 /* This needs to happen before the emitter is used. Unfortunately we can't
441 * enforce this with the type system. We need to construct this emitter
442 * before calling the CallInvocationTransformer, and we need to create the
443 * call after that. */
444 emitter.call = call;
445 if (callProperties.callOptions.credentials) {
446 call.setCredentials(callProperties.callOptions.credentials);
447 }
448 let responseMessage: ResponseType | null = null;
449 let receivedStatus = false;
450 const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
451 call.start(callProperties.metadata, {
452 onReceiveMetadata: (metadata) => {
453 emitter.emit('metadata', metadata);
454 },
455 // eslint-disable-next-line @typescript-eslint/no-explicit-any
456 onReceiveMessage(message: any) {
457 if (responseMessage !== null) {
458 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
459 }
460 responseMessage = message;
461 },
462 onReceiveStatus(status: StatusObject) {
463 if (receivedStatus) {
464 return;
465 }
466 receivedStatus = true;
467 if (status.code === Status.OK) {
468 if (responseMessage === null) {
469 callProperties.callback!(callErrorFromStatus({
470 code: Status.INTERNAL,
471 details: 'No message received',
472 metadata: status.metadata
473 }, callerStack));
474 } else {
475 callProperties.callback!(null, responseMessage);
476 }
477 } else {
478 callProperties.callback!(callErrorFromStatus(status, callerStack));
479 }
480 emitter.emit('status', status);
481 },
482 });
483 return emitter;
484 }
485
486 private checkMetadataAndOptions(
487 arg1?: Metadata | CallOptions,
488 arg2?: CallOptions
489 ): { metadata: Metadata; options: CallOptions } {
490 let metadata: Metadata;
491 let options: CallOptions;
492 if (arg1 instanceof Metadata) {
493 metadata = arg1;
494 if (arg2) {
495 options = arg2;
496 } else {
497 options = {};
498 }
499 } else {
500 if (arg1) {
501 options = arg1;
502 } else {
503 options = {};
504 }
505 metadata = new Metadata();
506 }
507 return { metadata, options };
508 }
509
510 makeServerStreamRequest<RequestType, ResponseType>(
511 method: string,
512 serialize: (value: RequestType) => Buffer,
513 deserialize: (value: Buffer) => ResponseType,
514 argument: RequestType,
515 metadata: Metadata,
516 options?: CallOptions
517 ): ClientReadableStream<ResponseType>;
518 makeServerStreamRequest<RequestType, ResponseType>(
519 method: string,
520 serialize: (value: RequestType) => Buffer,
521 deserialize: (value: Buffer) => ResponseType,
522 argument: RequestType,
523 options?: CallOptions
524 ): ClientReadableStream<ResponseType>;
525 makeServerStreamRequest<RequestType, ResponseType>(
526 method: string,
527 serialize: (value: RequestType) => Buffer,
528 deserialize: (value: Buffer) => ResponseType,
529 argument: RequestType,
530 metadata?: Metadata | CallOptions,
531 options?: CallOptions
532 ): ClientReadableStream<ResponseType> {
533 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
534 const methodDefinition: ClientMethodDefinition<
535 RequestType,
536 ResponseType
537 > = {
538 path: method,
539 requestStream: false,
540 responseStream: true,
541 requestSerialize: serialize,
542 responseDeserialize: deserialize,
543 };
544 let callProperties: CallProperties<RequestType, ResponseType> = {
545 argument: argument,
546 metadata: checkedArguments.metadata,
547 call: new ClientReadableStreamImpl<ResponseType>(deserialize),
548 channel: this[CHANNEL_SYMBOL],
549 methodDefinition: methodDefinition,
550 callOptions: checkedArguments.options,
551 };
552 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
553 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
554 callProperties
555 ) as CallProperties<RequestType, ResponseType>;
556 }
557 const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
558 const interceptorArgs: InterceptorArguments = {
559 clientInterceptors: this[INTERCEPTOR_SYMBOL],
560 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
561 callInterceptors: callProperties.callOptions.interceptors ?? [],
562 callInterceptorProviders:
563 callProperties.callOptions.interceptor_providers ?? [],
564 };
565 const call: InterceptingCallInterface = getInterceptingCall(
566 interceptorArgs,
567 callProperties.methodDefinition,
568 callProperties.callOptions,
569 callProperties.channel
570 );
571 /* This needs to happen before the emitter is used. Unfortunately we can't
572 * enforce this with the type system. We need to construct this emitter
573 * before calling the CallInvocationTransformer, and we need to create the
574 * call after that. */
575 stream.call = call;
576 if (callProperties.callOptions.credentials) {
577 call.setCredentials(callProperties.callOptions.credentials);
578 }
579 let receivedStatus = false;
580 const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
581 call.start(callProperties.metadata, {
582 onReceiveMetadata(metadata: Metadata) {
583 stream.emit('metadata', metadata);
584 },
585 // eslint-disable-next-line @typescript-eslint/no-explicit-any
586 onReceiveMessage(message: any) {
587 stream.push(message);
588 },
589 onReceiveStatus(status: StatusObject) {
590 if (receivedStatus) {
591 return;
592 }
593 receivedStatus = true;
594 stream.push(null);
595 if (status.code !== Status.OK) {
596 stream.emit('error', callErrorFromStatus(status, callerStack));
597 }
598 stream.emit('status', status);
599 },
600 });
601 call.sendMessage(argument);
602 call.halfClose();
603 return stream;
604 }
605
606 makeBidiStreamRequest<RequestType, ResponseType>(
607 method: string,
608 serialize: (value: RequestType) => Buffer,
609 deserialize: (value: Buffer) => ResponseType,
610 metadata: Metadata,
611 options?: CallOptions
612 ): ClientDuplexStream<RequestType, ResponseType>;
613 makeBidiStreamRequest<RequestType, ResponseType>(
614 method: string,
615 serialize: (value: RequestType) => Buffer,
616 deserialize: (value: Buffer) => ResponseType,
617 options?: CallOptions
618 ): ClientDuplexStream<RequestType, ResponseType>;
619 makeBidiStreamRequest<RequestType, ResponseType>(
620 method: string,
621 serialize: (value: RequestType) => Buffer,
622 deserialize: (value: Buffer) => ResponseType,
623 metadata?: Metadata | CallOptions,
624 options?: CallOptions
625 ): ClientDuplexStream<RequestType, ResponseType> {
626 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
627 const methodDefinition: ClientMethodDefinition<
628 RequestType,
629 ResponseType
630 > = {
631 path: method,
632 requestStream: true,
633 responseStream: true,
634 requestSerialize: serialize,
635 responseDeserialize: deserialize,
636 };
637 let callProperties: CallProperties<RequestType, ResponseType> = {
638 metadata: checkedArguments.metadata,
639 call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
640 serialize,
641 deserialize
642 ),
643 channel: this[CHANNEL_SYMBOL],
644 methodDefinition: methodDefinition,
645 callOptions: checkedArguments.options,
646 };
647 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
648 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
649 callProperties
650 ) as CallProperties<RequestType, ResponseType>;
651 }
652 const stream: ClientDuplexStream<
653 RequestType,
654 ResponseType
655 > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
656 const interceptorArgs: InterceptorArguments = {
657 clientInterceptors: this[INTERCEPTOR_SYMBOL],
658 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
659 callInterceptors: callProperties.callOptions.interceptors ?? [],
660 callInterceptorProviders:
661 callProperties.callOptions.interceptor_providers ?? [],
662 };
663 const call: InterceptingCallInterface = getInterceptingCall(
664 interceptorArgs,
665 callProperties.methodDefinition,
666 callProperties.callOptions,
667 callProperties.channel
668 );
669 /* This needs to happen before the emitter is used. Unfortunately we can't
670 * enforce this with the type system. We need to construct this emitter
671 * before calling the CallInvocationTransformer, and we need to create the
672 * call after that. */
673 stream.call = call;
674 if (callProperties.callOptions.credentials) {
675 call.setCredentials(callProperties.callOptions.credentials);
676 }
677 let receivedStatus = false;
678 const callerStack = (new Error().stack!).split('\n').slice(1).join('\n');
679 call.start(callProperties.metadata, {
680 onReceiveMetadata(metadata: Metadata) {
681 stream.emit('metadata', metadata);
682 },
683 onReceiveMessage(message: Buffer) {
684 stream.push(message);
685 },
686 onReceiveStatus(status: StatusObject) {
687 if (receivedStatus) {
688 return;
689 }
690 receivedStatus = true;
691 stream.push(null);
692 if (status.code !== Status.OK) {
693 stream.emit('error', callErrorFromStatus(status, callerStack));
694 }
695 stream.emit('status', status);
696 },
697 });
698 return stream;
699 }
700}