1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | 'use strict';
|
19 |
|
20 |
|
21 |
|
22 |
|
23 | const util = require('util'),
|
24 | nats = require('nats'),
|
25 | timers = require('timers'),
|
26 | events = require('events'),
|
27 | nuid = require('nuid'),
|
28 | url = require('url'),
|
29 | proto = require('./pb');
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 | const VERSION = '0.2.4',
|
36 | DEFAULT_PORT = 4222,
|
37 | DEFAULT_PRE = 'nats://localhost:',
|
38 | DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT,
|
39 | DEFAULT_DISCOVER_PREFIX = '_STAN.discover',
|
40 | DEFAULT_ACK_PREFIX = '_STAN.acks',
|
41 | DEFAULT_CONNECT_WAIT = 1000 * 2,
|
42 |
|
43 | DEFAULT_MAX_IN_FLIGHT = 16384,
|
44 | DEFAULT_ACK_WAIT = 30 * 1000,
|
45 |
|
46 | BAD_SUBJECT = 'stan: subject must be supplied',
|
47 | BAD_CLUSTER_ID = 'stan: cluster ID must be supplied',
|
48 | BAD_CLIENT_ID = 'stan: client ID must be supplied',
|
49 | MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.',
|
50 | CONN_CLOSED = 'stan: Connection closed',
|
51 | BAD_SUBSCRIPTION = 'stan: invalid subscription',
|
52 | BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.',
|
53 | NO_SERVER_SUPPORT = 'stan: not supported by server',
|
54 | ACK_TIMEOUT = 'stan: publish ack timeout',
|
55 | CONNECT_REQ_TIMEOUT = 'stan: connect request timeout',
|
56 | CLOSE_REQ_TIMEOUT = 'stan: close request timeout',
|
57 | SUB_REQ_TIMEOUT = 'stan: subscribe request timeout',
|
58 | UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout',
|
59 |
|
60 | PROTOCOL_ONE = 1,
|
61 | DEFAULT_PING_INTERVAL = 5 * 1000,
|
62 | DEFAULT_PING_MAXOUT = 3,
|
63 | MAX_PINGS_EXCEEDED = 'stan: connection lost due to PING failure';
|
64 |
|
65 |
|
66 |
|
67 |
|
68 |
|
69 |
|
70 | exports.version = VERSION;
|
71 |
|
72 |
|
73 | function Stan(clusterID, clientID, opts) {
|
74 | events.EventEmitter.call(this);
|
75 | if (typeof clusterID !== 'string' || clusterID.length < 1) {
|
76 | throw new Error(BAD_CLUSTER_ID);
|
77 | }
|
78 | if (typeof clientID !== 'string' || clientID.length < 1) {
|
79 | throw new Error(BAD_CLIENT_ID);
|
80 | }
|
81 | this.clusterID = clusterID;
|
82 | this.clientID = clientID;
|
83 | this.ackSubject = DEFAULT_ACK_PREFIX + "." + nuid.next();
|
84 |
|
85 |
|
86 | this.pubPrefix = null;
|
87 | this.subRequests = null;
|
88 | this.unsubRequests = null;
|
89 | this.subCloseRequests = null;
|
90 | this.closeRequests = null;
|
91 |
|
92 | this.parseOptions(opts);
|
93 | this.initState();
|
94 | this.createConnection();
|
95 | return this;
|
96 | }
|
97 |
|
98 |
|
99 | util.inherits(Stan, events.EventEmitter);
|
100 |
|
101 |
|
102 |
|
103 |
|
104 |
|
105 |
|
106 |
|
107 |
|
108 |
|
109 | exports.connect = function(clusterID, clientID, opts) {
|
110 | return new Stan(clusterID, clientID, opts);
|
111 | };
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 |
|
118 | Stan.prototype.isClosed = function() {
|
119 | return this.nc === undefined;
|
120 | };
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 | Stan.prototype.parseOptions = function(opts) {
|
128 | const options = this.options = {
|
129 | url: DEFAULT_URI,
|
130 | connectTimeout: DEFAULT_CONNECT_WAIT,
|
131 | ackTimeout: DEFAULT_ACK_WAIT,
|
132 | discoverPrefix: DEFAULT_DISCOVER_PREFIX,
|
133 | maxPubAcksInflight: DEFAULT_MAX_IN_FLIGHT,
|
134 | stanEncoding: 'utf8',
|
135 | stanPingInterval: DEFAULT_PING_INTERVAL,
|
136 | stanMaxPingOut: DEFAULT_PING_MAXOUT,
|
137 | maxReconnectAttempts: -1
|
138 | };
|
139 |
|
140 | if (opts === undefined) {
|
141 | options.url = DEFAULT_URI;
|
142 | } else if ('number' === typeof opts) {
|
143 | options.url = DEFAULT_PRE + opts;
|
144 | } else if ('string' === typeof opts) {
|
145 | options.url = sanitizeUrl(opts);
|
146 | } else if ('object' === typeof opts) {
|
147 | if (opts.port !== undefined) {
|
148 | options.url = DEFAULT_PRE + opts.port;
|
149 | }
|
150 |
|
151 | this.assignOption(opts, 'discoverPrefix');
|
152 | this.assignOption(opts, 'nc');
|
153 | this.assignOption(opts, 'connectTimeout');
|
154 | this.assignOption(opts, 'ackTimeout');
|
155 | this.assignOption(opts, 'maxPubAcksInflight');
|
156 | this.assignOption(opts, 'stanEncoding');
|
157 | this.assignOption(opts, 'stanPingInterval');
|
158 | this.assignOption(opts, 'stanMaxPingOut');
|
159 |
|
160 |
|
161 |
|
162 |
|
163 | this.assignOption(opts, 'url');
|
164 | this.assignOption(opts, 'uri', 'url');
|
165 | this.assignOption(opts, 'user');
|
166 | this.assignOption(opts, 'pass');
|
167 | this.assignOption(opts, 'token');
|
168 | this.assignOption(opts, 'password', 'pass');
|
169 | this.assignOption(opts, 'verbose');
|
170 | this.assignOption(opts, 'pedantic');
|
171 | this.assignOption(opts, 'reconnect');
|
172 | this.assignOption(opts, 'maxReconnectAttempts');
|
173 | this.assignOption(opts, 'reconnectTimeWait');
|
174 | this.assignOption(opts, 'servers');
|
175 | this.assignOption(opts, 'urls', 'servers');
|
176 | this.assignOption(opts, 'noRandomize');
|
177 | this.assignOption(opts, 'NoRandomize', 'noRandomize');
|
178 | this.assignOption(opts, 'dontRandomize', 'noRandomize');
|
179 | this.assignOption(opts, 'encoding');
|
180 | this.assignOption(opts, 'tls');
|
181 | this.assignOption(opts, 'secure', 'tls');
|
182 | this.assignOption(opts, 'name');
|
183 | this.assignOption(opts, 'client', 'name');
|
184 | this.assignOption(opts, 'yieldTime');
|
185 | this.assignOption(opts, 'waitOnFirstConnect');
|
186 | this.assignOption(opts, 'preserveBuffers');
|
187 | this.assignOption(opts, 'pingInterval');
|
188 | this.assignOption(opts, 'maxPingOut');
|
189 | this.assignOption(opts, 'useOldRequestStyle');
|
190 | }
|
191 | };
|
192 |
|
193 |
|
194 | function sanitizeUrl(host) {
|
195 | if ((/^.*:\/\/.*/).exec(host) === null) {
|
196 |
|
197 | host = 'nats://' + host;
|
198 | }
|
199 | const u = url.parse(host);
|
200 | if (u.port === null || u.port == '') {
|
201 | host += ":" + DEFAULT_PORT;
|
202 | }
|
203 | return host;
|
204 | }
|
205 |
|
206 |
|
207 |
|
208 |
|
209 |
|
210 |
|
211 |
|
212 |
|
213 | Stan.prototype.assignOption = function(opts, prop, assign) {
|
214 | if (assign === undefined) {
|
215 | assign = prop;
|
216 | }
|
217 | if (opts[prop] !== undefined) {
|
218 | this.options[assign] = opts[prop];
|
219 | }
|
220 | };
|
221 |
|
222 |
|
223 |
|
224 |
|
225 |
|
226 | Stan.prototype.initState = function() {
|
227 | this.pubAckMap = {};
|
228 | this.pubAckOutstanding = 0;
|
229 | this.subMap = {};
|
230 | };
|
231 |
|
232 |
|
233 |
|
234 |
|
235 |
|
236 |
|
237 |
|
238 |
|
239 |
|
240 |
|
241 |
|
242 |
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 |
|
254 |
|
255 |
|
256 |
|
257 |
|
258 |
|
259 |
|
260 |
|
261 |
|
262 |
|
263 |
|
264 | Stan.prototype.createConnection = function() {
|
265 | if (typeof this.options.nc === 'object') {
|
266 | if (this.options.nc.encoding !== 'binary') {
|
267 | throw new Error(BINARY_ENCODING_REQUIRED);
|
268 | } else {
|
269 | this.nc = this.options.nc;
|
270 | }
|
271 | }
|
272 | if (this.nc === undefined) {
|
273 | const encoding = this.options.encoding;
|
274 | if (encoding && encoding !== 'binary') {
|
275 | throw new Error(BINARY_ENCODING_REQUIRED);
|
276 | } else {
|
277 | this.options.encoding = 'binary';
|
278 | }
|
279 | this.nc = nats.connect(this.options);
|
280 | this.ncOwned = true;
|
281 | }
|
282 |
|
283 |
|
284 | this.nc.on('connect', () => {
|
285 |
|
286 | const hbInbox = nats.createInbox();
|
287 | this.hbSubscription = this.nc.subscribe(hbInbox, (msg, reply) => {
|
288 | this.nc.publish(reply);
|
289 | });
|
290 |
|
291 | this.pingInbox = nats.createInbox();
|
292 | this.pingSubscription = this.nc.subscribe(this.pingInbox, (msg) => {
|
293 | if (msg) {
|
294 | const pingResponse = proto.PingResponse.deserializeBinary(Buffer.from(msg, 'binary'));
|
295 | const err = pingResponse.getError();
|
296 | if (err) {
|
297 | this.closeWithError('connection_lost', err);
|
298 | return;
|
299 | }
|
300 | }
|
301 | this.pingOut = 0;
|
302 | });
|
303 |
|
304 | this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck());
|
305 |
|
306 | const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID;
|
307 |
|
308 | this.connId = Buffer.from(nuid.next(), "utf8");
|
309 | const req = new proto.ConnectRequest();
|
310 | req.setClientId(this.clientID);
|
311 | req.setHeartbeatInbox(hbInbox);
|
312 | req.setProtocol(PROTOCOL_ONE);
|
313 | req.setConnId(this.connId);
|
314 | req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000));
|
315 | req.setPingMaxOut(this.options.stanMaxPingOut);
|
316 |
|
317 |
|
318 | this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => {
|
319 | if (msg instanceof nats.NatsError) {
|
320 | let err = msg;
|
321 | if (msg.code === nats.REQ_TIMEOUT) {
|
322 | err = CONNECT_REQ_TIMEOUT;
|
323 | }
|
324 | this.closeWithError('error', err);
|
325 | return;
|
326 | }
|
327 |
|
328 | const cr = proto.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary'));
|
329 | if (cr.getError() !== "") {
|
330 | this.closeWithError('error', cr.getError());
|
331 | return;
|
332 | }
|
333 | this.pubPrefix = cr.getPubPrefix();
|
334 | this.subRequests = cr.getSubRequests();
|
335 | this.unsubRequests = cr.getUnsubRequests();
|
336 | this.subCloseRequests = cr.getSubCloseRequests();
|
337 | this.closeRequests = cr.getCloseRequests();
|
338 |
|
339 | let unsubPingSub = true;
|
340 | if (cr.getProtocol() >= PROTOCOL_ONE) {
|
341 | if (cr.getPingInterval() !== 0) {
|
342 | unsubPingSub = false;
|
343 |
|
344 | this.pingRequests = cr.getPingRequests();
|
345 | this.stanPingInterval = cr.getPingInterval() * 1000;
|
346 | this.stanMaxPingOut = cr.getPingMaxOut();
|
347 |
|
348 | const ping = new proto.Ping();
|
349 | ping.setConnId(this.connId);
|
350 | this.pingBytes = Buffer.from(ping.serializeBinary());
|
351 |
|
352 | this.pingOut = 0;
|
353 | const that = this;
|
354 | this.pingTimer = setTimeout(function pingFun() {
|
355 | that.pingOut++;
|
356 | if (that.pingOut > that.stanMaxPingOut) {
|
357 | that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED));
|
358 | return;
|
359 | }
|
360 | that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox);
|
361 | that.pingTimer = setTimeout(pingFun, that.stanPingInterval);
|
362 | }, this.stanPingInterval);
|
363 | }
|
364 | }
|
365 | if (unsubPingSub) {
|
366 | this.nc.unsubscribe(this.pingSubscription);
|
367 | this.pingSubscription = null;
|
368 | }
|
369 |
|
370 | this.emit('connect', this);
|
371 | });
|
372 | });
|
373 |
|
374 | this.nc.on('close', () => {
|
375 |
|
376 | this.cleanupOnClose();
|
377 | this.emit('close');
|
378 | });
|
379 |
|
380 | this.nc.on('disconnect', () => {
|
381 | this.emit('disconnect');
|
382 | });
|
383 |
|
384 | this.nc.on('reconnect', () => {
|
385 | this.emit('reconnect', this);
|
386 | });
|
387 |
|
388 | this.nc.on('reconnecting', () => {
|
389 | this.emit('reconnecting');
|
390 | });
|
391 |
|
392 | this.nc.on('error', (msg) => {
|
393 | this.emit('error', msg);
|
394 | });
|
395 | };
|
396 |
|
397 |
|
398 |
|
399 |
|
400 |
|
401 |
|
402 |
|
403 |
|
404 |
|
405 | Stan.prototype.closeWithError = function(event, error) {
|
406 | if (this.nc === undefined || this.clientID === undefined) {
|
407 | return;
|
408 | }
|
409 | this.cleanupOnClose(error);
|
410 | if (this.ncOwned) {
|
411 | this.nc.close();
|
412 | }
|
413 | this.emit(event, error);
|
414 | this.emit('close');
|
415 | };
|
416 |
|
417 |
|
418 |
|
419 |
|
420 |
|
421 |
|
422 |
|
423 | Stan.prototype.cleanupOnClose = function(err) {
|
424 |
|
425 | if (this.pingTimer) {
|
426 | timers.clearTimeout(this.pingTimer);
|
427 | delete this.pingTimer;
|
428 | }
|
429 |
|
430 |
|
431 |
|
432 |
|
433 | if (!this.ncOwned && this.nc) {
|
434 | if (this.ackSubscription) {
|
435 | this.nc.unsubscribe(this.ackSubscription);
|
436 | this.ackSubscription = null;
|
437 | }
|
438 | if (this.pingSubscription) {
|
439 | this.nc.unsubscribe(this.pingSubscription);
|
440 | this.pingSubscription = null;
|
441 | }
|
442 | if (this.hbSubscription) {
|
443 | this.nc.unsubscribe(this.hbSubscription);
|
444 | this.hbSubscription = null;
|
445 | }
|
446 | }
|
447 | for (const guid in this.pubAckMap) {
|
448 | if (this.pubAckMap.hasOwnProperty(guid)) {
|
449 | const a = this.removeAck(guid);
|
450 | if (a && a.ah && typeof a.ah === 'function') {
|
451 | a.ah(err, guid);
|
452 | }
|
453 | }
|
454 | }
|
455 | };
|
456 |
|
457 |
|
458 |
|
459 |
|
460 |
|
461 |
|
462 | Stan.prototype.close = function() {
|
463 | if (this.nc === undefined || this.clientID === undefined) {
|
464 | return;
|
465 | }
|
466 | this.cleanupOnClose(new Error(CONN_CLOSED));
|
467 |
|
468 | if (this.nc && this.closeRequests) {
|
469 | const req = new proto.CloseRequest();
|
470 | req.setClientId(this.clientID);
|
471 | this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, this.options.connectTimeout, (msgOrError) => {
|
472 | const nc = this.nc;
|
473 | delete this.nc;
|
474 | let closeError = null;
|
475 |
|
476 | if (msgOrError instanceof nats.NatsError) {
|
477 |
|
478 | closeError = msgOrError;
|
479 | } else {
|
480 | const cr = proto.CloseResponse.deserializeBinary(Buffer.from(msgOrError, 'binary'));
|
481 | const err = cr.getError();
|
482 | if (err && err.length > 0) {
|
483 |
|
484 | closeError = new Error(err);
|
485 | }
|
486 | }
|
487 | if (nc && this.ncOwned) {
|
488 | nc.close();
|
489 | }
|
490 | this.emit('close', closeError);
|
491 | });
|
492 | } else {
|
493 | if (this.nc && this.ncOwned) {
|
494 | this.nc.close();
|
495 | delete this.nc;
|
496 | }
|
497 | this.emit('close');
|
498 | }
|
499 | };
|
500 |
|
501 |
|
502 |
|
503 |
|
504 |
|
505 |
|
506 | Stan.prototype.processAck = function() {
|
507 | return (msg) => {
|
508 |
|
509 | const pa = proto.PubAck.deserializeBinary(Buffer.from(msg, 'binary'));
|
510 | const guid = pa.getGuid();
|
511 | const a = this.removeAck(guid);
|
512 | if (a && a.ah) {
|
513 | const err = pa.getError();
|
514 | a.ah(err === '' ? undefined : err, guid);
|
515 | }
|
516 | };
|
517 | };
|
518 |
|
519 |
|
520 |
|
521 |
|
522 |
|
523 |
|
524 |
|
525 | Stan.prototype.removeAck = function(guid) {
|
526 | const a = this.pubAckMap[guid];
|
527 | if (a !== undefined) {
|
528 | delete this.pubAckMap[guid];
|
529 | this.pubAckOutstanding--;
|
530 | if (a.t !== undefined) {
|
531 |
|
532 | timers.clearTimeout(a.t);
|
533 | }
|
534 | }
|
535 | return a;
|
536 | };
|
537 |
|
538 |
|
539 |
|
540 |
|
541 |
|
542 |
|
543 |
|
544 |
|
545 |
|
546 |
|
547 |
|
548 |
|
549 | Stan.prototype.publish = function(subject, data, ackHandler) {
|
550 | if (this.nc === undefined) {
|
551 | if (util.isFunction(ackHandler)) {
|
552 | ackHandler(new Error(CONN_CLOSED));
|
553 | return;
|
554 | } else {
|
555 | throw new Error(CONN_CLOSED);
|
556 | }
|
557 | }
|
558 |
|
559 | if (this.pubAckOutstanding > this.options.maxPubAcksInflight) {
|
560 |
|
561 | if (util.isFunction(ackHandler)) {
|
562 | ackHandler(new Error(MAX_FLIGHT_LIMIT_REACHED));
|
563 | } else {
|
564 | throw new Error(MAX_FLIGHT_LIMIT_REACHED);
|
565 | }
|
566 | }
|
567 |
|
568 | const subj = this.pubPrefix + '.' + subject;
|
569 | const peGUID = nuid.next();
|
570 |
|
571 | const pe = new proto.PubMsg();
|
572 | pe.setClientId(this.clientID);
|
573 | pe.setConnId(this.connId);
|
574 | pe.setGuid(peGUID);
|
575 | pe.setSubject(subject);
|
576 | let buf;
|
577 | if (typeof data === 'string') {
|
578 | buf = Buffer.from(data, 'utf8');
|
579 | data = new Uint8Array(buf);
|
580 | } else if (Buffer.prototype.isPrototypeOf(data)) {
|
581 | buf = Buffer.from(data, 'utf8');
|
582 | data = new Uint8Array(buf);
|
583 | } else if (Buffer.prototype.isPrototypeOf(Uint8Array)) {
|
584 |
|
585 | }
|
586 |
|
587 | pe.setData(data);
|
588 |
|
589 | const ack = {};
|
590 | ack.ah = ackHandler;
|
591 | this.pubAckMap[peGUID] = ack;
|
592 |
|
593 | const bytes = Buffer.from(pe.serializeBinary());
|
594 | this.nc.publish(subj, bytes, this.ackSubject);
|
595 | this.pubAckOutstanding++;
|
596 |
|
597 |
|
598 |
|
599 | ack.t = timers.setTimeout(() => {
|
600 | const a = this.removeAck(peGUID);
|
601 | if (a && a.ah !== undefined) {
|
602 | a.ah(new Error(ACK_TIMEOUT), peGUID);
|
603 | }
|
604 | }, this.options.ackTimeout);
|
605 |
|
606 | return peGUID;
|
607 | };
|
608 |
|
609 |
|
610 |
|
611 |
|
612 |
|
613 |
|
614 |
|
615 |
|
616 |
|
617 |
|
618 |
|
619 |
|
620 |
|
621 | Stan.prototype.subscribe = function(subject, qGroup, options) {
|
622 | const args = {};
|
623 | if (typeof qGroup === 'string') {
|
624 | args.qGroup = qGroup;
|
625 | } else if (typeof qGroup === 'object') {
|
626 | args.options = qGroup;
|
627 | }
|
628 | if (typeof options === 'object') {
|
629 | args.options = options;
|
630 | }
|
631 | if (!args.options) {
|
632 | args.options = new SubscriptionOptions();
|
633 | }
|
634 |
|
635 |
|
636 | const retVal = new Subscription(this, subject, args.qGroup, nats.createInbox(), args.options, args.callback);
|
637 |
|
638 | if (typeof subject !== 'string' || subject.length === 0) {
|
639 | process.nextTick(() => {
|
640 | retVal.emit('error', new Error(BAD_SUBJECT));
|
641 | });
|
642 | return retVal;
|
643 | }
|
644 |
|
645 | if (this.isClosed()) {
|
646 | process.nextTick(() => {
|
647 | retVal.emit('error', new Error(CONN_CLOSED));
|
648 | });
|
649 | return retVal;
|
650 | }
|
651 |
|
652 | this.subMap[retVal.inbox] = retVal;
|
653 | retVal.inboxSub = this.nc.subscribe(retVal.inbox, this.processMsg());
|
654 | const sr = new proto.SubscriptionRequest();
|
655 | sr.setClientId(this.clientID);
|
656 | sr.setSubject(subject);
|
657 | sr.setQGroup(retVal.qGroup || '');
|
658 | sr.setInbox(retVal.inbox);
|
659 | sr.setMaxInFlight(retVal.opts.maxInFlight);
|
660 | sr.setAckWaitInSecs(retVal.opts.ackWait / 1000);
|
661 | sr.setStartPosition(retVal.opts.startPosition);
|
662 | sr.setDurableName(retVal.opts.durableName || '');
|
663 |
|
664 | switch (sr.getStartPosition()) {
|
665 | case proto.StartPosition.TIME_DELTA_START:
|
666 | sr.setStartTimeDelta(retVal.opts.startTime);
|
667 | break;
|
668 | case proto.StartPosition.SEQUENCE_START:
|
669 | sr.setStartSequence(retVal.opts.startSequence);
|
670 | break;
|
671 | }
|
672 |
|
673 | this.nc.requestOne(this.subRequests, Buffer.from(sr.serializeBinary()), this.options.connectTimeout, (msg) => {
|
674 | if (msg instanceof nats.NatsError) {
|
675 | if (msg.code === nats.REQ_TIMEOUT) {
|
676 | const err = new nats.NatsError(SUB_REQ_TIMEOUT, SUB_REQ_TIMEOUT, msg);
|
677 | retVal.emit('timeout', err);
|
678 | } else {
|
679 | retVal.emit('error', msg);
|
680 | }
|
681 | return;
|
682 | }
|
683 |
|
684 | const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary'));
|
685 | const err = r.getError();
|
686 | if (err && err.length !== 0) {
|
687 | retVal.emit('error', new Error(err));
|
688 | this.nc.unsubscribe(retVal.inboxSub);
|
689 | retVal.emit('unsubscribed');
|
690 | return;
|
691 | }
|
692 | retVal.ackInbox = r.getAckInbox();
|
693 | retVal.emit('ready');
|
694 | });
|
695 |
|
696 | return retVal;
|
697 | };
|
698 |
|
699 |
|
700 |
|
701 |
|
702 |
|
703 |
|
704 |
|
705 |
|
706 |
|
707 |
|
708 |
|
709 |
|
710 |
|
711 |
|
712 | function Subscription(stanConnection, subject, qGroup, inbox, opts) {
|
713 | this.stanConnection = stanConnection;
|
714 | this.subject = subject;
|
715 | this.qGroup = qGroup;
|
716 | this.inbox = inbox;
|
717 | this.opts = opts;
|
718 | this.ackInbox = undefined;
|
719 | this.inboxSub = undefined;
|
720 | }
|
721 |
|
722 |
|
723 |
|
724 |
|
725 |
|
726 |
|
727 |
|
728 |
|
729 |
|
730 |
|
731 |
|
732 |
|
733 |
|
734 |
|
735 |
|
736 |
|
737 |
|
738 |
|
739 |
|
740 |
|
741 |
|
742 |
|
743 |
|
744 |
|
745 |
|
746 |
|
747 |
|
748 |
|
749 |
|
750 |
|
751 |
|
752 |
|
753 |
|
754 |
|
755 | util.inherits(Subscription, events.EventEmitter);
|
756 |
|
757 |
|
758 |
|
759 |
|
760 |
|
761 | Subscription.prototype.isClosed = function() {
|
762 | return this.stanConnection === undefined;
|
763 | };
|
764 |
|
765 |
|
766 |
|
767 |
|
768 |
|
769 |
|
770 | Subscription.prototype.unsubscribe = function() {
|
771 | this.closeOrUnsubscribe(false);
|
772 | };
|
773 |
|
774 |
|
775 |
|
776 |
|
777 |
|
778 |
|
779 |
|
780 |
|
781 |
|
782 |
|
783 |
|
784 | Subscription.prototype.close = function() {
|
785 | this.closeOrUnsubscribe(true);
|
786 | };
|
787 |
|
788 |
|
789 |
|
790 |
|
791 |
|
792 |
|
793 | Subscription.prototype.closeOrUnsubscribe = function(doClose) {
|
794 | if (this.isClosed()) {
|
795 | this.emit('error', new Error(BAD_SUBSCRIPTION));
|
796 | return;
|
797 | }
|
798 |
|
799 | const sc = this.stanConnection;
|
800 | delete this.stanConnection;
|
801 | delete sc.subMap[this.inbox];
|
802 |
|
803 | if (sc.isClosed()) {
|
804 | this.emit('error', new Error(CONN_CLOSED));
|
805 | return;
|
806 | }
|
807 |
|
808 | let reqSubject = sc.unsubRequests;
|
809 | if (doClose) {
|
810 | reqSubject = sc.subCloseRequests;
|
811 | if (!reqSubject) {
|
812 | this.emit('error', new Error(NO_SERVER_SUPPORT));
|
813 | }
|
814 | }
|
815 |
|
816 | sc.nc.unsubscribe(this.inboxSub);
|
817 |
|
818 | const ur = new proto.UnsubscribeRequest();
|
819 | ur.setClientId(sc.clientID);
|
820 | ur.setSubject(this.subject);
|
821 | ur.setInbox(this.ackInbox);
|
822 |
|
823 | sc.nc.requestOne(reqSubject, Buffer.from(ur.serializeBinary()), sc.options.connectTimeout, (msg) => {
|
824 | let err;
|
825 | if (msg instanceof nats.NatsError) {
|
826 | const type = doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT;
|
827 | err = new nats.NatsError(type, type, msg);
|
828 | if (msg.code === nats.REQ_TIMEOUT) {
|
829 | this.emit('timeout', err);
|
830 | } else {
|
831 | this.emit('error', err);
|
832 | }
|
833 | return;
|
834 | }
|
835 |
|
836 | const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary'));
|
837 | err = r.getError();
|
838 | if (err && err.length > 0) {
|
839 | this.emit('error', new Error(r.getError()));
|
840 | } else {
|
841 | this.emit(doClose ? 'closed' : 'unsubscribed');
|
842 | }
|
843 | });
|
844 | };
|
845 |
|
846 |
|
847 |
|
848 |
|
849 |
|
850 |
|
851 |
|
852 | Stan.prototype.processMsg = function() {
|
853 |
|
854 | return (rawMsg, reply, subject, sid) => {
|
855 | const sub = this.subMap[subject];
|
856 | try {
|
857 |
|
858 | const m = proto.MsgProto.deserializeBinary(Buffer.from(rawMsg, 'binary'));
|
859 | if (sub === undefined || !this.nc) {
|
860 | return;
|
861 | }
|
862 | const msg = new Message(this, m, sub);
|
863 | sub.emit('message', msg);
|
864 | msg.maybeAutoAck();
|
865 | } catch (error) {
|
866 | sub.emit('error', error);
|
867 | }
|
868 | };
|
869 | };
|
870 |
|
871 |
|
872 |
|
873 |
|
874 |
|
875 |
|
876 |
|
877 |
|
878 | function Message(stanClient, msg, subscription) {
|
879 | this.stanClient = stanClient;
|
880 | this.msg = msg;
|
881 | this.subscription = subscription;
|
882 | }
|
883 |
|
884 |
|
885 |
|
886 |
|
887 |
|
888 | Message.prototype.getSequence = function() {
|
889 | return this.msg.getSequence();
|
890 | };
|
891 |
|
892 |
|
893 |
|
894 |
|
895 |
|
896 | Message.prototype.getSubject = function() {
|
897 | return this.msg.getSubject();
|
898 | };
|
899 |
|
900 |
|
901 |
|
902 |
|
903 |
|
904 | Message.prototype.getRawData = function() {
|
905 | return Buffer.from(this.msg.getData(), 'binary');
|
906 | };
|
907 |
|
908 |
|
909 |
|
910 |
|
911 |
|
912 |
|
913 |
|
914 | Message.prototype.getData = function() {
|
915 | let bytes = this.msg.getData();
|
916 | const encoding = this.stanClient.options.stanEncoding;
|
917 | if (encoding !== 'binary') {
|
918 | bytes = bytes.length > 0 ? Buffer.from(bytes, encoding).toString() : '';
|
919 | }
|
920 | return bytes;
|
921 | };
|
922 |
|
923 |
|
924 |
|
925 |
|
926 |
|
927 |
|
928 |
|
929 | Message.prototype.getTimestampRaw = function() {
|
930 | return this.msg.getTimestamp();
|
931 | };
|
932 |
|
933 |
|
934 |
|
935 |
|
936 |
|
937 | Message.prototype.getTimestamp = function() {
|
938 | return new Date(this.getTimestampRaw() / 1000000);
|
939 | };
|
940 |
|
941 |
|
942 |
|
943 |
|
944 |
|
945 | Message.prototype.isRedelivered = function() {
|
946 | return this.msg.getRedelivered();
|
947 | };
|
948 |
|
949 |
|
950 |
|
951 |
|
952 |
|
953 | Message.prototype.getCrc32 = function() {
|
954 | return this.msg.getCrc32();
|
955 | };
|
956 |
|
957 |
|
958 |
|
959 |
|
960 |
|
961 |
|
962 |
|
963 | Message.prototype.maybeAutoAck = function() {
|
964 | if (!this.subscription.opts.manualAcks) {
|
965 | this.ack();
|
966 | }
|
967 | };
|
968 |
|
969 |
|
970 |
|
971 |
|
972 |
|
973 | Message.prototype.ack = function() {
|
974 | if (!this.subscription.isClosed()) {
|
975 | const ack = new proto.Ack();
|
976 | ack.setSubject(this.getSubject());
|
977 | ack.setSequence(this.getSequence());
|
978 | this.stanClient.nc.publish(this.subscription.ackInbox, Buffer.from(ack.serializeBinary()));
|
979 | }
|
980 | };
|
981 |
|
982 |
|
983 |
|
984 |
|
985 |
|
986 | Message.prototype.getClientID = function() {
|
987 | return this.msg.getConnId();
|
988 | };
|
989 |
|
990 | Message.prototype.getConnectionID = function() {
|
991 | return this.msg.getClientId();
|
992 | };
|
993 |
|
994 |
|
995 |
|
996 |
|
997 |
|
998 |
|
999 |
|
1000 | exports.StartPosition = proto.StartPosition;
|
1001 |
|
1002 | function SubscriptionOptions(durableName, maxInFlight, ackWait, startPosition, startSequence, startTime, manualAcks) {
|
1003 |
|
1004 | this.durableName = durableName;
|
1005 |
|
1006 | this.maxInFlight = maxInFlight || DEFAULT_MAX_IN_FLIGHT;
|
1007 |
|
1008 | this.ackWait = ackWait || DEFAULT_ACK_WAIT;
|
1009 |
|
1010 | this.startPosition = startPosition;
|
1011 |
|
1012 | this.startSequence = startSequence;
|
1013 |
|
1014 | this.startTime = startTime;
|
1015 |
|
1016 | this.manualAcks = manualAcks;
|
1017 | }
|
1018 |
|
1019 |
|
1020 |
|
1021 |
|
1022 |
|
1023 | Stan.prototype.subscriptionOptions = function() {
|
1024 | return new SubscriptionOptions();
|
1025 | };
|
1026 |
|
1027 |
|
1028 |
|
1029 |
|
1030 |
|
1031 | SubscriptionOptions.prototype.setMaxInFlight = function(n) {
|
1032 | this.maxInFlight = n;
|
1033 | return this;
|
1034 | };
|
1035 |
|
1036 | SubscriptionOptions.prototype.setAckWait = function(millis) {
|
1037 | this.ackWait = millis;
|
1038 | return this;
|
1039 | };
|
1040 |
|
1041 | SubscriptionOptions.prototype.setStartAt = function(startPosition) {
|
1042 | this.startPosition = startPosition;
|
1043 | return this;
|
1044 | };
|
1045 |
|
1046 | SubscriptionOptions.prototype.setStartAtSequence = function(sequence) {
|
1047 | this.startPosition = proto.StartPosition.SEQUENCE_START;
|
1048 | this.startSequence = sequence;
|
1049 | return this;
|
1050 | };
|
1051 |
|
1052 |
|
1053 |
|
1054 |
|
1055 |
|
1056 |
|
1057 | SubscriptionOptions.prototype.setStartTime = function(date) {
|
1058 | this.startPosition = proto.StartPosition.TIME_DELTA_START;
|
1059 |
|
1060 | this.startTime = (Date.now() - date.valueOf()) * 1000000;
|
1061 | return this;
|
1062 | };
|
1063 |
|
1064 |
|
1065 |
|
1066 |
|
1067 |
|
1068 | SubscriptionOptions.prototype.setStartAtTimeDelta = function(millis) {
|
1069 | this.startPosition = proto.StartPosition.TIME_DELTA_START;
|
1070 |
|
1071 |
|
1072 | this.startTime = millis * 1000000;
|
1073 | return this;
|
1074 | };
|
1075 |
|
1076 |
|
1077 |
|
1078 |
|
1079 |
|
1080 | SubscriptionOptions.prototype.setStartWithLastReceived = function() {
|
1081 | this.startPosition = proto.StartPosition.LAST_RECEIVED;
|
1082 | return this;
|
1083 | };
|
1084 |
|
1085 |
|
1086 |
|
1087 |
|
1088 |
|
1089 | SubscriptionOptions.prototype.setDeliverAllAvailable = function() {
|
1090 | this.startPosition = proto.StartPosition.FIRST;
|
1091 | return this;
|
1092 | };
|
1093 |
|
1094 |
|
1095 |
|
1096 |
|
1097 |
|
1098 | SubscriptionOptions.prototype.setManualAckMode = function(tf) {
|
1099 | this.manualAcks = tf;
|
1100 | return this;
|
1101 | };
|
1102 |
|
1103 |
|
1104 |
|
1105 |
|
1106 |
|
1107 |
|
1108 | SubscriptionOptions.prototype.setDurableName = function(durableName) {
|
1109 | this.durableName = durableName;
|
1110 | return this;
|
1111 | };
|