UNPKG

15 kBPlain TextView Raw
1import { BYTE } from './byte';
2import { Client } from './client';
3import { FrameImpl } from './frame-impl';
4import { IMessage } from './i-message';
5import { ITransaction } from './i-transaction';
6import { Parser } from './parser';
7import { StompConfig } from './stomp-config';
8import { StompHeaders } from './stomp-headers';
9import { StompSubscription } from './stomp-subscription';
10import {
11 closeEventCallbackType,
12 debugFnType,
13 frameCallbackType,
14 IPublishParams,
15 IStompSocket,
16 IStompSocketMessageEvent,
17 messageCallbackType,
18 StompSocketState,
19 wsErrorCallbackType,
20} from './types';
21import { Versions } from './versions';
22import { augmentWebsocket } from './augment-websocket';
23
24/**
25 * The STOMP protocol handler
26 *
27 * Part of `@stomp/stompjs`.
28 *
29 * @internal
30 */
31export class StompHandler {
32 public debug: debugFnType;
33
34 public stompVersions: Versions;
35
36 public connectHeaders: StompHeaders;
37
38 public disconnectHeaders: StompHeaders;
39
40 public heartbeatIncoming: number;
41
42 public heartbeatOutgoing: number;
43
44 public onUnhandledMessage: messageCallbackType;
45
46 public onUnhandledReceipt: frameCallbackType;
47
48 public onUnhandledFrame: frameCallbackType;
49
50 public onConnect: frameCallbackType;
51
52 public onDisconnect: frameCallbackType;
53
54 public onStompError: frameCallbackType;
55
56 public onWebSocketClose: closeEventCallbackType;
57
58 public onWebSocketError: wsErrorCallbackType;
59
60 public logRawCommunication: boolean;
61
62 public splitLargeFrames: boolean;
63
64 public maxWebSocketChunkSize: number;
65
66 public forceBinaryWSFrames: boolean;
67
68 public appendMissingNULLonIncoming: boolean;
69
70 public discardWebsocketOnCommFailure: boolean;
71
72 get connectedVersion(): string {
73 return this._connectedVersion;
74 }
75 private _connectedVersion: string;
76
77 get connected(): boolean {
78 return this._connected;
79 }
80
81 private _connected: boolean;
82
83 private readonly _subscriptions: { [key: string]: messageCallbackType };
84 private readonly _receiptWatchers: { [key: string]: frameCallbackType };
85 private _partialData: string;
86 private _escapeHeaderValues: boolean;
87 private _counter: number;
88 private _pinger: any;
89 private _ponger: any;
90 private _lastServerActivityTS: number;
91
92 private _onclose: (closeEvent: any) => void;
93
94 constructor(
95 private _client: Client,
96 public _webSocket: IStompSocket,
97 config: StompConfig = {}
98 ) {
99 // used to index subscribers
100 this._counter = 0;
101
102 // subscription callbacks indexed by subscriber's ID
103 this._subscriptions = {};
104
105 // receipt-watchers indexed by receipts-ids
106 this._receiptWatchers = {};
107
108 this._partialData = '';
109
110 this._escapeHeaderValues = false;
111
112 this._lastServerActivityTS = Date.now();
113
114 this.configure(config);
115 }
116
117 public configure(conf: StompConfig): void {
118 // bulk assign all properties to this
119 (Object as any).assign(this, conf);
120 }
121
122 public start(): void {
123 const parser = new Parser(
124 // On Frame
125 rawFrame => {
126 const frame = FrameImpl.fromRawFrame(
127 rawFrame,
128 this._escapeHeaderValues
129 );
130
131 // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
132 if (!this.logRawCommunication) {
133 this.debug(`<<< ${frame}`);
134 }
135
136 const serverFrameHandler =
137 this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
138 serverFrameHandler(frame);
139 },
140 // On Incoming Ping
141 () => {
142 this.debug('<<< PONG');
143 }
144 );
145
146 this._webSocket.onmessage = (evt: IStompSocketMessageEvent) => {
147 this.debug('Received data');
148 this._lastServerActivityTS = Date.now();
149
150 if (this.logRawCommunication) {
151 const rawChunkAsString =
152 evt.data instanceof ArrayBuffer
153 ? new TextDecoder().decode(evt.data)
154 : evt.data;
155 this.debug(`<<< ${rawChunkAsString}`);
156 }
157
158 parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
159 };
160
161 this._onclose = (closeEvent): void => {
162 this.debug(`Connection closed to ${this._client.brokerURL}`);
163 this._cleanUp();
164 this.onWebSocketClose(closeEvent);
165 };
166
167 this._webSocket.onclose = this._onclose;
168
169 this._webSocket.onerror = (errorEvent): void => {
170 this.onWebSocketError(errorEvent);
171 };
172
173 this._webSocket.onopen = () => {
174 // Clone before updating
175 const connectHeaders = (Object as any).assign({}, this.connectHeaders);
176
177 this.debug('Web Socket Opened...');
178 connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
179 connectHeaders['heart-beat'] = [
180 this.heartbeatOutgoing,
181 this.heartbeatIncoming,
182 ].join(',');
183 this._transmit({ command: 'CONNECT', headers: connectHeaders });
184 };
185 }
186
187 private readonly _serverFrameHandlers: {
188 [key: string]: frameCallbackType;
189 } = {
190 // [CONNECTED Frame](http://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
191 CONNECTED: frame => {
192 this.debug(`connected to server ${frame.headers.server}`);
193 this._connected = true;
194 this._connectedVersion = frame.headers.version;
195 // STOMP version 1.2 needs header values to be escaped
196 if (this._connectedVersion === Versions.V1_2) {
197 this._escapeHeaderValues = true;
198 }
199
200 this._setupHeartbeat(frame.headers);
201 this.onConnect(frame);
202 },
203
204 // [MESSAGE Frame](http://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
205 MESSAGE: frame => {
206 // the callback is registered when the client calls
207 // `subscribe()`.
208 // If there is no registered subscription for the received message,
209 // the default `onUnhandledMessage` callback is used that the client can set.
210 // This is useful for subscriptions that are automatically created
211 // on the browser side (e.g. [RabbitMQ's temporary
212 // queues](http://www.rabbitmq.com/stomp.html)).
213 const subscription = frame.headers.subscription;
214 const onReceive =
215 this._subscriptions[subscription] || this.onUnhandledMessage;
216
217 // bless the frame to be a Message
218 const message = frame as IMessage;
219
220 const client = this;
221 const messageId =
222 this._connectedVersion === Versions.V1_2
223 ? message.headers.ack
224 : message.headers['message-id'];
225
226 // add `ack()` and `nack()` methods directly to the returned frame
227 // so that a simple call to `message.ack()` can acknowledge the message.
228 message.ack = (headers: StompHeaders = {}): void => {
229 return client.ack(messageId, subscription, headers);
230 };
231 message.nack = (headers: StompHeaders = {}): void => {
232 return client.nack(messageId, subscription, headers);
233 };
234 onReceive(message);
235 },
236
237 // [RECEIPT Frame](http://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
238 RECEIPT: frame => {
239 const callback = this._receiptWatchers[frame.headers['receipt-id']];
240 if (callback) {
241 callback(frame);
242 // Server will acknowledge only once, remove the callback
243 delete this._receiptWatchers[frame.headers['receipt-id']];
244 } else {
245 this.onUnhandledReceipt(frame);
246 }
247 },
248
249 // [ERROR Frame](http://stomp.github.com/stomp-specification-1.2.html#ERROR)
250 ERROR: frame => {
251 this.onStompError(frame);
252 },
253 };
254
255 private _setupHeartbeat(headers: StompHeaders): void {
256 if (
257 headers.version !== Versions.V1_1 &&
258 headers.version !== Versions.V1_2
259 ) {
260 return;
261 }
262
263 // It is valid for the server to not send this header
264 // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
265 if (!headers['heart-beat']) {
266 return;
267 }
268
269 // heart-beat header received from the server looks like:
270 //
271 // heart-beat: sx, sy
272 const [serverOutgoing, serverIncoming] = headers['heart-beat']
273 .split(',')
274 .map((v: string) => parseInt(v, 10));
275
276 if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
277 const ttl: number = Math.max(this.heartbeatOutgoing, serverIncoming);
278 this.debug(`send PING every ${ttl}ms`);
279 this._pinger = setInterval(() => {
280 if (this._webSocket.readyState === StompSocketState.OPEN) {
281 this._webSocket.send(BYTE.LF);
282 this.debug('>>> PING');
283 }
284 }, ttl);
285 }
286
287 if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
288 const ttl: number = Math.max(this.heartbeatIncoming, serverOutgoing);
289 this.debug(`check PONG every ${ttl}ms`);
290 this._ponger = setInterval(() => {
291 const delta = Date.now() - this._lastServerActivityTS;
292 // We wait twice the TTL to be flexible on window's setInterval calls
293 if (delta > ttl * 2) {
294 this.debug(`did not receive server activity for the last ${delta}ms`);
295 this._closeOrDiscardWebsocket();
296 }
297 }, ttl);
298 }
299 }
300
301 private _closeOrDiscardWebsocket() {
302 if (this.discardWebsocketOnCommFailure) {
303 this.debug(
304 'Discarding websocket, the underlying socket may linger for a while'
305 );
306 this._discardWebsocket();
307 } else {
308 this.debug('Issuing close on the websocket');
309 this._closeWebsocket();
310 }
311 }
312
313 public forceDisconnect() {
314 if (this._webSocket) {
315 if (
316 this._webSocket.readyState === StompSocketState.CONNECTING ||
317 this._webSocket.readyState === StompSocketState.OPEN
318 ) {
319 this._closeOrDiscardWebsocket();
320 }
321 }
322 }
323
324 public _closeWebsocket() {
325 this._webSocket.onmessage = () => {}; // ignore messages
326 this._webSocket.close();
327 }
328
329 private _discardWebsocket() {
330 if (!this._webSocket.terminate) {
331 augmentWebsocket(this._webSocket, (msg: string) => this.debug(msg));
332 }
333
334 this._webSocket.terminate();
335 }
336
337 private _transmit(params: {
338 command: string;
339 headers?: StompHeaders;
340 body?: string;
341 binaryBody?: Uint8Array;
342 skipContentLengthHeader?: boolean;
343 }): void {
344 const { command, headers, body, binaryBody, skipContentLengthHeader } =
345 params;
346 const frame = new FrameImpl({
347 command,
348 headers,
349 body,
350 binaryBody,
351 escapeHeaderValues: this._escapeHeaderValues,
352 skipContentLengthHeader,
353 });
354
355 let rawChunk = frame.serialize();
356
357 if (this.logRawCommunication) {
358 this.debug(`>>> ${rawChunk}`);
359 } else {
360 this.debug(`>>> ${frame}`);
361 }
362
363 if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
364 rawChunk = new TextEncoder().encode(rawChunk);
365 }
366
367 if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
368 this._webSocket.send(rawChunk);
369 } else {
370 let out = rawChunk as string;
371 while (out.length > 0) {
372 const chunk = out.substring(0, this.maxWebSocketChunkSize);
373 out = out.substring(this.maxWebSocketChunkSize);
374 this._webSocket.send(chunk);
375 this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
376 }
377 }
378 }
379
380 public dispose(): void {
381 if (this.connected) {
382 try {
383 // clone before updating
384 const disconnectHeaders = (Object as any).assign(
385 {},
386 this.disconnectHeaders
387 );
388
389 if (!disconnectHeaders.receipt) {
390 disconnectHeaders.receipt = `close-${this._counter++}`;
391 }
392 this.watchForReceipt(disconnectHeaders.receipt, frame => {
393 this._closeWebsocket();
394 this._cleanUp();
395 this.onDisconnect(frame);
396 });
397 this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
398 } catch (error) {
399 this.debug(`Ignoring error during disconnect ${error}`);
400 }
401 } else {
402 if (
403 this._webSocket.readyState === StompSocketState.CONNECTING ||
404 this._webSocket.readyState === StompSocketState.OPEN
405 ) {
406 this._closeWebsocket();
407 }
408 }
409 }
410
411 private _cleanUp() {
412 this._connected = false;
413
414 if (this._pinger) {
415 clearInterval(this._pinger);
416 }
417 if (this._ponger) {
418 clearInterval(this._ponger);
419 }
420 }
421
422 public publish(params: IPublishParams): void {
423 const { destination, headers, body, binaryBody, skipContentLengthHeader } =
424 params;
425 const hdrs: StompHeaders = (Object as any).assign({ destination }, headers);
426 this._transmit({
427 command: 'SEND',
428 headers: hdrs,
429 body,
430 binaryBody,
431 skipContentLengthHeader,
432 });
433 }
434
435 public watchForReceipt(receiptId: string, callback: frameCallbackType): void {
436 this._receiptWatchers[receiptId] = callback;
437 }
438
439 public subscribe(
440 destination: string,
441 callback: messageCallbackType,
442 headers: StompHeaders = {}
443 ): StompSubscription {
444 headers = (Object as any).assign({}, headers);
445
446 if (!headers.id) {
447 headers.id = `sub-${this._counter++}`;
448 }
449 headers.destination = destination;
450 this._subscriptions[headers.id] = callback;
451 this._transmit({ command: 'SUBSCRIBE', headers });
452 const client = this;
453 return {
454 id: headers.id,
455
456 unsubscribe(hdrs) {
457 return client.unsubscribe(headers.id, hdrs);
458 },
459 };
460 }
461
462 public unsubscribe(id: string, headers: StompHeaders = {}): void {
463 headers = (Object as any).assign({}, headers);
464
465 delete this._subscriptions[id];
466 headers.id = id;
467 this._transmit({ command: 'UNSUBSCRIBE', headers });
468 }
469
470 public begin(transactionId: string): ITransaction {
471 const txId = transactionId || `tx-${this._counter++}`;
472 this._transmit({
473 command: 'BEGIN',
474 headers: {
475 transaction: txId,
476 },
477 });
478 const client = this;
479 return {
480 id: txId,
481 commit(): void {
482 client.commit(txId);
483 },
484 abort(): void {
485 client.abort(txId);
486 },
487 };
488 }
489
490 public commit(transactionId: string): void {
491 this._transmit({
492 command: 'COMMIT',
493 headers: {
494 transaction: transactionId,
495 },
496 });
497 }
498
499 public abort(transactionId: string): void {
500 this._transmit({
501 command: 'ABORT',
502 headers: {
503 transaction: transactionId,
504 },
505 });
506 }
507
508 public ack(
509 messageId: string,
510 subscriptionId: string,
511 headers: StompHeaders = {}
512 ): void {
513 headers = (Object as any).assign({}, headers);
514
515 if (this._connectedVersion === Versions.V1_2) {
516 headers.id = messageId;
517 } else {
518 headers['message-id'] = messageId;
519 }
520 headers.subscription = subscriptionId;
521 this._transmit({ command: 'ACK', headers });
522 }
523
524 public nack(
525 messageId: string,
526 subscriptionId: string,
527 headers: StompHeaders = {}
528 ): void {
529 headers = (Object as any).assign({}, headers);
530
531 if (this._connectedVersion === Versions.V1_2) {
532 headers.id = messageId;
533 } else {
534 headers['message-id'] = messageId;
535 }
536 headers.subscription = subscriptionId;
537 return this._transmit({ command: 'NACK', headers });
538 }
539}