UNPKG

33.8 kBJavaScriptView Raw
1/*
2 * Copyright 2016-2018 The NATS Authors
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16/* jshint esversion: 6 */
17/* jshint node: true */
18'use strict';
19
20/**
21 * Module Dependencies
22 */
23const util = require('util'),
24 nats = require('nats'),
25 timers = require('timers'),
26 events = require('events'),
27 nuid = require('nuid'),
28 url = require('url'),
29 proto = require('./pb');
30
31
32/**
33 * Constants
34 */
35const VERSION = '0.2.4',
36 DEFAULT_PORT = 4222,
37 DEFAULT_PRE = 'nats://localhost:',
38 DEFAULT_URI = DEFAULT_PRE + DEFAULT_PORT,
39 DEFAULT_DISCOVER_PREFIX = '_STAN.discover',
40 DEFAULT_ACK_PREFIX = '_STAN.acks',
41 DEFAULT_CONNECT_WAIT = 1000 * 2,
42
43 DEFAULT_MAX_IN_FLIGHT = 16384,
44 DEFAULT_ACK_WAIT = 30 * 1000,
45
46 BAD_SUBJECT = 'stan: subject must be supplied',
47 BAD_CLUSTER_ID = 'stan: cluster ID must be supplied',
48 BAD_CLIENT_ID = 'stan: client ID must be supplied',
49 MAX_FLIGHT_LIMIT_REACHED = 'stan: max in flight reached.',
50 CONN_CLOSED = 'stan: Connection closed',
51 BAD_SUBSCRIPTION = 'stan: invalid subscription',
52 BINARY_ENCODING_REQUIRED = 'stan: NATS connection encoding must be \'binary\'.',
53 NO_SERVER_SUPPORT = 'stan: not supported by server',
54 ACK_TIMEOUT = 'stan: publish ack timeout',
55 CONNECT_REQ_TIMEOUT = 'stan: connect request timeout',
56 CLOSE_REQ_TIMEOUT = 'stan: close request timeout',
57 SUB_REQ_TIMEOUT = 'stan: subscribe request timeout',
58 UNSUB_REQ_TIMEOUT = 'stan: unsubscribe request timeout',
59
60 PROTOCOL_ONE = 1,
61 DEFAULT_PING_INTERVAL = 5 * 1000,
62 DEFAULT_PING_MAXOUT = 3,
63 MAX_PINGS_EXCEEDED = 'stan: connection lost due to PING failure';
64
65
66/**
67 * Library Version
68 * @type {string}
69 */
70exports.version = VERSION;
71
72
73function Stan(clusterID, clientID, opts) {
74 events.EventEmitter.call(this);
75 if (typeof clusterID !== 'string' || clusterID.length < 1) {
76 throw new Error(BAD_CLUSTER_ID);
77 }
78 if (typeof clientID !== 'string' || clientID.length < 1) {
79 throw new Error(BAD_CLIENT_ID);
80 }
81 this.clusterID = clusterID;
82 this.clientID = clientID;
83 this.ackSubject = DEFAULT_ACK_PREFIX + "." + nuid.next(); // publish acks
84
85 // these are set by stan
86 this.pubPrefix = null; // publish prefix appended to subject
87 this.subRequests = null; // subject for subscription requests
88 this.unsubRequests = null; // subject for unsubscribe requests
89 this.subCloseRequests = null; // subject for subscription close requests
90 this.closeRequests = null; // subject for close requests
91
92 this.parseOptions(opts);
93 this.initState();
94 this.createConnection();
95 return this;
96}
97
98
99util.inherits(Stan, events.EventEmitter);
100
101/**
102 * Connect to a nats-streaming-server and return the client.
103 * @param {string} clusterID
104 * @param {string} [clientID] - must be unique
105 * @param {object} [opts] - object with NATS/STAN options
106 * @return {Stan}
107 * @public
108 */
109exports.connect = function(clusterID, clientID, opts) {
110 return new Stan(clusterID, clientID, opts);
111};
112
113/**
114 * Returns true if the connection to NATS is closed.
115 * @returns {boolean}
116 * @private
117 */
118Stan.prototype.isClosed = function() {
119 return this.nc === undefined;
120};
121
122/**
123 * Parses the provided options
124 * @param {number|string|object} opts
125 * @private
126 */
127Stan.prototype.parseOptions = function(opts) {
128 const options = this.options = {
129 url: DEFAULT_URI,
130 connectTimeout: DEFAULT_CONNECT_WAIT,
131 ackTimeout: DEFAULT_ACK_WAIT,
132 discoverPrefix: DEFAULT_DISCOVER_PREFIX,
133 maxPubAcksInflight: DEFAULT_MAX_IN_FLIGHT,
134 stanEncoding: 'utf8',
135 stanPingInterval: DEFAULT_PING_INTERVAL,
136 stanMaxPingOut: DEFAULT_PING_MAXOUT,
137 maxReconnectAttempts: -1
138 };
139
140 if (opts === undefined) {
141 options.url = DEFAULT_URI;
142 } else if ('number' === typeof opts) {
143 options.url = DEFAULT_PRE + opts;
144 } else if ('string' === typeof opts) {
145 options.url = sanitizeUrl(opts);
146 } else if ('object' === typeof opts) {
147 if (opts.port !== undefined) {
148 options.url = DEFAULT_PRE + opts.port;
149 }
150
151 this.assignOption(opts, 'discoverPrefix');
152 this.assignOption(opts, 'nc');
153 this.assignOption(opts, 'connectTimeout');
154 this.assignOption(opts, 'ackTimeout');
155 this.assignOption(opts, 'maxPubAcksInflight');
156 this.assignOption(opts, 'stanEncoding');
157 this.assignOption(opts, 'stanPingInterval');
158 this.assignOption(opts, 'stanMaxPingOut');
159
160 // node-nats does takes a bunch of other options
161 // we simply forward them, as node-nats is used
162 // underneath.
163 this.assignOption(opts, 'url');
164 this.assignOption(opts, 'uri', 'url');
165 this.assignOption(opts, 'user');
166 this.assignOption(opts, 'pass');
167 this.assignOption(opts, 'token');
168 this.assignOption(opts, 'password', 'pass');
169 this.assignOption(opts, 'verbose');
170 this.assignOption(opts, 'pedantic');
171 this.assignOption(opts, 'reconnect');
172 this.assignOption(opts, 'maxReconnectAttempts');
173 this.assignOption(opts, 'reconnectTimeWait');
174 this.assignOption(opts, 'servers');
175 this.assignOption(opts, 'urls', 'servers');
176 this.assignOption(opts, 'noRandomize');
177 this.assignOption(opts, 'NoRandomize', 'noRandomize');
178 this.assignOption(opts, 'dontRandomize', 'noRandomize');
179 this.assignOption(opts, 'encoding');
180 this.assignOption(opts, 'tls');
181 this.assignOption(opts, 'secure', 'tls');
182 this.assignOption(opts, 'name');
183 this.assignOption(opts, 'client', 'name');
184 this.assignOption(opts, 'yieldTime');
185 this.assignOption(opts, 'waitOnFirstConnect');
186 this.assignOption(opts, 'preserveBuffers');
187 this.assignOption(opts, 'pingInterval');
188 this.assignOption(opts, 'maxPingOut');
189 this.assignOption(opts, 'useOldRequestStyle');
190 }
191};
192
193
194function sanitizeUrl(host) {
195 if ((/^.*:\/\/.*/).exec(host) === null) {
196 // Does not have a scheme.
197 host = 'nats://' + host;
198 }
199 const u = url.parse(host);
200 if (u.port === null || u.port == '') {
201 host += ":" + DEFAULT_PORT;
202 }
203 return host;
204}
205
206
207/**
208 * Updates the internal option to the value from opts.
209 * @param {object} opts
210 * @param {string} prop - the property name
211 * @param {string} [assign] is an alternate name for prop name in the target
212 */
213Stan.prototype.assignOption = function(opts, prop, assign) {
214 if (assign === undefined) {
215 assign = prop;
216 }
217 if (opts[prop] !== undefined) {
218 this.options[assign] = opts[prop];
219 }
220};
221
222
223/**
224 * Internal initializer
225 */
226Stan.prototype.initState = function() {
227 this.pubAckMap = {};
228 this.pubAckOutstanding = 0;
229 this.subMap = {};
230};
231
232/**
233 * Connect event - emitted when the streaming protocol connection sequence has
234 * completed and the client is ready to process requests.
235 *
236 * @event Stan#connect
237 * @type {Stan}
238 */
239
240/**
241 * Close event - emitted when Stan#close() is called or its underlying NATS connection
242 * closes
243 *
244 * @event Stan#close
245 */
246
247/**
248 * Reconnecting event - emitted with the underlying NATS connection emits reconnecting
249 *
250 * @Event Stan#reconnecting
251 */
252
253/**
254 * Error event - emitted when there's an error
255 * @type {Error|object}
256 *
257 * Stan#error
258 */
259
260/**
261 * Connect to a NATS Streaming subsystem
262 * @fires Stan#connect, Stan#close, Stan#reconnecting, Stan#error
263 */
264Stan.prototype.createConnection = function() {
265 if (typeof this.options.nc === 'object') {
266 if (this.options.nc.encoding !== 'binary') {
267 throw new Error(BINARY_ENCODING_REQUIRED);
268 } else {
269 this.nc = this.options.nc;
270 }
271 }
272 if (this.nc === undefined) {
273 const encoding = this.options.encoding;
274 if (encoding && encoding !== 'binary') {
275 throw new Error(BINARY_ENCODING_REQUIRED);
276 } else {
277 this.options.encoding = 'binary';
278 }
279 this.nc = nats.connect(this.options);
280 this.ncOwned = true;
281 }
282
283
284 this.nc.on('connect', () => {
285 // heartbeat processing
286 const hbInbox = nats.createInbox();
287 this.hbSubscription = this.nc.subscribe(hbInbox, (msg, reply) => {
288 this.nc.publish(reply);
289 });
290
291 this.pingInbox = nats.createInbox();
292 this.pingSubscription = this.nc.subscribe(this.pingInbox, (msg) => {
293 if (msg) {
294 const pingResponse = proto.PingResponse.deserializeBinary(Buffer.from(msg, 'binary'));
295 const err = pingResponse.getError();
296 if (err) {
297 this.closeWithError('connection_lost', err);
298 return;
299 }
300 }
301 this.pingOut = 0;
302 });
303
304 this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck());
305
306 const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID;
307 //noinspection JSUnresolvedFunction
308 this.connId = Buffer.from(nuid.next(), "utf8");
309 const req = new proto.ConnectRequest();
310 req.setClientId(this.clientID);
311 req.setHeartbeatInbox(hbInbox);
312 req.setProtocol(PROTOCOL_ONE);
313 req.setConnId(this.connId);
314 req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000));
315 req.setPingMaxOut(this.options.stanMaxPingOut);
316
317
318 this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => {
319 if (msg instanceof nats.NatsError) {
320 let err = msg;
321 if (msg.code === nats.REQ_TIMEOUT) {
322 err = CONNECT_REQ_TIMEOUT;
323 }
324 this.closeWithError('error', err);
325 return;
326 }
327
328 const cr = proto.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary'));
329 if (cr.getError() !== "") {
330 this.closeWithError('error', cr.getError());
331 return;
332 }
333 this.pubPrefix = cr.getPubPrefix();
334 this.subRequests = cr.getSubRequests();
335 this.unsubRequests = cr.getUnsubRequests();
336 this.subCloseRequests = cr.getSubCloseRequests();
337 this.closeRequests = cr.getCloseRequests();
338
339 let unsubPingSub = true;
340 if (cr.getProtocol() >= PROTOCOL_ONE) {
341 if (cr.getPingInterval() !== 0) {
342 unsubPingSub = false;
343
344 this.pingRequests = cr.getPingRequests();
345 this.stanPingInterval = cr.getPingInterval() * 1000;
346 this.stanMaxPingOut = cr.getPingMaxOut();
347
348 const ping = new proto.Ping();
349 ping.setConnId(this.connId);
350 this.pingBytes = Buffer.from(ping.serializeBinary());
351
352 this.pingOut = 0;
353 const that = this;
354 this.pingTimer = setTimeout(function pingFun() {
355 that.pingOut++;
356 if (that.pingOut > that.stanMaxPingOut) {
357 that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED));
358 return;
359 }
360 that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox);
361 that.pingTimer = setTimeout(pingFun, that.stanPingInterval);
362 }, this.stanPingInterval);
363 }
364 }
365 if (unsubPingSub) {
366 this.nc.unsubscribe(this.pingSubscription);
367 this.pingSubscription = null;
368 }
369
370 this.emit('connect', this);
371 });
372 });
373
374 this.nc.on('close', () => {
375 // insure we cleaned up
376 this.cleanupOnClose();
377 this.emit('close');
378 });
379
380 this.nc.on('disconnect', () => {
381 this.emit('disconnect');
382 });
383
384 this.nc.on('reconnect', () => {
385 this.emit('reconnect', this);
386 });
387
388 this.nc.on('reconnecting', () => {
389 this.emit('reconnecting');
390 });
391
392 this.nc.on('error', (msg) => {
393 this.emit('error', msg);
394 });
395};
396
397
398/**
399 * Close stan invoking the event notification with the
400 * specified error, followed by a close notification.
401 * @param event
402 * @param error
403 * @private
404 */
405Stan.prototype.closeWithError = function(event, error) {
406 if (this.nc === undefined || this.clientID === undefined) {
407 return;
408 }
409 this.cleanupOnClose(error);
410 if (this.ncOwned) {
411 this.nc.close();
412 }
413 this.emit(event, error);
414 this.emit('close');
415};
416
417
418/**
419 * Cleanup stan protocol subscriptions, pings and pending acks
420 * @param err
421 * @private
422 */
423Stan.prototype.cleanupOnClose = function(err) {
424 // remove the ping timer
425 if (this.pingTimer) {
426 timers.clearTimeout(this.pingTimer);
427 delete this.pingTimer;
428 }
429
430 // if we don't own the connection, we unsub to insure
431 // that a subsequent reconnect will properly clean up.
432 // Otherwise the close() will take care of it.
433 if (!this.ncOwned && this.nc) {
434 if (this.ackSubscription) {
435 this.nc.unsubscribe(this.ackSubscription);
436 this.ackSubscription = null;
437 }
438 if (this.pingSubscription) {
439 this.nc.unsubscribe(this.pingSubscription);
440 this.pingSubscription = null;
441 }
442 if (this.hbSubscription) {
443 this.nc.unsubscribe(this.hbSubscription);
444 this.hbSubscription = null;
445 }
446 }
447 for (const guid in this.pubAckMap) {
448 if (this.pubAckMap.hasOwnProperty(guid)) {
449 const a = this.removeAck(guid);
450 if (a && a.ah && typeof a.ah === 'function') {
451 a.ah(err, guid);
452 }
453 }
454 }
455};
456
457/**
458 * Closes the NATS streaming server connection, or returns if already closed.
459 * @fire Stan.close, Stan.error
460 *
461 */
462Stan.prototype.close = function() {
463 if (this.nc === undefined || this.clientID === undefined) {
464 return;
465 }
466 this.cleanupOnClose(new Error(CONN_CLOSED));
467 //noinspection JSUnresolvedFunction
468 if (this.nc && this.closeRequests) {
469 const req = new proto.CloseRequest();
470 req.setClientId(this.clientID);
471 this.nc.requestOne(this.closeRequests, Buffer.from(req.serializeBinary()), {}, this.options.connectTimeout, (msgOrError) => {
472 const nc = this.nc;
473 delete this.nc;
474 let closeError = null;
475 //noinspection JSUnresolvedVariable
476 if (msgOrError instanceof nats.NatsError) {
477 // if we get an error here, we simply show it in the close notification as there's not much we can do here.
478 closeError = msgOrError;
479 } else {
480 const cr = proto.CloseResponse.deserializeBinary(Buffer.from(msgOrError, 'binary'));
481 const err = cr.getError();
482 if (err && err.length > 0) {
483 // if the protocol returned an error there's nothing for us to handle, pass it as an arg to close notification.
484 closeError = new Error(err);
485 }
486 }
487 if (nc && this.ncOwned) {
488 nc.close();
489 }
490 this.emit('close', closeError);
491 });
492 } else {
493 if (this.nc && this.ncOwned) {
494 this.nc.close();
495 delete this.nc;
496 }
497 this.emit('close');
498 }
499};
500
501
502/**
503 * @return {Function} for processing acks associated with the protocol
504 * @protected
505 */
506Stan.prototype.processAck = function() {
507 return (msg) => {
508 //noinspection JSUnresolvedVariable
509 const pa = proto.PubAck.deserializeBinary(Buffer.from(msg, 'binary'));
510 const guid = pa.getGuid();
511 const a = this.removeAck(guid);
512 if (a && a.ah) {
513 const err = pa.getError();
514 a.ah(err === '' ? undefined : err, guid);
515 }
516 };
517};
518
519/**
520 * Removes Ack for the specified guid from the outstanding ack list
521 * @param {string} guid
522 * @return {object}
523 * @protected
524 */
525Stan.prototype.removeAck = function(guid) {
526 const a = this.pubAckMap[guid];
527 if (a !== undefined) {
528 delete this.pubAckMap[guid];
529 this.pubAckOutstanding--;
530 if (a.t !== undefined) {
531 //noinspection JSUnresolvedFunction
532 timers.clearTimeout(a.t);
533 }
534 }
535 return a;
536};
537
538/**
539 * Publishes a message to the streaming server with the specified subject and data.
540 * Data can be {Uint8Array|string|Buffer}. The ackHandler is called with any errors or
541 * empty string, and the guid for the published message.
542 *
543 * Note that if the maxPubAcksInflight option is exceeded, the ackHandler will be called
544 * with an error. If no ackHandler was provided, an exception is thrown.
545 * @param subject
546 * @param data {Uint8Array|string|Buffer}
547 * @param ackHandler(err,guid)
548 */
549Stan.prototype.publish = function(subject, data, ackHandler) {
550 if (this.nc === undefined) {
551 if (util.isFunction(ackHandler)) {
552 ackHandler(new Error(CONN_CLOSED));
553 return;
554 } else {
555 throw new Error(CONN_CLOSED);
556 }
557 }
558
559 if (this.pubAckOutstanding > this.options.maxPubAcksInflight) {
560 // we have many pending publish messages, fail it.
561 if (util.isFunction(ackHandler)) {
562 ackHandler(new Error(MAX_FLIGHT_LIMIT_REACHED));
563 } else {
564 throw new Error(MAX_FLIGHT_LIMIT_REACHED);
565 }
566 }
567
568 const subj = this.pubPrefix + '.' + subject;
569 const peGUID = nuid.next();
570 //noinspection JSUnresolvedFunction
571 const pe = new proto.PubMsg();
572 pe.setClientId(this.clientID);
573 pe.setConnId(this.connId);
574 pe.setGuid(peGUID);
575 pe.setSubject(subject);
576 let buf;
577 if (typeof data === 'string') {
578 buf = Buffer.from(data, 'utf8');
579 data = new Uint8Array(buf);
580 } else if (Buffer.prototype.isPrototypeOf(data)) {
581 buf = Buffer.from(data, 'utf8');
582 data = new Uint8Array(buf);
583 } else if (Buffer.prototype.isPrototypeOf(Uint8Array)) {
584 // we already handle this
585 }
586
587 pe.setData(data);
588
589 const ack = {};
590 ack.ah = ackHandler;
591 this.pubAckMap[peGUID] = ack;
592
593 const bytes = Buffer.from(pe.serializeBinary());
594 this.nc.publish(subj, bytes, this.ackSubject);
595 this.pubAckOutstanding++;
596
597 // all acks are received in ackSubject, so not possible to reuse nats.timeout
598 //noinspection JSUnresolvedFunction
599 ack.t = timers.setTimeout(() => {
600 const a = this.removeAck(peGUID);
601 if (a && a.ah !== undefined) {
602 a.ah(new Error(ACK_TIMEOUT), peGUID);
603 }
604 }, this.options.ackTimeout);
605
606 return peGUID;
607};
608
609
610/**
611 * Creates a NATS streaming server subscription on the specified subject. If qGroup
612 * is provided, the subscription will be distributed between all subscribers using
613 * the same qGroup name.
614 * @param {String} subject
615 * @param {String} [qGroup]
616 * @param {SubscriptionOptions} [options]
617 * @throws err if the subject is not provided
618 * @fires Stan#error({Error})
619 * @returns Subscription
620 */
621Stan.prototype.subscribe = function(subject, qGroup, options) {
622 const args = {};
623 if (typeof qGroup === 'string') {
624 args.qGroup = qGroup;
625 } else if (typeof qGroup === 'object') {
626 args.options = qGroup;
627 }
628 if (typeof options === 'object') {
629 args.options = options;
630 }
631 if (!args.options) {
632 args.options = new SubscriptionOptions();
633 }
634
635 // in node-nats there's no Subscription object...
636 const retVal = new Subscription(this, subject, args.qGroup, nats.createInbox(), args.options, args.callback);
637
638 if (typeof subject !== 'string' || subject.length === 0) {
639 process.nextTick(() => {
640 retVal.emit('error', new Error(BAD_SUBJECT));
641 });
642 return retVal;
643 }
644
645 if (this.isClosed()) {
646 process.nextTick(() => {
647 retVal.emit('error', new Error(CONN_CLOSED));
648 });
649 return retVal;
650 }
651
652 this.subMap[retVal.inbox] = retVal;
653 retVal.inboxSub = this.nc.subscribe(retVal.inbox, this.processMsg());
654 const sr = new proto.SubscriptionRequest();
655 sr.setClientId(this.clientID);
656 sr.setSubject(subject);
657 sr.setQGroup(retVal.qGroup || '');
658 sr.setInbox(retVal.inbox);
659 sr.setMaxInFlight(retVal.opts.maxInFlight);
660 sr.setAckWaitInSecs(retVal.opts.ackWait / 1000);
661 sr.setStartPosition(retVal.opts.startPosition);
662 sr.setDurableName(retVal.opts.durableName || '');
663
664 switch (sr.getStartPosition()) {
665 case proto.StartPosition.TIME_DELTA_START:
666 sr.setStartTimeDelta(retVal.opts.startTime);
667 break;
668 case proto.StartPosition.SEQUENCE_START:
669 sr.setStartSequence(retVal.opts.startSequence);
670 break;
671 }
672
673 this.nc.requestOne(this.subRequests, Buffer.from(sr.serializeBinary()), this.options.connectTimeout, (msg) => {
674 if (msg instanceof nats.NatsError) {
675 if (msg.code === nats.REQ_TIMEOUT) {
676 const err = new nats.NatsError(SUB_REQ_TIMEOUT, SUB_REQ_TIMEOUT, msg);
677 retVal.emit('timeout', err);
678 } else {
679 retVal.emit('error', msg);
680 }
681 return;
682 }
683 //noinspection JSUnresolvedVariable
684 const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary'));
685 const err = r.getError();
686 if (err && err.length !== 0) {
687 retVal.emit('error', new Error(err));
688 this.nc.unsubscribe(retVal.inboxSub);
689 retVal.emit('unsubscribed');
690 return;
691 }
692 retVal.ackInbox = r.getAckInbox();
693 retVal.emit('ready');
694 });
695
696 return retVal;
697};
698
699/**
700 * A NATS streaming subscription is an {event.EventEmitter} representing a subscription to the
701 * server. The subscription will be ready to receive messages after the Subscription#ready notification.
702 * fires. Messages are delivered on the Subscription#message(msg) notificatication.
703 * @param stanConnection
704 * @param subject
705 * @param qGroup
706 * @param inbox
707 * @param opts
708 * @constructor
709 * @fires Subscription#error({Error}), Subscription#unsubscribed, Subscription#ready, Subscription#timeout({Error})
710 * Subscription#message({Message})
711 */
712function Subscription(stanConnection, subject, qGroup, inbox, opts) {
713 this.stanConnection = stanConnection;
714 this.subject = subject;
715 this.qGroup = qGroup;
716 this.inbox = inbox;
717 this.opts = opts;
718 this.ackInbox = undefined;
719 this.inboxSub = undefined;
720}
721
722/**
723 * Error event - if there's an error with setting up the subscription, such
724 * as the connection is closed or the server returns an error.
725 *
726 * @event Subscription#Error
727 * @type Error
728 */
729
730/**
731 * Timeout event - An error notification indicating that the operation timeout.
732 *
733 * @event Subscription#Timeout
734 * @type Error
735 */
736
737/**
738 * Unsubscribed event - notification that the unsubscribe request was processed by the server
739 *
740 * @event Subscription#unsubscribed
741 */
742
743/**
744 * Ready event - notification that the subscription request was processed by the server
745 *
746 * @event Subscription#ready
747 */
748
749/**
750 * Message event - notification that the subscription received a message from the server
751 * @event Subscription#message
752 * @type {Message}
753 */
754
755util.inherits(Subscription, events.EventEmitter);
756
757/**
758 * Returns true if the subscription has been closed or unsubscribed from.
759 * @returns {boolean}
760 */
761Subscription.prototype.isClosed = function() {
762 return this.stanConnection === undefined;
763};
764
765/**
766 * Unregisters the subscription from the streaming server. You cannot unsubscribe
767 * from the server unless the Subscription#ready notification has already fired.
768 * @fires Subscription#error({Error}, Subscription#unsubscribed, Subscription#timeout({Error}
769 */
770Subscription.prototype.unsubscribe = function() {
771 this.closeOrUnsubscribe(false);
772};
773
774
775/**
776 * Close removes the subscriber from the server, but unlike the Subscription#unsubscribe(),
777 * the durable interest is not removed. If the client has connected to a server
778 * for which this feature is not available, Subscription#Close() will emit a
779 * Subscription#error(NO_SERVER_SUPPORT) error. Note that this affects durable clients only.
780 * If called on a non-durable subscriber, this is equivalent to Subscription#close()
781 *
782 * @fires Subscription#error({Error}, Subscription#closed
783 */
784Subscription.prototype.close = function() {
785 this.closeOrUnsubscribe(true);
786};
787
788
789/**
790 * @param doClose
791 * @private
792 */
793Subscription.prototype.closeOrUnsubscribe = function(doClose) {
794 if (this.isClosed()) {
795 this.emit('error', new Error(BAD_SUBSCRIPTION));
796 return;
797 }
798
799 const sc = this.stanConnection;
800 delete this.stanConnection;
801 delete sc.subMap[this.inbox];
802
803 if (sc.isClosed()) {
804 this.emit('error', new Error(CONN_CLOSED));
805 return;
806 }
807
808 let reqSubject = sc.unsubRequests;
809 if (doClose) {
810 reqSubject = sc.subCloseRequests;
811 if (!reqSubject) {
812 this.emit('error', new Error(NO_SERVER_SUPPORT));
813 }
814 }
815
816 sc.nc.unsubscribe(this.inboxSub);
817 //noinspection JSUnresolvedFunction
818 const ur = new proto.UnsubscribeRequest();
819 ur.setClientId(sc.clientID);
820 ur.setSubject(this.subject);
821 ur.setInbox(this.ackInbox);
822
823 sc.nc.requestOne(reqSubject, Buffer.from(ur.serializeBinary()), sc.options.connectTimeout, (msg) => {
824 let err;
825 if (msg instanceof nats.NatsError) {
826 const type = doClose ? CLOSE_REQ_TIMEOUT : UNSUB_REQ_TIMEOUT;
827 err = new nats.NatsError(type, type, msg);
828 if (msg.code === nats.REQ_TIMEOUT) {
829 this.emit('timeout', err);
830 } else {
831 this.emit('error', err);
832 }
833 return;
834 }
835 //noinspection JSUnresolvedVariable
836 const r = proto.SubscriptionResponse.deserializeBinary(Buffer.from(msg, 'binary'));
837 err = r.getError();
838 if (err && err.length > 0) {
839 this.emit('error', new Error(r.getError()));
840 } else {
841 this.emit(doClose ? 'closed' : 'unsubscribed');
842 }
843 });
844};
845
846
847/**
848 * Internal function to process in-bound messages.
849 * @return {Function}
850 * @private
851 */
852Stan.prototype.processMsg = function() {
853 // curry
854 return (rawMsg, reply, subject, sid) => {
855 const sub = this.subMap[subject];
856 try {
857 //noinspection JSUnresolvedVariable
858 const m = proto.MsgProto.deserializeBinary(Buffer.from(rawMsg, 'binary'));
859 if (sub === undefined || !this.nc) {
860 return;
861 }
862 const msg = new Message(this, m, sub);
863 sub.emit('message', msg);
864 msg.maybeAutoAck();
865 } catch (error) {
866 sub.emit('error', error);
867 }
868 };
869};
870
871/**
872 * Represents a message received from the streaming server.
873 * @param stanClient
874 * @param msg
875 * @param subscription
876 * @constructor
877 */
878function Message(stanClient, msg, subscription) {
879 this.stanClient = stanClient;
880 this.msg = msg;
881 this.subscription = subscription;
882}
883
884/**
885 * Returns the sequence number of the message.
886 * @returns {number}
887 */
888Message.prototype.getSequence = function() {
889 return this.msg.getSequence();
890};
891
892/**
893 * Returns the subject the message was published on
894 * @returns {string}
895 */
896Message.prototype.getSubject = function() {
897 return this.msg.getSubject();
898};
899
900/**
901 * Returns a Buffer object with the raw message payload.
902 * @returns {Buffer}
903 */
904Message.prototype.getRawData = function() {
905 return Buffer.from(this.msg.getData(), 'binary');
906};
907
908/**
909 * Convenience API to convert the results of Message#getRawData to
910 * a string with the specified 'stanEncoding'. Note that if the encoding
911 * is set to binary, this method returns Message#getRawData.
912 * @returns {!(string|Uint8Array)|string}
913 */
914Message.prototype.getData = function() {
915 let bytes = this.msg.getData();
916 const encoding = this.stanClient.options.stanEncoding;
917 if (encoding !== 'binary') {
918 bytes = bytes.length > 0 ? Buffer.from(bytes, encoding).toString() : '';
919 }
920 return bytes;
921};
922
923/**
924 * Returns the raw timestamp. The NATS streaming server returns a 64bit nanosecond resolution
925 * timestamp that is not quite useful in JavaScript. Use Message#getTimestamp to read
926 * a timestamp as a Date.
927 * @returns {number}
928 */
929Message.prototype.getTimestampRaw = function() {
930 return this.msg.getTimestamp();
931};
932
933/**
934 * Returns Message#getTimestampRaw as a JavaScript Date.
935 * @returns {Date}
936 */
937Message.prototype.getTimestamp = function() {
938 return new Date(this.getTimestampRaw() / 1000000);
939};
940
941/**
942 * Returns true if this message is being redelivered.
943 * @returns {boolean}
944 */
945Message.prototype.isRedelivered = function() {
946 return this.msg.getRedelivered();
947};
948
949/**
950 * Returns the CRC32 of the message if provided.
951 * @returns {number}
952 */
953Message.prototype.getCrc32 = function() {
954 return this.msg.getCrc32();
955};
956
957/**
958 * Calls Message.ack if the subscription was specified to
959 * use manualAcks.
960 * @type {Message.ack}
961 * @protected
962 */
963Message.prototype.maybeAutoAck = function() {
964 if (!this.subscription.opts.manualAcks) {
965 this.ack();
966 }
967};
968
969/**
970 * Acks the message, note this method shouldn't be called unless
971 * the manualAcks option was set on the subscription.
972 */
973Message.prototype.ack = function() {
974 if (!this.subscription.isClosed()) {
975 const ack = new proto.Ack();
976 ack.setSubject(this.getSubject());
977 ack.setSequence(this.getSequence());
978 this.stanClient.nc.publish(this.subscription.ackInbox, Buffer.from(ack.serializeBinary()));
979 }
980};
981
982/**
983 *
984 * @returns {!(string|Uint8Array)}
985 */
986Message.prototype.getClientID = function() {
987 return this.msg.getConnId();
988};
989
990Message.prototype.getConnectionID = function() {
991 return this.msg.getClientId();
992};
993
994
995/**
996 * Returns an object with various constants for StartPosition (NEW_ONLY,
997 * LAST_RECEIVED, TIME_DELTA_START, SEQUENCE_START, FIRST)
998 * @type {StartPosition}
999 */
1000exports.StartPosition = proto.StartPosition;
1001
1002function SubscriptionOptions(durableName, maxInFlight, ackWait, startPosition, startSequence, startTime, manualAcks) {
1003 // DurableName, if set will survive client restarts.
1004 this.durableName = durableName;
1005 // Controls the number of messages the cluster will have inflight without an ACK.
1006 this.maxInFlight = maxInFlight || DEFAULT_MAX_IN_FLIGHT;
1007 // Controls the time the cluster will wait for an ACK for a given message.
1008 this.ackWait = ackWait || DEFAULT_ACK_WAIT;
1009 // StartPosition enum from proto.
1010 this.startPosition = startPosition;
1011 // Optional start sequence number.
1012 this.startSequence = startSequence;
1013 // Optional start time.
1014 this.startTime = startTime;
1015 // Option to do Manual Acks
1016 this.manualAcks = manualAcks;
1017}
1018
1019/**
1020 * Returns a SubscriptionOptions initialized to the defaults
1021 * @return {SubscriptionOptions}
1022 */
1023Stan.prototype.subscriptionOptions = function() {
1024 return new SubscriptionOptions();
1025};
1026
1027/**
1028 * @param n
1029 * @returns {SubscriptionOptions}
1030 */
1031SubscriptionOptions.prototype.setMaxInFlight = function(n) {
1032 this.maxInFlight = n;
1033 return this;
1034};
1035
1036SubscriptionOptions.prototype.setAckWait = function(millis) {
1037 this.ackWait = millis;
1038 return this;
1039};
1040
1041SubscriptionOptions.prototype.setStartAt = function(startPosition) {
1042 this.startPosition = startPosition;
1043 return this;
1044};
1045
1046SubscriptionOptions.prototype.setStartAtSequence = function(sequence) {
1047 this.startPosition = proto.StartPosition.SEQUENCE_START;
1048 this.startSequence = sequence;
1049 return this;
1050};
1051
1052
1053/**
1054 * @param {Date} date
1055 * @return {SubscriptionOptions}
1056 */
1057SubscriptionOptions.prototype.setStartTime = function(date) {
1058 this.startPosition = proto.StartPosition.TIME_DELTA_START;
1059 // server expects values in ns
1060 this.startTime = (Date.now() - date.valueOf()) * 1000000;
1061 return this;
1062};
1063
1064/**
1065 * @param {Number} millis
1066 * @return {SubscriptionOptions}
1067 */
1068SubscriptionOptions.prototype.setStartAtTimeDelta = function(millis) {
1069 this.startPosition = proto.StartPosition.TIME_DELTA_START;
1070 //noinspection JSUnresolvedFunction
1071 // server expects values in ns
1072 this.startTime = millis * 1000000;
1073 return this;
1074};
1075
1076
1077/**
1078 * @return {SubscriptionOptions}
1079 */
1080SubscriptionOptions.prototype.setStartWithLastReceived = function() {
1081 this.startPosition = proto.StartPosition.LAST_RECEIVED;
1082 return this;
1083};
1084
1085
1086/**
1087 * @return {SubscriptionOptions}
1088 */
1089SubscriptionOptions.prototype.setDeliverAllAvailable = function() {
1090 this.startPosition = proto.StartPosition.FIRST;
1091 return this;
1092};
1093
1094
1095/**
1096 * @return {SubscriptionOptions}
1097 */
1098SubscriptionOptions.prototype.setManualAckMode = function(tf) {
1099 this.manualAcks = tf;
1100 return this;
1101};
1102
1103
1104/**
1105 * @param {String} durableName
1106 * @return {SubscriptionOptions}
1107 */
1108SubscriptionOptions.prototype.setDurableName = function(durableName) {
1109 this.durableName = durableName;
1110 return this;
1111};