1 |
|
2 |
|
3 |
|
4 | import { DefaultHttpClient } from "./DefaultHttpClient";
|
5 | import { HttpClient } from "./HttpClient";
|
6 | import { IConnection } from "./IConnection";
|
7 | import { IHttpConnectionOptions } from "./IHttpConnectionOptions";
|
8 | import { ILogger, LogLevel } from "./ILogger";
|
9 | import { HttpTransportType, ITransport, TransferFormat } from "./ITransport";
|
10 | import { LongPollingTransport } from "./LongPollingTransport";
|
11 | import { ServerSentEventsTransport } from "./ServerSentEventsTransport";
|
12 | import { Arg, createLogger, Platform } from "./Utils";
|
13 | import { WebSocketTransport } from "./WebSocketTransport";
|
14 |
|
15 |
|
16 | const enum ConnectionState {
|
17 | Connecting = "Connecting ",
|
18 | Connected = "Connected",
|
19 | Disconnected = "Disconnected",
|
20 | Disconnecting = "Disconnecting",
|
21 | }
|
22 |
|
23 |
|
24 | export interface INegotiateResponse {
|
25 | connectionId?: string;
|
26 | connectionToken?: string;
|
27 | negotiateVersion?: number;
|
28 | availableTransports?: IAvailableTransport[];
|
29 | url?: string;
|
30 | accessToken?: string;
|
31 | error?: string;
|
32 | }
|
33 |
|
34 |
|
35 | export interface IAvailableTransport {
|
36 | transport: keyof typeof HttpTransportType;
|
37 | transferFormats: Array<keyof typeof TransferFormat>;
|
38 | }
|
39 |
|
40 | const MAX_REDIRECTS = 100;
|
41 |
|
42 | let WebSocketModule: any = null;
|
43 | let EventSourceModule: any = null;
|
44 | if (Platform.isNode && typeof require !== "undefined") {
|
45 |
|
46 |
|
47 | const requireFunc = typeof __webpack_require__ === "function" ? __non_webpack_require__ : require;
|
48 | WebSocketModule = requireFunc("ws");
|
49 | EventSourceModule = requireFunc("eventsource");
|
50 | }
|
51 |
|
52 |
|
53 | export class HttpConnection implements IConnection {
|
54 | private connectionState: ConnectionState;
|
55 |
|
56 |
|
57 | private connectionStarted: boolean;
|
58 | private readonly httpClient: HttpClient;
|
59 | private readonly logger: ILogger;
|
60 | private readonly options: IHttpConnectionOptions;
|
61 | private transport?: ITransport;
|
62 | private startInternalPromise?: Promise<void>;
|
63 | private stopPromise?: Promise<void>;
|
64 | private stopPromiseResolver!: (value?: PromiseLike<void>) => void;
|
65 | private stopError?: Error;
|
66 | private accessTokenFactory?: () => string | Promise<string>;
|
67 | private sendQueue?: TransportSendQueue;
|
68 |
|
69 | public readonly features: any = {};
|
70 | public baseUrl: string;
|
71 | public connectionId?: string;
|
72 | public onreceive: ((data: string | ArrayBuffer) => void) | null;
|
73 | public onclose: ((e?: Error) => void) | null;
|
74 |
|
75 | private readonly negotiateVersion: number = 1;
|
76 |
|
77 | constructor(url: string, options: IHttpConnectionOptions = {}) {
|
78 | Arg.isRequired(url, "url");
|
79 |
|
80 | this.logger = createLogger(options.logger);
|
81 | this.baseUrl = this.resolveUrl(url);
|
82 |
|
83 | options = options || {};
|
84 | options.logMessageContent = options.logMessageContent || false;
|
85 |
|
86 | if (!Platform.isNode && typeof WebSocket !== "undefined" && !options.WebSocket) {
|
87 | options.WebSocket = WebSocket;
|
88 | } else if (Platform.isNode && !options.WebSocket) {
|
89 | if (WebSocketModule) {
|
90 | options.WebSocket = WebSocketModule;
|
91 | }
|
92 | }
|
93 |
|
94 | if (!Platform.isNode && typeof EventSource !== "undefined" && !options.EventSource) {
|
95 | options.EventSource = EventSource;
|
96 | } else if (Platform.isNode && !options.EventSource) {
|
97 | if (typeof EventSourceModule !== "undefined") {
|
98 | options.EventSource = EventSourceModule;
|
99 | }
|
100 | }
|
101 |
|
102 | this.httpClient = options.httpClient || new DefaultHttpClient(this.logger);
|
103 | this.connectionState = ConnectionState.Disconnected;
|
104 | this.connectionStarted = false;
|
105 | this.options = options;
|
106 |
|
107 | this.onreceive = null;
|
108 | this.onclose = null;
|
109 | }
|
110 |
|
111 | public start(): Promise<void>;
|
112 | public start(transferFormat: TransferFormat): Promise<void>;
|
113 | public async start(transferFormat?: TransferFormat): Promise<void> {
|
114 | transferFormat = transferFormat || TransferFormat.Binary;
|
115 |
|
116 | Arg.isIn(transferFormat, TransferFormat, "transferFormat");
|
117 |
|
118 | this.logger.log(LogLevel.Debug, `Starting connection with transfer format '${TransferFormat[transferFormat]}'.`);
|
119 |
|
120 | if (this.connectionState !== ConnectionState.Disconnected) {
|
121 | return Promise.reject(new Error("Cannot start an HttpConnection that is not in the 'Disconnected' state."));
|
122 | }
|
123 |
|
124 | this.connectionState = ConnectionState.Connecting;
|
125 |
|
126 | this.startInternalPromise = this.startInternal(transferFormat);
|
127 | await this.startInternalPromise;
|
128 |
|
129 | // The TypeScript compiler thinks that connectionState must be Connecting here. The TypeScript compiler is wrong.
|
130 | if (this.connectionState as any === ConnectionState.Disconnecting) {
|
131 | // stop() was called and transitioned the client into the Disconnecting state.
|
132 | const message = "Failed to start the HttpConnection before stop() was called.";
|
133 | this.logger.log(LogLevel.Error, message);
|
134 |
|
135 | // We cannot await stopPromise inside startInternal since stopInternal awaits the startInternalPromise.
|
136 | await this.stopPromise;
|
137 |
|
138 | return Promise.reject(new Error(message));
|
139 | } else if (this.connectionState as any !== ConnectionState.Connected) {
|
140 | // stop() was called and transitioned the client into the Disconnecting state.
|
141 | const message = "HttpConnection.startInternal completed gracefully but didn't enter the connection into the connected state!";
|
142 | this.logger.log(LogLevel.Error, message);
|
143 | return Promise.reject(new Error(message));
|
144 | }
|
145 |
|
146 | this.connectionStarted = true;
|
147 | }
|
148 |
|
149 | public send(data: string | ArrayBuffer): Promise<void> {
|
150 | if (this.connectionState !== ConnectionState.Connected) {
|
151 | return Promise.reject(new Error("Cannot send data if the connection is not in the 'Connected' State."));
|
152 | }
|
153 |
|
154 | if (!this.sendQueue) {
|
155 | this.sendQueue = new TransportSendQueue(this.transport!);
|
156 | }
|
157 |
|
158 | // Transport will not be null if state is connected
|
159 | return this.sendQueue.send(data);
|
160 | }
|
161 |
|
162 | public async stop(error?: Error): Promise<void> {
|
163 | if (this.connectionState === ConnectionState.Disconnected) {
|
164 | this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnected state.`);
|
165 | return Promise.resolve();
|
166 | }
|
167 |
|
168 | if (this.connectionState === ConnectionState.Disconnecting) {
|
169 | this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`);
|
170 | return this.stopPromise;
|
171 | }
|
172 |
|
173 | this.connectionState = ConnectionState.Disconnecting;
|
174 |
|
175 | this.stopPromise = new Promise((resolve) => {
|
176 |
|
177 | this.stopPromiseResolver = resolve;
|
178 | });
|
179 |
|
180 | // stopInternal should never throw so just observe it.
|
181 | await this.stopInternal(error);
|
182 | await this.stopPromise;
|
183 | }
|
184 |
|
185 | private async stopInternal(error?: Error): Promise<void> {
|
186 | // Set error as soon as possible otherwise there is a race between
|
187 | // the transport closing and providing an error and the error from a close message
|
188 | // We would prefer the close message error.
|
189 | this.stopError = error;
|
190 |
|
191 | try {
|
192 | await this.startInternalPromise;
|
193 | } catch (e) {
|
194 | // This exception is returned to the user as a rejected Promise from the start method.
|
195 | }
|
196 |
|
197 | // The transport's onclose will trigger stopConnection which will run our onclose event.
|
198 | // The transport should always be set if currently connected. If it wasn't set, it's likely because
|
199 | // stop was called during start() and start() failed.
|
200 | if (this.transport) {
|
201 | try {
|
202 | await this.transport.stop();
|
203 | } catch (e) {
|
204 | this.logger.log(LogLevel.Error, `HttpConnection.transport.stop() threw error '${e}'.`);
|
205 | this.stopConnection();
|
206 | }
|
207 |
|
208 | this.transport = undefined;
|
209 | } else {
|
210 | this.logger.log(LogLevel.Debug, "HttpConnection.transport is undefined in HttpConnection.stop() because start() failed.");
|
211 | this.stopConnection();
|
212 | }
|
213 | }
|
214 |
|
215 | private async startInternal(transferFormat: TransferFormat): Promise<void> {
|
216 | // Store the original base url and the access token factory since they may change
|
217 | // as part of negotiating
|
218 | let url = this.baseUrl;
|
219 | this.accessTokenFactory = this.options.accessTokenFactory;
|
220 |
|
221 | try {
|
222 | if (this.options.skipNegotiation) {
|
223 | if (this.options.transport === HttpTransportType.WebSockets) {
|
224 | // No need to add a connection ID in this case
|
225 | this.transport = this.constructTransport(HttpTransportType.WebSockets);
|
226 | // We should just call connect directly in this case.
|
227 | // No fallback or negotiate in this case.
|
228 | await this.startTransport(url, transferFormat);
|
229 | } else {
|
230 | throw new Error("Negotiation can only be skipped when using the WebSocket transport directly.");
|
231 | }
|
232 | } else {
|
233 | let negotiateResponse: INegotiateResponse | null = null;
|
234 | let redirects = 0;
|
235 |
|
236 | do {
|
237 | negotiateResponse = await this.getNegotiationResponse(url);
|
238 | // the user tries to stop the connection when it is being started
|
239 | if (this.connectionState === ConnectionState.Disconnecting || this.connectionState === ConnectionState.Disconnected) {
|
240 | throw new Error("The connection was stopped during negotiation.");
|
241 | }
|
242 |
|
243 | if (negotiateResponse.error) {
|
244 | throw new Error(negotiateResponse.error);
|
245 | }
|
246 |
|
247 | if ((negotiateResponse as any).ProtocolVersion) {
|
248 | throw new Error("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https:
|
249 | }
|
250 |
|
251 | if (negotiateResponse.url) {
|
252 | url = negotiateResponse.url;
|
253 | }
|
254 |
|
255 | if (negotiateResponse.accessToken) {
|
256 |
|
257 |
|
258 | const accessToken = negotiateResponse.accessToken;
|
259 | this.accessTokenFactory = () => accessToken;
|
260 | }
|
261 |
|
262 | redirects++;
|
263 | }
|
264 | while (negotiateResponse.url && redirects < MAX_REDIRECTS);
|
265 |
|
266 | if (redirects === MAX_REDIRECTS && negotiateResponse.url) {
|
267 | throw new Error("Negotiate redirection limit exceeded.");
|
268 | }
|
269 |
|
270 | await this.createTransport(url, this.options.transport, negotiateResponse, transferFormat);
|
271 | }
|
272 |
|
273 | if (this.transport instanceof LongPollingTransport) {
|
274 | this.features.inherentKeepAlive = true;
|
275 | }
|
276 |
|
277 | if (this.connectionState === ConnectionState.Connecting) {
|
278 |
|
279 |
|
280 | this.logger.log(LogLevel.Debug, "The HttpConnection connected successfully.");
|
281 | this.connectionState = ConnectionState.Connected;
|
282 | }
|
283 |
|
284 |
|
285 |
|
286 |
|
287 | } catch (e) {
|
288 | this.logger.log(LogLevel.Error, "Failed to start the connection: " + e);
|
289 | this.connectionState = ConnectionState.Disconnected;
|
290 | this.transport = undefined;
|
291 | return Promise.reject(e);
|
292 | }
|
293 | }
|
294 |
|
295 | private async getNegotiationResponse(url: string): Promise<INegotiateResponse> {
|
296 | let headers;
|
297 | if (this.accessTokenFactory) {
|
298 | const token = await this.accessTokenFactory();
|
299 | if (token) {
|
300 | headers = {
|
301 | ["Authorization"]: `Bearer ${token}`,
|
302 | };
|
303 | }
|
304 | }
|
305 |
|
306 | const negotiateUrl = this.resolveNegotiateUrl(url);
|
307 | this.logger.log(LogLevel.Debug, `Sending negotiation request: ${negotiateUrl}.`);
|
308 | try {
|
309 | const response = await this.httpClient.post(negotiateUrl, {
|
310 | content: "",
|
311 | headers,
|
312 | });
|
313 |
|
314 | if (response.statusCode !== 200) {
|
315 | return Promise.reject(new Error(`Unexpected status code returned from negotiate ${response.statusCode}`));
|
316 | }
|
317 |
|
318 | const negotiateResponse = JSON.parse(response.content as string) as INegotiateResponse;
|
319 | if (!negotiateResponse.negotiateVersion || negotiateResponse.negotiateVersion < 1) {
|
320 |
|
321 |
|
322 | negotiateResponse.connectionToken = negotiateResponse.connectionId;
|
323 | }
|
324 | return negotiateResponse;
|
325 | } catch (e) {
|
326 | this.logger.log(LogLevel.Error, "Failed to complete negotiation with the server: " + e);
|
327 | return Promise.reject(e);
|
328 | }
|
329 | }
|
330 |
|
331 | private createConnectUrl(url: string, connectionToken: string | null | undefined) {
|
332 | if (!connectionToken) {
|
333 | return url;
|
334 | }
|
335 |
|
336 | return url + (url.indexOf("?") === -1 ? "?" : "&") + `id=${connectionToken}`;
|
337 | }
|
338 |
|
339 | private async createTransport(url: string, requestedTransport: HttpTransportType | ITransport | undefined, negotiateResponse: INegotiateResponse, requestedTransferFormat: TransferFormat): Promise<void> {
|
340 | let connectUrl = this.createConnectUrl(url, negotiateResponse.connectionToken);
|
341 | if (this.isITransport(requestedTransport)) {
|
342 | this.logger.log(LogLevel.Debug, "Connection was provided an instance of ITransport, using that directly.");
|
343 | this.transport = requestedTransport;
|
344 | await this.startTransport(connectUrl, requestedTransferFormat);
|
345 |
|
346 | this.connectionId = negotiateResponse.connectionId;
|
347 | return;
|
348 | }
|
349 |
|
350 | const transportExceptions: any[] = [];
|
351 | const transports = negotiateResponse.availableTransports || [];
|
352 | let negotiate: INegotiateResponse | undefined = negotiateResponse;
|
353 | for (const endpoint of transports) {
|
354 | const transportOrError = this.resolveTransportOrError(endpoint, requestedTransport, requestedTransferFormat);
|
355 | if (transportOrError instanceof Error) {
|
356 |
|
357 | transportExceptions.push(`${endpoint.transport} failed: ${transportOrError}`);
|
358 | } else if (this.isITransport(transportOrError)) {
|
359 | this.transport = transportOrError;
|
360 | if (!negotiate) {
|
361 | try {
|
362 | negotiate = await this.getNegotiationResponse(url);
|
363 | } catch (ex) {
|
364 | return Promise.reject(ex);
|
365 | }
|
366 | connectUrl = this.createConnectUrl(url, negotiate.connectionToken);
|
367 | }
|
368 | try {
|
369 | await this.startTransport(connectUrl, requestedTransferFormat);
|
370 | this.connectionId = negotiate.connectionId;
|
371 | return;
|
372 | } catch (ex) {
|
373 | this.logger.log(LogLevel.Error, `Failed to start the transport '${endpoint.transport}': ${ex}`);
|
374 | negotiate = undefined;
|
375 | transportExceptions.push(`${endpoint.transport} failed: ${ex}`);
|
376 |
|
377 | if (this.connectionState !== ConnectionState.Connecting) {
|
378 | const message = "Failed to select transport before stop() was called.";
|
379 | this.logger.log(LogLevel.Debug, message);
|
380 | return Promise.reject(new Error(message));
|
381 | }
|
382 | }
|
383 | }
|
384 | }
|
385 |
|
386 | if (transportExceptions.length > 0) {
|
387 | return Promise.reject(new Error(`Unable to connect to the server with any of the available transports. ${transportExceptions.join(" ")}`));
|
388 | }
|
389 | return Promise.reject(new Error("None of the transports supported by the client are supported by the server."));
|
390 | }
|
391 |
|
392 | private constructTransport(transport: HttpTransportType): ITransport {
|
393 | switch (transport) {
|
394 | case HttpTransportType.WebSockets:
|
395 | if (!this.options.WebSocket) {
|
396 | throw new Error("'WebSocket' is not supported in your environment.");
|
397 | }
|
398 | return new WebSocketTransport(this.httpClient, this.accessTokenFactory, this.logger, this.options.logMessageContent || false, this.options.WebSocket);
|
399 | case HttpTransportType.ServerSentEvents:
|
400 | if (!this.options.EventSource) {
|
401 | throw new Error("'EventSource' is not supported in your environment.");
|
402 | }
|
403 | return new ServerSentEventsTransport(this.httpClient, this.accessTokenFactory, this.logger, this.options.logMessageContent || false, this.options.EventSource);
|
404 | case HttpTransportType.LongPolling:
|
405 | return new LongPollingTransport(this.httpClient, this.accessTokenFactory, this.logger, this.options.logMessageContent || false);
|
406 | default:
|
407 | throw new Error(`Unknown transport: ${transport}.`);
|
408 | }
|
409 | }
|
410 |
|
411 | private startTransport(url: string, transferFormat: TransferFormat): Promise<void> {
|
412 | this.transport!.onreceive = this.onreceive;
|
413 | this.transport!.onclose = (e) => this.stopConnection(e);
|
414 | return this.transport!.connect(url, transferFormat);
|
415 | }
|
416 |
|
417 | private resolveTransportOrError(endpoint: IAvailableTransport, requestedTransport: HttpTransportType | undefined, requestedTransferFormat: TransferFormat): ITransport | Error {
|
418 | const transport = HttpTransportType[endpoint.transport];
|
419 | if (transport === null || transport === undefined) {
|
420 | this.logger.log(LogLevel.Debug, `Skipping transport '${endpoint.transport}' because it is not supported by this client.`);
|
421 | return new Error(`Skipping transport '${endpoint.transport}' because it is not supported by this client.`);
|
422 | } else {
|
423 | if (transportMatches(requestedTransport, transport)) {
|
424 | const transferFormats = endpoint.transferFormats.map((s) => TransferFormat[s]);
|
425 | if (transferFormats.indexOf(requestedTransferFormat) >= 0) {
|
426 | if ((transport === HttpTransportType.WebSockets && !this.options.WebSocket) ||
|
427 | (transport === HttpTransportType.ServerSentEvents && !this.options.EventSource)) {
|
428 | this.logger.log(LogLevel.Debug, `Skipping transport '${HttpTransportType[transport]}' because it is not supported in your environment.'`);
|
429 | return new Error(`'${HttpTransportType[transport]}' is not supported in your environment.`);
|
430 | } else {
|
431 | this.logger.log(LogLevel.Debug, `Selecting transport '${HttpTransportType[transport]}'.`);
|
432 | try {
|
433 | return this.constructTransport(transport);
|
434 | } catch (ex) {
|
435 | return ex;
|
436 | }
|
437 | }
|
438 | } else {
|
439 | this.logger.log(LogLevel.Debug, `Skipping transport '${HttpTransportType[transport]}' because it does not support the requested transfer format '${TransferFormat[requestedTransferFormat]}'.`);
|
440 | return new Error(`'${HttpTransportType[transport]}' does not support ${TransferFormat[requestedTransferFormat]}.`);
|
441 | }
|
442 | } else {
|
443 | this.logger.log(LogLevel.Debug, `Skipping transport '${HttpTransportType[transport]}' because it was disabled by the client.`);
|
444 | return new Error(`'${HttpTransportType[transport]}' is disabled by the client.`);
|
445 | }
|
446 | }
|
447 | }
|
448 |
|
449 | private isITransport(transport: any): transport is ITransport {
|
450 | return transport && typeof (transport) === "object" && "connect" in transport;
|
451 | }
|
452 |
|
453 | private stopConnection(error?: Error): void {
|
454 | this.logger.log(LogLevel.Debug, `HttpConnection.stopConnection(${error}) called while in state ${this.connectionState}.`);
|
455 |
|
456 | this.transport = undefined;
|
457 |
|
458 |
|
459 | error = this.stopError || error;
|
460 | this.stopError = undefined;
|
461 |
|
462 | if (this.connectionState === ConnectionState.Disconnected) {
|
463 | this.logger.log(LogLevel.Debug, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection is already in the disconnected state.`);
|
464 | return;
|
465 | }
|
466 |
|
467 | if (this.connectionState === ConnectionState.Connecting) {
|
468 | this.logger.log(LogLevel.Warning, `Call to HttpConnection.stopConnection(${error}) was ignored because the connection hasn't yet left the in the connecting state.`);
|
469 | return;
|
470 | }
|
471 |
|
472 | if (this.connectionState === ConnectionState.Disconnecting) {
|
473 |
|
474 |
|
475 | this.stopPromiseResolver();
|
476 | }
|
477 |
|
478 | if (error) {
|
479 | this.logger.log(LogLevel.Error, `Connection disconnected with error '${error}'.`);
|
480 | } else {
|
481 | this.logger.log(LogLevel.Information, "Connection disconnected.");
|
482 | }
|
483 |
|
484 | if (this.sendQueue) {
|
485 | this.sendQueue.stop().catch((e) => {
|
486 | this.logger.log(LogLevel.Error, `TransportSendQueue.stop() threw error '${e}'.`);
|
487 | });
|
488 | this.sendQueue = undefined;
|
489 | }
|
490 |
|
491 | this.connectionId = undefined;
|
492 | this.connectionState = ConnectionState.Disconnected;
|
493 |
|
494 | if (this.connectionStarted) {
|
495 | this.connectionStarted = false;
|
496 | try {
|
497 | if (this.onclose) {
|
498 | this.onclose(error);
|
499 | }
|
500 | } catch (e) {
|
501 | this.logger.log(LogLevel.Error, `HttpConnection.onclose(${error}) threw error '${e}'.`);
|
502 | }
|
503 | }
|
504 | }
|
505 |
|
506 | private resolveUrl(url: string): string {
|
507 |
|
508 | if (url.lastIndexOf("https:
|
509 | return url;
|
510 | }
|
511 |
|
512 | if (!Platform.isBrowser || !window.document) {
|
513 | throw new Error(`Cannot resolve '${url}'.`);
|
514 | }
|
515 |
|
516 |
|
517 |
|
518 |
|
519 |
|
520 |
|
521 | const aTag = window.document.createElement("a");
|
522 | aTag.href = url;
|
523 |
|
524 | this.logger.log(LogLevel.Information, `Normalizing '${url}' to '${aTag.href}'.`);
|
525 | return aTag.href;
|
526 | }
|
527 |
|
528 | private resolveNegotiateUrl(url: string): string {
|
529 | const index = url.indexOf("?");
|
530 | let negotiateUrl = url.substring(0, index === -1 ? url.length : index);
|
531 | if (negotiateUrl[negotiateUrl.length - 1] !== "/") {
|
532 | negotiateUrl += "/";
|
533 | }
|
534 | negotiateUrl += "negotiate";
|
535 | negotiateUrl += index === -1 ? "" : url.substring(index);
|
536 |
|
537 | if (negotiateUrl.indexOf("negotiateVersion") === -1) {
|
538 | negotiateUrl += index === -1 ? "?" : "&";
|
539 | negotiateUrl += "negotiateVersion=" + this.negotiateVersion;
|
540 | }
|
541 | return negotiateUrl;
|
542 | }
|
543 | }
|
544 |
|
545 | function transportMatches(requestedTransport: HttpTransportType | undefined, actualTransport: HttpTransportType) {
|
546 | return !requestedTransport || ((actualTransport & requestedTransport) !== 0);
|
547 | }
|
548 |
|
549 |
|
550 | export class TransportSendQueue {
|
551 | private buffer: any[] = [];
|
552 | private sendBufferedData: PromiseSource;
|
553 | private executing: boolean = true;
|
554 | private transportResult?: PromiseSource;
|
555 | private sendLoopPromise: Promise<void>;
|
556 |
|
557 | constructor(private readonly transport: ITransport) {
|
558 | this.sendBufferedData = new PromiseSource();
|
559 | this.transportResult = new PromiseSource();
|
560 |
|
561 | this.sendLoopPromise = this.sendLoop();
|
562 | }
|
563 |
|
564 | public send(data: string | ArrayBuffer): Promise<void> {
|
565 | this.bufferData(data);
|
566 | if (!this.transportResult) {
|
567 | this.transportResult = new PromiseSource();
|
568 | }
|
569 | return this.transportResult.promise;
|
570 | }
|
571 |
|
572 | public stop(): Promise<void> {
|
573 | this.executing = false;
|
574 | this.sendBufferedData.resolve();
|
575 | return this.sendLoopPromise;
|
576 | }
|
577 |
|
578 | private bufferData(data: string | ArrayBuffer): void {
|
579 | if (this.buffer.length && typeof(this.buffer[0]) !== typeof(data)) {
|
580 | throw new Error(`Expected data to be of type ${typeof(this.buffer)} but was of type ${typeof(data)}`);
|
581 | }
|
582 |
|
583 | this.buffer.push(data);
|
584 | this.sendBufferedData.resolve();
|
585 | }
|
586 |
|
587 | private async sendLoop(): Promise<void> {
|
588 | while (true) {
|
589 | await this.sendBufferedData.promise;
|
590 |
|
591 | if (!this.executing) {
|
592 | if (this.transportResult) {
|
593 | this.transportResult.reject("Connection stopped.");
|
594 | }
|
595 |
|
596 | break;
|
597 | }
|
598 |
|
599 | this.sendBufferedData = new PromiseSource();
|
600 |
|
601 | const transportResult = this.transportResult!;
|
602 | this.transportResult = undefined;
|
603 |
|
604 | const data = typeof(this.buffer[0]) === "string" ?
|
605 | this.buffer.join("") :
|
606 | TransportSendQueue.concatBuffers(this.buffer);
|
607 |
|
608 | this.buffer.length = 0;
|
609 |
|
610 | try {
|
611 | await this.transport.send(data);
|
612 | transportResult.resolve();
|
613 | } catch (error) {
|
614 | transportResult.reject(error);
|
615 | }
|
616 | }
|
617 | }
|
618 |
|
619 | private static concatBuffers(arrayBuffers: ArrayBuffer[]): ArrayBuffer {
|
620 | const totalLength = arrayBuffers.map((b) => b.byteLength).reduce((a, b) => a + b);
|
621 | const result = new Uint8Array(totalLength);
|
622 | let offset = 0;
|
623 | for (const item of arrayBuffers) {
|
624 | result.set(new Uint8Array(item), offset);
|
625 | offset += item.byteLength;
|
626 | }
|
627 |
|
628 | return result;
|
629 | }
|
630 | }
|
631 |
|
632 | class PromiseSource {
|
633 | private resolver?: () => void;
|
634 | private rejecter!: (reason?: any) => void;
|
635 | public promise: Promise<void>;
|
636 |
|
637 | constructor() {
|
638 | this.promise = new Promise((resolve, reject) => [this.resolver, this.rejecter] = [resolve, reject]);
|
639 | }
|
640 |
|
641 | public resolve(): void {
|
642 | this.resolver!();
|
643 | }
|
644 |
|
645 | public reject(reason?: any): void {
|
646 | this.rejecter!(reason);
|
647 | }
|
648 | }
|
649 |
|
\ | No newline at end of file |