1 | # NATS - Websocket Javascript Client for the Browser
|
2 |
|
3 |
|
4 | A websocket client for the [NATS messaging system](https://nats.io).
|
5 |
|
6 | [![License](https://img.shields.io/badge/Licence-Apache%202.0-blue.svg)](LICENSE)
|
7 | [![npm](https://img.shields.io/npm/dt/nats.ws.svg)](https://www.npmjs.com/package/nats.ws)
|
8 | [![npm](https://img.shields.io/npm/dm/nats.ws.svg)](https://www.npmjs.com/package/nats.ws)
|
9 |
|
10 |
|
11 | ## Installation
|
12 |
|
13 | >** NATS.ws is a release candidate** you can get the current candidate by:
|
14 |
|
15 | ```bash
|
16 | npm install nats.ws
|
17 | ```
|
18 |
|
19 | Getting started with NATS.ws requires a little preparation:
|
20 |
|
21 | - A recent NATS server that supports WebSockets
|
22 | - An HTTP server to serve HTML and the nats.ws library
|
23 |
|
24 | To make it easy, the nats.ws github repository aids you with this setup. If you are on Windows, you'll need to look at the package.json for hints on what to do. Better yet, contribute an alternate package.json.
|
25 |
|
26 | Here are the steps:
|
27 |
|
28 |
|
29 | ```bash
|
30 | # clone the nats.ws repository:
|
31 | git clone https://github.com/nats-io/nats.ws.git
|
32 |
|
33 | # install [deno](https://deno.land):
|
34 | npm run setup
|
35 |
|
36 | # build the library
|
37 | npm run build
|
38 |
|
39 | # install the master of nats-server, if you have
|
40 | # [Go](https://golang.org/doc/install) installed,
|
41 | # you can easily clone and build the latest from
|
42 | # master:
|
43 | npm run install-ns
|
44 |
|
45 | # start the built nats-server:
|
46 | npm run start-nats
|
47 |
|
48 | # start an http server to serve the content in
|
49 | # the examples directory:
|
50 | npm run start-http
|
51 |
|
52 | # point your browser to: http://localhost:4507/examples
|
53 | # click on one of the HTML files
|
54 |
|
55 | ```
|
56 |
|
57 | ## Documentation
|
58 |
|
59 | NATS.ws shares all client API and examples with
|
60 | [nats.deno repo](https://github.com/nats-io/nats.deno)
|
61 |
|
62 |
|
63 | ## Importing the Module
|
64 | nats.ws is an async nats client. The model a standard ES Module. Copy the nats.mjs
|
65 | module from node_modules (if you didn't build it yourself), and place it where you
|
66 | can reference it from your code:
|
67 |
|
68 | ```html
|
69 | <script type="module">
|
70 | // load the library
|
71 | import { connect } from './nats.mjs'
|
72 | // do something with it...
|
73 | </script>
|
74 | ```
|
75 |
|
76 | ## Connection Options Specific to nats.ws
|
77 |
|
78 | By default, the nats-server will serve WSS connections **only**.
|
79 |
|
80 | The `nats-server` gossips cluster configuration to clients. Cluster
|
81 | configuration however is disseminated as `host:port`. With websockets,
|
82 | a connection is made using an URL which means that the protocol specifies
|
83 | whether the connection is encrypted or not. By default,
|
84 | the nats.ws client assumes any specified `host:port` is available
|
85 | via `wss://host:port`. If this is not the case, you will need to specify
|
86 | the protocol as part of the server hostport information.
|
87 |
|
88 | Likewise, if your cluster security not uniform (mixes `ws://` and `wss://`),
|
89 | you'll need to disable server advertising or on the client specify
|
90 | the `ignoreServerUpdates` connection option. Of course in this case
|
91 | you are responsible for providing all the necessary URLs for the cluster if
|
92 | you want fail over to work.
|
93 |
|
94 |
|
95 | ```javascript
|
96 |
|
97 | const conn = await connect(
|
98 | { servers: ["ws://localhost:9222", "wss://localhost:2229", "localhost:9111"] },
|
99 | );
|
100 |
|
101 | ```
|
102 |
|
103 | In the above example, the first two URLs connect as per their protocol specifications.
|
104 | The third server connects using `wss://` as that is the default protocol.
|
105 |
|
106 | If you are accessing the websocket connection via a proxy, likely the
|
107 | `ignoreServerUpdates` should be specified to avoid learning about servers that
|
108 | are not directly accessible to the clients.
|
109 |
|
110 | ## Basics
|
111 |
|
112 |
|
113 | ### Connecting to a nats-server
|
114 |
|
115 | To connect to a server you use the `connect()` function. It returns
|
116 | a connection that you can use to interact with the server. You can customize the behavior of the client by specifying many [`ConnectionOptions`](#Connection_Options).
|
117 |
|
118 | By default, a connection will attempt a connection on `127.0.0.1:4222`. If the connection is dropped, the client will attempt to reconnect. You can customize the server you want to connect to by specifying `port` (for local connections), or full host port on the `servers` option. Note that the `servers` option can be a single hostport (a string) or an array of hostports.
|
119 |
|
120 | The example below will attempt to connect to different servers by specifying different `ConnectionOptions`. At least two of them should work if your internet is working.
|
121 |
|
122 | ```javascript
|
123 | import { connect } from './nats.mjs'
|
124 | const servers = [
|
125 | {},
|
126 | { servers: ["demo.nats.io:4442", "demo.nats.io:4222"] },
|
127 | { servers: "demo.nats.io:4443" },
|
128 | { port: 4222 },
|
129 | { servers: "localhost" },
|
130 | ];
|
131 | await servers.forEach(async (v) => {
|
132 | try {
|
133 | const nc = await connect(v);
|
134 | console.log(`connected to ${nc.getServer()}`);
|
135 | // this promise indicates the client closed
|
136 | const done = nc.closed();
|
137 | // do something with the connection
|
138 |
|
139 | // close the connection
|
140 | await nc.close();
|
141 | // check if the close was OK
|
142 | const err = await done;
|
143 | if (err) {
|
144 | console.log(`error closing:`, err);
|
145 | }
|
146 | } catch (err) {
|
147 | console.log(`error connecting to ${JSON.stringify(v)}`);
|
148 | }
|
149 | });
|
150 | ```
|
151 |
|
152 | To disconnect from the nats-server, call `close()` on the connection. A connection can be terminated by an unexpected error.
|
153 | For example, the server returns a runtime error. In those cases,
|
154 | the client will re-initiate a connection.
|
155 |
|
156 | By default, the client will always attempt to reconnect if the connection is closed for a reason other than calling `close()`. To get notified when the connection is closed for some reason, await the resolution of the Promise returned by `closed()`. If closed resolves to a value, the value is a `NatsError`
|
157 | indicating why the connection closed.
|
158 |
|
159 |
|
160 | ### Publish and Subscribe
|
161 |
|
162 | The basic client operations are `publish` to send messages and
|
163 | `subscribe` to receive messages.
|
164 |
|
165 | Messages are published to a subject. A subject is like an URL with the exception that it doesn't specify an actual endpoint. All recipients that have expressed interest in a subject will receive messages addressed to that subject (provided they have access and permissions to get it). To express interest in a subject, you create a `subscription`.
|
166 |
|
167 | In JavaScript clients (websocket, Deno, or Node) subscriptions work as an async iterator - clients simply loop to process messages as they become available.
|
168 |
|
169 | NATS messages are payload agnostic. Payloads are `Uint8Arrays`. You can easily convert to and from JSON or strings by using `JSONCodec` or `StringCodec`, or a custom `Codec`.
|
170 |
|
171 | To cancel a subscription and terminate your interest, you call `unsubscribe()` or `drain()` on a subscription. Unsubscribe will typically terminate regardless of whether there are messages in flight for the client. Drain ensures that all messages that are inflight are processed before canceling the subscription. Connections can also be drained as well. Draining a connection
|
172 | closes it, after all subscriptions have been drained and all outbound messages have been sent to the server.
|
173 |
|
174 | ```javascript
|
175 | import { connect, StringCodec } from './nats.mjs'
|
176 |
|
177 | // to create a connection to a nats-server:
|
178 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
179 |
|
180 | // create a codec
|
181 | const sc = StringCodec();
|
182 | // create a simple subscriber and iterate over messages
|
183 | // matching the subscription
|
184 | const sub = nc.subscribe("hello");
|
185 | (async () => {
|
186 | for await (const m of sub) {
|
187 | console.log(`[${sub.getProcessed()}]: ${sc.decode(m.data)}`);
|
188 | }
|
189 | console.log("subscription closed");
|
190 | })();
|
191 |
|
192 | nc.publish("hello", sc.encode("world"));
|
193 | nc.publish("hello", sc.encode("again"));
|
194 |
|
195 | // we want to insure that messages that are in flight
|
196 | // get processed, so we are going to drain the
|
197 | // connection. Drain is the same as close, but makes
|
198 | // sure that all messages in flight get seen
|
199 | // by the iterator. After calling drain on the connection
|
200 | // the connection closes.
|
201 | await nc.drain();
|
202 |
|
203 | ```
|
204 |
|
205 | ### Wildcard Subscriptions
|
206 |
|
207 | Subjects can be used to organize messages into hierarchies.
|
208 | For example, a subject may contain additional information that
|
209 | can be useful in providing a context to the message, such as
|
210 | the ID of the client that sent the message, or the region where
|
211 | a message originated.
|
212 |
|
213 | Instead of subscribing to each specific subject, you can create
|
214 | subscriptions that have subjects with wildcards. Wildcards match
|
215 | one or more tokens in a subject. A token is a string following a
|
216 | period.
|
217 |
|
218 | All subscriptions are independent. If two different subscriptions
|
219 | match a subject, both will get to process the message:
|
220 |
|
221 | ```javascript
|
222 | import { connect, StringCodec } from './nats.mjs'
|
223 |
|
224 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
225 | const sc = StringCodec();
|
226 |
|
227 | // subscriptions can have wildcard subjects
|
228 | // the '*' matches any string in the specified token position
|
229 | const s1 = nc.subscribe("help.*.system");
|
230 | const s2 = nc.subscribe("help.me.*");
|
231 | // the '>' matches any tokens in that position or following
|
232 | // '>' can only be specified at the end of the subject
|
233 | const s3 = nc.subscribe("help.>");
|
234 |
|
235 | async function printMsgs(s) {
|
236 | let subj = s.getSubject();
|
237 | console.log(`listening for ${subj}`);
|
238 | const c = (13 - subj.length);
|
239 | const pad = "".padEnd(c);
|
240 | for await (const m of s) {
|
241 | console.log(
|
242 | `[${subj}]${pad} #${s.getProcessed()} - ${m.subject} ${
|
243 | m.data ? " " + sc.decode(m.data) : ""
|
244 | }`,
|
245 | );
|
246 | }
|
247 | }
|
248 |
|
249 | printMsgs(s1);
|
250 | printMsgs(s2);
|
251 | printMsgs(s3);
|
252 |
|
253 | // don't exit until the client closes
|
254 | await nc.closed();
|
255 |
|
256 | ```
|
257 |
|
258 | ### Services: Request/Reply
|
259 |
|
260 | Request/Reply is NATS equivalent to an HTTP request. To make requests you publish messages as you did before, but also specify a `reply` subject. The `reply` subject is where a service will publish your response.
|
261 |
|
262 | NATS provides syntactic sugar for publishing requests. The `request()` API will generate a reply subject and manage the creation of a subscription under the covers. It will also start a timer to ensure that if a response is not received within your allotted time, the request fails. The example also illustrates a graceful shutdown.
|
263 |
|
264 | #### Services
|
265 |
|
266 | Here's an example of a service. It is a bit more complicated than expected simply to illustrate not only how to create responses, but how the subject itself is used to dispatch different behaviors.
|
267 |
|
268 |
|
269 | ```javascript
|
270 | import { connect, StringCodec } from './nats.mjs'
|
271 |
|
272 | // create a connection
|
273 | const nc = await connect({ servers: "demo.nats.io" });
|
274 |
|
275 | // create a codec
|
276 | const sc = StringCodec();
|
277 |
|
278 | // this subscription listens for `time` requests and returns the current time
|
279 | const sub = nc.subscribe("time");
|
280 | (async (sub: Subscription) => {
|
281 | console.log(`listening for ${sub.getSubject()} requests...`);
|
282 | for await (const m of sub) {
|
283 | if (m.respond(sc.encode(new Date().toISOString()))) {
|
284 | console.info(`[time] handled #${sub.getProcessed()}`);
|
285 | } else {
|
286 | console.log(`[time] #${sub.getProcessed()} ignored - no reply subject`);
|
287 | }
|
288 | }
|
289 | console.log(`subscription ${sub.getSubject()} drained.`);
|
290 | })(sub);
|
291 |
|
292 | // this subscription listens for admin.uptime and admin.stop
|
293 | // requests to admin.uptime returns how long the service has been running
|
294 | // requests to admin.stop gracefully stop the client by draining
|
295 | // the connection
|
296 | const started = Date.now();
|
297 | const msub = nc.subscribe("admin.*");
|
298 | (async (sub) => {
|
299 | console.log(`listening for ${sub.getSubject()} requests [uptime | stop]`);
|
300 | // it would be very good to verify the origin of the request
|
301 | // before implementing something that allows your service to be managed.
|
302 | // NATS can limit which client can send or receive on what subjects.
|
303 | for await (const m of sub) {
|
304 | const chunks = m.subject.split(".");
|
305 | console.info(`[admin] #${sub.getProcessed()} handling ${chunks[1]}`);
|
306 | switch (chunks[1]) {
|
307 | case "uptime":
|
308 | // send the number of millis since up
|
309 | m.respond(sc.encode(`${Date.now() - started}`));
|
310 | break;
|
311 | case "stop": {
|
312 | m.respond(sc.encode(`[admin] #${sub.getProcessed()} stopping....`));
|
313 | // gracefully shutdown
|
314 | nc.drain()
|
315 | .catch((err) => {
|
316 | console.log("error draining", err);
|
317 | });
|
318 | break;
|
319 | }
|
320 | default:
|
321 | console.log(
|
322 | `[admin] #${sub.getProcessed()} ignoring request for ${m.subject}`,
|
323 | );
|
324 | }
|
325 | }
|
326 | console.log(`subscription ${sub.getSubject()} drained.`);
|
327 | })(msub);
|
328 |
|
329 | // wait for the client to close here.
|
330 | await nc.closed().then((err) => {
|
331 | let m = `connection to ${nc.getServer()} closed`;
|
332 | if (err) {
|
333 | m = `${m} with an error: ${err.message}`;
|
334 | }
|
335 | console.log(m);
|
336 | });
|
337 |
|
338 | ```
|
339 |
|
340 | #### Making Requests
|
341 |
|
342 | Here's a simple example of a client making a simple request
|
343 | from the service above:
|
344 |
|
345 | ```javascript
|
346 | import { connect, StringCodec } from './nats.mjs'
|
347 |
|
348 | // create a connection
|
349 | const nc = await connect({ servers: "demo.nats.io:4222" });
|
350 |
|
351 | // create an encoder
|
352 | const sc = StringCodec();
|
353 |
|
354 | // the client makes a request and receives a promise for a message
|
355 | // by default the request times out after 1s (1000 millis) and has
|
356 | // no payload.
|
357 | await nc.request("time", Empty, { timeout: 1000 })
|
358 | .then((m) => {
|
359 | console.log(`got response: ${sc.decode(m.data)}`);
|
360 | })
|
361 | .catch((err) => {
|
362 | console.log(`problem with request: ${err.message}`);
|
363 | });
|
364 |
|
365 | await nc.close();
|
366 | ```
|
367 |
|
368 | ### Queue Groups
|
369 | Queue groups enable the scaling of services horizontally. Subscriptions for members of a queue group are treated as a single service. When you send a message to a queue group
|
370 | subscription, only a single client in a queue group will receive it.
|
371 |
|
372 | There can be any number of queue groups. Each group is treated as its own independent unit. Note that non-queue subscriptions are also independent of subscriptions in a queue group.
|
373 |
|
374 | ```javascript
|
375 | import { connect, NatsConnection, StringCodec } from './nats.mjs'
|
376 |
|
377 | async function createService(
|
378 | name,
|
379 | count = 1,
|
380 | queue = ""
|
381 | ): Promise {
|
382 | const conns = [];
|
383 | for (let i = 1; i <= count; i++) {
|
384 | const n = queue ? `${name}-${i}` : name;
|
385 | const nc = await connect(
|
386 | { servers: "demo.nats.io:4222", name: `${n}` },
|
387 | );
|
388 | nc.closed()
|
389 | .then((err) => {
|
390 | if (err) {
|
391 | console.error(
|
392 | `service ${n} exited because of error: ${err.message}`,
|
393 | );
|
394 | }
|
395 | });
|
396 | // create a subscription - note the option for a queue, if set
|
397 | // any client with the same queue will be a member of the group.
|
398 | const sub = nc.subscribe("echo", { queue: queue });
|
399 | const _ = handleRequest(n, sub);
|
400 | console.log(`${nc.options.name} is listening for 'echo' requests...`);
|
401 | conns.push(nc);
|
402 | }
|
403 | return conns;
|
404 | }
|
405 |
|
406 | const sc = StringCodec();
|
407 |
|
408 | // simple handler for service requests
|
409 | async function handleRequest(name, s) {
|
410 | const p = 12 - name.length;
|
411 | const pad = "".padEnd(p);
|
412 | for await (const m of s) {
|
413 | // respond returns true if the message had a reply subject, thus it could respond
|
414 | if (m.respond(m.data)) {
|
415 | console.log(
|
416 | `[${name}]:${pad} #${s.getProcessed()} echoed ${sc.decode(m.data)}`,
|
417 | );
|
418 | } else {
|
419 | console.log(
|
420 | `[${name}]:${pad} #${s.getProcessed()} ignoring request - no reply subject`,
|
421 | );
|
422 | }
|
423 | }
|
424 | }
|
425 |
|
426 | // let's create two queue groups and a standalone subscriber
|
427 | const conns = [];
|
428 | conns.push(...await createService("echo", 3, "echo"));
|
429 | conns.push(...await createService("other-echo", 2, "other-echo"));
|
430 | conns.push(...await createService("standalone"));
|
431 |
|
432 | const a: Promise<void | Error>[] = [];
|
433 | conns.forEach((c) => {
|
434 | a.push(c.closed());
|
435 | });
|
436 | await Promise.all(a);
|
437 |
|
438 | ```
|
439 | Run it and publish a request to the subject `echo` to see what happens.
|
440 |
|
441 |
|
442 | ## Advanced Usage
|
443 |
|
444 | ### Headers
|
445 |
|
446 | NATS headers are similar to HTTP headers. Headers are enabled automatically if the server supports them. Note that if you publish a message using headers but the server doesn't support them, an Error is thrown. Also note that even if you are publishing a message with a header, it is possible for the
|
447 | recipient to not support them.
|
448 |
|
449 | ```javascript
|
450 | import { connect, createInbox, Empty, headers } from './nats.mjs'
|
451 |
|
452 | const nc = await connect(
|
453 | {
|
454 | servers: `demo.nats.io`,
|
455 | },
|
456 | );
|
457 |
|
458 | const subj = createInbox();
|
459 | const sub = nc.subscribe(subj);
|
460 | (async () => {
|
461 | for await (const m of sub) {
|
462 | if (m.headers) {
|
463 | for (const [key, value] of m.headers) {
|
464 | console.log(`${key}=${value}`);
|
465 | }
|
466 | // reading/setting a header is not case sensitive
|
467 | console.log("id", m.headers.get("id"));
|
468 | }
|
469 | }
|
470 | })().then();
|
471 |
|
472 | // headers always have their names turned into a canonical mime header key
|
473 | // header names can be any printable ASCII character with the exception of `:`.
|
474 | // header values can be any ASCII character except `\r` or `\n`.
|
475 | // see https://www.ietf.org/rfc/rfc822.txt
|
476 | const h = headers();
|
477 | h.append("id", "123456");
|
478 | h.append("unix_time", Date.now().toString());
|
479 | nc.publish(subj, Empty, { headers: h });
|
480 |
|
481 | await nc.flush();
|
482 | await nc.close();
|
483 |
|
484 | ```
|
485 |
|
486 |
|
487 | ### No Responders
|
488 |
|
489 | Requests can fail for many reasons. A common reason for a failure is the lack of interest in the subject. Typically these surface as a timeout error. If the server is enabled to use headers, it will also enable a `no responders` feature. If you send a request for which there's no interest, the request will be immediately rejected:
|
490 |
|
491 | ```javascript
|
492 | const nc = await connect({
|
493 | servers: `demo.nats.io` },
|
494 | );
|
495 |
|
496 | try {
|
497 | const m = await nc.request('hello.world');
|
498 | console.log(m.data);
|
499 | } catch(err) {
|
500 | switch (err.code) {
|
501 | case ErrorCode.NO_RESPONDERS:
|
502 | console.log("no one is listening to 'hello.world'")
|
503 | break;
|
504 | case ErrorCode.TIMEOUT:
|
505 | console.log("someone is listening but didn't respond")
|
506 | break;
|
507 | default:
|
508 | console.log("request failed", err)
|
509 | }
|
510 | }
|
511 |
|
512 | await nc.close();
|
513 |
|
514 | ```
|
515 |
|
516 | ### Authentication
|
517 |
|
518 | NATS supports many different forms of credentials:
|
519 |
|
520 | - username/password
|
521 | - token
|
522 | - NKEYS
|
523 | - client certificates
|
524 | - JWTs
|
525 |
|
526 | For user/password and token authentication, you can simply provide them as `ConnectionOptions` - see `user`, `pass`, `token`. Internally these mechanisms are implemented as an `Authenticator`. An `Authenticator` is simply a function that handles the type of authentication specified.
|
527 |
|
528 | Setting the `user`/`pass` or `token` options, simply initializes an `Authenticator` and sets the username/password.
|
529 |
|
530 |
|
531 | ```typescript
|
532 | // if the connection requires authentication, provide `user` and `pass` or
|
533 | // `token` options in the NatsConnectionOptions
|
534 | import { connect } from './nats.mjs'
|
535 |
|
536 | const nc1 = await connect({servers: "127.0.0.1:4222", user: "jenny", pass: "867-5309"});
|
537 | const nc2 = await connect({port: 4222, token: "t0pS3cret!"});
|
538 | ```
|
539 |
|
540 |
|
541 | #### Authenticators
|
542 |
|
543 | NKEYs and JWT authentication are more complex, as they cryptographically respond to a server challenge.
|
544 |
|
545 | Because NKEY and JWT authentication may require reading data from a file or an HTTP cookie, these forms of authentication will require a bit more from the developer to activate them. However, the work is related to accessing these resources varies depending on the platform.
|
546 |
|
547 | After the credential artifacts are read, you can use one of these functions to create the authenticator. You then simply assign it to the `authenticator` property of the `ConnectionOptions`:
|
548 |
|
549 | - `nkeyAuthenticator(seed?: Uint8Array | (() => Uint8Array)): Authenticator`
|
550 | - `jwtAuthenticator(jwt: string | (() => string), seed?: Uint8Array | (()=> Uint8Array)): Authenticator`
|
551 | - `credsAuthenticator(creds: Uint8Array): Authenticator`
|
552 |
|
553 | The first two options provide the ability to specify functions that return the desired value. This enables dynamic environments such as a browser where values accessed by fetching a value from a cookie.
|
554 |
|
555 |
|
556 | Here's an example:
|
557 |
|
558 | ```javascript
|
559 | // read the creds file as necessary, in the case it
|
560 | // is part of the code for illustration purposes
|
561 | const creds = `-----BEGIN NATS USER JWT-----
|
562 | eyJ0eXAiOiJqdSDJB....
|
563 | ------END NATS USER JWT------
|
564 |
|
565 | ************************* IMPORTANT *************************
|
566 | NKEY Seed printed below can be used sign and prove identity.
|
567 | NKEYs are sensitive and should be treated as secrets.
|
568 |
|
569 | -----BEGIN USER NKEY SEED-----
|
570 | SUAIBDPBAUTW....
|
571 | ------END USER NKEY SEED------
|
572 | `;
|
573 |
|
574 | const nc = await connect(
|
575 | {
|
576 | port: 4222,
|
577 | authenticator: credsAuthenticator(new TextEncoder().encode(creds)),
|
578 | },
|
579 | );
|
580 | ```
|
581 |
|
582 | ### Flush
|
583 |
|
584 | Flush sends a PING to the server. When the server responds with PONG you are guaranteed that all pending data was sent and received by the server.
|
585 |
|
586 | Note `ping()` effectively adds a server round-trip. All NATS clients
|
587 | handle their buffering optimally, so `ping(): Promise<void>` shouldn't be used except in cases where you are writing some sort of test.
|
588 |
|
589 | ```javascript
|
590 | nc.publish('foo');
|
591 | nc.publish('bar');
|
592 | await nc.flush();
|
593 | ```
|
594 |
|
595 | ### `PublishOptions`
|
596 |
|
597 | When you publish a message you can specify some options:
|
598 |
|
599 | - `reply` - this is a subject to receive a reply (you must setup a subscription) before you publish.
|
600 | - `headers` - a set of headers to decorate the message.
|
601 |
|
602 | ### `SubscriptionOptions`
|
603 |
|
604 | You can specify several options when creating a subscription:
|
605 | - `max`: maximum number of messages to receive - auto unsubscribe
|
606 | - `timeout`: how long to wait for the first message
|
607 | - `queue`: the [queue group](#Queue-Groups) name the subscriber belongs to
|
608 | - `callback`: a function with the signature `(err: NatsError|null, msg: Msg) => void;` that should be used for handling the message. Subscriptions with callbacks are NOT iterators.
|
609 |
|
610 | #### Auto Unsubscribe
|
611 | ```javascript
|
612 | // subscriptions can auto unsubscribe after a certain number of messages
|
613 | nc.subscribe('foo', { max: 10 });
|
614 | ```
|
615 |
|
616 |
|
617 | #### Timeout Subscriptions
|
618 | ```javascript
|
619 | // create subscription with a timeout, if no message arrives
|
620 | // within the timeout, the function running the iterator with
|
621 | // reject - depending on how you code it, you may need a
|
622 | // try/catch block.
|
623 | const sub = nc.subscribe("hello", { timeout: 1000 });
|
624 | (async () => {
|
625 | for await (const m of sub) {
|
626 | }
|
627 | })().catch((err) => {
|
628 | if (err.code === ErrorCode.TIMEOUT) {
|
629 | console.log(`sub timed out!`);
|
630 | } else {
|
631 | console.log(`sub iterator got an error!`);
|
632 | }
|
633 | });
|
634 | ```
|
635 |
|
636 | ### `RequestOptions`
|
637 |
|
638 | When making a request, there are several options you can pass:
|
639 |
|
640 | - `timeout`: how long to wait for the response
|
641 | - `headers`: optional headers to include with the message
|
642 | - `noMux`: create a new subscription to handle the request. Normally a shared subscription is used to receive response messages.
|
643 | - `reply`: optional subject where the reply should be sent.
|
644 |
|
645 | #### `noMux` and `reply`
|
646 |
|
647 | Under the hood, the request API simply uses a wildcard subscription
|
648 | to handle all requests you send.
|
649 |
|
650 | In some cases, the default subscription strategy doesn't work correctly.
|
651 | For example, a client may be constrained by the subjects where it can
|
652 | receive replies.
|
653 |
|
654 | When `noMux` is set to `true`, the client will create a normal subscription for
|
655 | receiving the response to a generated inbox subject before the request is published.
|
656 | The `reply` option can be used to override the generated inbox subject with an application
|
657 | provided one. Note that setting `reply` requires `noMux` to be `true`:
|
658 |
|
659 | ```typescript
|
660 | const m = await nc.request(
|
661 | "q",
|
662 | Empty,
|
663 | { reply: "bar", noMux: true, timeout: 1000 },
|
664 | );
|
665 | ```
|
666 |
|
667 | ### Draining Connections and Subscriptions
|
668 |
|
669 | Draining provides for a graceful way to unsubscribe or
|
670 | close a connection without losing messages that have
|
671 | already been dispatched to the client.
|
672 |
|
673 | You can drain a subscription or all subscriptions in a connection.
|
674 |
|
675 | When you drain a subscription, the client sends an `unsubscribe`
|
676 | protocol message to the server followed by a `flush`. The
|
677 | subscription handler is only removed after the server responds.
|
678 | Thus all pending messages for the subscription have been processed.
|
679 |
|
680 | Draining a connection, drains all subscriptions. However
|
681 | when you drain the connection it becomes impossible to make
|
682 | new subscriptions or send new requests. After the last
|
683 | subscription is drained it also becomes impossible to publish
|
684 | a message. These restrictions do not exist when just draining
|
685 | a subscription.
|
686 |
|
687 | ### Lifecycle/Informational Events
|
688 | Clients can get notification on various event types:
|
689 | - `Events.DISCONNECT`
|
690 | - `Events.RECONNECT`
|
691 | - `Events.UPDATE`
|
692 | - `Events.LDM`
|
693 | - `Events.ERROR`
|
694 |
|
695 | The first two fire when a client disconnects and reconnects respectively.
|
696 | The payload will be the server where the event took place.
|
697 |
|
698 | The `UPDATE` event notifies whenever the client receives a cluster configuration update. The `ServersChanged` interface provides two arrays: `added` and `deleted` listing the servers that were added or removed.
|
699 |
|
700 | The `LDM` event notifies that the current server has signaled that it is running in _Lame Duck Mode_ and will evict clients. Depending on the server configuration policy, the client may want to initiate an ordered shutdown, and initiate a new connection to a different server in the cluster.
|
701 |
|
702 | The `ERROR` event notifies you of async errors that couldn't be routed in a more precise way to your client. For example, permission errors for a subscription or request, will properly be reported by the subscription or request. However, permission errors on publish will be reported via the status mechanism.
|
703 |
|
704 | ```javascript
|
705 | const nc = await connect();
|
706 | (async () => {
|
707 | console.info(`connected ${nc.getServer()}`);
|
708 | for await (const s of nc.status()) {
|
709 | console.info(`${s.type}: ${s.data}`);
|
710 | }
|
711 | })().then();
|
712 |
|
713 | nc.closed()
|
714 | .then((err) => {
|
715 | console.log(
|
716 | `connection closed ${err ? " with error: " + err.message : ""}`,
|
717 | );
|
718 | });
|
719 | ```
|
720 |
|
721 | To be aware of when a client closes, wait for the `closed()` promise to resolve. When it resolves, the client has finished and won't reconnect.
|
722 |
|
723 |
|
724 | ### Async vs. Callbacks
|
725 |
|
726 | Previous versions of the JavaScript NATS clients specified callbacks for message processing. This required complex handling logic when a service required coordination of operations. Callbacks are an inversion of control anti-pattern.
|
727 |
|
728 | The async APIs trivialize complex coordination and makes your code easier to maintain. With that said, there are some implications:
|
729 |
|
730 | - Async subscriptions buffer inbound messages.
|
731 | - Subscription processing delays until the runtime executes the promise related microtasks at the end of an event loop.
|
732 |
|
733 | In a traditional callback-based library, I/O happens after all data yielded by a read in the current event loop completes processing. This means that callbacks are invoked as part of processing. With async, the processing is queued in a microtask queue. At the end of the event loop, the runtime processes
|
734 | the microtasks, which in turn resumes your functions. As expected, this increases latency, but also provides additional liveliness.
|
735 |
|
736 | To reduce async latency, the NATS client allows processing a subscription in the same event loop that dispatched the message. Simply specify a `callback` in the subscription options. The signature for a callback is `(err: (NatsError|null), msg: Msg) => void`. When specified, the subscription iterator will never yield a message, as the callback will intercept all messages.
|
737 |
|
738 | Note that `callback` likely shouldn't even be documented, as likely it is a workaround to an underlying application problem where you should be considering a different strategy to horizontally scale your application, or reduce pressure on the clients, such as using queue workers, or more explicitly targeting messages. With that said, there are many situations where using callbacks can be more performant or appropriate.
|
739 |
|
740 | ## Connection Options
|
741 |
|
742 | The following is the list of connection options and default values.
|
743 |
|
744 | | Option | Default | Description
|
745 | |-------- |--------- |------------
|
746 | | `authenticator` | none | Specifies the authenticator function that sets the client credentials.
|
747 | | `debug` | `false` | If `true`, the client prints protocol interactions to the console. Useful for debugging.
|
748 | | `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error.
|
749 | | `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit.
|
750 | | `name` | | Optional client name - recommended to be set to a unique client name.
|
751 | | `noEcho` | `false` | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code `NO_ECHO_NOT_SUPPORTED` is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature.
|
752 | | `noRandomize` | `false` | If set, the order of user-specified servers is randomized.
|
753 | | `pass` | | Sets the password for a connection.
|
754 | | `pedantic` | `false` | Turns on strict subject format checks.
|
755 | | `pingInterval` | `120000` | Number of milliseconds between client-sent pings.
|
756 | | `port` | `4222` | Port to connect to (only used if `servers` is not specified).
|
757 | | `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts.
|
758 | | `reconnectJitter` | `100` | Number of millis to randomize after `reconnectTimeWait`.
|
759 | | `reconnectJitterTLS` | `1000` | Number of millis to randomize after `reconnectTimeWait` when TLS options are specified.
|
760 | | `reconnectDelayHandler`| Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to `()=>number`.
|
761 | | `reconnect` | `true` | If false, client will not attempt reconnecting.
|
762 | | `servers` | `"localhost:4222"` | String or Array of hostport for servers.
|
763 | | `timeout` | 20000 | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted.
|
764 | | `token` | | Sets an authorization token for a connection.
|
765 | | `user` | | Sets the username for a connection.
|
766 | | `verbose` | `false` | Turns on `+OK` protocol acknowledgements.
|
767 | | `waitOnFirstConnect` | `false` | If `true` the client will fall back to a reconnect mode if it fails its first connection attempt.
|
768 | | `ignoreClusterUpdates` | `false` | If `true` the client will ignore any cluster updates provided by the server.
|
769 |
|
770 | ### Jitter
|
771 |
|
772 | The settings `reconnectTimeWait`, `reconnectJitter`, `reconnectJitterTLS`, `reconnectDelayHandler` are all related.
|
773 | They control how long before the NATS client attempts to reconnect to a server it has previously connected.
|
774 |
|
775 | The intention of the settings is to spread out the number of clients attempting to reconnect to a server over a period of time, and thus preventing a ["Thundering Herd"](https://docs.nats.io/developing-with-nats/reconnect/random).
|
776 |
|
777 | The relationship between these is:
|
778 |
|
779 | - If `reconnectDelayHandler` is specified, the client will wait the value returned by this function. No other value will be taken into account.
|
780 | - If the client specified TLS options, the client will generate a number between 0 and `reconnectJitterTLS` and add it to
|
781 | `reconnectTimeWait`.
|
782 | - If the client didn't specify TLS options, the client will generate a number between 0 and `reconnectJitter` and add it to `reconnectTimeWait`.
|
783 |
|
784 |
|
785 | ## Web Application Examples
|
786 |
|
787 | For various examples browser examples, checkout [examples](examples).
|
788 |
|
789 | ## Contributing
|
790 |
|
791 | NATS.ws uses [deno](https://deno.land) to build and package the ES Module for the library. The library shares client functionality with [NATS.deno](https://github.com/nats-io/nats.deno).
|
792 | This means that both the NATS.deno and NATS.ws use the same exact code base, only differing on the implementation of the `Transport`. This strategy greatly reduces the amount of work required to develop and maintain the clients, as well as provide a completly compatible API across all clients.
|
793 |
|
794 | Currently, the base client implementation is the deno implementation. You can take a look at it [here](https://github.com/nats-io/nats.deno/tree/main/nats-base-client).
|