UNPKG

16 kBPlain TextView Raw
1import { BYTE } from './byte.js';
2import { Client } from './client.js';
3import { FrameImpl } from './frame-impl.js';
4import { IMessage } from './i-message.js';
5import { ITransaction } from './i-transaction.js';
6import { Parser } from './parser.js';
7import { StompHeaders } from './stomp-headers.js';
8import { StompSubscription } from './stomp-subscription.js';
9import {
10 closeEventCallbackType,
11 debugFnType,
12 frameCallbackType,
13 IPublishParams,
14 IStompSocket,
15 IStompSocketMessageEvent,
16 IStomptHandlerConfig,
17 messageCallbackType,
18 StompSocketState,
19 wsErrorCallbackType,
20} from './types.js';
21import { Versions } from './versions.js';
22import { augmentWebsocket } from './augment-websocket.js';
23
24/**
25 * The STOMP protocol handler
26 *
27 * Part of `@stomp/stompjs`.
28 *
29 * @internal
30 */
31export class StompHandler {
32 public debug: debugFnType;
33
34 public stompVersions: Versions;
35
36 public connectHeaders: StompHeaders;
37
38 public disconnectHeaders: StompHeaders;
39
40 public heartbeatIncoming: number;
41
42 public heartbeatOutgoing: number;
43
44 public onUnhandledMessage: messageCallbackType;
45
46 public onUnhandledReceipt: frameCallbackType;
47
48 public onUnhandledFrame: frameCallbackType;
49
50 public onConnect: frameCallbackType;
51
52 public onDisconnect: frameCallbackType;
53
54 public onStompError: frameCallbackType;
55
56 public onWebSocketClose: closeEventCallbackType;
57
58 public onWebSocketError: wsErrorCallbackType;
59
60 public logRawCommunication: boolean;
61
62 public splitLargeFrames: boolean;
63
64 public maxWebSocketChunkSize: number;
65
66 public forceBinaryWSFrames: boolean;
67
68 public appendMissingNULLonIncoming: boolean;
69
70 public discardWebsocketOnCommFailure: boolean;
71
72 get connectedVersion(): string | undefined {
73 return this._connectedVersion;
74 }
75 private _connectedVersion: string | undefined;
76
77 get connected(): boolean {
78 return this._connected;
79 }
80
81 private _connected: boolean = false;
82
83 private readonly _subscriptions: { [key: string]: messageCallbackType };
84 private readonly _receiptWatchers: { [key: string]: frameCallbackType };
85 private _partialData: string;
86 private _escapeHeaderValues: boolean;
87 private _counter: number;
88 private _pinger: any;
89 private _ponger: any;
90 private _lastServerActivityTS: number;
91
92 constructor(
93 private _client: Client,
94 public _webSocket: IStompSocket,
95 config: IStomptHandlerConfig
96 ) {
97 // used to index subscribers
98 this._counter = 0;
99
100 // subscription callbacks indexed by subscriber's ID
101 this._subscriptions = {};
102
103 // receipt-watchers indexed by receipts-ids
104 this._receiptWatchers = {};
105
106 this._partialData = '';
107
108 this._escapeHeaderValues = false;
109
110 this._lastServerActivityTS = Date.now();
111
112 this.debug = config.debug;
113 this.stompVersions = config.stompVersions;
114 this.connectHeaders = config.connectHeaders;
115 this.disconnectHeaders = config.disconnectHeaders;
116 this.heartbeatIncoming = config.heartbeatIncoming;
117 this.heartbeatOutgoing = config.heartbeatOutgoing;
118 this.splitLargeFrames = config.splitLargeFrames;
119 this.maxWebSocketChunkSize = config.maxWebSocketChunkSize;
120 this.forceBinaryWSFrames = config.forceBinaryWSFrames;
121 this.logRawCommunication = config.logRawCommunication;
122 this.appendMissingNULLonIncoming = config.appendMissingNULLonIncoming;
123 this.discardWebsocketOnCommFailure = config.discardWebsocketOnCommFailure;
124 this.onConnect = config.onConnect;
125 this.onDisconnect = config.onDisconnect;
126 this.onStompError = config.onStompError;
127 this.onWebSocketClose = config.onWebSocketClose;
128 this.onWebSocketError = config.onWebSocketError;
129 this.onUnhandledMessage = config.onUnhandledMessage;
130 this.onUnhandledReceipt = config.onUnhandledReceipt;
131 this.onUnhandledFrame = config.onUnhandledFrame;
132 }
133
134 public start(): void {
135 const parser = new Parser(
136 // On Frame
137 rawFrame => {
138 const frame = FrameImpl.fromRawFrame(
139 rawFrame,
140 this._escapeHeaderValues
141 );
142
143 // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
144 if (!this.logRawCommunication) {
145 this.debug(`<<< ${frame}`);
146 }
147
148 const serverFrameHandler =
149 this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
150 serverFrameHandler(frame);
151 },
152 // On Incoming Ping
153 () => {
154 this.debug('<<< PONG');
155 }
156 );
157
158 this._webSocket.onmessage = (evt: IStompSocketMessageEvent) => {
159 this.debug('Received data');
160 this._lastServerActivityTS = Date.now();
161
162 if (this.logRawCommunication) {
163 const rawChunkAsString =
164 evt.data instanceof ArrayBuffer
165 ? new TextDecoder().decode(evt.data)
166 : evt.data;
167 this.debug(`<<< ${rawChunkAsString}`);
168 }
169
170 parser.parseChunk(
171 evt.data as string | ArrayBuffer,
172 this.appendMissingNULLonIncoming
173 );
174 };
175
176 this._webSocket.onclose = (closeEvent): void => {
177 this.debug(`Connection closed to ${this._webSocket.url}`);
178 this._cleanUp();
179 this.onWebSocketClose(closeEvent);
180 };
181
182 this._webSocket.onerror = (errorEvent): void => {
183 this.onWebSocketError(errorEvent);
184 };
185
186 this._webSocket.onopen = () => {
187 // Clone before updating
188 const connectHeaders = (Object as any).assign({}, this.connectHeaders);
189
190 this.debug('Web Socket Opened...');
191 connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
192 connectHeaders['heart-beat'] = [
193 this.heartbeatOutgoing,
194 this.heartbeatIncoming,
195 ].join(',');
196 this._transmit({ command: 'CONNECT', headers: connectHeaders });
197 };
198 }
199
200 private readonly _serverFrameHandlers: {
201 [key: string]: frameCallbackType;
202 } = {
203 // [CONNECTED Frame](https://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
204 CONNECTED: frame => {
205 this.debug(`connected to server ${frame.headers.server}`);
206 this._connected = true;
207 this._connectedVersion = frame.headers.version;
208 // STOMP version 1.2 needs header values to be escaped
209 if (this._connectedVersion === Versions.V1_2) {
210 this._escapeHeaderValues = true;
211 }
212
213 this._setupHeartbeat(frame.headers);
214 this.onConnect(frame);
215 },
216
217 // [MESSAGE Frame](https://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
218 MESSAGE: frame => {
219 // the callback is registered when the client calls
220 // `subscribe()`.
221 // If there is no registered subscription for the received message,
222 // the default `onUnhandledMessage` callback is used that the client can set.
223 // This is useful for subscriptions that are automatically created
224 // on the browser side (e.g. [RabbitMQ's temporary
225 // queues](https://www.rabbitmq.com/stomp.html)).
226 const subscription = frame.headers.subscription;
227 const onReceive =
228 this._subscriptions[subscription] || this.onUnhandledMessage;
229
230 // bless the frame to be a Message
231 const message = frame as IMessage;
232
233 const client = this;
234 const messageId =
235 this._connectedVersion === Versions.V1_2
236 ? message.headers.ack
237 : message.headers['message-id'];
238
239 // add `ack()` and `nack()` methods directly to the returned frame
240 // so that a simple call to `message.ack()` can acknowledge the message.
241 message.ack = (headers: StompHeaders = {}): void => {
242 return client.ack(messageId, subscription, headers);
243 };
244 message.nack = (headers: StompHeaders = {}): void => {
245 return client.nack(messageId, subscription, headers);
246 };
247 onReceive(message);
248 },
249
250 // [RECEIPT Frame](https://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
251 RECEIPT: frame => {
252 const callback = this._receiptWatchers[frame.headers['receipt-id']];
253 if (callback) {
254 callback(frame);
255 // Server will acknowledge only once, remove the callback
256 delete this._receiptWatchers[frame.headers['receipt-id']];
257 } else {
258 this.onUnhandledReceipt(frame);
259 }
260 },
261
262 // [ERROR Frame](https://stomp.github.com/stomp-specification-1.2.html#ERROR)
263 ERROR: frame => {
264 this.onStompError(frame);
265 },
266 };
267
268 private _setupHeartbeat(headers: StompHeaders): void {
269 if (
270 headers.version !== Versions.V1_1 &&
271 headers.version !== Versions.V1_2
272 ) {
273 return;
274 }
275
276 // It is valid for the server to not send this header
277 // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
278 if (!headers['heart-beat']) {
279 return;
280 }
281
282 // heart-beat header received from the server looks like:
283 //
284 // heart-beat: sx, sy
285 const [serverOutgoing, serverIncoming] = headers['heart-beat']
286 .split(',')
287 .map((v: string) => parseInt(v, 10));
288
289 if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
290 const ttl: number = Math.max(this.heartbeatOutgoing, serverIncoming);
291 this.debug(`send PING every ${ttl}ms`);
292 this._pinger = setInterval(() => {
293 if (this._webSocket.readyState === StompSocketState.OPEN) {
294 this._webSocket.send(BYTE.LF);
295 this.debug('>>> PING');
296 }
297 }, ttl);
298 }
299
300 if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
301 const ttl: number = Math.max(this.heartbeatIncoming, serverOutgoing);
302 this.debug(`check PONG every ${ttl}ms`);
303 this._ponger = setInterval(() => {
304 const delta = Date.now() - this._lastServerActivityTS;
305 // We wait twice the TTL to be flexible on window's setInterval calls
306 if (delta > ttl * 2) {
307 this.debug(`did not receive server activity for the last ${delta}ms`);
308 this._closeOrDiscardWebsocket();
309 }
310 }, ttl);
311 }
312 }
313
314 private _closeOrDiscardWebsocket() {
315 if (this.discardWebsocketOnCommFailure) {
316 this.debug(
317 'Discarding websocket, the underlying socket may linger for a while'
318 );
319 this.discardWebsocket();
320 } else {
321 this.debug('Issuing close on the websocket');
322 this._closeWebsocket();
323 }
324 }
325
326 public forceDisconnect() {
327 if (this._webSocket) {
328 if (
329 this._webSocket.readyState === StompSocketState.CONNECTING ||
330 this._webSocket.readyState === StompSocketState.OPEN
331 ) {
332 this._closeOrDiscardWebsocket();
333 }
334 }
335 }
336
337 public _closeWebsocket() {
338 this._webSocket.onmessage = () => {}; // ignore messages
339 this._webSocket.close();
340 }
341
342 public discardWebsocket() {
343 if (typeof this._webSocket.terminate !== 'function') {
344 augmentWebsocket(this._webSocket, (msg: string) => this.debug(msg));
345 }
346
347 // @ts-ignore - this method will be there at this stage
348 this._webSocket.terminate();
349 }
350
351 private _transmit(params: {
352 command: string;
353 headers?: StompHeaders;
354 body?: string;
355 binaryBody?: Uint8Array;
356 skipContentLengthHeader?: boolean;
357 }): void {
358 const { command, headers, body, binaryBody, skipContentLengthHeader } =
359 params;
360 const frame = new FrameImpl({
361 command,
362 headers,
363 body,
364 binaryBody,
365 escapeHeaderValues: this._escapeHeaderValues,
366 skipContentLengthHeader,
367 });
368
369 let rawChunk = frame.serialize();
370
371 if (this.logRawCommunication) {
372 this.debug(`>>> ${rawChunk}`);
373 } else {
374 this.debug(`>>> ${frame}`);
375 }
376
377 if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
378 rawChunk = new TextEncoder().encode(rawChunk);
379 }
380
381 if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
382 this._webSocket.send(rawChunk);
383 } else {
384 let out = rawChunk as string;
385 while (out.length > 0) {
386 const chunk = out.substring(0, this.maxWebSocketChunkSize);
387 out = out.substring(this.maxWebSocketChunkSize);
388 this._webSocket.send(chunk);
389 this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
390 }
391 }
392 }
393
394 public dispose(): void {
395 if (this.connected) {
396 try {
397 // clone before updating
398 const disconnectHeaders = (Object as any).assign(
399 {},
400 this.disconnectHeaders
401 );
402
403 if (!disconnectHeaders.receipt) {
404 disconnectHeaders.receipt = `close-${this._counter++}`;
405 }
406 this.watchForReceipt(disconnectHeaders.receipt, frame => {
407 this._closeWebsocket();
408 this._cleanUp();
409 this.onDisconnect(frame);
410 });
411 this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
412 } catch (error) {
413 this.debug(`Ignoring error during disconnect ${error}`);
414 }
415 } else {
416 if (
417 this._webSocket.readyState === StompSocketState.CONNECTING ||
418 this._webSocket.readyState === StompSocketState.OPEN
419 ) {
420 this._closeWebsocket();
421 }
422 }
423 }
424
425 private _cleanUp() {
426 this._connected = false;
427
428 if (this._pinger) {
429 clearInterval(this._pinger);
430 this._pinger = undefined;
431 }
432 if (this._ponger) {
433 clearInterval(this._ponger);
434 this._ponger = undefined;
435 }
436 }
437
438 public publish(params: IPublishParams): void {
439 const { destination, headers, body, binaryBody, skipContentLengthHeader } =
440 params;
441 const hdrs: StompHeaders = (Object as any).assign({ destination }, headers);
442 this._transmit({
443 command: 'SEND',
444 headers: hdrs,
445 body,
446 binaryBody,
447 skipContentLengthHeader,
448 });
449 }
450
451 public watchForReceipt(receiptId: string, callback: frameCallbackType): void {
452 this._receiptWatchers[receiptId] = callback;
453 }
454
455 public subscribe(
456 destination: string,
457 callback: messageCallbackType,
458 headers: StompHeaders = {}
459 ): StompSubscription {
460 headers = (Object as any).assign({}, headers);
461
462 if (!headers.id) {
463 headers.id = `sub-${this._counter++}`;
464 }
465 headers.destination = destination;
466 this._subscriptions[headers.id] = callback;
467 this._transmit({ command: 'SUBSCRIBE', headers });
468 const client = this;
469 return {
470 id: headers.id,
471
472 unsubscribe(hdrs) {
473 return client.unsubscribe(headers.id, hdrs);
474 },
475 };
476 }
477
478 public unsubscribe(id: string, headers: StompHeaders = {}): void {
479 headers = (Object as any).assign({}, headers);
480
481 delete this._subscriptions[id];
482 headers.id = id;
483 this._transmit({ command: 'UNSUBSCRIBE', headers });
484 }
485
486 public begin(transactionId: string): ITransaction {
487 const txId = transactionId || `tx-${this._counter++}`;
488 this._transmit({
489 command: 'BEGIN',
490 headers: {
491 transaction: txId,
492 },
493 });
494 const client = this;
495 return {
496 id: txId,
497 commit(): void {
498 client.commit(txId);
499 },
500 abort(): void {
501 client.abort(txId);
502 },
503 };
504 }
505
506 public commit(transactionId: string): void {
507 this._transmit({
508 command: 'COMMIT',
509 headers: {
510 transaction: transactionId,
511 },
512 });
513 }
514
515 public abort(transactionId: string): void {
516 this._transmit({
517 command: 'ABORT',
518 headers: {
519 transaction: transactionId,
520 },
521 });
522 }
523
524 public ack(
525 messageId: string,
526 subscriptionId: string,
527 headers: StompHeaders = {}
528 ): void {
529 headers = (Object as any).assign({}, headers);
530
531 if (this._connectedVersion === Versions.V1_2) {
532 headers.id = messageId;
533 } else {
534 headers['message-id'] = messageId;
535 }
536 headers.subscription = subscriptionId;
537 this._transmit({ command: 'ACK', headers });
538 }
539
540 public nack(
541 messageId: string,
542 subscriptionId: string,
543 headers: StompHeaders = {}
544 ): void {
545 headers = (Object as any).assign({}, headers);
546
547 if (this._connectedVersion === Versions.V1_2) {
548 headers.id = messageId;
549 } else {
550 headers['message-id'] = messageId;
551 }
552 headers.subscription = subscriptionId;
553 return this._transmit({ command: 'NACK', headers });
554 }
555}