UNPKG

8.32 kBJavaScriptView Raw
1/**
2 * Connection that wraps a socket and provides an interface to interact with
3 * the Home Assistant websocket API.
4 */
5import * as messages from "./messages.js";
6import { ERR_INVALID_AUTH, ERR_CONNECTION_LOST } from "./errors.js";
7const DEBUG = false;
8export class Connection {
9 constructor(socket, options) {
10 // connection options
11 // - setupRetry: amount of ms to retry when unable to connect on initial setup
12 // - createSocket: create a new Socket connection
13 this.options = options;
14 // id if next command to send
15 this.commandId = 1;
16 // info about active subscriptions and commands in flight
17 this.commands = new Map();
18 // map of event listeners
19 this.eventListeners = new Map();
20 // true if a close is requested by the user
21 this.closeRequested = false;
22 this.setSocket(socket);
23 }
24 get haVersion() {
25 return this.socket.haVersion;
26 }
27 setSocket(socket) {
28 const oldSocket = this.socket;
29 this.socket = socket;
30 socket.addEventListener("message", ev => this._handleMessage(ev));
31 socket.addEventListener("close", ev => this._handleClose(ev));
32 if (oldSocket) {
33 const oldCommands = this.commands;
34 // reset to original state
35 this.commandId = 1;
36 this.commands = new Map();
37 oldCommands.forEach(info => {
38 if ("subscribe" in info) {
39 info.subscribe().then(unsub => {
40 info.unsubscribe = unsub;
41 // We need to resolve this in case it wasn't resolved yet.
42 // This allows us to subscribe while we're disconnected
43 // and recover properly.
44 info.resolve();
45 });
46 }
47 });
48 this.fireEvent("ready");
49 }
50 }
51 addEventListener(eventType, callback) {
52 let listeners = this.eventListeners.get(eventType);
53 if (!listeners) {
54 listeners = [];
55 this.eventListeners.set(eventType, listeners);
56 }
57 listeners.push(callback);
58 }
59 removeEventListener(eventType, callback) {
60 const listeners = this.eventListeners.get(eventType);
61 if (!listeners) {
62 return;
63 }
64 const index = listeners.indexOf(callback);
65 if (index !== -1) {
66 listeners.splice(index, 1);
67 }
68 }
69 fireEvent(eventType, eventData) {
70 (this.eventListeners.get(eventType) || []).forEach(callback => callback(this, eventData));
71 }
72 close() {
73 this.closeRequested = true;
74 this.socket.close();
75 }
76 /**
77 * Subscribe to a specific or all events.
78 *
79 * @param callback Callback to be called when a new event fires
80 * @param eventType
81 * @returns promise that resolves to an unsubscribe function
82 */
83 async subscribeEvents(callback, eventType) {
84 return this.subscribeMessage(callback, messages.subscribeEvents(eventType));
85 }
86 ping() {
87 return this.sendMessagePromise(messages.ping());
88 }
89 sendMessage(message, commandId) {
90 if (DEBUG) {
91 console.log("Sending", message);
92 }
93 if (!commandId) {
94 commandId = this._genCmdId();
95 }
96 message.id = commandId;
97 this.socket.send(JSON.stringify(message));
98 }
99 sendMessagePromise(message) {
100 return new Promise((resolve, reject) => {
101 const commandId = this._genCmdId();
102 this.commands.set(commandId, { resolve, reject });
103 this.sendMessage(message, commandId);
104 });
105 }
106 /**
107 * Call a websocket command that starts a subscription on the backend.
108 *
109 * @param message the message to start the subscription
110 * @param callback the callback to be called when a new item arrives
111 * @returns promise that resolves to an unsubscribe function
112 */
113 async subscribeMessage(callback, subscribeMessage) {
114 // Command ID that will be used
115 const commandId = this._genCmdId();
116 let info;
117 await new Promise((resolve, reject) => {
118 // We store unsubscribe on info object. That way we can overwrite it in case
119 // we get disconnected and we have to subscribe again.
120 info = {
121 resolve,
122 reject,
123 callback,
124 subscribe: () => this.subscribeMessage(callback, subscribeMessage),
125 unsubscribe: async () => {
126 await this.sendMessagePromise(messages.unsubscribeEvents(commandId));
127 this.commands.delete(commandId);
128 }
129 };
130 this.commands.set(commandId, info);
131 try {
132 this.sendMessage(subscribeMessage, commandId);
133 }
134 catch (err) {
135 // Happens when the websocket is already closing.
136 // Don't have to handle the error, reconnect logic will pick it up.
137 }
138 });
139 return () => info.unsubscribe();
140 }
141 _handleMessage(event) {
142 const message = JSON.parse(event.data);
143 if (DEBUG) {
144 console.log("Received", message);
145 }
146 const info = this.commands.get(message.id);
147 switch (message.type) {
148 case "event":
149 if (info) {
150 info.callback(message.event);
151 }
152 else {
153 console.warn(`Received event for unknown subscription ${message.id}. Unsubscribing.`);
154 this.sendMessagePromise(messages.unsubscribeEvents(message.id));
155 }
156 break;
157 case "result":
158 // No info is fine. If just sendMessage is used, we did not store promise for result
159 if (info) {
160 if (message.success) {
161 info.resolve(message.result);
162 // Don't remove subscriptions.
163 if (!("subscribe" in info)) {
164 this.commands.delete(message.id);
165 }
166 }
167 else {
168 info.reject(message.error);
169 this.commands.delete(message.id);
170 }
171 }
172 break;
173 case "pong":
174 if (info) {
175 info.resolve();
176 this.commands.delete(message.id);
177 }
178 else {
179 console.warn(`Received unknown pong response ${message.id}`);
180 }
181 break;
182 default:
183 if (DEBUG) {
184 console.warn("Unhandled message", message);
185 }
186 }
187 }
188 _handleClose(ev) {
189 // Reject in-flight sendMessagePromise requests
190 this.commands.forEach(info => {
191 // We don't cancel subscribeEvents commands in flight
192 // as we will be able to recover them.
193 if (!("subscribe" in info)) {
194 info.reject(messages.error(ERR_CONNECTION_LOST, "Connection lost"));
195 }
196 });
197 if (this.closeRequested) {
198 return;
199 }
200 this.fireEvent("disconnected");
201 // Disable setupRetry, we control it here with auto-backoff
202 const options = Object.assign(Object.assign({}, this.options), { setupRetry: 0 });
203 const reconnect = (tries) => {
204 setTimeout(async () => {
205 if (DEBUG) {
206 console.log("Trying to reconnect");
207 }
208 try {
209 const socket = await options.createSocket(options);
210 this.setSocket(socket);
211 }
212 catch (err) {
213 if (err === ERR_INVALID_AUTH) {
214 this.fireEvent("reconnect-error", err);
215 }
216 else {
217 reconnect(tries + 1);
218 }
219 }
220 }, Math.min(tries, 5) * 1000);
221 };
222 reconnect(0);
223 }
224 _genCmdId() {
225 return ++this.commandId;
226 }
227}