1 |
|
2 |
|
3 |
|
4 | import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } from "./HandshakeProtocol";
|
5 | import { IConnection } from "./IConnection";
|
6 | import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
|
7 | import { ILogger, LogLevel } from "./ILogger";
|
8 | import { IRetryPolicy } from "./IRetryPolicy";
|
9 | import { IStreamResult } from "./Stream";
|
10 | import { Subject } from "./Subject";
|
11 | import { Arg } from "./Utils";
|
12 |
|
13 | const DEFAULT_TIMEOUT_IN_MS: number = 30 * 1000;
|
14 | const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000;
|
15 |
|
16 |
|
17 | export enum HubConnectionState {
|
18 |
|
19 | Disconnected = "Disconnected",
|
20 |
|
21 | Connecting = "Connecting",
|
22 |
|
23 | Connected = "Connected",
|
24 |
|
25 | Disconnecting = "Disconnecting",
|
26 |
|
27 | Reconnecting = "Reconnecting",
|
28 | }
|
29 |
|
30 |
|
31 | export class HubConnection {
|
32 | private readonly cachedPingMessage: string | ArrayBuffer;
|
33 | private readonly connection: IConnection;
|
34 | private readonly logger: ILogger;
|
35 | private readonly reconnectPolicy?: IRetryPolicy;
|
36 | private protocol: IHubProtocol;
|
37 | private handshakeProtocol: HandshakeProtocol;
|
38 | private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void };
|
39 | private methods: { [name: string]: Array<(...args: any[]) => void> };
|
40 | private invocationId: number;
|
41 |
|
42 | private closedCallbacks: Array<(error?: Error) => void>;
|
43 | private reconnectingCallbacks: Array<(error?: Error) => void>;
|
44 | private reconnectedCallbacks: Array<(connectionId?: string) => void>;
|
45 |
|
46 | private receivedHandshakeResponse: boolean;
|
47 | private handshakeResolver!: (value?: PromiseLike<{}>) => void;
|
48 | private handshakeRejecter!: (reason?: any) => void;
|
49 | private stopDuringStartError?: Error;
|
50 |
|
51 | private connectionState: HubConnectionState;
|
52 |
|
53 |
|
54 | private connectionStarted: boolean;
|
55 | private startPromise?: Promise<void>;
|
56 | private stopPromise?: Promise<void>;
|
57 |
|
58 |
|
59 |
|
60 |
|
61 | private reconnectDelayHandle?: any;
|
62 | private timeoutHandle?: any;
|
63 | private pingServerHandle?: any;
|
64 |
|
65 | |
66 |
|
67 |
|
68 |
|
69 |
|
70 | public serverTimeoutInMilliseconds: number;
|
71 |
|
72 | |
73 |
|
74 |
|
75 |
|
76 |
|
77 | public keepAliveIntervalInMilliseconds: number;
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 | public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IRetryPolicy): HubConnection {
|
85 | return new HubConnection(connection, logger, protocol, reconnectPolicy);
|
86 | }
|
87 |
|
88 | private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IRetryPolicy) {
|
89 | Arg.isRequired(connection, "connection");
|
90 | Arg.isRequired(logger, "logger");
|
91 | Arg.isRequired(protocol, "protocol");
|
92 |
|
93 | this.serverTimeoutInMilliseconds = DEFAULT_TIMEOUT_IN_MS;
|
94 | this.keepAliveIntervalInMilliseconds = DEFAULT_PING_INTERVAL_IN_MS;
|
95 |
|
96 | this.logger = logger;
|
97 | this.protocol = protocol;
|
98 | this.connection = connection;
|
99 | this.reconnectPolicy = reconnectPolicy;
|
100 | this.handshakeProtocol = new HandshakeProtocol();
|
101 |
|
102 | this.connection.onreceive = (data: any) => this.processIncomingData(data);
|
103 | this.connection.onclose = (error?: Error) => this.connectionClosed(error);
|
104 |
|
105 | this.callbacks = {};
|
106 | this.methods = {};
|
107 | this.closedCallbacks = [];
|
108 | this.reconnectingCallbacks = [];
|
109 | this.reconnectedCallbacks = [];
|
110 | this.invocationId = 0;
|
111 | this.receivedHandshakeResponse = false;
|
112 | this.connectionState = HubConnectionState.Disconnected;
|
113 | this.connectionStarted = false;
|
114 |
|
115 | this.cachedPingMessage = this.protocol.writeMessage({ type: MessageType.Ping });
|
116 | }
|
117 |
|
118 |
|
119 | get state(): HubConnectionState {
|
120 | return this.connectionState;
|
121 | }
|
122 |
|
123 | |
124 |
|
125 |
|
126 | get connectionId(): string | null {
|
127 | return this.connection ? (this.connection.connectionId || null) : null;
|
128 | }
|
129 |
|
130 |
|
131 | get baseUrl(): string {
|
132 | return this.connection.baseUrl || "";
|
133 | }
|
134 |
|
135 | |
136 |
|
137 |
|
138 |
|
139 |
|
140 | set baseUrl(url: string) {
|
141 | if (this.connectionState !== HubConnectionState.Disconnected && this.connectionState !== HubConnectionState.Reconnecting) {
|
142 | throw new Error("The HubConnection must be in the Disconnected or Reconnecting state to change the url.");
|
143 | }
|
144 |
|
145 | if (!url) {
|
146 | throw new Error("The HubConnection url must be a valid url.");
|
147 | }
|
148 |
|
149 | this.connection.baseUrl = url;
|
150 | }
|
151 |
|
152 | |
153 |
|
154 |
|
155 |
|
156 | public start(): Promise<void> {
|
157 | this.startPromise = this.startWithStateTransitions();
|
158 | return this.startPromise;
|
159 | }
|
160 |
|
161 | private async startWithStateTransitions(): Promise<void> {
|
162 | if (this.connectionState !== HubConnectionState.Disconnected) {
|
163 | return Promise.reject(new Error("Cannot start a HubConnection that is not in the 'Disconnected' state."));
|
164 | }
|
165 |
|
166 | this.connectionState = HubConnectionState.Connecting;
|
167 | this.logger.log(LogLevel.Debug, "Starting HubConnection.");
|
168 |
|
169 | try {
|
170 | await this.startInternal();
|
171 |
|
172 | this.connectionState = HubConnectionState.Connected;
|
173 | this.connectionStarted = true;
|
174 | this.logger.log(LogLevel.Debug, "HubConnection connected successfully.");
|
175 | } catch (e) {
|
176 | this.connectionState = HubConnectionState.Disconnected;
|
177 | this.logger.log(LogLevel.Debug, `HubConnection failed to start successfully because of error '${e}'.`);
|
178 | return Promise.reject(e);
|
179 | }
|
180 | }
|
181 |
|
182 | private async startInternal() {
|
183 | this.stopDuringStartError = undefined;
|
184 | this.receivedHandshakeResponse = false;
|
185 |
|
186 | const handshakePromise = new Promise((resolve, reject) => {
|
187 | this.handshakeResolver = resolve;
|
188 | this.handshakeRejecter = reject;
|
189 | });
|
190 |
|
191 | await this.connection.start(this.protocol.transferFormat);
|
192 |
|
193 | try {
|
194 | const handshakeRequest: HandshakeRequestMessage = {
|
195 | protocol: this.protocol.name,
|
196 | version: this.protocol.version,
|
197 | };
|
198 |
|
199 | this.logger.log(LogLevel.Debug, "Sending handshake request.");
|
200 |
|
201 | await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
|
202 |
|
203 | this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
|
204 |
|
205 |
|
206 | this.cleanupTimeout();
|
207 | this.resetTimeoutPeriod();
|
208 | this.resetKeepAliveInterval();
|
209 |
|
210 | await handshakePromise;
|
211 |
|
212 |
|
213 |
|
214 |
|
215 | if (this.stopDuringStartError) {
|
216 |
|
217 |
|
218 |
|
219 | throw this.stopDuringStartError;
|
220 | }
|
221 | } catch (e) {
|
222 | this.logger.log(LogLevel.Debug, `Hub handshake failed with error '${e}' during start(). Stopping HubConnection.`);
|
223 |
|
224 | this.cleanupTimeout();
|
225 | this.cleanupPingTimer();
|
226 |
|
227 |
|
228 |
|
229 | await this.connection.stop(e);
|
230 | throw e;
|
231 | }
|
232 | }
|
233 |
|
234 | |
235 |
|
236 |
|
237 |
|
238 | public async stop(): Promise<void> {
|
239 |
|
240 | const startPromise = this.startPromise;
|
241 |
|
242 | this.stopPromise = this.stopInternal();
|
243 | await this.stopPromise;
|
244 |
|
245 | try {
|
246 |
|
247 | await startPromise;
|
248 | } catch (e) {
|
249 |
|
250 | }
|
251 | }
|
252 |
|
253 | private stopInternal(error?: Error): Promise<void> {
|
254 | if (this.connectionState === HubConnectionState.Disconnected) {
|
255 | this.logger.log(LogLevel.Debug, `Call to HubConnection.stop(${error}) ignored because it is already in the disconnected state.`);
|
256 | return Promise.resolve();
|
257 | }
|
258 |
|
259 | if (this.connectionState === HubConnectionState.Disconnecting) {
|
260 | this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`);
|
261 | return this.stopPromise!;
|
262 | }
|
263 |
|
264 | this.connectionState = HubConnectionState.Disconnecting;
|
265 |
|
266 | this.logger.log(LogLevel.Debug, "Stopping HubConnection.");
|
267 |
|
268 | if (this.reconnectDelayHandle) {
|
269 |
|
270 |
|
271 |
|
272 | this.logger.log(LogLevel.Debug, "Connection stopped during reconnect delay. Done reconnecting.");
|
273 |
|
274 | clearTimeout(this.reconnectDelayHandle);
|
275 | this.reconnectDelayHandle = undefined;
|
276 |
|
277 | this.completeClose();
|
278 | return Promise.resolve();
|
279 | }
|
280 |
|
281 | this.cleanupTimeout();
|
282 | this.cleanupPingTimer();
|
283 | this.stopDuringStartError = error || new Error("The connection was stopped before the hub handshake could complete.");
|
284 |
|
285 |
|
286 |
|
287 |
|
288 | return this.connection.stop(error);
|
289 | }
|
290 |
|
291 | |
292 |
|
293 |
|
294 |
|
295 |
|
296 |
|
297 |
|
298 | public stream<T = any>(methodName: string, ...args: any[]): IStreamResult<T> {
|
299 | const [streams, streamIds] = this.replaceStreamingParams(args);
|
300 | const invocationDescriptor = this.createStreamInvocation(methodName, args, streamIds);
|
301 |
|
302 | let promiseQueue: Promise<void>;
|
303 | const subject = new Subject<T>();
|
304 | subject.cancelCallback = () => {
|
305 | const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId);
|
306 |
|
307 | delete this.callbacks[invocationDescriptor.invocationId];
|
308 |
|
309 | return promiseQueue.then(() => {
|
310 | return this.sendWithProtocol(cancelInvocation);
|
311 | });
|
312 | };
|
313 |
|
314 | this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => {
|
315 | if (error) {
|
316 | subject.error(error);
|
317 | return;
|
318 | } else if (invocationEvent) {
|
319 |
|
320 | if (invocationEvent.type === MessageType.Completion) {
|
321 | if (invocationEvent.error) {
|
322 | subject.error(new Error(invocationEvent.error));
|
323 | } else {
|
324 | subject.complete();
|
325 | }
|
326 | } else {
|
327 | subject.next((invocationEvent.item) as T);
|
328 | }
|
329 | }
|
330 | };
|
331 |
|
332 | promiseQueue = this.sendWithProtocol(invocationDescriptor)
|
333 | .catch((e) => {
|
334 | subject.error(e);
|
335 | delete this.callbacks[invocationDescriptor.invocationId];
|
336 | });
|
337 |
|
338 | this.launchStreams(streams, promiseQueue);
|
339 |
|
340 | return subject;
|
341 | }
|
342 |
|
343 | private sendMessage(message: any) {
|
344 | this.resetKeepAliveInterval();
|
345 | return this.connection.send(message);
|
346 | }
|
347 |
|
348 | |
349 |
|
350 |
|
351 |
|
352 | private sendWithProtocol(message: any) {
|
353 | return this.sendMessage(this.protocol.writeMessage(message));
|
354 | }
|
355 |
|
356 | |
357 |
|
358 |
|
359 |
|
360 |
|
361 |
|
362 |
|
363 |
|
364 |
|
365 | public send(methodName: string, ...args: any[]): Promise<void> {
|
366 | const [streams, streamIds] = this.replaceStreamingParams(args);
|
367 | const sendPromise = this.sendWithProtocol(this.createInvocation(methodName, args, true, streamIds));
|
368 |
|
369 | this.launchStreams(streams, sendPromise);
|
370 |
|
371 | return sendPromise;
|
372 | }
|
373 |
|
374 | |
375 |
|
376 |
|
377 |
|
378 |
|
379 |
|
380 |
|
381 |
|
382 |
|
383 |
|
384 |
|
385 | public invoke<T = any>(methodName: string, ...args: any[]): Promise<T> {
|
386 | const [streams, streamIds] = this.replaceStreamingParams(args);
|
387 | const invocationDescriptor = this.createInvocation(methodName, args, false, streamIds);
|
388 |
|
389 | const p = new Promise<any>((resolve, reject) => {
|
390 |
|
391 | this.callbacks[invocationDescriptor.invocationId!] = (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => {
|
392 | if (error) {
|
393 | reject(error);
|
394 | return;
|
395 | } else if (invocationEvent) {
|
396 |
|
397 | if (invocationEvent.type === MessageType.Completion) {
|
398 | if (invocationEvent.error) {
|
399 | reject(new Error(invocationEvent.error));
|
400 | } else {
|
401 | resolve(invocationEvent.result);
|
402 | }
|
403 | } else {
|
404 | reject(new Error(`Unexpected message type: ${invocationEvent.type}`));
|
405 | }
|
406 | }
|
407 | };
|
408 |
|
409 | const promiseQueue = this.sendWithProtocol(invocationDescriptor)
|
410 | .catch((e) => {
|
411 | reject(e);
|
412 |
|
413 | delete this.callbacks[invocationDescriptor.invocationId!];
|
414 | });
|
415 |
|
416 | this.launchStreams(streams, promiseQueue);
|
417 | });
|
418 |
|
419 | return p;
|
420 | }
|
421 |
|
422 | /** Registers a handler that will be invoked when the hub method with the specified method name is invoked.
|
423 | *
|
424 | * @param {string} methodName The name of the hub method to define.
|
425 | * @param {Function} newMethod The handler that will be raised when the hub method is invoked.
|
426 | */
|
427 | public on(methodName: string, newMethod: (...args: any[]) => void) {
|
428 | if (!methodName || !newMethod) {
|
429 | return;
|
430 | }
|
431 |
|
432 | methodName = methodName.toLowerCase();
|
433 | if (!this.methods[methodName]) {
|
434 | this.methods[methodName] = [];
|
435 | }
|
436 |
|
437 | // Preventing adding the same handler multiple times.
|
438 | if (this.methods[methodName].indexOf(newMethod) !== -1) {
|
439 | return;
|
440 | }
|
441 |
|
442 | this.methods[methodName].push(newMethod);
|
443 | }
|
444 |
|
445 | /** Removes all handlers for the specified hub method.
|
446 | *
|
447 | * @param {string} methodName The name of the method to remove handlers for.
|
448 | */
|
449 | public off(methodName: string): void;
|
450 |
|
451 | /** Removes the specified handler for the specified hub method.
|
452 | *
|
453 | * You must pass the exact same Function instance as was previously passed to {@link @microsoft/signalr.HubConnection.on}. Passing a different instance (even if the function
|
454 | * body is the same) will not remove the handler.
|
455 | *
|
456 | * @param {string} methodName The name of the method to remove handlers for.
|
457 | * @param {Function} method The handler to remove. This must be the same Function instance as the one passed to {@link @microsoft/signalr.HubConnection.on}.
|
458 | */
|
459 | public off(methodName: string, method: (...args: any[]) => void): void;
|
460 | public off(methodName: string, method?: (...args: any[]) => void): void {
|
461 | if (!methodName) {
|
462 | return;
|
463 | }
|
464 |
|
465 | methodName = methodName.toLowerCase();
|
466 | const handlers = this.methods[methodName];
|
467 | if (!handlers) {
|
468 | return;
|
469 | }
|
470 | if (method) {
|
471 | const removeIdx = handlers.indexOf(method);
|
472 | if (removeIdx !== -1) {
|
473 | handlers.splice(removeIdx, 1);
|
474 | if (handlers.length === 0) {
|
475 | delete this.methods[methodName];
|
476 | }
|
477 | }
|
478 | } else {
|
479 | delete this.methods[methodName];
|
480 | }
|
481 |
|
482 | }
|
483 |
|
484 | /** Registers a handler that will be invoked when the connection is closed.
|
485 | *
|
486 | * @param {Function} callback The handler that will be invoked when the connection is closed. Optionally receives a single argument containing the error that caused the connection to close (if any).
|
487 | */
|
488 | public onclose(callback: (error?: Error) => void) {
|
489 | if (callback) {
|
490 | this.closedCallbacks.push(callback);
|
491 | }
|
492 | }
|
493 |
|
494 | /** Registers a handler that will be invoked when the connection starts reconnecting.
|
495 | *
|
496 | * @param {Function} callback The handler that will be invoked when the connection starts reconnecting. Optionally receives a single argument containing the error that caused the connection to start reconnecting (if any).
|
497 | */
|
498 | public onreconnecting(callback: (error?: Error) => void) {
|
499 | if (callback) {
|
500 | this.reconnectingCallbacks.push(callback);
|
501 | }
|
502 | }
|
503 |
|
504 | /** Registers a handler that will be invoked when the connection successfully reconnects.
|
505 | *
|
506 | * @param {Function} callback The handler that will be invoked when the connection successfully reconnects.
|
507 | */
|
508 | public onreconnected(callback: (connectionId?: string) => void) {
|
509 | if (callback) {
|
510 | this.reconnectedCallbacks.push(callback);
|
511 | }
|
512 | }
|
513 |
|
514 | private processIncomingData(data: any) {
|
515 | this.cleanupTimeout();
|
516 |
|
517 | if (!this.receivedHandshakeResponse) {
|
518 | data = this.processHandshakeResponse(data);
|
519 | this.receivedHandshakeResponse = true;
|
520 | }
|
521 |
|
522 | // Data may have all been read when processing handshake response
|
523 | if (data) {
|
524 | // Parse the messages
|
525 | const messages = this.protocol.parseMessages(data, this.logger);
|
526 |
|
527 | for (const message of messages) {
|
528 | switch (message.type) {
|
529 | case MessageType.Invocation:
|
530 | this.invokeClientMethod(message);
|
531 | break;
|
532 | case MessageType.StreamItem:
|
533 | case MessageType.Completion:
|
534 | const callback = this.callbacks[message.invocationId];
|
535 | if (callback) {
|
536 | if (message.type === MessageType.Completion) {
|
537 | delete this.callbacks[message.invocationId];
|
538 | }
|
539 | callback(message);
|
540 | }
|
541 | break;
|
542 | case MessageType.Ping:
|
543 | // Don't care about pings
|
544 | break;
|
545 | case MessageType.Close:
|
546 | this.logger.log(LogLevel.Information, "Close message received from server.");
|
547 |
|
548 | const error = message.error ? new Error("Server returned an error on close: " + message.error) : undefined;
|
549 |
|
550 | if (message.allowReconnect === true) {
|
551 | // It feels wrong not to await connection.stop() here, but processIncomingData is called as part of an onreceive callback which is not async,
|
552 | // this is already the behavior for serverTimeout(), and HttpConnection.Stop() should catch and log all possible exceptions.
|
553 |
|
554 | // tslint:disable-next-line:no-floating-promises
|
555 | this.connection.stop(error);
|
556 | } else {
|
557 | // We cannot await stopInternal() here, but subsequent calls to stop() will await this if stopInternal() is still ongoing.
|
558 | this.stopPromise = this.stopInternal(error);
|
559 | }
|
560 |
|
561 | break;
|
562 | default:
|
563 | this.logger.log(LogLevel.Warning, `Invalid message type: ${message.type}.`);
|
564 | break;
|
565 | }
|
566 | }
|
567 | }
|
568 |
|
569 | this.resetTimeoutPeriod();
|
570 | }
|
571 |
|
572 | private processHandshakeResponse(data: any): any {
|
573 | let responseMessage: HandshakeResponseMessage;
|
574 | let remainingData: any;
|
575 |
|
576 | try {
|
577 | [remainingData, responseMessage] = this.handshakeProtocol.parseHandshakeResponse(data);
|
578 | } catch (e) {
|
579 | const message = "Error parsing handshake response: " + e;
|
580 | this.logger.log(LogLevel.Error, message);
|
581 |
|
582 | const error = new Error(message);
|
583 | this.handshakeRejecter(error);
|
584 | throw error;
|
585 | }
|
586 | if (responseMessage.error) {
|
587 | const message = "Server returned handshake error: " + responseMessage.error;
|
588 | this.logger.log(LogLevel.Error, message);
|
589 |
|
590 | const error = new Error(message);
|
591 | this.handshakeRejecter(error);
|
592 | throw error;
|
593 | } else {
|
594 | this.logger.log(LogLevel.Debug, "Server handshake complete.");
|
595 | }
|
596 |
|
597 | this.handshakeResolver();
|
598 | return remainingData;
|
599 | }
|
600 |
|
601 | private resetKeepAliveInterval() {
|
602 | this.cleanupPingTimer();
|
603 | this.pingServerHandle = setTimeout(async () => {
|
604 | if (this.connectionState === HubConnectionState.Connected) {
|
605 | try {
|
606 | await this.sendMessage(this.cachedPingMessage);
|
607 | } catch {
|
608 |
|
609 |
|
610 | this.cleanupPingTimer();
|
611 | }
|
612 | }
|
613 | }, this.keepAliveIntervalInMilliseconds);
|
614 | }
|
615 |
|
616 | private resetTimeoutPeriod() {
|
617 | if (!this.connection.features || !this.connection.features.inherentKeepAlive) {
|
618 | // Set the timeout timer
|
619 | this.timeoutHandle = setTimeout(() => this.serverTimeout(), this.serverTimeoutInMilliseconds);
|
620 | }
|
621 | }
|
622 |
|
623 | private serverTimeout() {
|
624 | // The server hasn't talked to us in a while. It doesn't like us anymore ... :(
|
625 |
|
626 |
|
627 | this.connection.stop(new Error("Server timeout elapsed without receiving a message from the server."));
|
628 | }
|
629 |
|
630 | private invokeClientMethod(invocationMessage: InvocationMessage) {
|
631 | const methods = this.methods[invocationMessage.target.toLowerCase()];
|
632 | if (methods) {
|
633 | try {
|
634 | methods.forEach((m) => m.apply(this, invocationMessage.arguments));
|
635 | } catch (e) {
|
636 | this.logger.log(LogLevel.Error, `A callback for the method ${invocationMessage.target.toLowerCase()} threw error '${e}'.`);
|
637 | }
|
638 |
|
639 | if (invocationMessage.invocationId) {
|
640 |
|
641 | const message = "Server requested a response, which is not supported in this version of the client.";
|
642 | this.logger.log(LogLevel.Error, message);
|
643 |
|
644 |
|
645 | this.stopPromise = this.stopInternal(new Error(message));
|
646 | }
|
647 | } else {
|
648 | this.logger.log(LogLevel.Warning, `No client method with the name '${invocationMessage.target}' found.`);
|
649 | }
|
650 | }
|
651 |
|
652 | private connectionClosed(error?: Error) {
|
653 | this.logger.log(LogLevel.Debug, `HubConnection.connectionClosed(${error}) called while in state ${this.connectionState}.`);
|
654 |
|
655 |
|
656 | this.stopDuringStartError = this.stopDuringStartError || error || new Error("The underlying connection was closed before the hub handshake could complete.");
|
657 |
|
658 |
|
659 |
|
660 | if (this.handshakeResolver) {
|
661 | this.handshakeResolver();
|
662 | }
|
663 |
|
664 | this.cancelCallbacksWithError(error || new Error("Invocation canceled due to the underlying connection being closed."));
|
665 |
|
666 | this.cleanupTimeout();
|
667 | this.cleanupPingTimer();
|
668 |
|
669 | if (this.connectionState === HubConnectionState.Disconnecting) {
|
670 | this.completeClose(error);
|
671 | } else if (this.connectionState === HubConnectionState.Connected && this.reconnectPolicy) {
|
672 |
|
673 | this.reconnect(error);
|
674 | } else if (this.connectionState === HubConnectionState.Connected) {
|
675 | this.completeClose(error);
|
676 | }
|
677 |
|
678 |
|
679 |
|
680 |
|
681 |
|
682 |
|
683 | }
|
684 |
|
685 | private completeClose(error?: Error) {
|
686 | if (this.connectionStarted) {
|
687 | this.connectionState = HubConnectionState.Disconnected;
|
688 | this.connectionStarted = false;
|
689 |
|
690 | try {
|
691 | this.closedCallbacks.forEach((c) => c.apply(this, [error]));
|
692 | } catch (e) {
|
693 | this.logger.log(LogLevel.Error, `An onclose callback called with error '${error}' threw error '${e}'.`);
|
694 | }
|
695 | }
|
696 | }
|
697 |
|
698 | private async reconnect(error?: Error) {
|
699 | const reconnectStartTime = Date.now();
|
700 | let previousReconnectAttempts = 0;
|
701 | let retryError = error !== undefined ? error : new Error("Attempting to reconnect due to a unknown error.");
|
702 |
|
703 | let nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, 0, retryError);
|
704 |
|
705 | if (nextRetryDelay === null) {
|
706 | this.logger.log(LogLevel.Debug, "Connection not reconnecting because the IRetryPolicy returned null on the first reconnect attempt.");
|
707 | this.completeClose(error);
|
708 | return;
|
709 | }
|
710 |
|
711 | this.connectionState = HubConnectionState.Reconnecting;
|
712 |
|
713 | if (error) {
|
714 | this.logger.log(LogLevel.Information, `Connection reconnecting because of error '${error}'.`);
|
715 | } else {
|
716 | this.logger.log(LogLevel.Information, "Connection reconnecting.");
|
717 | }
|
718 |
|
719 | if (this.onreconnecting) {
|
720 | try {
|
721 | this.reconnectingCallbacks.forEach((c) => c.apply(this, [error]));
|
722 | } catch (e) {
|
723 | this.logger.log(LogLevel.Error, `An onreconnecting callback called with error '${error}' threw error '${e}'.`);
|
724 | }
|
725 |
|
726 |
|
727 | if (this.connectionState !== HubConnectionState.Reconnecting) {
|
728 | this.logger.log(LogLevel.Debug, "Connection left the reconnecting state in onreconnecting callback. Done reconnecting.");
|
729 | return;
|
730 | }
|
731 | }
|
732 |
|
733 | while (nextRetryDelay !== null) {
|
734 | this.logger.log(LogLevel.Information, `Reconnect attempt number ${previousReconnectAttempts} will start in ${nextRetryDelay} ms.`);
|
735 |
|
736 | await new Promise((resolve) => {
|
737 | this.reconnectDelayHandle = setTimeout(resolve, nextRetryDelay!);
|
738 | });
|
739 | this.reconnectDelayHandle = undefined;
|
740 |
|
741 | if (this.connectionState !== HubConnectionState.Reconnecting) {
|
742 | this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect delay. Done reconnecting.");
|
743 | return;
|
744 | }
|
745 |
|
746 | try {
|
747 | await this.startInternal();
|
748 |
|
749 | this.connectionState = HubConnectionState.Connected;
|
750 | this.logger.log(LogLevel.Information, "HubConnection reconnected successfully.");
|
751 |
|
752 | if (this.onreconnected) {
|
753 | try {
|
754 | this.reconnectedCallbacks.forEach((c) => c.apply(this, [this.connection.connectionId]));
|
755 | } catch (e) {
|
756 | this.logger.log(LogLevel.Error, `An onreconnected callback called with connectionId '${this.connection.connectionId}; threw error '${e}'.`);
|
757 | }
|
758 | }
|
759 |
|
760 | return;
|
761 | } catch (e) {
|
762 | this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`);
|
763 |
|
764 | if (this.connectionState !== HubConnectionState.Reconnecting) {
|
765 | this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting.");
|
766 | return;
|
767 | }
|
768 |
|
769 | retryError = e instanceof Error ? e : new Error(e.toString());
|
770 | nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, Date.now() - reconnectStartTime, retryError);
|
771 | }
|
772 | }
|
773 |
|
774 | this.logger.log(LogLevel.Information, `Reconnect retries have been exhausted after ${Date.now() - reconnectStartTime} ms and ${previousReconnectAttempts} failed attempts. Connection disconnecting.`);
|
775 |
|
776 | this.completeClose();
|
777 | }
|
778 |
|
779 | private getNextRetryDelay(previousRetryCount: number, elapsedMilliseconds: number, retryReason: Error) {
|
780 | try {
|
781 | return this.reconnectPolicy!.nextRetryDelayInMilliseconds({
|
782 | elapsedMilliseconds,
|
783 | previousRetryCount,
|
784 | retryReason,
|
785 | });
|
786 | } catch (e) {
|
787 | this.logger.log(LogLevel.Error, `IRetryPolicy.nextRetryDelayInMilliseconds(${previousRetryCount}, ${elapsedMilliseconds}) threw error '${e}'.`);
|
788 | return null;
|
789 | }
|
790 | }
|
791 |
|
792 | private cancelCallbacksWithError(error: Error) {
|
793 | const callbacks = this.callbacks;
|
794 | this.callbacks = {};
|
795 |
|
796 | Object.keys(callbacks)
|
797 | .forEach((key) => {
|
798 | const callback = callbacks[key];
|
799 | callback(null, error);
|
800 | });
|
801 | }
|
802 |
|
803 | private cleanupPingTimer(): void {
|
804 | if (this.pingServerHandle) {
|
805 | clearTimeout(this.pingServerHandle);
|
806 | }
|
807 | }
|
808 |
|
809 | private cleanupTimeout(): void {
|
810 | if (this.timeoutHandle) {
|
811 | clearTimeout(this.timeoutHandle);
|
812 | }
|
813 | }
|
814 |
|
815 | private createInvocation(methodName: string, args: any[], nonblocking: boolean, streamIds: string[]): InvocationMessage {
|
816 | if (nonblocking) {
|
817 | return {
|
818 | arguments: args,
|
819 | streamIds,
|
820 | target: methodName,
|
821 | type: MessageType.Invocation,
|
822 | };
|
823 | } else {
|
824 | const invocationId = this.invocationId;
|
825 | this.invocationId++;
|
826 |
|
827 | return {
|
828 | arguments: args,
|
829 | invocationId: invocationId.toString(),
|
830 | streamIds,
|
831 | target: methodName,
|
832 | type: MessageType.Invocation,
|
833 | };
|
834 | }
|
835 | }
|
836 |
|
837 | private launchStreams(streams: Array<IStreamResult<any>>, promiseQueue: Promise<void>): void {
|
838 | if (streams.length === 0) {
|
839 | return;
|
840 | }
|
841 |
|
842 |
|
843 | if (!promiseQueue) {
|
844 | promiseQueue = Promise.resolve();
|
845 | }
|
846 |
|
847 |
|
848 |
|
849 | for (const streamId in streams) {
|
850 | streams[streamId].subscribe({
|
851 | complete: () => {
|
852 | promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createCompletionMessage(streamId)));
|
853 | },
|
854 | error: (err) => {
|
855 | let message: string;
|
856 | if (err instanceof Error) {
|
857 | message = err.message;
|
858 | } else if (err && err.toString) {
|
859 | message = err.toString();
|
860 | } else {
|
861 | message = "Unknown error";
|
862 | }
|
863 |
|
864 | promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createCompletionMessage(streamId, message)));
|
865 | },
|
866 | next: (item) => {
|
867 | promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createStreamItemMessage(streamId, item)));
|
868 | },
|
869 | });
|
870 | }
|
871 | }
|
872 |
|
873 | private replaceStreamingParams(args: any[]): [Array<IStreamResult<any>>, string[]] {
|
874 | const streams: Array<IStreamResult<any>> = [];
|
875 | const streamIds: string[] = [];
|
876 | for (let i = 0; i < args.length; i++) {
|
877 | const argument = args[i];
|
878 | if (this.isObservable(argument)) {
|
879 | const streamId = this.invocationId;
|
880 | this.invocationId++;
|
881 |
|
882 | streams[streamId] = argument;
|
883 | streamIds.push(streamId.toString());
|
884 |
|
885 |
|
886 | args.splice(i, 1);
|
887 | }
|
888 | }
|
889 |
|
890 | return [streams, streamIds];
|
891 | }
|
892 |
|
893 | private isObservable(arg: any): arg is IStreamResult<any> {
|
894 |
|
895 | return arg && arg.subscribe && typeof arg.subscribe === "function";
|
896 | }
|
897 |
|
898 | private createStreamInvocation(methodName: string, args: any[], streamIds: string[]): StreamInvocationMessage {
|
899 | const invocationId = this.invocationId;
|
900 | this.invocationId++;
|
901 |
|
902 | return {
|
903 | arguments: args,
|
904 | invocationId: invocationId.toString(),
|
905 | streamIds,
|
906 | target: methodName,
|
907 | type: MessageType.StreamInvocation,
|
908 | };
|
909 | }
|
910 |
|
911 | private createCancelInvocation(id: string): CancelInvocationMessage {
|
912 | return {
|
913 | invocationId: id,
|
914 | type: MessageType.CancelInvocation,
|
915 | };
|
916 | }
|
917 |
|
918 | private createStreamItemMessage(id: string, item: any): StreamItemMessage {
|
919 | return {
|
920 | invocationId: id,
|
921 | item,
|
922 | type: MessageType.StreamItem,
|
923 | };
|
924 | }
|
925 |
|
926 | private createCompletionMessage(id: string, error?: any, result?: any): CompletionMessage {
|
927 | if (error) {
|
928 | return {
|
929 | error,
|
930 | invocationId: id,
|
931 | type: MessageType.Completion,
|
932 | };
|
933 | }
|
934 |
|
935 | return {
|
936 | invocationId: id,
|
937 | result,
|
938 | type: MessageType.Completion,
|
939 | };
|
940 | }
|
941 | }
|
942 |
|
\ | No newline at end of file |