1 |
|
2 |
|
3 |
|
4 | import { HttpClient } from "./HttpClient";
|
5 | import { ILogger, LogLevel } from "./ILogger";
|
6 | import { ITransport, TransferFormat } from "./ITransport";
|
7 | import { EventSourceConstructor } from "./Polyfills";
|
8 | import { Arg, getDataDetail, Platform, sendMessage } from "./Utils";
|
9 |
|
10 |
|
11 | export class ServerSentEventsTransport implements ITransport {
|
12 | private readonly httpClient: HttpClient;
|
13 | private readonly accessTokenFactory: (() => string | Promise<string>) | undefined;
|
14 | private readonly logger: ILogger;
|
15 | private readonly logMessageContent: boolean;
|
16 | private readonly eventSourceConstructor: EventSourceConstructor;
|
17 | private eventSource?: EventSource;
|
18 | private url?: string;
|
19 |
|
20 | public onreceive: ((data: string | ArrayBuffer) => void) | null;
|
21 | public onclose: ((error?: Error) => void) | null;
|
22 |
|
23 | constructor(httpClient: HttpClient, accessTokenFactory: (() => string | Promise<string>) | undefined, logger: ILogger,
|
24 | logMessageContent: boolean, eventSourceConstructor: EventSourceConstructor) {
|
25 | this.httpClient = httpClient;
|
26 | this.accessTokenFactory = accessTokenFactory;
|
27 | this.logger = logger;
|
28 | this.logMessageContent = logMessageContent;
|
29 | this.eventSourceConstructor = eventSourceConstructor;
|
30 |
|
31 | this.onreceive = null;
|
32 | this.onclose = null;
|
33 | }
|
34 |
|
35 | public async connect(url: string, transferFormat: TransferFormat): Promise<void> {
|
36 | Arg.isRequired(url, "url");
|
37 | Arg.isRequired(transferFormat, "transferFormat");
|
38 | Arg.isIn(transferFormat, TransferFormat, "transferFormat");
|
39 |
|
40 | this.logger.log(LogLevel.Trace, "(SSE transport) Connecting.");
|
41 |
|
42 | // set url before accessTokenFactory because this.url is only for send and we set the auth header instead of the query string for send
|
43 | this.url = url;
|
44 |
|
45 | if (this.accessTokenFactory) {
|
46 | const token = await this.accessTokenFactory();
|
47 | if (token) {
|
48 | url += (url.indexOf("?") < 0 ? "?" : "&") + `access_token=${encodeURIComponent(token)}`;
|
49 | }
|
50 | }
|
51 |
|
52 | return new Promise<void>((resolve, reject) => {
|
53 | let opened = false;
|
54 | if (transferFormat !== TransferFormat.Text) {
|
55 | reject(new Error("The Server-Sent Events transport only supports the 'Text' transfer format"));
|
56 | return;
|
57 | }
|
58 |
|
59 | let eventSource: EventSource;
|
60 | if (Platform.isBrowser || Platform.isWebWorker) {
|
61 | eventSource = new this.eventSourceConstructor(url, { withCredentials: true });
|
62 | } else {
|
63 |
|
64 | const cookies = this.httpClient.getCookieString(url);
|
65 | eventSource = new this.eventSourceConstructor(url, { withCredentials: true, headers: { Cookie: cookies } } as EventSourceInit);
|
66 | }
|
67 |
|
68 | try {
|
69 | eventSource.onmessage = (e: MessageEvent) => {
|
70 | if (this.onreceive) {
|
71 | try {
|
72 | this.logger.log(LogLevel.Trace, `(SSE transport) data received. ${getDataDetail(e.data, this.logMessageContent)}.`);
|
73 | this.onreceive(e.data);
|
74 | } catch (error) {
|
75 | this.close(error);
|
76 | return;
|
77 | }
|
78 | }
|
79 | };
|
80 |
|
81 | eventSource.onerror = (e: MessageEvent) => {
|
82 | const error = new Error(e.data || "Error occurred");
|
83 | if (opened) {
|
84 | this.close(error);
|
85 | } else {
|
86 | reject(error);
|
87 | }
|
88 | };
|
89 |
|
90 | eventSource.onopen = () => {
|
91 | this.logger.log(LogLevel.Information, `SSE connected to ${this.url}`);
|
92 | this.eventSource = eventSource;
|
93 | opened = true;
|
94 | resolve();
|
95 | };
|
96 | } catch (e) {
|
97 | reject(e);
|
98 | return;
|
99 | }
|
100 | });
|
101 | }
|
102 |
|
103 | public async send(data: any): Promise<void> {
|
104 | if (!this.eventSource) {
|
105 | return Promise.reject(new Error("Cannot send until the transport is connected"));
|
106 | }
|
107 | return sendMessage(this.logger, "SSE", this.httpClient, this.url!, this.accessTokenFactory, data, this.logMessageContent);
|
108 | }
|
109 |
|
110 | public stop(): Promise<void> {
|
111 | this.close();
|
112 | return Promise.resolve();
|
113 | }
|
114 |
|
115 | private close(e?: Error) {
|
116 | if (this.eventSource) {
|
117 | this.eventSource.close();
|
118 | this.eventSource = undefined;
|
119 |
|
120 | if (this.onclose) {
|
121 | this.onclose(e);
|
122 | }
|
123 | }
|
124 | }
|
125 | }
|
126 |
|
\ | No newline at end of file |