1 |
|
2 |
|
3 |
|
4 | import { AbortController } from "./AbortController";
|
5 | import { HttpError, TimeoutError } from "./Errors";
|
6 | import { HttpClient, HttpRequest } from "./HttpClient";
|
7 | import { ILogger, LogLevel } from "./ILogger";
|
8 | import { ITransport, TransferFormat } from "./ITransport";
|
9 | import { Arg, getDataDetail, sendMessage } from "./Utils";
|
10 |
|
11 |
|
12 |
|
13 | export class LongPollingTransport implements ITransport {
|
14 | private readonly httpClient: HttpClient;
|
15 | private readonly accessTokenFactory: (() => string | Promise<string>) | undefined;
|
16 | private readonly logger: ILogger;
|
17 | private readonly logMessageContent: boolean;
|
18 | private readonly pollAbort: AbortController;
|
19 |
|
20 | private url?: string;
|
21 | private running: boolean;
|
22 | private receiving?: Promise<void>;
|
23 | private closeError?: Error;
|
24 |
|
25 | public onreceive: ((data: string | ArrayBuffer) => void) | null;
|
26 | public onclose: ((error?: Error) => void) | null;
|
27 |
|
28 | // This is an internal type, not exported from 'index' so this is really just internal.
|
29 | public get pollAborted() {
|
30 | return this.pollAbort.aborted;
|
31 | }
|
32 |
|
33 | constructor(httpClient: HttpClient, accessTokenFactory: (() => string | Promise<string>) | undefined, logger: ILogger, logMessageContent: boolean) {
|
34 | this.httpClient = httpClient;
|
35 | this.accessTokenFactory = accessTokenFactory;
|
36 | this.logger = logger;
|
37 | this.pollAbort = new AbortController();
|
38 | this.logMessageContent = logMessageContent;
|
39 |
|
40 | this.running = false;
|
41 |
|
42 | this.onreceive = null;
|
43 | this.onclose = null;
|
44 | }
|
45 |
|
46 | public async connect(url: string, transferFormat: TransferFormat): Promise<void> {
|
47 | Arg.isRequired(url, "url");
|
48 | Arg.isRequired(transferFormat, "transferFormat");
|
49 | Arg.isIn(transferFormat, TransferFormat, "transferFormat");
|
50 |
|
51 | this.url = url;
|
52 |
|
53 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Connecting.");
|
54 |
|
55 | // Allow binary format on Node and Browsers that support binary content (indicated by the presence of responseType property)
|
56 | if (transferFormat === TransferFormat.Binary &&
|
57 | (typeof XMLHttpRequest !== "undefined" && typeof new XMLHttpRequest().responseType !== "string")) {
|
58 | throw new Error("Binary protocols over XmlHttpRequest not implementing advanced features are not supported.");
|
59 | }
|
60 |
|
61 | const pollOptions: HttpRequest = {
|
62 | abortSignal: this.pollAbort.signal,
|
63 | headers: {},
|
64 | timeout: 100000,
|
65 | };
|
66 |
|
67 | if (transferFormat === TransferFormat.Binary) {
|
68 | pollOptions.responseType = "arraybuffer";
|
69 | }
|
70 |
|
71 | const token = await this.getAccessToken();
|
72 | this.updateHeaderToken(pollOptions, token);
|
73 |
|
74 | // Make initial long polling request
|
75 | // Server uses first long polling request to finish initializing connection and it returns without data
|
76 | const pollUrl = `${url}&_=${Date.now()}`;
|
77 | this.logger.log(LogLevel.Trace, `(LongPolling transport) polling: ${pollUrl}.`);
|
78 | const response = await this.httpClient.get(pollUrl, pollOptions);
|
79 | if (response.statusCode !== 200) {
|
80 | this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}.`);
|
81 |
|
82 | // Mark running as false so that the poll immediately ends and runs the close logic
|
83 | this.closeError = new HttpError(response.statusText || "", response.statusCode);
|
84 | this.running = false;
|
85 | } else {
|
86 | this.running = true;
|
87 | }
|
88 |
|
89 | this.receiving = this.poll(this.url, pollOptions);
|
90 | }
|
91 |
|
92 | private async getAccessToken(): Promise<string | null> {
|
93 | if (this.accessTokenFactory) {
|
94 | return await this.accessTokenFactory();
|
95 | }
|
96 |
|
97 | return null;
|
98 | }
|
99 |
|
100 | private updateHeaderToken(request: HttpRequest, token: string | null) {
|
101 | if (!request.headers) {
|
102 | request.headers = {};
|
103 | }
|
104 | if (token) {
|
105 | // tslint:disable-next-line:no-string-literal
|
106 | request.headers["Authorization"] = `Bearer ${token}`;
|
107 | return;
|
108 | }
|
109 | // tslint:disable-next-line:no-string-literal
|
110 | if (request.headers["Authorization"]) {
|
111 | // tslint:disable-next-line:no-string-literal
|
112 | delete request.headers["Authorization"];
|
113 | }
|
114 | }
|
115 |
|
116 | private async poll(url: string, pollOptions: HttpRequest): Promise<void> {
|
117 | try {
|
118 | while (this.running) {
|
119 | // We have to get the access token on each poll, in case it changes
|
120 | const token = await this.getAccessToken();
|
121 | this.updateHeaderToken(pollOptions, token);
|
122 |
|
123 | try {
|
124 | const pollUrl = `${url}&_=${Date.now()}`;
|
125 | this.logger.log(LogLevel.Trace, `(LongPolling transport) polling: ${pollUrl}.`);
|
126 | const response = await this.httpClient.get(pollUrl, pollOptions);
|
127 |
|
128 | if (response.statusCode === 204) {
|
129 | this.logger.log(LogLevel.Information, "(LongPolling transport) Poll terminated by server.");
|
130 |
|
131 | this.running = false;
|
132 | } else if (response.statusCode !== 200) {
|
133 | this.logger.log(LogLevel.Error, `(LongPolling transport) Unexpected response code: ${response.statusCode}.`);
|
134 |
|
135 | // Unexpected status code
|
136 | this.closeError = new HttpError(response.statusText || "", response.statusCode);
|
137 | this.running = false;
|
138 | } else {
|
139 | // Process the response
|
140 | if (response.content) {
|
141 | this.logger.log(LogLevel.Trace, `(LongPolling transport) data received. ${getDataDetail(response.content, this.logMessageContent)}.`);
|
142 | if (this.onreceive) {
|
143 | this.onreceive(response.content);
|
144 | }
|
145 | } else {
|
146 | // This is another way timeout manifest.
|
147 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing.");
|
148 | }
|
149 | }
|
150 | } catch (e) {
|
151 | if (!this.running) {
|
152 | // Log but disregard errors that occur after stopping
|
153 | this.logger.log(LogLevel.Trace, `(LongPolling transport) Poll errored after shutdown: ${e.message}`);
|
154 | } else {
|
155 | if (e instanceof TimeoutError) {
|
156 | // Ignore timeouts and reissue the poll.
|
157 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Poll timed out, reissuing.");
|
158 | } else {
|
159 | // Close the connection with the error as the result.
|
160 | this.closeError = e;
|
161 | this.running = false;
|
162 | }
|
163 | }
|
164 | }
|
165 | }
|
166 | } finally {
|
167 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Polling complete.");
|
168 |
|
169 | // We will reach here with pollAborted==false when the server returned a response causing the transport to stop.
|
170 | // If pollAborted==true then client initiated the stop and the stop method will raise the close event after DELETE is sent.
|
171 | if (!this.pollAborted) {
|
172 | this.raiseOnClose();
|
173 | }
|
174 | }
|
175 | }
|
176 |
|
177 | public async send(data: any): Promise<void> {
|
178 | if (!this.running) {
|
179 | return Promise.reject(new Error("Cannot send until the transport is connected"));
|
180 | }
|
181 | return sendMessage(this.logger, "LongPolling", this.httpClient, this.url!, this.accessTokenFactory, data, this.logMessageContent);
|
182 | }
|
183 |
|
184 | public async stop(): Promise<void> {
|
185 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Stopping polling.");
|
186 |
|
187 | // Tell receiving loop to stop, abort any current request, and then wait for it to finish
|
188 | this.running = false;
|
189 | this.pollAbort.abort();
|
190 |
|
191 | try {
|
192 | await this.receiving;
|
193 |
|
194 | // Send DELETE to clean up long polling on the server
|
195 | this.logger.log(LogLevel.Trace, `(LongPolling transport) sending DELETE request to ${this.url}.`);
|
196 |
|
197 | const deleteOptions: HttpRequest = {
|
198 | headers: {},
|
199 | };
|
200 | const token = await this.getAccessToken();
|
201 | this.updateHeaderToken(deleteOptions, token);
|
202 | await this.httpClient.delete(this.url!, deleteOptions);
|
203 |
|
204 | this.logger.log(LogLevel.Trace, "(LongPolling transport) DELETE request sent.");
|
205 | } finally {
|
206 | this.logger.log(LogLevel.Trace, "(LongPolling transport) Stop finished.");
|
207 |
|
208 | // Raise close event here instead of in polling
|
209 | // It needs to happen after the DELETE request is sent
|
210 | this.raiseOnClose();
|
211 | }
|
212 | }
|
213 |
|
214 | private raiseOnClose() {
|
215 | if (this.onclose) {
|
216 | let logMessage = "(LongPolling transport) Firing onclose event.";
|
217 | if (this.closeError) {
|
218 | logMessage += " Error: " + this.closeError;
|
219 | }
|
220 | this.logger.log(LogLevel.Trace, logMessage);
|
221 | this.onclose(this.closeError);
|
222 | }
|
223 | }
|
224 | }
|
225 |
|
\ | No newline at end of file |