UNPKG

10.2 kBMarkdownView Raw
1amqp10
2=============
3
4[![Build Status](https://secure.travis-ci.org/noodlefrenzy/node-amqp10.svg?branch=master)](https://travis-ci.org/noodlefrenzy/node-amqp10)
5[![Dependency Status](https://david-dm.org/noodlefrenzy/node-amqp10.svg)](https://david-dm.org/noodlefrenzy/node-amqp10)
6[![Code Climate](https://codeclimate.com/github/noodlefrenzy/node-amqp10/badges/gpa.svg)](https://codeclimate.com/github/noodlefrenzy/node-amqp10)
7[![Test Coverage](https://codeclimate.com/github/noodlefrenzy/node-amqp10/badges/coverage.svg)](https://codeclimate.com/github/noodlefrenzy/node-amqp10)
8[![npm version](https://badge.fury.io/js/amqp10.svg)](http://badge.fury.io/js/amqp10)
9[![Join the chat at https://gitter.im/noodlefrenzy/node-amqp10](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/noodlefrenzy/node-amqp10?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
10
11amqp10 is a promise-based, AMQP 1.0 compliant node.js client
12
13## Usage ##
14
15See `simple_eventhub_test.js`, `simple_activemq_test.js` or any of the other files in [examples](https://github.com/noodlefrenzy/node-amqp10/tree/master/examples).
16
17The basic usage is to require the module, new up a client with the appropriate policy for the server you're
18connecting against, connect, and then send/receive as necessary. So a simple example for a local Apache Qpid
19server would look like:
20
21 var AMQPClient = require('amqp10').Client,
22 Promise = require('bluebird');
23
24 var client = new AMQPClient(); // Uses PolicyBase default policy
25 client.connect('amqp://localhost')
26 .then(function() {
27 return Promise.all([
28 client.createReceiver('amq.topic'),
29 client.createSender('amq.topic')
30 ]);
31 })
32 .spread(function(receiver, sender) {
33 receiver.on('errorReceived', function(err) { // check for errors });
34 receiver.on('message', function(message) {
35 console.log('Rx message: ', message.body);
36 });
37
38 return sender.send({ key: "Value" });
39 })
40 .error(function(err) {
41 console.log("error: ", err);
42 });
43
44By default send promises are resolved when a disposition frame is received from the remote link for the
45sent message, at this point the message is considered "settled". To tune this behavior, you can tweak
46the policy you give to AMQPClient on construction. For instance, to force send promises to be resolved
47immediately on successful sending of the payload, you would build AMQPClient like so:
48
49 var AMQPClient = require('amqp10').Client,
50 Policy = require('amqp10').Policy;
51 var client = new AMQPClient(Policy.merge({
52 senderLinkPolicy: {
53 callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
54 }
55 }, Policy.DefaultPolicy));
56
57In addition to the above, you can also tune how message link credit is doled out (for throttling), as
58well as most other AMQP behaviors, all through policy overrides. See [DefaultPolicy](https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/policies/default_policy.js)
59and the [policy utilities](https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/policies/policy_utilities.js)
60for more details on altering various behaviors.
61
62## Flow Control and Message Dispositions ##
63
64Flow control in AMQP occurs at both the `Session` and `Link` layers. Using our default policy, we start out with some sensible Session windows and Link credits, and renew those every time they get to the half-way point. In addition, receiver links start in "auto-settle" mode, which means that the sender side can consider the message "settled" as soon as it's sent. However, _all_ of those settings are easily tune-able through Policy overrides (`Policy.merge(<overrides>, <base policy>)`).
65
66For instance. we've provided a convenience helper for throttling your receiver links to only renew credits on messages they've "settled". To use this with Azure ServiceBus Queues for instance, it would look like:
67
68 var AMQPClient = require('amqp10').Client,
69 Policy = require('amqp10').Policy;
70 var client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusQueue));
71
72Where the first number is the initial credit, and the second is the _threshold_ - once remaining credit goes below that, we will give out more credit by the number of messages we've settled. In this case we're setting up the client for one-by-one message processing. Behind the scenes, this does the following:
73
741. Sets the Link's creditQuantum to the first number (1), which you can do for yourself via the Policy mix-in `{ receiverLink: { creditQuantum: 1 } }`
75
762. Sets the Link to not auto-settle messages at the sender, which you can do for yourself via `{ receiverLink: { attach: { receiverSettleMode: 1 } } }`
77 Where did that magic "1" come from? Well, that's the value from the spec, but you could use the constant we've defined at `require('amqp10').Constants.receiverSettleMode.settleOnDisposition`
78
793. Sets the Link's credit renewal policy to a custom method that renews only when the link credit is below the threshold and we've settled some messages. You can do this yourself by using your own custom method:
80```
81{
82 receiverLink: {
83 credit: function (link, options) {
84 // If the receiver link was just connected, set the initial link credit to the quantum. Otherwise, give more credit for every message we've settled.
85 var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
86 if (creditQuantum > 0 && link.linkCredit < threshold) {
87 link.addCredits(creditQuantum);
88 }
89 }
90 }
91}
92```
93
94Note that once you've set the policy to not auto-settle messages, you'll need to settle them yourself. We've tried to make that easy by providing methods on the receiver link for each of the possible "disposition states" that AMQP allows:
95
96* `link.accept(message)` will tell the sender that you've accepted and processed the message.
97* `link.reject(message, [error])` will reject the message with the given error (if provided). The sender is free to re-deliver, so this can be used to indicate transient errors.
98* `link.modify(message, [options])` will tell the sender to modify the message and re-deliver. You can tell it you can't accept the message by using `link.modify(message, { undeliverableHere: true })`
99* `link.release(message)` will tell the sender that you haven't processed the message and it's free to re-deliver - even back to you.
100
101All of these methods accept an array of messages, allowing you to settle many at once.
102
103## Plugins ##
104
105The amqp10 module now supports pluggable Client behaviors with the exported `use` method. Officially supported plugins include:
106
107+ [amqp10-link-cache](https://github.com/mbroadst/amqp10-link-cache) - caches links with optional purging based on ttl
108+ [amqp10-rpc](https://github.com/mbroadst/amqp10-rpc) - an rpc server/client implementation on top of amqp10
109
110## Supported Servers ##
111
112We are currently actively running integration tests against the following servers:
113
1141. Azure EventHubs
1151. Azure ServiceBus Queues and Topics
1161. Apache Qpid C++ broker (qpidd)
117
118We have been tested against the following servers, but not exhaustively so issues may remain:
119
1201. ActiveMQ (open issue related to ActiveMQ ignoring the auto-settle setting and disposition frames may cause messages to re-deliver or stop sending after a certain period)
1211. RabbitMQ with the amqp 1.0 experimental extension
1221. Apache Qpid Java broker
123
124If you find any issues, please report them via GitHub.
125
126## Todos and Known Issues ##
127
1281. Disposition support is incomplete in that we don't send proper "unsettled" information when re-attaching links.
1291. There are some AMQP types we don't process - notably the Decimal23/64/128 types. These are unused by the protocol, and no-one seems to
130 be using them to convey information in messages, so ignoring them is likely safe.
131
132## Implementation Notes ##
133
134+ Using node's built-in net/tls classes for communicating with the server.
135
136+ Data from the server is written to a buffer-list based on [Rod Vagg's BL](https://github.com/rvagg/bl).
137
138+ Outgoing data is encoded using [this buffer builder](https://github.com/PeterReid/node-buffer-builder) - streaming
139 output won't really work since each outgoing payload needs to be prefixed with its encoded size, however we're working on
140 converting to use as much streaming as possible.
141
142+ The connection state is managed using [Stately.js](https://github.com/fschaefer/Stately.js), with state transitions
143 swapping which callback gets invoked on receipt of new data. (e.g. post-connection, we write the AMQP version header
144 and then install a callback to ensure the correct version. Once incoming data is written to the circular buffer, this
145 callback is invoked, and a comparison vs. the expected version triggers another transition).
146
147+ Debug output is done via [debug](https://www.npmjs.com/package/debug) with the prefix `amqp10:`. The main client's debug
148 name is `amqp10:client` so setting `DEBUG=amqp10:client` as an environment variable will get you all top-level debugging output.
149 ```bash
150 bash# export DEBUG=amqp*
151 ```
152
153 ```bash
154 C:\> set DEBUG=amqp*
155 ```
156
157 ```bash
158 [root@pinguino]# node simple_eventhub_test.js
159 amqp10:client connecting to: amqps://xxxxxx:xxxxxxxxx@xxxxxxxxxxxx.servicebus.windows.net +0ms
160 amqp10:connection Connecting to xxxxxx-service-bus-001.servicebus.windows.net:5671 via TLS +72ms
161 amqp10:connection Transitioning from DISCONNECTED to START due to connect +17ms
162 amqp10:connection Sending Header 414d515003010000 +405ms
163 amqp10:connection Transitioning from START to IN_SASL due to connected +6ms
164 amqp10:connection Rx: 414d515003010000 +128ms
165 amqp10:sasl Server SASL Version: 414d515003010000 vs 414d515003010000 +1ms
166 amqp10:connection Rx: 0000003f02010000005340c03201e02f04b3000000074d535342434... +162ms
167 amqp10:client Reading variable with prefix 0xc0 of length 52 +2ms
168 amqp10:client Decoding 5340 +0ms
169 [...]
170 ```
171
172+ Many thanks to Gordon Sim for inspiration on the type system, gleaned from his project [rhea](https://github.com/grs/rhea).