UNPKG

11.8 kBTypeScriptView Raw
1/// <reference types="node" />
2/// <reference types="node" />
3import type * as amqplib from 'amqplib';
4import { Options } from 'amqplib';
5import { EventEmitter } from 'events';
6import pb from 'promise-breaker';
7import { IAmqpConnectionManager } from './AmqpConnectionManager.js';
8export type Channel = amqplib.ConfirmChannel | amqplib.Channel;
9export type SetupFunc = ((channel: Channel, callback: (error?: Error) => void) => void) | ((channel: Channel) => Promise<void>) | ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: amqplib.ConfirmChannel) => Promise<void>);
10export interface CreateChannelOpts {
11 /** Name for this channel. Used for debugging. */
12 name?: string;
13 /**
14 * A function to call whenever we reconnect to the broker (and therefore create a new underlying channel.)
15 * This function should either accept a callback, or return a Promise. See addSetup below
16 */
17 setup?: SetupFunc;
18 /**
19 * True to create a ConfirmChannel (default). False to create a regular Channel.
20 */
21 confirm?: boolean;
22 /**
23 * if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects.
24 * These will be encoded automatically before being sent.
25 */
26 json?: boolean;
27 /**
28 * Default publish timeout in ms. Messages not published within the given time are rejected with a timeout error.
29 */
30 publishTimeout?: number;
31}
32export interface PublishOptions extends Options.Publish {
33 /** Message will be rejected after timeout ms */
34 timeout?: number;
35}
36export interface ConsumerOptions extends amqplib.Options.Consume {
37 prefetch?: number;
38}
39export interface Consumer {
40 consumerTag: string | null;
41 queue: string;
42 onMessage: (msg: amqplib.ConsumeMessage) => void;
43 options: ConsumerOptions;
44}
45/**
46 * Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and
47 * are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new
48 * connection and continue.
49 *
50 * Events:
51 * * `connect` - emitted every time this channel connects or reconnects.
52 * * `error(err, {name})` - emitted if an error occurs setting up the channel.
53 * * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded.
54 * * `close` - emitted when this channel closes via a call to `close()`
55 *
56 */
57export default class ChannelWrapper extends EventEmitter {
58 private _connectionManager;
59 private _json;
60 /** If we're in the process of creating a channel, this is a Promise which
61 * will resolve when the channel is set up. Otherwise, this is `null`.
62 */
63 private _settingUp;
64 private _setups;
65 /** Queued messages, not yet sent. */
66 private _messages;
67 /** Oublished, but not yet confirmed messages. */
68 private _unconfirmedMessages;
69 /** Reason code during publish or sendtoqueue messages. */
70 private _irrecoverableCode;
71 /** Consumers which will be reconnected on channel errors etc. */
72 private _consumers;
73 /**
74 * The currently connected channel. Note that not all setup functions
75 * have been run on this channel until `@_settingUp` is either null or
76 * resolved.
77 */
78 private _channel?;
79 /**
80 * True to create a ConfirmChannel. False to create a regular Channel.
81 */
82 private _confirm;
83 /**
84 * True if the "worker" is busy sending messages. False if we need to
85 * start the worker to get stuff done.
86 */
87 private _working;
88 /**
89 * We kill off workers when we disconnect. Whenever we start a new
90 * worker, we bump up the `_workerNumber` - this makes it so if stale
91 * workers ever do wake up, they'll know to stop working.
92 */
93 private _workerNumber;
94 /**
95 * True if the underlying channel has room for more messages.
96 */
97 private _channelHasRoom;
98 /**
99 * Default publish timeout
100 */
101 private _publishTimeout?;
102 name?: string;
103 addListener(event: string, listener: (...args: any[]) => void): this;
104 addListener(event: 'connect', listener: () => void): this;
105 addListener(event: 'error', listener: (err: Error, info: {
106 name: string;
107 }) => void): this;
108 addListener(event: 'close', listener: () => void): this;
109 on(event: string, listener: (...args: any[]) => void): this;
110 on(event: 'connect', listener: () => void): this;
111 on(event: 'error', listener: (err: Error, info: {
112 name: string;
113 }) => void): this;
114 on(event: 'close', listener: () => void): this;
115 once(event: string, listener: (...args: any[]) => void): this;
116 once(event: 'connect', listener: () => void): this;
117 once(event: 'error', listener: (err: Error, info: {
118 name: string;
119 }) => void): this;
120 once(event: 'close', listener: () => void): this;
121 prependListener(event: string, listener: (...args: any[]) => void): this;
122 prependListener(event: 'connect', listener: () => void): this;
123 prependListener(event: 'error', listener: (err: Error, info: {
124 name: string;
125 }) => void): this;
126 prependListener(event: 'close', listener: () => void): this;
127 prependOnceListener(event: string, listener: (...args: any[]) => void): this;
128 prependOnceListener(event: 'connect', listener: () => void): this;
129 prependOnceListener(event: 'error', listener: (err: Error, info: {
130 name: string;
131 }) => void): this;
132 prependOnceListener(event: 'close', listener: () => void): this;
133 /**
134 * Adds a new 'setup handler'.
135 *
136 * `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
137 * exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
138 * The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
139 * this Promise resolves.
140 *
141 * If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
142 * until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
143 * be emitted, since you can just handle the error here (although the `setup` will still be added for future
144 * reconnects, even if it throws an error.)
145 *
146 * Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
147 * event.
148 *
149 * @param setup - setup function.
150 * @param [done] - callback.
151 * @returns - Resolves when complete.
152 */
153 addSetup(setup: SetupFunc, done?: pb.Callback<void>): Promise<void>;
154 /**
155 * Remove a setup function added with `addSetup`. If there is currently a
156 * connection, `teardown(channel, [cb])` will be run immediately, and the
157 * returned Promise will not resolve until it completes.
158 *
159 * @param {function} setup - the setup function to remove.
160 * @param {function} [teardown] - `function(channel, [cb])` to run to tear
161 * down the channel.
162 * @param {function} [done] - Optional callback.
163 * @returns {void | Promise} - Resolves when complete.
164 */
165 removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback<void>): Promise<void>;
166 /**
167 * Returns a Promise which resolves when this channel next connects.
168 * (Mainly here for unit testing...)
169 *
170 * @param [done] - Optional callback.
171 * @returns - Resolves when connected.
172 */
173 waitForConnect(done?: pb.Callback<void>): Promise<void>;
174 publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
175 sendToQueue(queue: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
176 private _enqueueMessage;
177 /**
178 * Create a new ChannelWrapper.
179 *
180 * @param connectionManager - connection manager which
181 * created this channel.
182 * @param [options] -
183 * @param [options.name] - A name for this channel. Handy for debugging.
184 * @param [options.setup] - A default setup function to call. See
185 * `addSetup` for details.
186 * @param [options.json] - if true, then ChannelWrapper assumes all
187 * messages passed to `publish()` and `sendToQueue()` are plain JSON objects.
188 * These will be encoded automatically before being sent.
189 *
190 */
191 constructor(connectionManager: IAmqpConnectionManager, options?: CreateChannelOpts);
192 private _onConnect;
193 private _onChannelClose;
194 /** Called whenever the channel drains. */
195 private _onChannelDrain;
196 private _onDisconnect;
197 queueLength(): number;
198 close(): Promise<void>;
199 private _shouldPublish;
200 private _startWorker;
201 private _canWaitReconnection;
202 private _messageResolved;
203 private _messageRejected;
204 private _getEncodedMessage;
205 private _publishQueuedMessages;
206 /**
207 * Setup a consumer
208 * This consumer will be reconnected on cancellation and channel errors.
209 */
210 consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<amqplib.Replies.Consume>;
211 private _consume;
212 private _reconnectConsumer;
213 /**
214 * Cancel all consumers
215 */
216 cancelAll(): Promise<void>;
217 cancel(consumerTag: string): Promise<void>;
218 /** Send an `ack` to the underlying channel. */
219 ack(message: amqplib.Message, allUpTo?: boolean): void;
220 /** Send an `ackAll` to the underlying channel. */
221 ackAll(): void;
222 /** Send a `nack` to the underlying channel. */
223 nack(message: amqplib.Message, allUpTo?: boolean, requeue?: boolean): void;
224 /** Send a `nackAll` to the underlying channel. */
225 nackAll(requeue?: boolean): void;
226 /** Send a `purgeQueue` to the underlying channel. */
227 purgeQueue(queue: string): Promise<amqplib.Replies.PurgeQueue>;
228 /** Send a `checkQueue` to the underlying channel. */
229 checkQueue(queue: string): Promise<amqplib.Replies.AssertQueue>;
230 /** Send a `assertQueue` to the underlying channel. */
231 assertQueue(queue: string, options?: amqplib.Options.AssertQueue): Promise<amqplib.Replies.AssertQueue>;
232 /** Send a `bindQueue` to the underlying channel. */
233 bindQueue(queue: string, source: string, pattern: string, args?: any): Promise<void>;
234 /** Send a `unbindQueue` to the underlying channel. */
235 unbindQueue(queue: string, source: string, pattern: string, args?: any): Promise<void>;
236 /** Send a `deleteQueue` to the underlying channel. */
237 deleteQueue(queue: string, options?: Options.DeleteQueue): Promise<amqplib.Replies.DeleteQueue>;
238 /** Send a `assertExchange` to the underlying channel. */
239 assertExchange(exchange: string, type: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string, options?: Options.AssertExchange): Promise<amqplib.Replies.AssertExchange>;
240 /** Send a `bindExchange` to the underlying channel. */
241 bindExchange(destination: string, source: string, pattern: string, args?: any): Promise<amqplib.Replies.Empty>;
242 /** Send a `checkExchange` to the underlying channel. */
243 checkExchange(exchange: string): Promise<amqplib.Replies.Empty>;
244 /** Send a `deleteExchange` to the underlying channel. */
245 deleteExchange(exchange: string, options?: Options.DeleteExchange): Promise<amqplib.Replies.Empty>;
246 /** Send a `unbindExchange` to the underlying channel. */
247 unbindExchange(destination: string, source: string, pattern: string, args?: any): Promise<amqplib.Replies.Empty>;
248 /** Send a `get` to the underlying channel. */
249 get(queue: string, options?: Options.Get): Promise<amqplib.GetMessage | false>;
250}