1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { EventEmitter } from 'events';
|
19 | import { Duplex, Readable, Writable } from 'stream';
|
20 |
|
21 | import { StatusObject, MessageContext } from './call-interface';
|
22 | import { Status } from './constants';
|
23 | import { EmitterAugmentation1 } from './events';
|
24 | import { Metadata } from './metadata';
|
25 | import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream';
|
26 | import { InterceptingCallInterface } from './client-interceptors';
|
27 |
|
28 |
|
29 |
|
30 |
|
31 | export type ServiceError = StatusObject & Error;
|
32 |
|
33 |
|
34 |
|
35 |
|
36 | export type SurfaceCall = {
|
37 | call?: InterceptingCallInterface;
|
38 | cancel(): void;
|
39 | getPeer(): string;
|
40 | } & EmitterAugmentation1<'metadata', Metadata> &
|
41 | EmitterAugmentation1<'status', StatusObject> &
|
42 | EventEmitter;
|
43 |
|
44 |
|
45 |
|
46 |
|
47 | export type ClientUnaryCall = SurfaceCall;
|
48 |
|
49 |
|
50 |
|
51 |
|
52 | export type ClientReadableStream<ResponseType> = {
|
53 | deserialize: (chunk: Buffer) => ResponseType;
|
54 | } & SurfaceCall &
|
55 | ObjectReadable<ResponseType>;
|
56 |
|
57 |
|
58 |
|
59 |
|
60 | export type ClientWritableStream<RequestType> = {
|
61 | serialize: (value: RequestType) => Buffer;
|
62 | } & SurfaceCall &
|
63 | ObjectWritable<RequestType>;
|
64 |
|
65 |
|
66 |
|
67 |
|
68 | export type ClientDuplexStream<RequestType, ResponseType> =
|
69 | ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
|
70 |
|
71 |
|
72 |
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | export function callErrorFromStatus(
|
78 | status: StatusObject,
|
79 | callerStack: string
|
80 | ): ServiceError {
|
81 | const message = `${status.code} ${Status[status.code]}: ${status.details}`;
|
82 | const error = new Error(message);
|
83 | const stack = `${error.stack}\nfor call at\n${callerStack}`;
|
84 | return Object.assign(new Error(message), status, { stack });
|
85 | }
|
86 |
|
87 | export class ClientUnaryCallImpl
|
88 | extends EventEmitter
|
89 | implements ClientUnaryCall
|
90 | {
|
91 | public call?: InterceptingCallInterface;
|
92 | constructor() {
|
93 | super();
|
94 | }
|
95 |
|
96 | cancel(): void {
|
97 | this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
|
98 | }
|
99 |
|
100 | getPeer(): string {
|
101 | return this.call?.getPeer() ?? 'unknown';
|
102 | }
|
103 | }
|
104 |
|
105 | export class ClientReadableStreamImpl<ResponseType>
|
106 | extends Readable
|
107 | implements ClientReadableStream<ResponseType>
|
108 | {
|
109 | public call?: InterceptingCallInterface;
|
110 | constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
|
111 | super({ objectMode: true });
|
112 | }
|
113 |
|
114 | cancel(): void {
|
115 | this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
|
116 | }
|
117 |
|
118 | getPeer(): string {
|
119 | return this.call?.getPeer() ?? 'unknown';
|
120 | }
|
121 |
|
122 | _read(_size: number): void {
|
123 | this.call?.startRead();
|
124 | }
|
125 | }
|
126 |
|
127 | export class ClientWritableStreamImpl<RequestType>
|
128 | extends Writable
|
129 | implements ClientWritableStream<RequestType>
|
130 | {
|
131 | public call?: InterceptingCallInterface;
|
132 | constructor(readonly serialize: (value: RequestType) => Buffer) {
|
133 | super({ objectMode: true });
|
134 | }
|
135 |
|
136 | cancel(): void {
|
137 | this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
|
138 | }
|
139 |
|
140 | getPeer(): string {
|
141 | return this.call?.getPeer() ?? 'unknown';
|
142 | }
|
143 |
|
144 | _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
|
145 | const context: MessageContext = {
|
146 | callback: cb,
|
147 | };
|
148 | const flags = Number(encoding);
|
149 | if (!Number.isNaN(flags)) {
|
150 | context.flags = flags;
|
151 | }
|
152 | this.call?.sendMessageWithContext(context, chunk);
|
153 | }
|
154 |
|
155 | _final(cb: Function) {
|
156 | this.call?.halfClose();
|
157 | cb();
|
158 | }
|
159 | }
|
160 |
|
161 | export class ClientDuplexStreamImpl<RequestType, ResponseType>
|
162 | extends Duplex
|
163 | implements ClientDuplexStream<RequestType, ResponseType>
|
164 | {
|
165 | public call?: InterceptingCallInterface;
|
166 | constructor(
|
167 | readonly serialize: (value: RequestType) => Buffer,
|
168 | readonly deserialize: (chunk: Buffer) => ResponseType
|
169 | ) {
|
170 | super({ objectMode: true });
|
171 | }
|
172 |
|
173 | cancel(): void {
|
174 | this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
|
175 | }
|
176 |
|
177 | getPeer(): string {
|
178 | return this.call?.getPeer() ?? 'unknown';
|
179 | }
|
180 |
|
181 | _read(_size: number): void {
|
182 | this.call?.startRead();
|
183 | }
|
184 |
|
185 | _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
|
186 | const context: MessageContext = {
|
187 | callback: cb,
|
188 | };
|
189 | const flags = Number(encoding);
|
190 | if (!Number.isNaN(flags)) {
|
191 | context.flags = flags;
|
192 | }
|
193 | this.call?.sendMessageWithContext(context, chunk);
|
194 | }
|
195 |
|
196 | _final(cb: Function) {
|
197 | this.call?.halfClose();
|
198 | cb();
|
199 | }
|
200 | }
|