UNPKG

15.3 kBJavaScriptView Raw
1import { BYTE } from './byte.js';
2import { FrameImpl } from './frame-impl.js';
3import { Parser } from './parser.js';
4import { StompSocketState, } from './types.js';
5import { Versions } from './versions.js';
6import { augmentWebsocket } from './augment-websocket.js';
7/**
8 * The STOMP protocol handler
9 *
10 * Part of `@stomp/stompjs`.
11 *
12 * @internal
13 */
14export class StompHandler {
15 constructor(_client, _webSocket, config) {
16 this._client = _client;
17 this._webSocket = _webSocket;
18 this._connected = false;
19 this._serverFrameHandlers = {
20 // [CONNECTED Frame](https://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
21 CONNECTED: frame => {
22 this.debug(`connected to server ${frame.headers.server}`);
23 this._connected = true;
24 this._connectedVersion = frame.headers.version;
25 // STOMP version 1.2 needs header values to be escaped
26 if (this._connectedVersion === Versions.V1_2) {
27 this._escapeHeaderValues = true;
28 }
29 this._setupHeartbeat(frame.headers);
30 this.onConnect(frame);
31 },
32 // [MESSAGE Frame](https://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
33 MESSAGE: frame => {
34 // the callback is registered when the client calls
35 // `subscribe()`.
36 // If there is no registered subscription for the received message,
37 // the default `onUnhandledMessage` callback is used that the client can set.
38 // This is useful for subscriptions that are automatically created
39 // on the browser side (e.g. [RabbitMQ's temporary
40 // queues](https://www.rabbitmq.com/stomp.html)).
41 const subscription = frame.headers.subscription;
42 const onReceive = this._subscriptions[subscription] || this.onUnhandledMessage;
43 // bless the frame to be a Message
44 const message = frame;
45 const client = this;
46 const messageId = this._connectedVersion === Versions.V1_2
47 ? message.headers.ack
48 : message.headers['message-id'];
49 // add `ack()` and `nack()` methods directly to the returned frame
50 // so that a simple call to `message.ack()` can acknowledge the message.
51 message.ack = (headers = {}) => {
52 return client.ack(messageId, subscription, headers);
53 };
54 message.nack = (headers = {}) => {
55 return client.nack(messageId, subscription, headers);
56 };
57 onReceive(message);
58 },
59 // [RECEIPT Frame](https://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
60 RECEIPT: frame => {
61 const callback = this._receiptWatchers[frame.headers['receipt-id']];
62 if (callback) {
63 callback(frame);
64 // Server will acknowledge only once, remove the callback
65 delete this._receiptWatchers[frame.headers['receipt-id']];
66 }
67 else {
68 this.onUnhandledReceipt(frame);
69 }
70 },
71 // [ERROR Frame](https://stomp.github.com/stomp-specification-1.2.html#ERROR)
72 ERROR: frame => {
73 this.onStompError(frame);
74 },
75 };
76 // used to index subscribers
77 this._counter = 0;
78 // subscription callbacks indexed by subscriber's ID
79 this._subscriptions = {};
80 // receipt-watchers indexed by receipts-ids
81 this._receiptWatchers = {};
82 this._partialData = '';
83 this._escapeHeaderValues = false;
84 this._lastServerActivityTS = Date.now();
85 this.debug = config.debug;
86 this.stompVersions = config.stompVersions;
87 this.connectHeaders = config.connectHeaders;
88 this.disconnectHeaders = config.disconnectHeaders;
89 this.heartbeatIncoming = config.heartbeatIncoming;
90 this.heartbeatOutgoing = config.heartbeatOutgoing;
91 this.splitLargeFrames = config.splitLargeFrames;
92 this.maxWebSocketChunkSize = config.maxWebSocketChunkSize;
93 this.forceBinaryWSFrames = config.forceBinaryWSFrames;
94 this.logRawCommunication = config.logRawCommunication;
95 this.appendMissingNULLonIncoming = config.appendMissingNULLonIncoming;
96 this.discardWebsocketOnCommFailure = config.discardWebsocketOnCommFailure;
97 this.onConnect = config.onConnect;
98 this.onDisconnect = config.onDisconnect;
99 this.onStompError = config.onStompError;
100 this.onWebSocketClose = config.onWebSocketClose;
101 this.onWebSocketError = config.onWebSocketError;
102 this.onUnhandledMessage = config.onUnhandledMessage;
103 this.onUnhandledReceipt = config.onUnhandledReceipt;
104 this.onUnhandledFrame = config.onUnhandledFrame;
105 }
106 get connectedVersion() {
107 return this._connectedVersion;
108 }
109 get connected() {
110 return this._connected;
111 }
112 start() {
113 const parser = new Parser(
114 // On Frame
115 rawFrame => {
116 const frame = FrameImpl.fromRawFrame(rawFrame, this._escapeHeaderValues);
117 // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
118 if (!this.logRawCommunication) {
119 this.debug(`<<< ${frame}`);
120 }
121 const serverFrameHandler = this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
122 serverFrameHandler(frame);
123 },
124 // On Incoming Ping
125 () => {
126 this.debug('<<< PONG');
127 });
128 this._webSocket.onmessage = (evt) => {
129 this.debug('Received data');
130 this._lastServerActivityTS = Date.now();
131 if (this.logRawCommunication) {
132 const rawChunkAsString = evt.data instanceof ArrayBuffer
133 ? new TextDecoder().decode(evt.data)
134 : evt.data;
135 this.debug(`<<< ${rawChunkAsString}`);
136 }
137 parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
138 };
139 this._webSocket.onclose = (closeEvent) => {
140 this.debug(`Connection closed to ${this._webSocket.url}`);
141 this._cleanUp();
142 this.onWebSocketClose(closeEvent);
143 };
144 this._webSocket.onerror = (errorEvent) => {
145 this.onWebSocketError(errorEvent);
146 };
147 this._webSocket.onopen = () => {
148 // Clone before updating
149 const connectHeaders = Object.assign({}, this.connectHeaders);
150 this.debug('Web Socket Opened...');
151 connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
152 connectHeaders['heart-beat'] = [
153 this.heartbeatOutgoing,
154 this.heartbeatIncoming,
155 ].join(',');
156 this._transmit({ command: 'CONNECT', headers: connectHeaders });
157 };
158 }
159 _setupHeartbeat(headers) {
160 if (headers.version !== Versions.V1_1 &&
161 headers.version !== Versions.V1_2) {
162 return;
163 }
164 // It is valid for the server to not send this header
165 // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
166 if (!headers['heart-beat']) {
167 return;
168 }
169 // heart-beat header received from the server looks like:
170 //
171 // heart-beat: sx, sy
172 const [serverOutgoing, serverIncoming] = headers['heart-beat']
173 .split(',')
174 .map((v) => parseInt(v, 10));
175 if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
176 const ttl = Math.max(this.heartbeatOutgoing, serverIncoming);
177 this.debug(`send PING every ${ttl}ms`);
178 this._pinger = setInterval(() => {
179 if (this._webSocket.readyState === StompSocketState.OPEN) {
180 this._webSocket.send(BYTE.LF);
181 this.debug('>>> PING');
182 }
183 }, ttl);
184 }
185 if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
186 const ttl = Math.max(this.heartbeatIncoming, serverOutgoing);
187 this.debug(`check PONG every ${ttl}ms`);
188 this._ponger = setInterval(() => {
189 const delta = Date.now() - this._lastServerActivityTS;
190 // We wait twice the TTL to be flexible on window's setInterval calls
191 if (delta > ttl * 2) {
192 this.debug(`did not receive server activity for the last ${delta}ms`);
193 this._closeOrDiscardWebsocket();
194 }
195 }, ttl);
196 }
197 }
198 _closeOrDiscardWebsocket() {
199 if (this.discardWebsocketOnCommFailure) {
200 this.debug('Discarding websocket, the underlying socket may linger for a while');
201 this.discardWebsocket();
202 }
203 else {
204 this.debug('Issuing close on the websocket');
205 this._closeWebsocket();
206 }
207 }
208 forceDisconnect() {
209 if (this._webSocket) {
210 if (this._webSocket.readyState === StompSocketState.CONNECTING ||
211 this._webSocket.readyState === StompSocketState.OPEN) {
212 this._closeOrDiscardWebsocket();
213 }
214 }
215 }
216 _closeWebsocket() {
217 this._webSocket.onmessage = () => { }; // ignore messages
218 this._webSocket.close();
219 }
220 discardWebsocket() {
221 if (typeof this._webSocket.terminate !== 'function') {
222 augmentWebsocket(this._webSocket, (msg) => this.debug(msg));
223 }
224 // @ts-ignore - this method will be there at this stage
225 this._webSocket.terminate();
226 }
227 _transmit(params) {
228 const { command, headers, body, binaryBody, skipContentLengthHeader } = params;
229 const frame = new FrameImpl({
230 command,
231 headers,
232 body,
233 binaryBody,
234 escapeHeaderValues: this._escapeHeaderValues,
235 skipContentLengthHeader,
236 });
237 let rawChunk = frame.serialize();
238 if (this.logRawCommunication) {
239 this.debug(`>>> ${rawChunk}`);
240 }
241 else {
242 this.debug(`>>> ${frame}`);
243 }
244 if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
245 rawChunk = new TextEncoder().encode(rawChunk);
246 }
247 if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
248 this._webSocket.send(rawChunk);
249 }
250 else {
251 let out = rawChunk;
252 while (out.length > 0) {
253 const chunk = out.substring(0, this.maxWebSocketChunkSize);
254 out = out.substring(this.maxWebSocketChunkSize);
255 this._webSocket.send(chunk);
256 this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
257 }
258 }
259 }
260 dispose() {
261 if (this.connected) {
262 try {
263 // clone before updating
264 const disconnectHeaders = Object.assign({}, this.disconnectHeaders);
265 if (!disconnectHeaders.receipt) {
266 disconnectHeaders.receipt = `close-${this._counter++}`;
267 }
268 this.watchForReceipt(disconnectHeaders.receipt, frame => {
269 this._closeWebsocket();
270 this._cleanUp();
271 this.onDisconnect(frame);
272 });
273 this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
274 }
275 catch (error) {
276 this.debug(`Ignoring error during disconnect ${error}`);
277 }
278 }
279 else {
280 if (this._webSocket.readyState === StompSocketState.CONNECTING ||
281 this._webSocket.readyState === StompSocketState.OPEN) {
282 this._closeWebsocket();
283 }
284 }
285 }
286 _cleanUp() {
287 this._connected = false;
288 if (this._pinger) {
289 clearInterval(this._pinger);
290 this._pinger = undefined;
291 }
292 if (this._ponger) {
293 clearInterval(this._ponger);
294 this._ponger = undefined;
295 }
296 }
297 publish(params) {
298 const { destination, headers, body, binaryBody, skipContentLengthHeader } = params;
299 const hdrs = Object.assign({ destination }, headers);
300 this._transmit({
301 command: 'SEND',
302 headers: hdrs,
303 body,
304 binaryBody,
305 skipContentLengthHeader,
306 });
307 }
308 watchForReceipt(receiptId, callback) {
309 this._receiptWatchers[receiptId] = callback;
310 }
311 subscribe(destination, callback, headers = {}) {
312 headers = Object.assign({}, headers);
313 if (!headers.id) {
314 headers.id = `sub-${this._counter++}`;
315 }
316 headers.destination = destination;
317 this._subscriptions[headers.id] = callback;
318 this._transmit({ command: 'SUBSCRIBE', headers });
319 const client = this;
320 return {
321 id: headers.id,
322 unsubscribe(hdrs) {
323 return client.unsubscribe(headers.id, hdrs);
324 },
325 };
326 }
327 unsubscribe(id, headers = {}) {
328 headers = Object.assign({}, headers);
329 delete this._subscriptions[id];
330 headers.id = id;
331 this._transmit({ command: 'UNSUBSCRIBE', headers });
332 }
333 begin(transactionId) {
334 const txId = transactionId || `tx-${this._counter++}`;
335 this._transmit({
336 command: 'BEGIN',
337 headers: {
338 transaction: txId,
339 },
340 });
341 const client = this;
342 return {
343 id: txId,
344 commit() {
345 client.commit(txId);
346 },
347 abort() {
348 client.abort(txId);
349 },
350 };
351 }
352 commit(transactionId) {
353 this._transmit({
354 command: 'COMMIT',
355 headers: {
356 transaction: transactionId,
357 },
358 });
359 }
360 abort(transactionId) {
361 this._transmit({
362 command: 'ABORT',
363 headers: {
364 transaction: transactionId,
365 },
366 });
367 }
368 ack(messageId, subscriptionId, headers = {}) {
369 headers = Object.assign({}, headers);
370 if (this._connectedVersion === Versions.V1_2) {
371 headers.id = messageId;
372 }
373 else {
374 headers['message-id'] = messageId;
375 }
376 headers.subscription = subscriptionId;
377 this._transmit({ command: 'ACK', headers });
378 }
379 nack(messageId, subscriptionId, headers = {}) {
380 headers = Object.assign({}, headers);
381 if (this._connectedVersion === Versions.V1_2) {
382 headers.id = messageId;
383 }
384 else {
385 headers['message-id'] = messageId;
386 }
387 headers.subscription = subscriptionId;
388 return this._transmit({ command: 'NACK', headers });
389 }
390}
391//# sourceMappingURL=stomp-handler.js.map
\No newline at end of file