1 | import { BYTE } from './byte';
|
2 | import { FrameImpl } from './frame-impl';
|
3 | import { Parser } from './parser';
|
4 | import { StompSocketState, } from './types';
|
5 | import { Versions } from './versions';
|
6 | import { augmentWebsocket } from './augment-websocket';
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | export class StompHandler {
|
15 | constructor(_client, _webSocket, config = {}) {
|
16 | this._client = _client;
|
17 | this._webSocket = _webSocket;
|
18 | this._serverFrameHandlers = {
|
19 |
|
20 | CONNECTED: frame => {
|
21 | this.debug(`connected to server ${frame.headers.server}`);
|
22 | this._connected = true;
|
23 | this._connectedVersion = frame.headers.version;
|
24 |
|
25 | if (this._connectedVersion === Versions.V1_2) {
|
26 | this._escapeHeaderValues = true;
|
27 | }
|
28 | this._setupHeartbeat(frame.headers);
|
29 | this.onConnect(frame);
|
30 | },
|
31 |
|
32 | MESSAGE: frame => {
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 | const subscription = frame.headers.subscription;
|
41 | const onReceive = this._subscriptions[subscription] || this.onUnhandledMessage;
|
42 |
|
43 | const message = frame;
|
44 | const client = this;
|
45 | const messageId = this._connectedVersion === Versions.V1_2
|
46 | ? message.headers.ack
|
47 | : message.headers['message-id'];
|
48 |
|
49 |
|
50 | message.ack = (headers = {}) => {
|
51 | return client.ack(messageId, subscription, headers);
|
52 | };
|
53 | message.nack = (headers = {}) => {
|
54 | return client.nack(messageId, subscription, headers);
|
55 | };
|
56 | onReceive(message);
|
57 | },
|
58 |
|
59 | RECEIPT: frame => {
|
60 | const callback = this._receiptWatchers[frame.headers['receipt-id']];
|
61 | if (callback) {
|
62 | callback(frame);
|
63 |
|
64 | delete this._receiptWatchers[frame.headers['receipt-id']];
|
65 | }
|
66 | else {
|
67 | this.onUnhandledReceipt(frame);
|
68 | }
|
69 | },
|
70 |
|
71 | ERROR: frame => {
|
72 | this.onStompError(frame);
|
73 | },
|
74 | };
|
75 |
|
76 | this._counter = 0;
|
77 |
|
78 | this._subscriptions = {};
|
79 |
|
80 | this._receiptWatchers = {};
|
81 | this._partialData = '';
|
82 | this._escapeHeaderValues = false;
|
83 | this._lastServerActivityTS = Date.now();
|
84 | this.configure(config);
|
85 | }
|
86 | get connectedVersion() {
|
87 | return this._connectedVersion;
|
88 | }
|
89 | get connected() {
|
90 | return this._connected;
|
91 | }
|
92 | configure(conf) {
|
93 |
|
94 | Object.assign(this, conf);
|
95 | }
|
96 | start() {
|
97 | const parser = new Parser(
|
98 |
|
99 | rawFrame => {
|
100 | const frame = FrameImpl.fromRawFrame(rawFrame, this._escapeHeaderValues);
|
101 |
|
102 | if (!this.logRawCommunication) {
|
103 | this.debug(`<<< ${frame}`);
|
104 | }
|
105 | const serverFrameHandler = this._serverFrameHandlers[frame.command] || this.onUnhandledFrame;
|
106 | serverFrameHandler(frame);
|
107 | },
|
108 |
|
109 | () => {
|
110 | this.debug('<<< PONG');
|
111 | });
|
112 | this._webSocket.onmessage = (evt) => {
|
113 | this.debug('Received data');
|
114 | this._lastServerActivityTS = Date.now();
|
115 | if (this.logRawCommunication) {
|
116 | const rawChunkAsString = evt.data instanceof ArrayBuffer
|
117 | ? new TextDecoder().decode(evt.data)
|
118 | : evt.data;
|
119 | this.debug(`<<< ${rawChunkAsString}`);
|
120 | }
|
121 | parser.parseChunk(evt.data, this.appendMissingNULLonIncoming);
|
122 | };
|
123 | this._onclose = (closeEvent) => {
|
124 | this.debug(`Connection closed to ${this._client.brokerURL}`);
|
125 | this._cleanUp();
|
126 | this.onWebSocketClose(closeEvent);
|
127 | };
|
128 | this._webSocket.onclose = this._onclose;
|
129 | this._webSocket.onerror = (errorEvent) => {
|
130 | this.onWebSocketError(errorEvent);
|
131 | };
|
132 | this._webSocket.onopen = () => {
|
133 |
|
134 | const connectHeaders = Object.assign({}, this.connectHeaders);
|
135 | this.debug('Web Socket Opened...');
|
136 | connectHeaders['accept-version'] = this.stompVersions.supportedVersions();
|
137 | connectHeaders['heart-beat'] = [
|
138 | this.heartbeatOutgoing,
|
139 | this.heartbeatIncoming,
|
140 | ].join(',');
|
141 | this._transmit({ command: 'CONNECT', headers: connectHeaders });
|
142 | };
|
143 | }
|
144 | _setupHeartbeat(headers) {
|
145 | if (headers.version !== Versions.V1_1 &&
|
146 | headers.version !== Versions.V1_2) {
|
147 | return;
|
148 | }
|
149 |
|
150 |
|
151 | if (!headers['heart-beat']) {
|
152 | return;
|
153 | }
|
154 |
|
155 |
|
156 |
|
157 | const [serverOutgoing, serverIncoming] = headers['heart-beat']
|
158 | .split(',')
|
159 | .map((v) => parseInt(v, 10));
|
160 | if (this.heartbeatOutgoing !== 0 && serverIncoming !== 0) {
|
161 | const ttl = Math.max(this.heartbeatOutgoing, serverIncoming);
|
162 | this.debug(`send PING every ${ttl}ms`);
|
163 | this._pinger = setInterval(() => {
|
164 | if (this._webSocket.readyState === StompSocketState.OPEN) {
|
165 | this._webSocket.send(BYTE.LF);
|
166 | this.debug('>>> PING');
|
167 | }
|
168 | }, ttl);
|
169 | }
|
170 | if (this.heartbeatIncoming !== 0 && serverOutgoing !== 0) {
|
171 | const ttl = Math.max(this.heartbeatIncoming, serverOutgoing);
|
172 | this.debug(`check PONG every ${ttl}ms`);
|
173 | this._ponger = setInterval(() => {
|
174 | const delta = Date.now() - this._lastServerActivityTS;
|
175 |
|
176 | if (delta > ttl * 2) {
|
177 | this.debug(`did not receive server activity for the last ${delta}ms`);
|
178 | this._closeOrDiscardWebsocket();
|
179 | }
|
180 | }, ttl);
|
181 | }
|
182 | }
|
183 | _closeOrDiscardWebsocket() {
|
184 | if (this.discardWebsocketOnCommFailure) {
|
185 | this.debug('Discarding websocket, the underlying socket may linger for a while');
|
186 | this._discardWebsocket();
|
187 | }
|
188 | else {
|
189 | this.debug('Issuing close on the websocket');
|
190 | this._closeWebsocket();
|
191 | }
|
192 | }
|
193 | forceDisconnect() {
|
194 | if (this._webSocket) {
|
195 | if (this._webSocket.readyState === StompSocketState.CONNECTING ||
|
196 | this._webSocket.readyState === StompSocketState.OPEN) {
|
197 | this._closeOrDiscardWebsocket();
|
198 | }
|
199 | }
|
200 | }
|
201 | _closeWebsocket() {
|
202 | this._webSocket.onmessage = () => { };
|
203 | this._webSocket.close();
|
204 | }
|
205 | _discardWebsocket() {
|
206 | if (!this._webSocket.terminate) {
|
207 | augmentWebsocket(this._webSocket, (msg) => this.debug(msg));
|
208 | }
|
209 | this._webSocket.terminate();
|
210 | }
|
211 | _transmit(params) {
|
212 | const { command, headers, body, binaryBody, skipContentLengthHeader } = params;
|
213 | const frame = new FrameImpl({
|
214 | command,
|
215 | headers,
|
216 | body,
|
217 | binaryBody,
|
218 | escapeHeaderValues: this._escapeHeaderValues,
|
219 | skipContentLengthHeader,
|
220 | });
|
221 | let rawChunk = frame.serialize();
|
222 | if (this.logRawCommunication) {
|
223 | this.debug(`>>> ${rawChunk}`);
|
224 | }
|
225 | else {
|
226 | this.debug(`>>> ${frame}`);
|
227 | }
|
228 | if (this.forceBinaryWSFrames && typeof rawChunk === 'string') {
|
229 | rawChunk = new TextEncoder().encode(rawChunk);
|
230 | }
|
231 | if (typeof rawChunk !== 'string' || !this.splitLargeFrames) {
|
232 | this._webSocket.send(rawChunk);
|
233 | }
|
234 | else {
|
235 | let out = rawChunk;
|
236 | while (out.length > 0) {
|
237 | const chunk = out.substring(0, this.maxWebSocketChunkSize);
|
238 | out = out.substring(this.maxWebSocketChunkSize);
|
239 | this._webSocket.send(chunk);
|
240 | this.debug(`chunk sent = ${chunk.length}, remaining = ${out.length}`);
|
241 | }
|
242 | }
|
243 | }
|
244 | dispose() {
|
245 | if (this.connected) {
|
246 | try {
|
247 |
|
248 | const disconnectHeaders = Object.assign({}, this.disconnectHeaders);
|
249 | if (!disconnectHeaders.receipt) {
|
250 | disconnectHeaders.receipt = `close-${this._counter++}`;
|
251 | }
|
252 | this.watchForReceipt(disconnectHeaders.receipt, frame => {
|
253 | this._closeWebsocket();
|
254 | this._cleanUp();
|
255 | this.onDisconnect(frame);
|
256 | });
|
257 | this._transmit({ command: 'DISCONNECT', headers: disconnectHeaders });
|
258 | }
|
259 | catch (error) {
|
260 | this.debug(`Ignoring error during disconnect ${error}`);
|
261 | }
|
262 | }
|
263 | else {
|
264 | if (this._webSocket.readyState === StompSocketState.CONNECTING ||
|
265 | this._webSocket.readyState === StompSocketState.OPEN) {
|
266 | this._closeWebsocket();
|
267 | }
|
268 | }
|
269 | }
|
270 | _cleanUp() {
|
271 | this._connected = false;
|
272 | if (this._pinger) {
|
273 | clearInterval(this._pinger);
|
274 | }
|
275 | if (this._ponger) {
|
276 | clearInterval(this._ponger);
|
277 | }
|
278 | }
|
279 | publish(params) {
|
280 | const { destination, headers, body, binaryBody, skipContentLengthHeader } = params;
|
281 | const hdrs = Object.assign({ destination }, headers);
|
282 | this._transmit({
|
283 | command: 'SEND',
|
284 | headers: hdrs,
|
285 | body,
|
286 | binaryBody,
|
287 | skipContentLengthHeader,
|
288 | });
|
289 | }
|
290 | watchForReceipt(receiptId, callback) {
|
291 | this._receiptWatchers[receiptId] = callback;
|
292 | }
|
293 | subscribe(destination, callback, headers = {}) {
|
294 | headers = Object.assign({}, headers);
|
295 | if (!headers.id) {
|
296 | headers.id = `sub-${this._counter++}`;
|
297 | }
|
298 | headers.destination = destination;
|
299 | this._subscriptions[headers.id] = callback;
|
300 | this._transmit({ command: 'SUBSCRIBE', headers });
|
301 | const client = this;
|
302 | return {
|
303 | id: headers.id,
|
304 | unsubscribe(hdrs) {
|
305 | return client.unsubscribe(headers.id, hdrs);
|
306 | },
|
307 | };
|
308 | }
|
309 | unsubscribe(id, headers = {}) {
|
310 | headers = Object.assign({}, headers);
|
311 | delete this._subscriptions[id];
|
312 | headers.id = id;
|
313 | this._transmit({ command: 'UNSUBSCRIBE', headers });
|
314 | }
|
315 | begin(transactionId) {
|
316 | const txId = transactionId || `tx-${this._counter++}`;
|
317 | this._transmit({
|
318 | command: 'BEGIN',
|
319 | headers: {
|
320 | transaction: txId,
|
321 | },
|
322 | });
|
323 | const client = this;
|
324 | return {
|
325 | id: txId,
|
326 | commit() {
|
327 | client.commit(txId);
|
328 | },
|
329 | abort() {
|
330 | client.abort(txId);
|
331 | },
|
332 | };
|
333 | }
|
334 | commit(transactionId) {
|
335 | this._transmit({
|
336 | command: 'COMMIT',
|
337 | headers: {
|
338 | transaction: transactionId,
|
339 | },
|
340 | });
|
341 | }
|
342 | abort(transactionId) {
|
343 | this._transmit({
|
344 | command: 'ABORT',
|
345 | headers: {
|
346 | transaction: transactionId,
|
347 | },
|
348 | });
|
349 | }
|
350 | ack(messageId, subscriptionId, headers = {}) {
|
351 | headers = Object.assign({}, headers);
|
352 | if (this._connectedVersion === Versions.V1_2) {
|
353 | headers.id = messageId;
|
354 | }
|
355 | else {
|
356 | headers['message-id'] = messageId;
|
357 | }
|
358 | headers.subscription = subscriptionId;
|
359 | this._transmit({ command: 'ACK', headers });
|
360 | }
|
361 | nack(messageId, subscriptionId, headers = {}) {
|
362 | headers = Object.assign({}, headers);
|
363 | if (this._connectedVersion === Versions.V1_2) {
|
364 | headers.id = messageId;
|
365 | }
|
366 | else {
|
367 | headers['message-id'] = messageId;
|
368 | }
|
369 | headers.subscription = subscriptionId;
|
370 | return this._transmit({ command: 'NACK', headers });
|
371 | }
|
372 | }
|
373 |
|
\ | No newline at end of file |