1 | # NATS.js - A [NATS](http://nats.io) client for [Node.Js](https://nodejs.org/en/)
|
2 |
|
3 | A Node.js client for the [NATS messaging system](https://nats.io).
|
4 |
|
5 | [![License](https://img.shields.io/badge/Licence-Apache%202.0-blue.svg)](./LICENSE)
|
6 | ![NATS.js CI](https://github.com/nats-io/nats.js/workflows/NATS.js%20CI/badge.svg)
|
7 | [![npm](https://img.shields.io/npm/v/nats.svg)](https://www.npmjs.com/package/nats)
|
8 | [![npm](https://img.shields.io/npm/dt/nats.svg)](https://www.npmjs.com/package/nats)
|
9 | [![npm](https://img.shields.io/npm/dm/nats.svg)](https://www.npmjs.com/package/nats)
|
10 |
|
11 | # Installation
|
12 |
|
13 | ```bash
|
14 | npm install nats@latest
|
15 | ```
|
16 |
|
17 | The nats.js@2.0.0 **is not API compatible** with previous versions of
|
18 | nats.js. For a migration guide, please see [the migration guide](migration.md).
|
19 |
|
20 | ## Basics
|
21 |
|
22 | ### Connecting to a nats-server
|
23 |
|
24 | To connect to a server you use the `connect()` function. It returns a connection
|
25 | that you can use to interact with the server. You can customize the behavior of
|
26 | the client by specifying many [`ConnectionOptions`](#Connection-Options).
|
27 |
|
28 | By default, a connection will attempt a connection on`127.0.0.1:4222`. If the
|
29 | connection is dropped, the client will attempt to reconnect. You can customize
|
30 | the server you want to connect to by specifying `port` (for local connections),
|
31 | or full host port on the `servers` option. Note that the `servers` option can be
|
32 | a single hostport (a string) or an array of hostports.
|
33 |
|
34 | The example below will attempt to connect to different servers by specifying
|
35 | different `ConnectionOptions`. At least two of them should work if your internet
|
36 | is working.
|
37 |
|
38 | ```javascript
|
39 | const { connect } = require("nats");
|
40 | const servers = [
|
41 | {},
|
42 | { servers: ["demo.nats.io:4442", "demo.nats.io:4222"] },
|
43 | { servers: "demo.nats.io:4443" },
|
44 | { port: 4222 },
|
45 | { servers: "localhost" },
|
46 | ];
|
47 | await servers.forEach(async (v) => {
|
48 | try {
|
49 | const nc = await connect(v);
|
50 | console.log(`connected to ${nc.getServer()}`);
|
51 | // this promise indicates the client closed
|
52 | const done = nc.closed();
|
53 | // do something with the connection
|
54 |
|
55 | // close the connection
|
56 | await nc.close();
|
57 | // check if the close was OK
|
58 | const err = await done;
|
59 | if (err) {
|
60 | console.log(`error closing:`, err);
|
61 | }
|
62 | } catch (err) {
|
63 | console.log(`error connecting to ${JSON.stringify(v)}`);
|
64 | }
|
65 | });
|
66 | ```
|
67 |
|
68 | To disconnect from the nats-server, call `close()` on the connection. A
|
69 | connection can also be terminated when an unexpected error happens. For example,
|
70 | the server returns a run-time error. In those cases, the client will re-initiate
|
71 | a connection.
|
72 |
|
73 | By default, the client will always attempt to reconnect if the connection is
|
74 | closed for a reason other than calling `close()`. To get notified when the
|
75 | connection is closed for some reason, await the resolution of the Promise
|
76 | returned by `closed()`. If closed resolves to a value, the value is a
|
77 | `NatsError` indicating why the connection closed.
|
78 |
|
79 | ### Publish and Subscribe
|
80 |
|
81 | The basic client operations are `publish` to send messages and `subscribe` to
|
82 | receive messages.
|
83 |
|
84 | Messages are published to a subject. A subject is like a URL with the exception
|
85 | that it doesn't specify an actual endpoint. All recipients that have expressed
|
86 | interest in a subject will receive messages addressed to that subject (provided
|
87 | they have access and permissions to get it). To express interest in a subject,
|
88 | you create a `subscription`.
|
89 |
|
90 | In JavaScript clients (websocket, Deno, or Node) subscriptions work as an async
|
91 | iterator - clients simply loop to process messages as they become available.
|
92 |
|
93 | NATS messages are payload agnostic. Payloads are `Uint8Arrays`. You can easily
|
94 | convert to and from JSON or strings by using `JSONCodec` or `StringCodec`, or a
|
95 | custom `Codec`.
|
96 |
|
97 | To cancel a subscription and terminate your interest, you call `unsubscribe()`
|
98 | or `drain()` on a subscription. Unsubscribe will typically terminate regardless
|
99 | of whether there are messages in flight for the client. Drain ensures that all
|
100 | messages that are inflight are processed before canceling the subscription.
|
101 | Connections can also be drained as well. Draining a connection closes it, after
|
102 | all subscriptions have been drained and all outbound messages have been sent to
|
103 | the server.
|
104 |
|
105 | ```javascript
|
106 | const { connect, StringCodec } = require("nats");
|
107 |
|
108 | // to create a connection to a nats-server:
|
109 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
110 |
|
111 | // create a codec
|
112 | const sc = StringCodec();
|
113 | // create a simple subscriber and iterate over messages
|
114 | // matching the subscription
|
115 | const sub = nc.subscribe("hello");
|
116 | (async () => {
|
117 | for await (const m of sub) {
|
118 | console.log(`[${sub.getProcessed()}]: ${sc.decode(m.data)}`);
|
119 | }
|
120 | console.log("subscription closed");
|
121 | })();
|
122 |
|
123 | nc.publish("hello", sc.encode("world"));
|
124 | nc.publish("hello", sc.encode("again"));
|
125 |
|
126 | // we want to insure that messages that are in flight
|
127 | // get processed, so we are going to drain the
|
128 | // connection. Drain is the same as close, but makes
|
129 | // sure that all messages in flight get seen
|
130 | // by the iterator. After calling drain on the connection
|
131 | // the connection closes.
|
132 | await nc.drain();
|
133 | ```
|
134 |
|
135 | ### Wildcard Subscriptions
|
136 |
|
137 | Subjects can be used to organize messages into hierarchies. For example, a
|
138 | subject may contain additional information that can be useful in providing a
|
139 | context to the message, such as the ID of the client that sent the message, or
|
140 | the region where a message originated.
|
141 |
|
142 | Instead of subscribing to each specific subject, you can create subscriptions
|
143 | that have subjects with wildcards. Wildcards match one or more tokens in a
|
144 | subject. A token is a string following a period.
|
145 |
|
146 | All subscriptions are independent. If two different subscriptions match a
|
147 | subject, both will get to process the message:
|
148 |
|
149 | ```javascript
|
150 | const { connect, StringCodec } = require("nats");
|
151 |
|
152 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
153 | const sc = StringCodec();
|
154 |
|
155 | // subscriptions can have wildcard subjects
|
156 | // the '*' matches any string in the specified token position
|
157 | const s1 = nc.subscribe("help.*.system");
|
158 | const s2 = nc.subscribe("help.me.*");
|
159 | // the '>' matches any tokens in that position or following
|
160 | // '>' can only be specified at the end of the subject
|
161 | const s3 = nc.subscribe("help.>");
|
162 |
|
163 | async function printMsgs(s) {
|
164 | let subj = s.getSubject();
|
165 | console.log(`listening for ${subj}`);
|
166 | const c = (13 - subj.length);
|
167 | const pad = "".padEnd(c);
|
168 | for await (const m of s) {
|
169 | console.log(
|
170 | `[${subj}]${pad} #${s.getProcessed()} - ${m.subject} ${
|
171 | m.data ? " " + sc.decode(m.data) : ""
|
172 | }`,
|
173 | );
|
174 | }
|
175 | }
|
176 |
|
177 | printMsgs(s1);
|
178 | printMsgs(s2);
|
179 | printMsgs(s3);
|
180 |
|
181 | // don't exit until the client closes
|
182 | await nc.closed();
|
183 | ```
|
184 |
|
185 | ### Services: Request/Reply
|
186 |
|
187 | Request/Reply is NATS equivalent to an HTTP request. To make requests you
|
188 | publish messages as you did before, but also specify a `reply` subject. The
|
189 | `reply` subject is where a service will publish your response.
|
190 |
|
191 | NATS provides syntactic sugar, for publishing requests. The `request()` API will
|
192 | generate a reply subject and manage the creation of a subscription under the
|
193 | covers. It will also start a timer to ensure that if a response is not received
|
194 | within your allotted time, the request fails. The example also illustrates a
|
195 | graceful shutdown.
|
196 |
|
197 | #### Services
|
198 |
|
199 | Here's an example of a service. It is a bit more complicated than expected
|
200 | simply to illustrate not only how to create responses, but how the subject
|
201 | itself is used to dispatch different behaviors.
|
202 |
|
203 | ```javascript
|
204 | const { connect, StringCodec, Subscription } = require("nats");
|
205 |
|
206 | // create a connection
|
207 | const nc = await connect({ servers: "demo.nats.io" });
|
208 |
|
209 | // create a codec
|
210 | const sc = StringCodec();
|
211 |
|
212 | // this subscription listens for `time` requests and returns the current time
|
213 | const sub = nc.subscribe("time");
|
214 | (async (sub: Subscription) => {
|
215 | console.log(`listening for ${sub.getSubject()} requests...`);
|
216 | for await (const m of sub) {
|
217 | if (m.respond(sc.encode(new Date().toISOString()))) {
|
218 | console.info(`[time] handled #${sub.getProcessed()}`);
|
219 | } else {
|
220 | console.log(`[time] #${sub.getProcessed()} ignored - no reply subject`);
|
221 | }
|
222 | }
|
223 | console.log(`subscription ${sub.getSubject()} drained.`);
|
224 | })(sub);
|
225 |
|
226 | // this subscription listens for admin.uptime and admin.stop
|
227 | // requests to admin.uptime returns how long the service has been running
|
228 | // requests to admin.stop gracefully stop the client by draining
|
229 | // the connection
|
230 | const started = Date.now();
|
231 | const msub = nc.subscribe("admin.*");
|
232 | (async (sub) => {
|
233 | console.log(`listening for ${sub.getSubject()} requests [uptime | stop]`);
|
234 | // it would be very good to verify the origin of the request
|
235 | // before implementing something that allows your service to be managed.
|
236 | // NATS can limit which client can send or receive on what subjects.
|
237 | for await (const m of sub) {
|
238 | const chunks = m.subject.split(".");
|
239 | console.info(`[admin] #${sub.getProcessed()} handling ${chunks[1]}`);
|
240 | switch (chunks[1]) {
|
241 | case "uptime":
|
242 | // send the number of millis since up
|
243 | m.respond(sc.encode(`${Date.now() - started}`));
|
244 | break;
|
245 | case "stop": {
|
246 | m.respond(sc.encode(`[admin] #${sub.getProcessed()} stopping....`));
|
247 | // gracefully shutdown
|
248 | nc.drain()
|
249 | .catch((err) => {
|
250 | console.log("error draining", err);
|
251 | });
|
252 | break;
|
253 | }
|
254 | default:
|
255 | console.log(
|
256 | `[admin] #${sub.getProcessed()} ignoring request for ${m.subject}`,
|
257 | );
|
258 | }
|
259 | }
|
260 | console.log(`subscription ${sub.getSubject()} drained.`);
|
261 | })(msub);
|
262 |
|
263 | // wait for the client to close here.
|
264 | await nc.closed().then((err) => {
|
265 | let m = `connection to ${nc.getServer()} closed`;
|
266 | if (err) {
|
267 | m = `${m} with an error: ${err.message}`;
|
268 | }
|
269 | console.log(m);
|
270 | });
|
271 | ```
|
272 |
|
273 | #### Making Requests
|
274 |
|
275 | Here's a simple example of a client making a simple request from the service
|
276 | above:
|
277 |
|
278 | ```javascript
|
279 | const { connect, StringCodec } = require("nats");
|
280 |
|
281 | // create a connection
|
282 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
283 |
|
284 | // create an encoder
|
285 | const sc = StringCodec();
|
286 |
|
287 | // the client makes a request and receives a promise for a message
|
288 | // by default the request times out after 1s (1000 millis) and has
|
289 | // no payload.
|
290 | await nc.request("time", Empty, { timeout: 1000 })
|
291 | .then((m) => {
|
292 | console.log(`got response: ${sc.decode(m.data)}`);
|
293 | })
|
294 | .catch((err) => {
|
295 | console.log(`problem with request: ${err.message}`);
|
296 | });
|
297 |
|
298 | await nc.close();
|
299 | ```
|
300 |
|
301 | ### Queue Groups
|
302 |
|
303 | Queue groups allow scaling of services horizontally. Subscriptions for members
|
304 | of a queue group are treated as a single service. When you send a message to a
|
305 | queue group subscription, only a single client in a queue group will receive it.
|
306 |
|
307 | There can be any number of queue groups. Each group is treated as its own
|
308 | independent unit. Note that non-queue subscriptions are also independent of
|
309 | subscriptions in a queue group.
|
310 |
|
311 | ```javascript
|
312 | const {
|
313 | connect,
|
314 | NatsConnection,
|
315 | StringCodec,
|
316 | } = require("nats");
|
317 |
|
318 | async function createService(
|
319 | name,
|
320 | count = 1,
|
321 | queue = ""
|
322 | ): Promise {
|
323 | const conns = [];
|
324 | for (let i = 1; i <= count; i++) {
|
325 | const n = queue ? `${name}-${i}` : name;
|
326 | const nc = await connect(
|
327 | { servers: "demo.nats.io:4222", name: `${n}` },
|
328 | );
|
329 | nc.closed()
|
330 | .then((err) => {
|
331 | if (err) {
|
332 | console.error(
|
333 | `service ${n} exited because of error: ${err.message}`,
|
334 | );
|
335 | }
|
336 | });
|
337 | // create a subscription - note the option for a queue, if set
|
338 | // any client with the same queue will be a member of the group.
|
339 | const sub = nc.subscribe("echo", { queue: queue });
|
340 | const _ = handleRequest(n, sub);
|
341 | console.log(`${nc.options.name} is listening for 'echo' requests...`);
|
342 | conns.push(nc);
|
343 | }
|
344 | return conns;
|
345 | }
|
346 |
|
347 | const sc = StringCodec();
|
348 |
|
349 | // simple handler for service requests
|
350 | async function handleRequest(name, s) {
|
351 | const p = 12 - name.length;
|
352 | const pad = "".padEnd(p);
|
353 | for await (const m of s) {
|
354 | // respond returns true if the message had a reply subject, thus it could respond
|
355 | if (m.respond(m.data)) {
|
356 | console.log(
|
357 | `[${name}]:${pad} #${s.getProcessed()} echoed ${sc.decode(m.data)}`,
|
358 | );
|
359 | } else {
|
360 | console.log(
|
361 | `[${name}]:${pad} #${s.getProcessed()} ignoring request - no reply subject`,
|
362 | );
|
363 | }
|
364 | }
|
365 | }
|
366 |
|
367 | // let's create two queue groups and a standalone subscriber
|
368 | const conns = [];
|
369 | conns.push(...await createService("echo", 3, "echo"));
|
370 | conns.push(...await createService("other-echo", 2, "other-echo"));
|
371 | conns.push(...await createService("standalone"));
|
372 |
|
373 | const a: Promise<void | Error>[] = [];
|
374 | conns.forEach((c) => {
|
375 | a.push(c.closed());
|
376 | });
|
377 | await Promise.all(a);
|
378 | ```
|
379 |
|
380 | Run it and publish a request to the subject `echo` to see what happens.
|
381 |
|
382 | ## Advanced Usage
|
383 |
|
384 | ### Headers
|
385 |
|
386 | NATS headers are similar to HTTP headers. Headers are enabled automatically if
|
387 | the server supports them. Note that if you publish a message using headers, and
|
388 | the server doesn't support them, an Error is thrown. Also note that even if you
|
389 | are publishing a message with a header, it is possible for the recipient to not
|
390 | support them.
|
391 |
|
392 | ```javascript
|
393 | const { connect, createInbox, Empty, headers } = require("nats");
|
394 |
|
395 | const nc = await connect(
|
396 | {
|
397 | servers: `demo.nats.io`,
|
398 | },
|
399 | );
|
400 |
|
401 | const subj = createInbox();
|
402 | const sub = nc.subscribe(subj);
|
403 | (async () => {
|
404 | for await (const m of sub) {
|
405 | if (m.headers) {
|
406 | for (const [key, value] of m.headers) {
|
407 | console.log(`${key}=${value}`);
|
408 | }
|
409 | // reading/setting a header is not case sensitive
|
410 | console.log("id", m.headers.get("id"));
|
411 | }
|
412 | }
|
413 | })().then();
|
414 |
|
415 | // headers always have their names turned into a canonical mime header key
|
416 | // header names can be any printable ASCII character with the exception of `:`.
|
417 | // header values can be any ASCII character except `\r` or `\n`.
|
418 | // see https://www.ietf.org/rfc/rfc822.txt
|
419 | const h = headers();
|
420 | h.append("id", "123456");
|
421 | h.append("unix_time", Date.now().toString());
|
422 | nc.publish(subj, Empty, { headers: h });
|
423 |
|
424 | await nc.flush();
|
425 | await nc.close();
|
426 | ```
|
427 |
|
428 | ### No Responders
|
429 |
|
430 | Requests can fail for many reasons. A common reason for a failure is the lack of
|
431 | interest in the subject. Typically, these surface as a timeout error. If the
|
432 | server is enabled to use headers, it will also enable a `no responders` feature.
|
433 | If you send a request for which there's no interest, the request will be
|
434 | immediately rejected:
|
435 |
|
436 | ```javascript
|
437 | const nc = await connect({
|
438 | servers: `demo.nats.io`,
|
439 | });
|
440 |
|
441 | try {
|
442 | const m = await nc.request("hello.world");
|
443 | console.log(m.data);
|
444 | } catch (err) {
|
445 | switch (err.code) {
|
446 | case ErrorCode.NoResponders:
|
447 | console.log("no one is listening to 'hello.world'");
|
448 | break;
|
449 | case ErrorCode.Timeout:
|
450 | console.log("someone is listening but didn't respond");
|
451 | break;
|
452 | default:
|
453 | console.log("request failed", err);
|
454 | }
|
455 | }
|
456 |
|
457 | await nc.close();
|
458 | ```
|
459 |
|
460 | ### Authentication
|
461 |
|
462 | NATS supports many different forms of authentication:
|
463 |
|
464 | - username/password
|
465 | - token
|
466 | - NKEYS
|
467 | - client certificates
|
468 | - JWTs
|
469 |
|
470 | For user/password and token authentication, you can simply provide them as
|
471 | `ConnectionOptions` - see `user`, `pass`, `token`. Internally these mechanisms
|
472 | are implemented as an `Authenticator`. An `Authenticator` is simply a function
|
473 | that handles the type of authentication specified.
|
474 |
|
475 | Setting the `user`/`pass` or `token` options, simply initializes an
|
476 | `Authenticator` and sets the username/password.
|
477 |
|
478 | ```typescript
|
479 | // if the connection requires authentication, provide `user` and `pass` or
|
480 | // `token` options in the NatsConnectionOptions
|
481 | const { connect } = require("nats");
|
482 |
|
483 | const nc1 = await connect({
|
484 | servers: "127.0.0.1:4222",
|
485 | user: "jenny",
|
486 | pass: "867-5309",
|
487 | });
|
488 | const nc2 = await connect({ port: 4222, token: "t0pS3cret!" });
|
489 | ```
|
490 |
|
491 | #### Authenticators
|
492 |
|
493 | NKEYs and JWT authentication are more complex, as they cryptographically respond
|
494 | to a server challenge.
|
495 |
|
496 | Because NKEY and JWT authentication may require reading data from a file or an
|
497 | HTTP cookie, these forms of authentication will require a bit more from the
|
498 | developer to activate them. However, the work is related to accessing these
|
499 | resources varies depending on the platform.
|
500 |
|
501 | After the credential artifacts are read, you can use one of these functions to
|
502 | create the authenticator. You then simply assign it to the `authenticator`
|
503 | property of the `ConnectionOptions`:
|
504 |
|
505 | - `nkeyAuthenticator(seed?: Uint8Array | (() => Uint8Array)): Authenticator`
|
506 | - `jwtAuthenticator(jwt: string | (() => string), seed?: Uint8Array | (()=> Uint8Array)): Authenticator`
|
507 | - `credsAuthenticator(creds: Uint8Array): Authenticator`
|
508 |
|
509 | The first two options provide the ability to specify functions that return the
|
510 | desired value. This enables dynamic environments such as a browser where values
|
511 | accessed by fetching a value from a cookie.
|
512 |
|
513 | Here's an example:
|
514 |
|
515 | ```javascript
|
516 | // read the creds file as necessary, in the case it
|
517 | // is part of the code for illustration purposes
|
518 | const creds = `-----BEGIN NATS USER JWT-----
|
519 | eyJ0eXAiOiJqdSDJB....
|
520 | ------END NATS USER JWT------
|
521 |
|
522 | ************************* IMPORTANT *************************
|
523 | NKEY Seed printed below can be used sign and prove identity.
|
524 | NKEYs are sensitive and should be treated as secrets.
|
525 |
|
526 | -----BEGIN USER NKEY SEED-----
|
527 | SUAIBDPBAUTW....
|
528 | ------END USER NKEY SEED------
|
529 | `;
|
530 |
|
531 | const nc = await connect(
|
532 | {
|
533 | port: 4222,
|
534 | authenticator: credsAuthenticator(new TextEncoder().encode(creds)),
|
535 | },
|
536 | );
|
537 | ```
|
538 |
|
539 | The node client supports the ability to verify the tls connection if client
|
540 | certificates are specified as ConnectionOptions:
|
541 |
|
542 | ```javascript
|
543 | tlsOptions = {
|
544 | keyFile: "./test/certs/client-key.pem",
|
545 | certFile: "./test/certs/client-cert.pem",
|
546 | caFile: "./test/certs/ca.pem"
|
547 | };
|
548 | nc = await connect({ tls: tlsOptions });
|
549 | ```
|
550 |
|
551 | ### Flush
|
552 |
|
553 | Flush sends a PING to the server. When the server responds with PONG you are
|
554 | guaranteed that all pending data was sent and received by the server. Note
|
555 | `ping()` effectively adds a server round-trip. All NATS clients handle their
|
556 | buffering optimally, so `ping(): Promise<void>` shouldn't be used except in
|
557 | cases where you are writing some sort of test.
|
558 |
|
559 | ```javascript
|
560 | nc.publish("foo");
|
561 | nc.publish("bar");
|
562 | await nc.flush();
|
563 | ```
|
564 |
|
565 | ### `PublishOptions`
|
566 |
|
567 | When you publish a message you can specify some options:
|
568 |
|
569 | - `reply` - this is a subject to receive a reply (you must set up a subscription)
|
570 | before you publish.
|
571 | - `headers` - a set of headers to decorate the message.
|
572 |
|
573 | ### `SubscriptionOptions`
|
574 |
|
575 | You can specify several options when creating a subscription:
|
576 |
|
577 | - `max`: maximum number of messages to receive - auto unsubscribe
|
578 | - `timeout`: how long to wait for the first message
|
579 | - `queue`: the [queue group](#Queue-Groups) name the subscriber belongs to
|
580 | - `callback`: a function with the signature
|
581 | `(err: NatsError|null, msg: Msg) => void;` that should be used for handling
|
582 | the message. Subscriptions with callbacks are NOT iterators.
|
583 |
|
584 | #### Auto Unsubscribe
|
585 |
|
586 | ```javascript
|
587 | // subscriptions can auto unsubscribe after a certain number of messages
|
588 | nc.subscribe("foo", { max: 10 });
|
589 | ```
|
590 |
|
591 | #### Timeout Subscriptions
|
592 |
|
593 | ```javascript
|
594 | // create subscription with a timeout, if no message arrives
|
595 | // within the timeout, the function running the iterator with
|
596 | // reject - depending on how you code it, you may need a
|
597 | // try/catch block.
|
598 | const sub = nc.subscribe("hello", { timeout: 1000 });
|
599 | (async () => {
|
600 | for await (const m of sub) {
|
601 | }
|
602 | })().catch((err) => {
|
603 | if (err.code === ErrorCode.Timeout) {
|
604 | console.log(`sub timed out!`);
|
605 | } else {
|
606 | console.log(`sub iterator got an error!`);
|
607 | }
|
608 | });
|
609 | ```
|
610 |
|
611 | ### `RequestOptions`
|
612 |
|
613 | When making a request, there are several options you can pass:
|
614 |
|
615 | - `timeout`: how long to wait for the response
|
616 | - `headers`: optional headers to include with the message
|
617 | - `noMux`: create a new subscription to handle the request. Normally a shared
|
618 | subscription is used to receive response messages.
|
619 | - `reply`: optional subject where the reply should be sent.
|
620 |
|
621 | #### `noMux` and `reply`
|
622 |
|
623 | Under the hood, the request API simply uses a wildcard subscription to handle
|
624 | all requests you send.
|
625 |
|
626 | In some cases, the default subscription strategy doesn't work correctly. For
|
627 | example, a client may be constrained by the subjects where it can receive
|
628 | replies.
|
629 |
|
630 | When `noMux` is set to `true`, the client will create a normal subscription for
|
631 | receiving the response to a generated inbox subject before the request is
|
632 | published. The `reply` option can be used to override the generated inbox
|
633 | subject with an application provided one. Note that setting `reply` requires
|
634 | `noMux` to be `true`:
|
635 |
|
636 | ```typescript
|
637 | const m = await nc.request(
|
638 | "q",
|
639 | Empty,
|
640 | { reply: "bar", noMux: true, timeout: 1000 },
|
641 | );
|
642 | ```
|
643 |
|
644 | ### Draining Connections and Subscriptions
|
645 |
|
646 | Draining provides for a graceful way to unsubscribe or close a connection
|
647 | without losing messages that have already been dispatched to the client.
|
648 |
|
649 | You can drain a subscription or all subscriptions in a connection.
|
650 |
|
651 | When you drain a subscription, the client sends an `unsubscribe` protocol
|
652 | message to the server followed by a `flush`. The subscription handler is only
|
653 | removed after the server responds. Thus, all pending messages for the
|
654 | subscription have been processed.
|
655 |
|
656 | Draining a connection, drains all subscriptions. However, when you drain the
|
657 | connection it becomes impossible to make new subscriptions or send new requests.
|
658 | After the last subscription is drained, it also becomes impossible to publish a
|
659 | message. These restrictions do not exist when just draining a subscription.
|
660 |
|
661 | ### Lifecycle/Informational Events
|
662 |
|
663 | Clients can get notification on various event types:
|
664 |
|
665 | - `Events.Disconnect`
|
666 | - `Events.Reconnect`
|
667 | - `Events.Update`
|
668 | - `Events.LDM`
|
669 | - `Events.Error`
|
670 |
|
671 | The first two fire when a client disconnects and reconnects respectively. The
|
672 | payload will be the server where the event took place.
|
673 |
|
674 | The `UPDATE` event notifies whenever the client receives a cluster configuration
|
675 | update. The `ServersChanged` interface provides two arrays: `added` and
|
676 | `deleted` listing the servers that were added or removed.
|
677 |
|
678 | The `LDM` event notifies that the current server has signaled that it is running
|
679 | in _Lame Duck Mode_ and will evict clients. Depending on the server
|
680 | configuration policy, the client may want to initiate an ordered shutdown, and
|
681 | initiate a new connection to a different server in the cluster.
|
682 |
|
683 | The `ERROR` event notifies you of async errors that couldn't be routed in a more
|
684 | precise way to your client. For example, permission errors for a subscription or
|
685 | request, will properly be reported by the subscription or request. However,
|
686 | permission errors on publish will be reported via the status mechanism.
|
687 |
|
688 | ```javascript
|
689 | const nc = await connect();
|
690 | (async () => {
|
691 | console.info(`connected ${nc.getServer()}`);
|
692 | for await (const s of nc.status()) {
|
693 | console.info(`${s.type}: ${s.data}`);
|
694 | }
|
695 | })().then();
|
696 |
|
697 | nc.closed()
|
698 | .then((err) => {
|
699 | console.log(
|
700 | `connection closed ${err ? " with error: " + err.message : ""}`,
|
701 | );
|
702 | });
|
703 | ```
|
704 |
|
705 | Be aware that when a client closes, you will need to wait for the `closed()`
|
706 | promise to resolve. When it resolves, the client is done and will not reconnect.
|
707 |
|
708 | ### Async vs. Callbacks
|
709 |
|
710 | Previous versions of the JavaScript NATS clients specified callbacks for message
|
711 | processing. This required complex handling logic when a service required
|
712 | coordination of operations. Callbacks are an inversion of control anti-pattern.
|
713 |
|
714 | The async APIs trivialize complex coordination and makes your code easier to
|
715 | maintain. With that said, there are some implications:
|
716 |
|
717 | - Async subscriptions buffer inbound messages.
|
718 | - Subscription processing delays until the runtime executes the promise related
|
719 | microtasks at the end of an event loop.
|
720 |
|
721 | In a traditional callback-based library, I/O happens after all data yielded by a
|
722 | read in the current event loop completes processing. This means that callbacks
|
723 | are invoked as part of processing. With async, the processing is queued in a
|
724 | microtask queue. At the end of the event loop, the runtime processes the
|
725 | microtasks, which in turn resumes your functions. As expected, this increases
|
726 | latency, but also provides additional liveliness.
|
727 |
|
728 | To reduce async latency, the NATS client allows processing a subscription in the
|
729 | same event loop that dispatched the message. Simply specify a `callback` in the
|
730 | subscription options. The signature for a callback is
|
731 | `(err: (NatsError|null), msg: Msg) => void`. When specified, the subscription
|
732 | iterator will never yield a message, as the callback will intercept all
|
733 | messages.
|
734 |
|
735 | Note that `callback` likely shouldn't even be documented, as likely it is a
|
736 | workaround to an underlying application problem where you should be considering
|
737 | a different strategy to horizontally scale your application, or reduce pressure
|
738 | on the clients, such as using queue workers, or more explicitly targeting
|
739 | messages. With that said, there are many situations where using callbacks can be
|
740 | more performant or appropriate.
|
741 |
|
742 | ## Connection Options
|
743 |
|
744 | The following is the list of connection options and default values.
|
745 |
|
746 | | Option | Default | Description |
|
747 | | ----------------------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
748 | | `authenticator` | none | Specifies the authenticator function that sets the client credentials. |
|
749 | | `debug` | `false` | If `true`, the client prints protocol interactions to the console. Useful for debugging. |
|
750 | | `ignoreClusterUpdates` | `false` | If `true` the client will ignore any cluster updates provided by the server. |
|
751 | | `inboxPrefix` | `"_INBOX"` | Sets de prefix for automatically created inboxes - `createInbox(prefix)` |
|
752 | | `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error. |
|
753 | | `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit. |
|
754 | | `name` | | Optional client name - recommended to be set to a unique client name. |
|
755 | | `noEcho` | `false` | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code `NO_ECHO_NOT_SUPPORTED` is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature. |
|
756 | | `noRandomize` | `false` | If set, the order of user-specified servers is randomized. |
|
757 | | `pass` | | Sets the password for a connection. |
|
758 | | `pedantic` | `false` | Turns on strict subject format checks. |
|
759 | | `pingInterval` | `120000` | Number of milliseconds between client-sent pings. |
|
760 | | `port` | `4222` | Port to connect to (only used if `servers` is not specified). |
|
761 | | `reconnect` | `true` | If false, client will not attempt reconnecting. |
|
762 | | `reconnectDelayHandler` | Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to `()=>number`. |
|
763 | | `reconnectJitter` | `100` | Number of millis to randomize after `reconnectTimeWait`. |
|
764 | | `reconnectJitterTLS` | `1000` | Number of millis to randomize after `reconnectTimeWait` when TLS options are specified. |
|
765 | | `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts. |
|
766 | | `servers` | `"localhost:4222"` | String or Array of hostport for servers. |
|
767 | | `timeout` | 20000 | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted. |
|
768 | | `tls` | TlsOptions | A configuration object for requiring a TLS connection (not applicable to nats.ws). |
|
769 | | `token` | | Sets a authorization token for a connection. |
|
770 | | `user` | | Sets the username for a connection. |
|
771 | | `verbose` | `false` | Turns on `+OK` protocol acknowledgements. |
|
772 | | `waitOnFirstConnect` | `false` | If `true` the client will fall back to a reconnect mode if it fails its first connection attempt. |
|
773 |
|
774 | ### TlsOptions
|
775 |
|
776 | | Option | Default | Description |
|
777 | | ---------- | ------- | ---------------------------- |
|
778 | | `caFile` | | CA certificate filepath |
|
779 | | `ca` | | CA certificate |
|
780 | | `certFile` | | Client certificate file path |
|
781 | | `cert` | | Client certificate |
|
782 | | `keyFile` | | Client key file path |
|
783 | | `key` | | Client key |
|
784 |
|
785 | In some Node and Deno clients, having the option set to an empty option,
|
786 | requires the client have a secured connection.
|
787 |
|
788 | ### Jitter
|
789 |
|
790 | The settings `reconnectTimeWait`, `reconnectJitter`, `reconnectJitterTLS`,
|
791 | `reconnectDelayHandler` are all related. They control how long before the NATS
|
792 | client attempts to reconnect to a server it has previously connected.
|
793 |
|
794 | The intention of the settings is to spread out the number of clients attempting
|
795 | to reconnect to a server over a period of time, and thus preventing a
|
796 | ["Thundering Herd"](https://docs.nats.io/developing-with-nats/reconnect/random).
|
797 |
|
798 | The relationship between these are:
|
799 |
|
800 | - If `reconnectDelayHandler` is specified, the client will wait the value
|
801 | returned by this function. No other value will be taken into account.
|
802 | - If the client specified TLS options, the client will generate a number between
|
803 | 0 and `reconnectJitterTLS` and add it to `reconnectTimeWait`.
|
804 | - If the client didn't specify TLS options, the client will generate a number
|
805 | between 0 and `reconnectJitter` and add it to `reconnectTimeWait`.
|
806 |
|
807 | ## JetStream
|
808 |
|
809 | [Support for JetStream is built-in](https://github.com/nats-io/nats.deno/blob/main/jetstream.md).
|
810 | However, the JetStream API extensions are still in beta. Feel free to use them.
|
811 | The client will emit a console message when either `nc.jetstream()` or
|
812 | `nc.jetstreamManager()` apis are used to remind you they are in beta.
|
813 |
|
814 | ## Contributing
|
815 |
|
816 | The library shares client functionality with
|
817 | [NATS.deno](https://github.com/nats-io/nats.deno). This means that both the
|
818 | NATS.deno and NATS.js use the same exact code base, only differing on the
|
819 | implementation of the `Transport`. This strategy greatly reduces the amount of
|
820 | work required to develop and maintain the clients, as well as provide a
|
821 | completely compatible API across all clients.
|
822 |
|
823 | Currently, the base client implementation is the deno implementation. You can
|
824 | take a look at it
|
825 | [here](https://github.com/nats-io/nats.deno/tree/main/nats-base-client).
|
826 |
|
827 | ## Supported Node Versions
|
828 |
|
829 | Our support policy for Nodejs versions follows
|
830 | [Nodejs release support](https://github.com/nodejs/Release). We will support and
|
831 | build node-nats on even-numbered Nodejs versions that are current or in LTS.
|