1 | import { BYTE } from './byte.js';
|
2 | import { FrameImpl } from './frame-impl.js';
|
3 | import { Parser } from './parser.js';
|
4 | import { StompSocketState, } from './types.js';
|
5 | import { Versions } from './versions.js';
|
6 | import { augmentWebsocket } from './augment-websocket.js';
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | export class StompHandler {
|
15 | constructor(_client, _webSocket, config) {
|
16 | this._client = _client;
|
17 | this._webSocket = _webSocket;
|
18 | this._connected = false;
|
19 | this._serverFrameHandlers = {
|
20 |
|
21 | CONNECTED: frame => {
|
22 | this.debug(`connected to server ${frame.headers.server}`);
|
23 | this._connected = true;
|
24 | this._connectedVersion = frame.headers.version;
|
25 |
|
26 | if (this._connectedVersion === Versions.V1_2) {
|
27 | this._escapeHeaderValues = true;
|
28 | }
|
29 | this._setupHeartbeat(frame.headers);
|
30 | this.onConnect(frame);
|
31 | },
|
32 |
|
33 | MESSAGE: frame => {
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 | const subscription = frame.headers.subscription;
|
42 | const onReceive = this._subscriptions[subscription] || this.onUnhandledMessage;
|
43 |
|
44 | const message = frame;
|
45 | const client = this;
|
46 | const messageId = this._connectedVersion === Versions.V1_2
|
47 | ? message.headers.ack
|
48 | : message.headers['message-id'];
|
49 |
|
50 |
|
51 | message.ack = (headers = {}) => {
|
52 | return client.ack(messageId, subscription, headers);
|
53 | };
|
54 | message.nack = (headers = {}) => {
|
55 | return client.nack(messageId, subscription, headers);
|
56 | };
|
57 | onReceive(message);
|
58 | },
|
59 |
|
60 | RECEIPT: frame => {
|
61 | const callback = this._receiptWatchers[frame.headers['receipt-id']];
|
62 | if (callback) {
|
63 | callback(frame);
|
64 |
|
65 | delete this._receiptWatchers[frame.headers['receipt-id']];
|
66 | }
|
67 | else {
|
68 | this.onUnhandledReceipt(frame);
|
69 | }
|
70 | },
|
71 |
|
72 | ERROR: frame => {
|
73 | this.onStompError(frame);
|
74 | },
|
75 | };
|
76 |
|
77 | this._counter = 0;
|
78 |
|
79 | this._subscriptions = {};
|
80 |
|
81 | this._receiptWatchers = {};
|
82 | this._partialData = '';
|
83 | this._escapeHeaderValues = false;
|
84 | this._lastServerActivityTS = Date.now();
|
85 | this.debug = config.debug;
|
86 | this.stompVersions = config.stompVersions;
|
87 | this.connectHeaders = config.connectHeaders;
|
88 | this.disconnectHeaders = config.disconnectHeaders;
|
89 | this.heartbeatIncoming = config.heartbeatIncoming;
|
90 | this.heartbeatOutgoing = config.heartbeatOutgoing;
|
91 | this.splitLargeFrames = config.splitLargeFrames;
|
92 | this.maxWebSocketChunkSize = config.maxWebSocketChunkSize;
|
93 | this.forceBinaryWSFrames = config.forceBinaryWSFrames;
|
94 | this.logRawCommunication = config.logRawCommunication;
|
95 | this.appendMissingNULLonIncoming = config.appendMissingNULLonIncoming;
|
96 | this.discardWebsocketOnCommFailure = config.discardWebsocketOnCommFailure;
|
97 | this.onConnect = config.onConnect;
|
98 | this.onDisconnect = config.onDisconnect;
|
99 | this.onStompError = config.onStompError;
|
100 | this.onWebSocketClose = config.onWebSocketClose;
|
101 | this.onWebSocketError = config.onWebSocketError;
|
102 | this.onUnhandledMessage = config.onUnhandledMessage;
|
103 | this.onUnhandledReceipt = config.onUnhandledReceipt;
|
104 | this.onUnhandledFrame = config.onUnhandledFrame;
|
105 | }
|
106 | get connectedVersion() {
|
107 | return this._connectedVersion;
|
108 | }
|
109 | get connected() {
|
110 | return this._connected;
|
111 | }
|
112 | start() {
|
113 | const parser = new Parser(
|
114 |
|
115 | rawFrame => {
|
116 | const frame = FrameImpl.fromRawFrame(rawFrame, this._escapeHeaderValues);
|
117 |
|
118 | if (!this.logRawCommunication) {
|
119 | this.debug(`<<< ${frame}`);
|
120 | }
|
121 | const serverFrameHandler = this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
|
122 | serverFrameHandler(frame);
|
123 | },
|
124 |
|
125 | () => {
|
126 | this.debug('<<< PONG');
|
127 | });
|
128 | this._webSocket.onmessage = (evt) => {
|
129 | this.debug('Received data');
|
130 | this._lastServerActivityTS = Date.now();
|
131 | if (this.logRawCommunication) {
|
132 | const rawChunkAsString = evt.data instanceof ArrayBuffer
|
133 | ? new TextDecoder().decode(evt.data)
|
134 | : evt.data;
|
135 | this.debug(`<<< ${rawChunkAsString}`);
|
136 | }
|
137 | parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
|
138 | };
|
139 | this._webSocket.onclose = (closeEvent) => {
|
140 | this.debug(`Connection closed to ${this._webSocket.url}`);
|
141 | this._cleanUp();
|
142 | this.onWebSocketClose(closeEvent);
|
143 | };
|
144 | this._webSocket.onerror = (errorEvent) => {
|
145 | this.onWebSocketError(errorEvent);
|
146 | };
|
147 | this._webSocket.onopen = () => {
|
148 |
|
149 | const connectHeaders = Object.assign({}, this.connectHeaders);
|
150 | this.debug('Web Socket Opened...');
|
151 | connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
|
152 | connectHeaders['heart-beat'] = [
|
153 | this.heartbeatOutgoing,
|
154 | this.heartbeatIncoming,
|
155 | ].join(',');
|
156 | this._transmit({ command: 'CONNECT', headers: connectHeaders });
|
157 | };
|
158 | }
|
159 | _setupHeartbeat(headers) {
|
160 | if (headers.version !== Versions.V1_1 &&
|
161 | headers.version !== Versions.V1_2) {
|
162 | return;
|
163 | }
|
164 |
|
165 |
|
166 | if (!headers['heart-beat']) {
|
167 | return;
|
168 | }
|
169 |
|
170 |
|
171 |
|
172 | const [serverOutgoing, serverIncoming] = headers['heart-beat']
|
173 | .split(',')
|
174 | .map((v) => parseInt(v, 10));
|
175 | if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
|
176 | const ttl = Math.max(this.heartbeatOutgoing, serverIncoming);
|
177 | this.debug(`send PING every ${ttl}ms`);
|
178 | this._pinger = setInterval(() => {
|
179 | if (this._webSocket.readyState === StompSocketState.OPEN) {
|
180 | this._webSocket.send(BYTE.LF);
|
181 | this.debug('>>> PING');
|
182 | }
|
183 | }, ttl);
|
184 | }
|
185 | if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
|
186 | const ttl = Math.max(this.heartbeatIncoming, serverOutgoing);
|
187 | this.debug(`check PONG every ${ttl}ms`);
|
188 | this._ponger = setInterval(() => {
|
189 | const delta = Date.now() - this._lastServerActivityTS;
|
190 |
|
191 | if (delta > ttl * 2) {
|
192 | this.debug(`did not receive server activity for the last ${delta}ms`);
|
193 | this._closeOrDiscardWebsocket();
|
194 | }
|
195 | }, ttl);
|
196 | }
|
197 | }
|
198 | _closeOrDiscardWebsocket() {
|
199 | if (this.discardWebsocketOnCommFailure) {
|
200 | this.debug('Discarding websocket, the underlying socket may linger for a while');
|
201 | this.discardWebsocket();
|
202 | }
|
203 | else {
|
204 | this.debug('Issuing close on the websocket');
|
205 | this._closeWebsocket();
|
206 | }
|
207 | }
|
208 | forceDisconnect() {
|
209 | if (this._webSocket) {
|
210 | if (this._webSocket.readyState === StompSocketState.CONNECTING ||
|
211 | this._webSocket.readyState === StompSocketState.OPEN) {
|
212 | this._closeOrDiscardWebsocket();
|
213 | }
|
214 | }
|
215 | }
|
216 | _closeWebsocket() {
|
217 | this._webSocket.onmessage = () => { };
|
218 | this._webSocket.close();
|
219 | }
|
220 | discardWebsocket() {
|
221 | if (typeof this._webSocket.terminate !== 'function') {
|
222 | augmentWebsocket(this._webSocket, (msg) => this.debug(msg));
|
223 | }
|
224 |
|
225 | this._webSocket.terminate();
|
226 | }
|
227 | _transmit(params) {
|
228 | const { command, headers, body, binaryBody, skipContentLengthHeader } = params;
|
229 | const frame = new FrameImpl({
|
230 | command,
|
231 | headers,
|
232 | body,
|
233 | binaryBody,
|
234 | escapeHeaderValues: this._escapeHeaderValues,
|
235 | skipContentLengthHeader,
|
236 | });
|
237 | let rawChunk = frame.serialize();
|
238 | if (this.logRawCommunication) {
|
239 | this.debug(`>>> ${rawChunk}`);
|
240 | }
|
241 | else {
|
242 | this.debug(`>>> ${frame}`);
|
243 | }
|
244 | if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
|
245 | rawChunk = new TextEncoder().encode(rawChunk);
|
246 | }
|
247 | if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
|
248 | this._webSocket.send(rawChunk);
|
249 | }
|
250 | else {
|
251 | let out = rawChunk;
|
252 | while (out.length > 0) {
|
253 | const chunk = out.substring(0, this.maxWebSocketChunkSize);
|
254 | out = out.substring(this.maxWebSocketChunkSize);
|
255 | this._webSocket.send(chunk);
|
256 | this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
|
257 | }
|
258 | }
|
259 | }
|
260 | dispose() {
|
261 | if (this.connected) {
|
262 | try {
|
263 |
|
264 | const disconnectHeaders = Object.assign({}, this.disconnectHeaders);
|
265 | if (!disconnectHeaders.receipt) {
|
266 | disconnectHeaders.receipt = `close-${this._counter++}`;
|
267 | }
|
268 | this.watchForReceipt(disconnectHeaders.receipt, frame => {
|
269 | this._closeWebsocket();
|
270 | this._cleanUp();
|
271 | this.onDisconnect(frame);
|
272 | });
|
273 | this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
|
274 | }
|
275 | catch (error) {
|
276 | this.debug(`Ignoring error during disconnect ${error}`);
|
277 | }
|
278 | }
|
279 | else {
|
280 | if (this._webSocket.readyState === StompSocketState.CONNECTING ||
|
281 | this._webSocket.readyState === StompSocketState.OPEN) {
|
282 | this._closeWebsocket();
|
283 | }
|
284 | }
|
285 | }
|
286 | _cleanUp() {
|
287 | this._connected = false;
|
288 | if (this._pinger) {
|
289 | clearInterval(this._pinger);
|
290 | this._pinger = undefined;
|
291 | }
|
292 | if (this._ponger) {
|
293 | clearInterval(this._ponger);
|
294 | this._ponger = undefined;
|
295 | }
|
296 | }
|
297 | publish(params) {
|
298 | const { destination, headers, body, binaryBody, skipContentLengthHeader } = params;
|
299 | const hdrs = Object.assign({ destination }, headers);
|
300 | this._transmit({
|
301 | command: 'SEND',
|
302 | headers: hdrs,
|
303 | body,
|
304 | binaryBody,
|
305 | skipContentLengthHeader,
|
306 | });
|
307 | }
|
308 | watchForReceipt(receiptId, callback) {
|
309 | this._receiptWatchers[receiptId] = callback;
|
310 | }
|
311 | subscribe(destination, callback, headers = {}) {
|
312 | headers = Object.assign({}, headers);
|
313 | if (!headers.id) {
|
314 | headers.id = `sub-${this._counter++}`;
|
315 | }
|
316 | headers.destination = destination;
|
317 | this._subscriptions[headers.id] = callback;
|
318 | this._transmit({ command: 'SUBSCRIBE', headers });
|
319 | const client = this;
|
320 | return {
|
321 | id: headers.id,
|
322 | unsubscribe(hdrs) {
|
323 | return client.unsubscribe(headers.id, hdrs);
|
324 | },
|
325 | };
|
326 | }
|
327 | unsubscribe(id, headers = {}) {
|
328 | headers = Object.assign({}, headers);
|
329 | delete this._subscriptions[id];
|
330 | headers.id = id;
|
331 | this._transmit({ command: 'UNSUBSCRIBE', headers });
|
332 | }
|
333 | begin(transactionId) {
|
334 | const txId = transactionId || `tx-${this._counter++}`;
|
335 | this._transmit({
|
336 | command: 'BEGIN',
|
337 | headers: {
|
338 | transaction: txId,
|
339 | },
|
340 | });
|
341 | const client = this;
|
342 | return {
|
343 | id: txId,
|
344 | commit() {
|
345 | client.commit(txId);
|
346 | },
|
347 | abort() {
|
348 | client.abort(txId);
|
349 | },
|
350 | };
|
351 | }
|
352 | commit(transactionId) {
|
353 | this._transmit({
|
354 | command: 'COMMIT',
|
355 | headers: {
|
356 | transaction: transactionId,
|
357 | },
|
358 | });
|
359 | }
|
360 | abort(transactionId) {
|
361 | this._transmit({
|
362 | command: 'ABORT',
|
363 | headers: {
|
364 | transaction: transactionId,
|
365 | },
|
366 | });
|
367 | }
|
368 | ack(messageId, subscriptionId, headers = {}) {
|
369 | headers = Object.assign({}, headers);
|
370 | if (this._connectedVersion === Versions.V1_2) {
|
371 | headers.id = messageId;
|
372 | }
|
373 | else {
|
374 | headers['message-id'] = messageId;
|
375 | }
|
376 | headers.subscription = subscriptionId;
|
377 | this._transmit({ command: 'ACK', headers });
|
378 | }
|
379 | nack(messageId, subscriptionId, headers = {}) {
|
380 | headers = Object.assign({}, headers);
|
381 | if (this._connectedVersion === Versions.V1_2) {
|
382 | headers.id = messageId;
|
383 | }
|
384 | else {
|
385 | headers['message-id'] = messageId;
|
386 | }
|
387 | headers.subscription = subscriptionId;
|
388 | return this._transmit({ command: 'NACK', headers });
|
389 | }
|
390 | }
|
391 |
|
\ | No newline at end of file |