UNPKG

9.91 kBPlain TextView Raw
1// Copyright (c) .NET Foundation. All rights reserved.
2// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3
4import { AbortController } from "./AbortController";
5import { HttpError, TimeoutError } from "./Errors";
6import { HttpClient, HttpRequest } from "./HttpClient";
7import { ILogger, LogLevel } from "./ILogger";
8import { ITransport, TransferFormat } from "./ITransport";
9import { Arg, getDataDetail, sendMessage } from "./Utils";
10
11// Not exported from 'index', this type is internal.
12/** @private */
13export class LongPollingTransport implements ITransport {
14 private readonly httpClient: HttpClient;
15 private readonly accessTokenFactory: (() => string | Promise<string>) | undefined;
16 private readonly logger: ILogger;
17 private readonly logMessageContent: boolean;
18 private readonly pollAbort: AbortController;
19
20 private url?: string;
21 private running: boolean;
22 private receiving?: Promise<void>;
23 private closeError?: Error;
24
25 public onreceive: ((data: string | ArrayBuffer) => void) | null;
26 public onclose: ((error?: Error) => void) | null;
27
28 // This is an internal type, not exported from 'index' so this is really just internal.
29 public get pollAborted() {
30 return this.pollAbort.aborted;
31 }
32
33 constructor(httpClient: HttpClient, accessTokenFactory: (() => string | Promise<string>) | undefined, logger: ILogger, logMessageContent: boolean) {
34 this.httpClient = httpClient;
35 this.accessTokenFactory = accessTokenFactory;
36 this.logger = logger;
37 this.pollAbort = new AbortController();
38 this.logMessageContent = logMessageContent;
39
40 this.running = false;
41
42 this.onreceive = null;
43 this.onclose = null;
44 }
45
46 public async connect(url: string, transferFormat: TransferFormat): Promise<void> {
47 Arg.isRequired(url, "url");
48 Arg.isRequired(transferFormat, "transferFormat");
49 Arg.isIn(transferFormat, TransferFormat, "transferFormat");
50
51 this.url = url;
52
53 this.logger.log(LogLevel.Trace, "(LongPolling transport) Connecting.");
54
55 // Allow binary format on Node and Browsers that support binary content (indicated by the presence of responseType property)
56 if (transferFormat === TransferFormat.Binary &&
57 (typeof XMLHttpRequest !== "undefined" && typeof new XMLHttpRequest().responseType !== "string")) {
58 throw new Error("Binary protocols over XmlHttpRequest not implementing advanced features are not supported.");
59 }
60
61 const pollOptions: HttpRequest = {
62 abortSignal: this.pollAbort.signal,
63 headers: {},
64 timeout: 100000,
65 };
66
67 if (transferFormat === TransferFormat.Binary) {
68 pollOptions.responseType = "arraybuffer";
69 }
70
71 const token = await this.getAccessToken();
72 this.updateHeaderToken(pollOptions, token);
73
74 // Make initial long polling request
75 // Server uses first long polling request to finish initializing connection and it returns without data
76 const pollUrl = `${url}&_=${Date.now()}`;
77 this.logger.log(LogLevel.Trace, `(LongPolling transport) polling: ${pollUrl}.`);
78 const response = await this.httpClient.get(pollUrl, pollOptions);
79 if (response.statusCode !== 200) {
80 this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}.`);
81
82 // Mark running as false so that the poll immediately ends and runs the close logic
83 this.closeError = new HttpError(response.statusText || "", response.statusCode);
84 this.running = false;
85 } else {
86 this.running = true;
87 }
88
89 this.receiving = this.poll(this.url, pollOptions);
90 }
91
92 private async getAccessToken(): Promise<string | null> {
93 if (this.accessTokenFactory) {
94 return await this.accessTokenFactory();
95 }
96
97 return null;
98 }
99
100 private updateHeaderToken(request: HttpRequest, token: string | null) {
101 if (!request.headers) {
102 request.headers = {};
103 }
104 if (token) {
105 // tslint:disable-next-line:no-string-literal
106 request.headers["Authorization"] = `Bearer ${token}`;
107 return;
108 }
109 // tslint:disable-next-line:no-string-literal
110 if (request.headers["Authorization"]) {
111 // tslint:disable-next-line:no-string-literal
112 delete request.headers["Authorization"];
113 }
114 }
115
116 private async poll(url: string, pollOptions: HttpRequest): Promise<void> {
117 try {
118 while (this.running) {
119 // We have to get the access token on each poll, in case it changes
120 const token = await this.getAccessToken();
121 this.updateHeaderToken(pollOptions, token);
122
123 try {
124 const pollUrl = `${url}&_=${Date.now()}`;
125 this.logger.log(LogLevel.Trace, `(LongPolling transport) polling: ${pollUrl}.`);
126 const response = await this.httpClient.get(pollUrl, pollOptions);
127
128 if (response.statusCode === 204) {
129 this.logger.log(LogLevel.Information, "(LongPolling transport) Poll terminated by server.");
130
131 this.running = false;
132 } else if (response.statusCode !== 200) {
133 this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}.`);
134
135 // Unexpected status code
136 this.closeError = new HttpError(response.statusText || "", response.statusCode);
137 this.running = false;
138 } else {
139 // Process the response
140 if (response.content) {
141 this.logger.log(LogLevel.Trace, `(LongPolling transport) data received. ${getDataDetail(response.content, this.logMessageContent)}.`);
142 if (this.onreceive) {
143 this.onreceive(response.content);
144 }
145 } else {
146 // This is another way timeout manifest.
147 this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing.");
148 }
149 }
150 } catch (e) {
151 if (!this.running) {
152 // Log but disregard errors that occur after stopping
153 this.logger.log(LogLevel.Trace, `(LongPolling transport) Poll errored after shutdown: ${e.message}`);
154 } else {
155 if (e instanceof TimeoutError) {
156 // Ignore timeouts and reissue the poll.
157 this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing.");
158 } else {
159 // Close the connection with the error as the result.
160 this.closeError = e;
161 this.running = false;
162 }
163 }
164 }
165 }
166 } finally {
167 this.logger.log(LogLevel.Trace, "(LongPolling transport) Polling complete.");
168
169 // We will reach here with pollAborted==false when the server returned a response causing the transport to stop.
170 // If pollAborted==true then client initiated the stop and the stop method will raise the close event after DELETE is sent.
171 if (!this.pollAborted) {
172 this.raiseOnClose();
173 }
174 }
175 }
176
177 public async send(data: any): Promise<void> {
178 if (!this.running) {
179 return Promise.reject(new Error("Cannot send until the transport is connected"));
180 }
181 return sendMessage(this.logger, "LongPolling", this.httpClient, this.url!, this.accessTokenFactory, data, this.logMessageContent);
182 }
183
184 public async stop(): Promise<void> {
185 this.logger.log(LogLevel.Trace, "(LongPolling transport) Stopping polling.");
186
187 // Tell receiving loop to stop, abort any current request, and then wait for it to finish
188 this.running = false;
189 this.pollAbort.abort();
190
191 try {
192 await this.receiving;
193
194 // Send DELETE to clean up long polling on the server
195 this.logger.log(LogLevel.Trace, `(LongPolling transport) sending DELETE request to ${this.url}.`);
196
197 const deleteOptions: HttpRequest = {
198 headers: {},
199 };
200 const token = await this.getAccessToken();
201 this.updateHeaderToken(deleteOptions, token);
202 await this.httpClient.delete(this.url!, deleteOptions);
203
204 this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request sent.");
205 } finally {
206 this.logger.log(LogLevel.Trace, "(LongPolling transport) Stop finished.");
207
208 // Raise close event here instead of in polling
209 // It needs to happen after the DELETE request is sent
210 this.raiseOnClose();
211 }
212 }
213
214 private raiseOnClose() {
215 if (this.onclose) {
216 let logMessage = "(LongPolling transport) Firing onclose event.";
217 if (this.closeError) {
218 logMessage += " Error: " + this.closeError;
219 }
220 this.logger.log(LogLevel.Trace, logMessage);
221 this.onclose(this.closeError);
222 }
223 }
224}
225
\No newline at end of file