UNPKG

41.2 kBPlain TextView Raw
1// Copyright (c) .NET Foundation. All rights reserved.
2// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3
4import { HandshakeProtocol, HandshakeRequestMessage, HandshakeResponseMessage } from "./HandshakeProtocol";
5import { IConnection } from "./IConnection";
6import { CancelInvocationMessage, CompletionMessage, IHubProtocol, InvocationMessage, MessageType, StreamInvocationMessage, StreamItemMessage } from "./IHubProtocol";
7import { ILogger, LogLevel } from "./ILogger";
8import { IRetryPolicy } from "./IRetryPolicy";
9import { IStreamResult } from "./Stream";
10import { Subject } from "./Subject";
11import { Arg } from "./Utils";
12
13const DEFAULT_TIMEOUT_IN_MS: number = 30 * 1000;
14const DEFAULT_PING_INTERVAL_IN_MS: number = 15 * 1000;
15
16/** Describes the current state of the {@link HubConnection} to the server. */
17export enum HubConnectionState {
18 /** The hub connection is disconnected. */
19 Disconnected = "Disconnected",
20 /** The hub connection is connecting. */
21 Connecting = "Connecting",
22 /** The hub connection is connected. */
23 Connected = "Connected",
24 /** The hub connection is disconnecting. */
25 Disconnecting = "Disconnecting",
26 /** The hub connection is reconnecting. */
27 Reconnecting = "Reconnecting",
28}
29
30/** Represents a connection to a SignalR Hub. */
31export class HubConnection {
32 private readonly cachedPingMessage: string | ArrayBuffer;
33 private readonly connection: IConnection;
34 private readonly logger: ILogger;
35 private readonly reconnectPolicy?: IRetryPolicy;
36 private protocol: IHubProtocol;
37 private handshakeProtocol: HandshakeProtocol;
38 private callbacks: { [invocationId: string]: (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => void };
39 private methods: { [name: string]: Array<(...args: any[]) => void> };
40 private invocationId: number;
41
42 private closedCallbacks: Array<(error?: Error) => void>;
43 private reconnectingCallbacks: Array<(error?: Error) => void>;
44 private reconnectedCallbacks: Array<(connectionId?: string) => void>;
45
46 private receivedHandshakeResponse: boolean;
47 private handshakeResolver!: (value?: PromiseLike<{}>) => void;
48 private handshakeRejecter!: (reason?: any) => void;
49 private stopDuringStartError?: Error;
50
51 private connectionState: HubConnectionState;
52 // connectionStarted is tracked independently from connectionState, so we can check if the
53 // connection ever did successfully transition from connecting to connected before disconnecting.
54 private connectionStarted: boolean;
55 private startPromise?: Promise<void>;
56 private stopPromise?: Promise<void>;
57
58 // The type of these a) doesn't matter and b) varies when building in browser and node contexts
59 // Since we're building the WebPack bundle directly from the TypeScript, this matters (previously
60 // we built the bundle from the compiled JavaScript).
61 private reconnectDelayHandle?: any;
62 private timeoutHandle?: any;
63 private pingServerHandle?: any;
64
65 /** The server timeout in milliseconds.
66 *
67 * If this timeout elapses without receiving any messages from the server, the connection will be terminated with an error.
68 * The default timeout value is 30,000 milliseconds (30 seconds).
69 */
70 public serverTimeoutInMilliseconds: number;
71
72 /** Default interval at which to ping the server.
73 *
74 * The default value is 15,000 milliseconds (15 seconds).
75 * Allows the server to detect hard disconnects (like when a client unplugs their computer).
76 */
77 public keepAliveIntervalInMilliseconds: number;
78
79 /** @internal */
80 // Using a public static factory method means we can have a private constructor and an _internal_
81 // create method that can be used by HubConnectionBuilder. An "internal" constructor would just
82 // be stripped away and the '.d.ts' file would have no constructor, which is interpreted as a
83 // public parameter-less constructor.
84 public static create(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IRetryPolicy): HubConnection {
85 return new HubConnection(connection, logger, protocol, reconnectPolicy);
86 }
87
88 private constructor(connection: IConnection, logger: ILogger, protocol: IHubProtocol, reconnectPolicy?: IRetryPolicy) {
89 Arg.isRequired(connection, "connection");
90 Arg.isRequired(logger, "logger");
91 Arg.isRequired(protocol, "protocol");
92
93 this.serverTimeoutInMilliseconds = DEFAULT_TIMEOUT_IN_MS;
94 this.keepAliveIntervalInMilliseconds = DEFAULT_PING_INTERVAL_IN_MS;
95
96 this.logger = logger;
97 this.protocol = protocol;
98 this.connection = connection;
99 this.reconnectPolicy = reconnectPolicy;
100 this.handshakeProtocol = new HandshakeProtocol();
101
102 this.connection.onreceive = (data: any) => this.processIncomingData(data);
103 this.connection.onclose = (error?: Error) => this.connectionClosed(error);
104
105 this.callbacks = {};
106 this.methods = {};
107 this.closedCallbacks = [];
108 this.reconnectingCallbacks = [];
109 this.reconnectedCallbacks = [];
110 this.invocationId = 0;
111 this.receivedHandshakeResponse = false;
112 this.connectionState = HubConnectionState.Disconnected;
113 this.connectionStarted = false;
114
115 this.cachedPingMessage = this.protocol.writeMessage({ type: MessageType.Ping });
116 }
117
118 /** Indicates the state of the {@link HubConnection} to the server. */
119 get state(): HubConnectionState {
120 return this.connectionState;
121 }
122
123 /** Represents the connection id of the {@link HubConnection} on the server. The connection id will be null when the connection is either
124 * in the disconnected state or if the negotiation step was skipped.
125 */
126 get connectionId(): string | null {
127 return this.connection ? (this.connection.connectionId || null) : null;
128 }
129
130 /** Indicates the url of the {@link HubConnection} to the server. */
131 get baseUrl(): string {
132 return this.connection.baseUrl || "";
133 }
134
135 /**
136 * Sets a new url for the HubConnection. Note that the url can only be changed when the connection is in either the Disconnected or
137 * Reconnecting states.
138 * @param {string} url The url to connect to.
139 */
140 set baseUrl(url: string) {
141 if (this.connectionState !== HubConnectionState.Disconnected && this.connectionState !== HubConnectionState.Reconnecting) {
142 throw new Error("The HubConnection must be in the Disconnected or Reconnecting state to change the url.");
143 }
144
145 if (!url) {
146 throw new Error("The HubConnection url must be a valid url.");
147 }
148
149 this.connection.baseUrl = url;
150 }
151
152 /** Starts the connection.
153 *
154 * @returns {Promise<void>} A Promise that resolves when the connection has been successfully established, or rejects with an error.
155 */
156 public start(): Promise<void> {
157 this.startPromise = this.startWithStateTransitions();
158 return this.startPromise;
159 }
160
161 private async startWithStateTransitions(): Promise<void> {
162 if (this.connectionState !== HubConnectionState.Disconnected) {
163 return Promise.reject(new Error("Cannot start a HubConnection that is not in the 'Disconnected' state."));
164 }
165
166 this.connectionState = HubConnectionState.Connecting;
167 this.logger.log(LogLevel.Debug, "Starting HubConnection.");
168
169 try {
170 await this.startInternal();
171
172 this.connectionState = HubConnectionState.Connected;
173 this.connectionStarted = true;
174 this.logger.log(LogLevel.Debug, "HubConnection connected successfully.");
175 } catch (e) {
176 this.connectionState = HubConnectionState.Disconnected;
177 this.logger.log(LogLevel.Debug, `HubConnection failed to start successfully because of error '${e}'.`);
178 return Promise.reject(e);
179 }
180 }
181
182 private async startInternal() {
183 this.stopDuringStartError = undefined;
184 this.receivedHandshakeResponse = false;
185 // Set up the promise before any connection is (re)started otherwise it could race with received messages
186 const handshakePromise = new Promise((resolve, reject) => {
187 this.handshakeResolver = resolve;
188 this.handshakeRejecter = reject;
189 });
190
191 await this.connection.start(this.protocol.transferFormat);
192
193 try {
194 const handshakeRequest: HandshakeRequestMessage = {
195 protocol: this.protocol.name,
196 version: this.protocol.version,
197 };
198
199 this.logger.log(LogLevel.Debug, "Sending handshake request.");
200
201 await this.sendMessage(this.handshakeProtocol.writeHandshakeRequest(handshakeRequest));
202
203 this.logger.log(LogLevel.Information, `Using HubProtocol '${this.protocol.name}'.`);
204
205 // defensively cleanup timeout in case we receive a message from the server before we finish start
206 this.cleanupTimeout();
207 this.resetTimeoutPeriod();
208 this.resetKeepAliveInterval();
209
210 await handshakePromise;
211
212 // It's important to check the stopDuringStartError instead of just relying on the handshakePromise
213 // being rejected on close, because this continuation can run after both the handshake completed successfully
214 // and the connection was closed.
215 if (this.stopDuringStartError) {
216 // It's important to throw instead of returning a rejected promise, because we don't want to allow any state
217 // transitions to occur between now and the calling code observing the exceptions. Returning a rejected promise
218 // will cause the calling continuation to get scheduled to run later.
219 throw this.stopDuringStartError;
220 }
221 } catch (e) {
222 this.logger.log(LogLevel.Debug, `Hub handshake failed with error '${e}' during start(). Stopping HubConnection.`);
223
224 this.cleanupTimeout();
225 this.cleanupPingTimer();
226
227 // HttpConnection.stop() should not complete until after the onclose callback is invoked.
228 // This will transition the HubConnection to the disconnected state before HttpConnection.stop() completes.
229 await this.connection.stop(e);
230 throw e;
231 }
232 }
233
234 /** Stops the connection.
235 *
236 * @returns {Promise<void>} A Promise that resolves when the connection has been successfully terminated, or rejects with an error.
237 */
238 public async stop(): Promise<void> {
239 // Capture the start promise before the connection might be restarted in an onclose callback.
240 const startPromise = this.startPromise;
241
242 this.stopPromise = this.stopInternal();
243 await this.stopPromise;
244
245 try {
246 // Awaiting undefined continues immediately
247 await startPromise;
248 } catch (e) {
249 // This exception is returned to the user as a rejected Promise from the start method.
250 }
251 }
252
253 private stopInternal(error?: Error): Promise<void> {
254 if (this.connectionState === HubConnectionState.Disconnected) {
255 this.logger.log(LogLevel.Debug, `Call to HubConnection.stop(${error}) ignored because it is already in the disconnected state.`);
256 return Promise.resolve();
257 }
258
259 if (this.connectionState === HubConnectionState.Disconnecting) {
260 this.logger.log(LogLevel.Debug, `Call to HttpConnection.stop(${error}) ignored because the connection is already in the disconnecting state.`);
261 return this.stopPromise!;
262 }
263
264 this.connectionState = HubConnectionState.Disconnecting;
265
266 this.logger.log(LogLevel.Debug, "Stopping HubConnection.");
267
268 if (this.reconnectDelayHandle) {
269 // We're in a reconnect delay which means the underlying connection is currently already stopped.
270 // Just clear the handle to stop the reconnect loop (which no one is waiting on thankfully) and
271 // fire the onclose callbacks.
272 this.logger.log(LogLevel.Debug, "Connection stopped during reconnect delay. Done reconnecting.");
273
274 clearTimeout(this.reconnectDelayHandle);
275 this.reconnectDelayHandle = undefined;
276
277 this.completeClose();
278 return Promise.resolve();
279 }
280
281 this.cleanupTimeout();
282 this.cleanupPingTimer();
283 this.stopDuringStartError = error || new Error("The connection was stopped before the hub handshake could complete.");
284
285 // HttpConnection.stop() should not complete until after either HttpConnection.start() fails
286 // or the onclose callback is invoked. The onclose callback will transition the HubConnection
287 // to the disconnected state if need be before HttpConnection.stop() completes.
288 return this.connection.stop(error);
289 }
290
291 /** Invokes a streaming hub method on the server using the specified name and arguments.
292 *
293 * @typeparam T The type of the items returned by the server.
294 * @param {string} methodName The name of the server method to invoke.
295 * @param {any[]} args The arguments used to invoke the server method.
296 * @returns {IStreamResult<T>} An object that yields results from the server as they are received.
297 */
298 public stream<T = any>(methodName: string, ...args: any[]): IStreamResult<T> {
299 const [streams, streamIds] = this.replaceStreamingParams(args);
300 const invocationDescriptor = this.createStreamInvocation(methodName, args, streamIds);
301
302 let promiseQueue: Promise<void>;
303 const subject = new Subject<T>();
304 subject.cancelCallback = () => {
305 const cancelInvocation: CancelInvocationMessage = this.createCancelInvocation(invocationDescriptor.invocationId);
306
307 delete this.callbacks[invocationDescriptor.invocationId];
308
309 return promiseQueue.then(() => {
310 return this.sendWithProtocol(cancelInvocation);
311 });
312 };
313
314 this.callbacks[invocationDescriptor.invocationId] = (invocationEvent: CompletionMessage | StreamItemMessage | null, error?: Error) => {
315 if (error) {
316 subject.error(error);
317 return;
318 } else if (invocationEvent) {
319 // invocationEvent will not be null when an error is not passed to the callback
320 if (invocationEvent.type === MessageType.Completion) {
321 if (invocationEvent.error) {
322 subject.error(new Error(invocationEvent.error));
323 } else {
324 subject.complete();
325 }
326 } else {
327 subject.next((invocationEvent.item) as T);
328 }
329 }
330 };
331
332 promiseQueue = this.sendWithProtocol(invocationDescriptor)
333 .catch((e) => {
334 subject.error(e);
335 delete this.callbacks[invocationDescriptor.invocationId];
336 });
337
338 this.launchStreams(streams, promiseQueue);
339
340 return subject;
341 }
342
343 private sendMessage(message: any) {
344 this.resetKeepAliveInterval();
345 return this.connection.send(message);
346 }
347
348 /**
349 * Sends a js object to the server.
350 * @param message The js object to serialize and send.
351 */
352 private sendWithProtocol(message: any) {
353 return this.sendMessage(this.protocol.writeMessage(message));
354 }
355
356 /** Invokes a hub method on the server using the specified name and arguments. Does not wait for a response from the receiver.
357 *
358 * The Promise returned by this method resolves when the client has sent the invocation to the server. The server may still
359 * be processing the invocation.
360 *
361 * @param {string} methodName The name of the server method to invoke.
362 * @param {any[]} args The arguments used to invoke the server method.
363 * @returns {Promise<void>} A Promise that resolves when the invocation has been successfully sent, or rejects with an error.
364 */
365 public send(methodName: string, ...args: any[]): Promise<void> {
366 const [streams, streamIds] = this.replaceStreamingParams(args);
367 const sendPromise = this.sendWithProtocol(this.createInvocation(methodName, args, true, streamIds));
368
369 this.launchStreams(streams, sendPromise);
370
371 return sendPromise;
372 }
373
374 /** Invokes a hub method on the server using the specified name and arguments.
375 *
376 * The Promise returned by this method resolves when the server indicates it has finished invoking the method. When the promise
377 * resolves, the server has finished invoking the method. If the server method returns a result, it is produced as the result of
378 * resolving the Promise.
379 *
380 * @typeparam T The expected return type.
381 * @param {string} methodName The name of the server method to invoke.
382 * @param {any[]} args The arguments used to invoke the server method.
383 * @returns {Promise<T>} A Promise that resolves with the result of the server method (if any), or rejects with an error.
384 */
385 public invoke<T = any>(methodName: string, ...args: any[]): Promise<T> {
386 const [streams, streamIds] = this.replaceStreamingParams(args);
387 const invocationDescriptor = this.createInvocation(methodName, args, false, streamIds);
388
389 const p = new Promise<any>((resolve, reject) => {
390 // invocationId will always have a value for a non-blocking invocation
391 this.callbacks[invocationDescriptor.invocationId!] = (invocationEvent: StreamItemMessage | CompletionMessage | null, error?: Error) => {
392 if (error) {
393 reject(error);
394 return;
395 } else if (invocationEvent) {
396 // invocationEvent will not be null when an error is not passed to the callback
397 if (invocationEvent.type === MessageType.Completion) {
398 if (invocationEvent.error) {
399 reject(new Error(invocationEvent.error));
400 } else {
401 resolve(invocationEvent.result);
402 }
403 } else {
404 reject(new Error(`Unexpected message type: ${invocationEvent.type}`));
405 }
406 }
407 };
408
409 const promiseQueue = this.sendWithProtocol(invocationDescriptor)
410 .catch((e) => {
411 reject(e);
412 // invocationId will always have a value for a non-blocking invocation
413 delete this.callbacks[invocationDescriptor.invocationId!];
414 });
415
416 this.launchStreams(streams, promiseQueue);
417 });
418
419 return p;
420 }
421
422 /** Registers a handler that will be invoked when the hub method with the specified method name is invoked.
423 *
424 * @param {string} methodName The name of the hub method to define.
425 * @param {Function} newMethod The handler that will be raised when the hub method is invoked.
426 */
427 public on(methodName: string, newMethod: (...args: any[]) => void) {
428 if (!methodName || !newMethod) {
429 return;
430 }
431
432 methodName = methodName.toLowerCase();
433 if (!this.methods[methodName]) {
434 this.methods[methodName] = [];
435 }
436
437 // Preventing adding the same handler multiple times.
438 if (this.methods[methodName].indexOf(newMethod) !== -1) {
439 return;
440 }
441
442 this.methods[methodName].push(newMethod);
443 }
444
445 /** Removes all handlers for the specified hub method.
446 *
447 * @param {string} methodName The name of the method to remove handlers for.
448 */
449 public off(methodName: string): void;
450
451 /** Removes the specified handler for the specified hub method.
452 *
453 * You must pass the exact same Function instance as was previously passed to {@link @microsoft/signalr.HubConnection.on}. Passing a different instance (even if the function
454 * body is the same) will not remove the handler.
455 *
456 * @param {string} methodName The name of the method to remove handlers for.
457 * @param {Function} method The handler to remove. This must be the same Function instance as the one passed to {@link @microsoft/signalr.HubConnection.on}.
458 */
459 public off(methodName: string, method: (...args: any[]) => void): void;
460 public off(methodName: string, method?: (...args: any[]) => void): void {
461 if (!methodName) {
462 return;
463 }
464
465 methodName = methodName.toLowerCase();
466 const handlers = this.methods[methodName];
467 if (!handlers) {
468 return;
469 }
470 if (method) {
471 const removeIdx = handlers.indexOf(method);
472 if (removeIdx !== -1) {
473 handlers.splice(removeIdx, 1);
474 if (handlers.length === 0) {
475 delete this.methods[methodName];
476 }
477 }
478 } else {
479 delete this.methods[methodName];
480 }
481
482 }
483
484 /** Registers a handler that will be invoked when the connection is closed.
485 *
486 * @param {Function} callback The handler that will be invoked when the connection is closed. Optionally receives a single argument containing the error that caused the connection to close (if any).
487 */
488 public onclose(callback: (error?: Error) => void) {
489 if (callback) {
490 this.closedCallbacks.push(callback);
491 }
492 }
493
494 /** Registers a handler that will be invoked when the connection starts reconnecting.
495 *
496 * @param {Function} callback The handler that will be invoked when the connection starts reconnecting. Optionally receives a single argument containing the error that caused the connection to start reconnecting (if any).
497 */
498 public onreconnecting(callback: (error?: Error) => void) {
499 if (callback) {
500 this.reconnectingCallbacks.push(callback);
501 }
502 }
503
504 /** Registers a handler that will be invoked when the connection successfully reconnects.
505 *
506 * @param {Function} callback The handler that will be invoked when the connection successfully reconnects.
507 */
508 public onreconnected(callback: (connectionId?: string) => void) {
509 if (callback) {
510 this.reconnectedCallbacks.push(callback);
511 }
512 }
513
514 private processIncomingData(data: any) {
515 this.cleanupTimeout();
516
517 if (!this.receivedHandshakeResponse) {
518 data = this.processHandshakeResponse(data);
519 this.receivedHandshakeResponse = true;
520 }
521
522 // Data may have all been read when processing handshake response
523 if (data) {
524 // Parse the messages
525 const messages = this.protocol.parseMessages(data, this.logger);
526
527 for (const message of messages) {
528 switch (message.type) {
529 case MessageType.Invocation:
530 this.invokeClientMethod(message);
531 break;
532 case MessageType.StreamItem:
533 case MessageType.Completion:
534 const callback = this.callbacks[message.invocationId];
535 if (callback) {
536 if (message.type === MessageType.Completion) {
537 delete this.callbacks[message.invocationId];
538 }
539 callback(message);
540 }
541 break;
542 case MessageType.Ping:
543 // Don't care about pings
544 break;
545 case MessageType.Close:
546 this.logger.log(LogLevel.Information, "Close message received from server.");
547
548 const error = message.error ? new Error("Server returned an error on close: " + message.error) : undefined;
549
550 if (message.allowReconnect === true) {
551 // It feels wrong not to await connection.stop() here, but processIncomingData is called as part of an onreceive callback which is not async,
552 // this is already the behavior for serverTimeout(), and HttpConnection.Stop() should catch and log all possible exceptions.
553
554 // tslint:disable-next-line:no-floating-promises
555 this.connection.stop(error);
556 } else {
557 // We cannot await stopInternal() here, but subsequent calls to stop() will await this if stopInternal() is still ongoing.
558 this.stopPromise = this.stopInternal(error);
559 }
560
561 break;
562 default:
563 this.logger.log(LogLevel.Warning, `Invalid message type: ${message.type}.`);
564 break;
565 }
566 }
567 }
568
569 this.resetTimeoutPeriod();
570 }
571
572 private processHandshakeResponse(data: any): any {
573 let responseMessage: HandshakeResponseMessage;
574 let remainingData: any;
575
576 try {
577 [remainingData, responseMessage] = this.handshakeProtocol.parseHandshakeResponse(data);
578 } catch (e) {
579 const message = "Error parsing handshake response: " + e;
580 this.logger.log(LogLevel.Error, message);
581
582 const error = new Error(message);
583 this.handshakeRejecter(error);
584 throw error;
585 }
586 if (responseMessage.error) {
587 const message = "Server returned handshake error: " + responseMessage.error;
588 this.logger.log(LogLevel.Error, message);
589
590 const error = new Error(message);
591 this.handshakeRejecter(error);
592 throw error;
593 } else {
594 this.logger.log(LogLevel.Debug, "Server handshake complete.");
595 }
596
597 this.handshakeResolver();
598 return remainingData;
599 }
600
601 private resetKeepAliveInterval() {
602 this.cleanupPingTimer();
603 this.pingServerHandle = setTimeout(async () => {
604 if (this.connectionState === HubConnectionState.Connected) {
605 try {
606 await this.sendMessage(this.cachedPingMessage);
607 } catch {
608 // We don't care about the error. It should be seen elsewhere in the client.
609 // The connection is probably in a bad or closed state now, cleanup the timer so it stops triggering
610 this.cleanupPingTimer();
611 }
612 }
613 }, this.keepAliveIntervalInMilliseconds);
614 }
615
616 private resetTimeoutPeriod() {
617 if (!this.connection.features || !this.connection.features.inherentKeepAlive) {
618 // Set the timeout timer
619 this.timeoutHandle = setTimeout(() => this.serverTimeout(), this.serverTimeoutInMilliseconds);
620 }
621 }
622
623 private serverTimeout() {
624 // The server hasn't talked to us in a while. It doesn't like us anymore ... :(
625 // Terminate the connection, but we don't need to wait on the promise. This could trigger reconnecting.
626 // tslint:disable-next-line:no-floating-promises
627 this.connection.stop(new Error("Server timeout elapsed without receiving a message from the server."));
628 }
629
630 private invokeClientMethod(invocationMessage: InvocationMessage) {
631 const methods = this.methods[invocationMessage.target.toLowerCase()];
632 if (methods) {
633 try {
634 methods.forEach((m) => m.apply(this, invocationMessage.arguments));
635 } catch (e) {
636 this.logger.log(LogLevel.Error, `A callback for the method ${invocationMessage.target.toLowerCase()} threw error '${e}'.`);
637 }
638
639 if (invocationMessage.invocationId) {
640 // This is not supported in v1. So we return an error to avoid blocking the server waiting for the response.
641 const message = "Server requested a response, which is not supported in this version of the client.";
642 this.logger.log(LogLevel.Error, message);
643
644 // We don't want to wait on the stop itself.
645 this.stopPromise = this.stopInternal(new Error(message));
646 }
647 } else {
648 this.logger.log(LogLevel.Warning, `No client method with the name '${invocationMessage.target}' found.`);
649 }
650 }
651
652 private connectionClosed(error?: Error) {
653 this.logger.log(LogLevel.Debug, `HubConnection.connectionClosed(${error}) called while in state ${this.connectionState}.`);
654
655 // Triggering this.handshakeRejecter is insufficient because it could already be resolved without the continuation having run yet.
656 this.stopDuringStartError = this.stopDuringStartError || error || new Error("The underlying connection was closed before the hub handshake could complete.");
657
658 // If the handshake is in progress, start will be waiting for the handshake promise, so we complete it.
659 // If it has already completed, this should just noop.
660 if (this.handshakeResolver) {
661 this.handshakeResolver();
662 }
663
664 this.cancelCallbacksWithError(error || new Error("Invocation canceled due to the underlying connection being closed."));
665
666 this.cleanupTimeout();
667 this.cleanupPingTimer();
668
669 if (this.connectionState === HubConnectionState.Disconnecting) {
670 this.completeClose(error);
671 } else if (this.connectionState === HubConnectionState.Connected && this.reconnectPolicy) {
672 // tslint:disable-next-line:no-floating-promises
673 this.reconnect(error);
674 } else if (this.connectionState === HubConnectionState.Connected) {
675 this.completeClose(error);
676 }
677
678 // If none of the above if conditions were true were called the HubConnection must be in either:
679 // 1. The Connecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail it.
680 // 2. The Reconnecting state in which case the handshakeResolver will complete it and stopDuringStartError will fail the current reconnect attempt
681 // and potentially continue the reconnect() loop.
682 // 3. The Disconnected state in which case we're already done.
683 }
684
685 private completeClose(error?: Error) {
686 if (this.connectionStarted) {
687 this.connectionState = HubConnectionState.Disconnected;
688 this.connectionStarted = false;
689
690 try {
691 this.closedCallbacks.forEach((c) => c.apply(this, [error]));
692 } catch (e) {
693 this.logger.log(LogLevel.Error, `An onclose callback called with error '${error}' threw error '${e}'.`);
694 }
695 }
696 }
697
698 private async reconnect(error?: Error) {
699 const reconnectStartTime = Date.now();
700 let previousReconnectAttempts = 0;
701 let retryError = error !== undefined ? error : new Error("Attempting to reconnect due to a unknown error.");
702
703 let nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, 0, retryError);
704
705 if (nextRetryDelay === null) {
706 this.logger.log(LogLevel.Debug, "Connection not reconnecting because the IRetryPolicy returned null on the first reconnect attempt.");
707 this.completeClose(error);
708 return;
709 }
710
711 this.connectionState = HubConnectionState.Reconnecting;
712
713 if (error) {
714 this.logger.log(LogLevel.Information, `Connection reconnecting because of error '${error}'.`);
715 } else {
716 this.logger.log(LogLevel.Information, "Connection reconnecting.");
717 }
718
719 if (this.onreconnecting) {
720 try {
721 this.reconnectingCallbacks.forEach((c) => c.apply(this, [error]));
722 } catch (e) {
723 this.logger.log(LogLevel.Error, `An onreconnecting callback called with error '${error}' threw error '${e}'.`);
724 }
725
726 // Exit early if an onreconnecting callback called connection.stop().
727 if (this.connectionState !== HubConnectionState.Reconnecting) {
728 this.logger.log(LogLevel.Debug, "Connection left the reconnecting state in onreconnecting callback. Done reconnecting.");
729 return;
730 }
731 }
732
733 while (nextRetryDelay !== null) {
734 this.logger.log(LogLevel.Information, `Reconnect attempt number ${previousReconnectAttempts} will start in ${nextRetryDelay} ms.`);
735
736 await new Promise((resolve) => {
737 this.reconnectDelayHandle = setTimeout(resolve, nextRetryDelay!);
738 });
739 this.reconnectDelayHandle = undefined;
740
741 if (this.connectionState !== HubConnectionState.Reconnecting) {
742 this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect delay. Done reconnecting.");
743 return;
744 }
745
746 try {
747 await this.startInternal();
748
749 this.connectionState = HubConnectionState.Connected;
750 this.logger.log(LogLevel.Information, "HubConnection reconnected successfully.");
751
752 if (this.onreconnected) {
753 try {
754 this.reconnectedCallbacks.forEach((c) => c.apply(this, [this.connection.connectionId]));
755 } catch (e) {
756 this.logger.log(LogLevel.Error, `An onreconnected callback called with connectionId '${this.connection.connectionId}; threw error '${e}'.`);
757 }
758 }
759
760 return;
761 } catch (e) {
762 this.logger.log(LogLevel.Information, `Reconnect attempt failed because of error '${e}'.`);
763
764 if (this.connectionState !== HubConnectionState.Reconnecting) {
765 this.logger.log(LogLevel.Debug, "Connection left the reconnecting state during reconnect attempt. Done reconnecting.");
766 return;
767 }
768
769 retryError = e instanceof Error ? e : new Error(e.toString());
770 nextRetryDelay = this.getNextRetryDelay(previousReconnectAttempts++, Date.now() - reconnectStartTime, retryError);
771 }
772 }
773
774 this.logger.log(LogLevel.Information, `Reconnect retries have been exhausted after ${Date.now() - reconnectStartTime} ms and ${previousReconnectAttempts} failed attempts. Connection disconnecting.`);
775
776 this.completeClose();
777 }
778
779 private getNextRetryDelay(previousRetryCount: number, elapsedMilliseconds: number, retryReason: Error) {
780 try {
781 return this.reconnectPolicy!.nextRetryDelayInMilliseconds({
782 elapsedMilliseconds,
783 previousRetryCount,
784 retryReason,
785 });
786 } catch (e) {
787 this.logger.log(LogLevel.Error, `IRetryPolicy.nextRetryDelayInMilliseconds(${previousRetryCount}, ${elapsedMilliseconds}) threw error '${e}'.`);
788 return null;
789 }
790 }
791
792 private cancelCallbacksWithError(error: Error) {
793 const callbacks = this.callbacks;
794 this.callbacks = {};
795
796 Object.keys(callbacks)
797 .forEach((key) => {
798 const callback = callbacks[key];
799 callback(null, error);
800 });
801 }
802
803 private cleanupPingTimer(): void {
804 if (this.pingServerHandle) {
805 clearTimeout(this.pingServerHandle);
806 }
807 }
808
809 private cleanupTimeout(): void {
810 if (this.timeoutHandle) {
811 clearTimeout(this.timeoutHandle);
812 }
813 }
814
815 private createInvocation(methodName: string, args: any[], nonblocking: boolean, streamIds: string[]): InvocationMessage {
816 if (nonblocking) {
817 return {
818 arguments: args,
819 streamIds,
820 target: methodName,
821 type: MessageType.Invocation,
822 };
823 } else {
824 const invocationId = this.invocationId;
825 this.invocationId++;
826
827 return {
828 arguments: args,
829 invocationId: invocationId.toString(),
830 streamIds,
831 target: methodName,
832 type: MessageType.Invocation,
833 };
834 }
835 }
836
837 private launchStreams(streams: Array<IStreamResult<any>>, promiseQueue: Promise<void>): void {
838 if (streams.length === 0) {
839 return;
840 }
841
842 // Synchronize stream data so they arrive in-order on the server
843 if (!promiseQueue) {
844 promiseQueue = Promise.resolve();
845 }
846
847 // We want to iterate over the keys, since the keys are the stream ids
848 // tslint:disable-next-line:forin
849 for (const streamId in streams) {
850 streams[streamId].subscribe({
851 complete: () => {
852 promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createCompletionMessage(streamId)));
853 },
854 error: (err) => {
855 let message: string;
856 if (err instanceof Error) {
857 message = err.message;
858 } else if (err && err.toString) {
859 message = err.toString();
860 } else {
861 message = "Unknown error";
862 }
863
864 promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createCompletionMessage(streamId, message)));
865 },
866 next: (item) => {
867 promiseQueue = promiseQueue.then(() => this.sendWithProtocol(this.createStreamItemMessage(streamId, item)));
868 },
869 });
870 }
871 }
872
873 private replaceStreamingParams(args: any[]): [Array<IStreamResult<any>>, string[]] {
874 const streams: Array<IStreamResult<any>> = [];
875 const streamIds: string[] = [];
876 for (let i = 0; i < args.length; i++) {
877 const argument = args[i];
878 if (this.isObservable(argument)) {
879 const streamId = this.invocationId;
880 this.invocationId++;
881 // Store the stream for later use
882 streams[streamId] = argument;
883 streamIds.push(streamId.toString());
884
885 // remove stream from args
886 args.splice(i, 1);
887 }
888 }
889
890 return [streams, streamIds];
891 }
892
893 private isObservable(arg: any): arg is IStreamResult<any> {
894 // This allows other stream implementations to just work (like rxjs)
895 return arg && arg.subscribe && typeof arg.subscribe === "function";
896 }
897
898 private createStreamInvocation(methodName: string, args: any[], streamIds: string[]): StreamInvocationMessage {
899 const invocationId = this.invocationId;
900 this.invocationId++;
901
902 return {
903 arguments: args,
904 invocationId: invocationId.toString(),
905 streamIds,
906 target: methodName,
907 type: MessageType.StreamInvocation,
908 };
909 }
910
911 private createCancelInvocation(id: string): CancelInvocationMessage {
912 return {
913 invocationId: id,
914 type: MessageType.CancelInvocation,
915 };
916 }
917
918 private createStreamItemMessage(id: string, item: any): StreamItemMessage {
919 return {
920 invocationId: id,
921 item,
922 type: MessageType.StreamItem,
923 };
924 }
925
926 private createCompletionMessage(id: string, error?: any, result?: any): CompletionMessage {
927 if (error) {
928 return {
929 error,
930 invocationId: id,
931 type: MessageType.Completion,
932 };
933 }
934
935 return {
936 invocationId: id,
937 result,
938 type: MessageType.Completion,
939 };
940 }
941}
942
\No newline at end of file