UNPKG

14.1 kBJavaScriptView Raw
1import { BYTE } from './byte';
2import { FrameImpl } from './frame-impl';
3import { Parser } from './parser';
4import { StompSocketState, } from './types';
5import { Versions } from './versions';
6import { augmentWebsocket } from './augment-websocket';
7/**
8 * The STOMP protocol handler
9 *
10 * Part of `@stomp/stompjs`.
11 *
12 * @internal
13 */
14export class StompHandler {
15 constructor(_client, _webSocket, config = {}) {
16 this._client = _client;
17 this._webSocket = _webSocket;
18 this._serverFrameHandlers = {
19 // [CONNECTED Frame](http://stomp.github.com/stomp-specification-1.2.html#CONNECTED_Frame)
20 CONNECTED: frame => {
21 this.debug(`connected to server ${frame.headers.server}`);
22 this._connected = true;
23 this._connectedVersion = frame.headers.version;
24 // STOMP version 1.2 needs header values to be escaped
25 if (this._connectedVersion === Versions.V1_2) {
26 this._escapeHeaderValues = true;
27 }
28 this._setupHeartbeat(frame.headers);
29 this.onConnect(frame);
30 },
31 // [MESSAGE Frame](http://stomp.github.com/stomp-specification-1.2.html#MESSAGE)
32 MESSAGE: frame => {
33 // the callback is registered when the client calls
34 // `subscribe()`.
35 // If there is no registered subscription for the received message,
36 // the default `onUnhandledMessage` callback is used that the client can set.
37 // This is useful for subscriptions that are automatically created
38 // on the browser side (e.g. [RabbitMQ's temporary
39 // queues](http://www.rabbitmq.com/stomp.html)).
40 const subscription = frame.headers.subscription;
41 const onReceive = this._subscriptions[subscription] || this.onUnhandledMessage;
42 // bless the frame to be a Message
43 const message = frame;
44 const client = this;
45 const messageId = this._connectedVersion === Versions.V1_2
46 ? message.headers.ack
47 : message.headers['message-id'];
48 // add `ack()` and `nack()` methods directly to the returned frame
49 // so that a simple call to `message.ack()` can acknowledge the message.
50 message.ack = (headers = {}) => {
51 return client.ack(messageId, subscription, headers);
52 };
53 message.nack = (headers = {}) => {
54 return client.nack(messageId, subscription, headers);
55 };
56 onReceive(message);
57 },
58 // [RECEIPT Frame](http://stomp.github.com/stomp-specification-1.2.html#RECEIPT)
59 RECEIPT: frame => {
60 const callback = this._receiptWatchers[frame.headers['receipt-id']];
61 if (callback) {
62 callback(frame);
63 // Server will acknowledge only once, remove the callback
64 delete this._receiptWatchers[frame.headers['receipt-id']];
65 }
66 else {
67 this.onUnhandledReceipt(frame);
68 }
69 },
70 // [ERROR Frame](http://stomp.github.com/stomp-specification-1.2.html#ERROR)
71 ERROR: frame => {
72 this.onStompError(frame);
73 },
74 };
75 // used to index subscribers
76 this._counter = 0;
77 // subscription callbacks indexed by subscriber's ID
78 this._subscriptions = {};
79 // receipt-watchers indexed by receipts-ids
80 this._receiptWatchers = {};
81 this._partialData = '';
82 this._escapeHeaderValues = false;
83 this._lastServerActivityTS = Date.now();
84 this.configure(config);
85 }
86 get connectedVersion() {
87 return this._connectedVersion;
88 }
89 get connected() {
90 return this._connected;
91 }
92 configure(conf) {
93 // bulk assign all properties to this
94 Object.assign(this, conf);
95 }
96 start() {
97 const parser = new Parser(
98 // On Frame
99 rawFrame => {
100 const frame = FrameImpl.fromRawFrame(rawFrame, this._escapeHeaderValues);
101 // if this.logRawCommunication is set, the rawChunk is logged at this._webSocket.onmessage
102 if (!this.logRawCommunication) {
103 this.debug(`<<< ${frame}`);
104 }
105 const serverFrameHandler = this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
106 serverFrameHandler(frame);
107 },
108 // On Incoming Ping
109 () => {
110 this.debug('<<< PONG');
111 });
112 this._webSocket.onmessage = (evt) => {
113 this.debug('Received data');
114 this._lastServerActivityTS = Date.now();
115 if (this.logRawCommunication) {
116 const rawChunkAsString = evt.data instanceof ArrayBuffer
117 ? new TextDecoder().decode(evt.data)
118 : evt.data;
119 this.debug(`<<< ${rawChunkAsString}`);
120 }
121 parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
122 };
123 this._onclose = (closeEvent) => {
124 this.debug(`Connection closed to ${this._client.brokerURL}`);
125 this._cleanUp();
126 this.onWebSocketClose(closeEvent);
127 };
128 this._webSocket.onclose = this._onclose;
129 this._webSocket.onerror = (errorEvent) => {
130 this.onWebSocketError(errorEvent);
131 };
132 this._webSocket.onopen = () => {
133 // Clone before updating
134 const connectHeaders = Object.assign({}, this.connectHeaders);
135 this.debug('Web Socket Opened...');
136 connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
137 connectHeaders['heart-beat'] = [
138 this.heartbeatOutgoing,
139 this.heartbeatIncoming,
140 ].join(',');
141 this._transmit({ command: 'CONNECT', headers: connectHeaders });
142 };
143 }
144 _setupHeartbeat(headers) {
145 if (headers.version !== Versions.V1_1 &&
146 headers.version !== Versions.V1_2) {
147 return;
148 }
149 // It is valid for the server to not send this header
150 // https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
151 if (!headers['heart-beat']) {
152 return;
153 }
154 // heart-beat header received from the server looks like:
155 //
156 // heart-beat: sx, sy
157 const [serverOutgoing, serverIncoming] = headers['heart-beat']
158 .split(',')
159 .map((v) => parseInt(v, 10));
160 if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
161 const ttl = Math.max(this.heartbeatOutgoing, serverIncoming);
162 this.debug(`send PING every ${ttl}ms`);
163 this._pinger = setInterval(() => {
164 if (this._webSocket.readyState === StompSocketState.OPEN) {
165 this._webSocket.send(BYTE.LF);
166 this.debug('>>> PING');
167 }
168 }, ttl);
169 }
170 if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
171 const ttl = Math.max(this.heartbeatIncoming, serverOutgoing);
172 this.debug(`check PONG every ${ttl}ms`);
173 this._ponger = setInterval(() => {
174 const delta = Date.now() - this._lastServerActivityTS;
175 // We wait twice the TTL to be flexible on window's setInterval calls
176 if (delta > ttl * 2) {
177 this.debug(`did not receive server activity for the last ${delta}ms`);
178 this._closeOrDiscardWebsocket();
179 }
180 }, ttl);
181 }
182 }
183 _closeOrDiscardWebsocket() {
184 if (this.discardWebsocketOnCommFailure) {
185 this.debug('Discarding websocket, the underlying socket may linger for a while');
186 this._discardWebsocket();
187 }
188 else {
189 this.debug('Issuing close on the websocket');
190 this._closeWebsocket();
191 }
192 }
193 forceDisconnect() {
194 if (this._webSocket) {
195 if (this._webSocket.readyState === StompSocketState.CONNECTING ||
196 this._webSocket.readyState === StompSocketState.OPEN) {
197 this._closeOrDiscardWebsocket();
198 }
199 }
200 }
201 _closeWebsocket() {
202 this._webSocket.onmessage = () => { }; // ignore messages
203 this._webSocket.close();
204 }
205 _discardWebsocket() {
206 if (!this._webSocket.terminate) {
207 augmentWebsocket(this._webSocket, (msg) => this.debug(msg));
208 }
209 this._webSocket.terminate();
210 }
211 _transmit(params) {
212 const { command, headers, body, binaryBody, skipContentLengthHeader } = params;
213 const frame = new FrameImpl({
214 command,
215 headers,
216 body,
217 binaryBody,
218 escapeHeaderValues: this._escapeHeaderValues,
219 skipContentLengthHeader,
220 });
221 let rawChunk = frame.serialize();
222 if (this.logRawCommunication) {
223 this.debug(`>>> ${rawChunk}`);
224 }
225 else {
226 this.debug(`>>> ${frame}`);
227 }
228 if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
229 rawChunk = new TextEncoder().encode(rawChunk);
230 }
231 if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
232 this._webSocket.send(rawChunk);
233 }
234 else {
235 let out = rawChunk;
236 while (out.length > 0) {
237 const chunk = out.substring(0, this.maxWebSocketChunkSize);
238 out = out.substring(this.maxWebSocketChunkSize);
239 this._webSocket.send(chunk);
240 this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
241 }
242 }
243 }
244 dispose() {
245 if (this.connected) {
246 try {
247 // clone before updating
248 const disconnectHeaders = Object.assign({}, this.disconnectHeaders);
249 if (!disconnectHeaders.receipt) {
250 disconnectHeaders.receipt = `close-${this._counter++}`;
251 }
252 this.watchForReceipt(disconnectHeaders.receipt, frame => {
253 this._closeWebsocket();
254 this._cleanUp();
255 this.onDisconnect(frame);
256 });
257 this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
258 }
259 catch (error) {
260 this.debug(`Ignoring error during disconnect ${error}`);
261 }
262 }
263 else {
264 if (this._webSocket.readyState === StompSocketState.CONNECTING ||
265 this._webSocket.readyState === StompSocketState.OPEN) {
266 this._closeWebsocket();
267 }
268 }
269 }
270 _cleanUp() {
271 this._connected = false;
272 if (this._pinger) {
273 clearInterval(this._pinger);
274 }
275 if (this._ponger) {
276 clearInterval(this._ponger);
277 }
278 }
279 publish(params) {
280 const { destination, headers, body, binaryBody, skipContentLengthHeader } = params;
281 const hdrs = Object.assign({ destination }, headers);
282 this._transmit({
283 command: 'SEND',
284 headers: hdrs,
285 body,
286 binaryBody,
287 skipContentLengthHeader,
288 });
289 }
290 watchForReceipt(receiptId, callback) {
291 this._receiptWatchers[receiptId] = callback;
292 }
293 subscribe(destination, callback, headers = {}) {
294 headers = Object.assign({}, headers);
295 if (!headers.id) {
296 headers.id = `sub-${this._counter++}`;
297 }
298 headers.destination = destination;
299 this._subscriptions[headers.id] = callback;
300 this._transmit({ command: 'SUBSCRIBE', headers });
301 const client = this;
302 return {
303 id: headers.id,
304 unsubscribe(hdrs) {
305 return client.unsubscribe(headers.id, hdrs);
306 },
307 };
308 }
309 unsubscribe(id, headers = {}) {
310 headers = Object.assign({}, headers);
311 delete this._subscriptions[id];
312 headers.id = id;
313 this._transmit({ command: 'UNSUBSCRIBE', headers });
314 }
315 begin(transactionId) {
316 const txId = transactionId || `tx-${this._counter++}`;
317 this._transmit({
318 command: 'BEGIN',
319 headers: {
320 transaction: txId,
321 },
322 });
323 const client = this;
324 return {
325 id: txId,
326 commit() {
327 client.commit(txId);
328 },
329 abort() {
330 client.abort(txId);
331 },
332 };
333 }
334 commit(transactionId) {
335 this._transmit({
336 command: 'COMMIT',
337 headers: {
338 transaction: transactionId,
339 },
340 });
341 }
342 abort(transactionId) {
343 this._transmit({
344 command: 'ABORT',
345 headers: {
346 transaction: transactionId,
347 },
348 });
349 }
350 ack(messageId, subscriptionId, headers = {}) {
351 headers = Object.assign({}, headers);
352 if (this._connectedVersion === Versions.V1_2) {
353 headers.id = messageId;
354 }
355 else {
356 headers['message-id'] = messageId;
357 }
358 headers.subscription = subscriptionId;
359 this._transmit({ command: 'ACK', headers });
360 }
361 nack(messageId, subscriptionId, headers = {}) {
362 headers = Object.assign({}, headers);
363 if (this._connectedVersion === Versions.V1_2) {
364 headers.id = messageId;
365 }
366 else {
367 headers['message-id'] = messageId;
368 }
369 headers.subscription = subscriptionId;
370 return this._transmit({ command: 'NACK', headers });
371 }
372}
373//# sourceMappingURL=stomp-handler.js.map
\No newline at end of file