UNPKG

7.89 kBTypeScriptView Raw
1/// <reference types="node" />
2/// <reference types="node" />
3/// <reference types="node" />
4/// <reference types="node" />
5import * as amqp from 'amqplib';
6import { EventEmitter } from 'events';
7import { TcpSocketConnectOpts } from 'net';
8import { ConnectionOptions } from 'tls';
9import ChannelWrapper, { CreateChannelOpts } from './ChannelWrapper.js';
10export type ConnectionUrl = string | amqp.Options.Connect | {
11 url: string;
12 connectionOptions?: AmqpConnectionOptions;
13};
14export interface ConnectListener {
15 (arg: {
16 connection: amqp.Connection;
17 url: string | amqp.Options.Connect;
18 }): void;
19}
20export interface ConnectFailedListener {
21 (arg: {
22 err: Error;
23 url: string | amqp.Options.Connect | undefined;
24 }): void;
25}
26export type AmqpConnectionOptions = (ConnectionOptions | TcpSocketConnectOpts) & {
27 noDelay?: boolean;
28 timeout?: number;
29 keepAlive?: boolean;
30 keepAliveDelay?: number;
31 clientProperties?: any;
32 credentials?: {
33 mechanism: string;
34 username: string;
35 password: string;
36 response: () => Buffer;
37 } | {
38 mechanism: string;
39 response: () => Buffer;
40 } | undefined;
41};
42export interface AmqpConnectionManagerOptions {
43 /** Interval to send heartbeats to broker. Defaults to 5 seconds. */
44 heartbeatIntervalInSeconds?: number;
45 /**
46 * The time to wait before trying to reconnect. If not specified, defaults
47 * to `heartbeatIntervalInSeconds`.
48 */
49 reconnectTimeInSeconds?: number | undefined;
50 /**
51 * `findServers` is a function that which returns one or more servers to
52 * connect to. This should return either a single URL or an array of URLs.
53 * This is handy when you're using a service discovery mechanism such as
54 * Consul or etcd. Instead of taking a callback, this can also return a
55 * Promise. Note that if this is supplied, then `urls` is ignored.
56 */
57 findServers?: ((callback: (urls: ConnectionUrl | ConnectionUrl[]) => void) => void) | (() => Promise<ConnectionUrl | ConnectionUrl[]>) | undefined;
58 /** Connection options, passed as options to the amqplib.connect() method. */
59 connectionOptions?: AmqpConnectionOptions;
60}
61export interface IAmqpConnectionManager {
62 connectionOptions?: AmqpConnectionOptions;
63 heartbeatIntervalInSeconds: number;
64 reconnectTimeInSeconds: number;
65 addListener(event: string, listener: (...args: any[]) => void): this;
66 addListener(event: 'connect', listener: ConnectListener): this;
67 addListener(event: 'connectFailed', listener: ConnectFailedListener): this;
68 addListener(event: 'blocked', listener: (arg: {
69 reason: string;
70 }) => void): this;
71 addListener(event: 'unblocked', listener: () => void): this;
72 addListener(event: 'disconnect', listener: (arg: {
73 err: Error;
74 }) => void): this;
75 listeners(eventName: string | symbol): Function[];
76 on(event: string, listener: (...args: any[]) => void): this;
77 on(event: 'connect', listener: ConnectListener): this;
78 on(event: 'connectFailed', listener: ConnectFailedListener): this;
79 on(event: 'blocked', listener: (arg: {
80 reason: string;
81 }) => void): this;
82 on(event: 'unblocked', listener: () => void): this;
83 on(event: 'disconnect', listener: (arg: {
84 err: Error;
85 }) => void): this;
86 once(event: string, listener: (...args: any[]) => void): this;
87 once(event: 'connect', listener: ConnectListener): this;
88 once(event: 'connectFailed', listener: ConnectFailedListener): this;
89 once(event: 'blocked', listener: (arg: {
90 reason: string;
91 }) => void): this;
92 once(event: 'unblocked', listener: () => void): this;
93 once(event: 'disconnect', listener: (arg: {
94 err: Error;
95 }) => void): this;
96 prependListener(event: string, listener: (...args: any[]) => void): this;
97 prependListener(event: 'connect', listener: ConnectListener): this;
98 prependListener(event: 'connectFailed', listener: ConnectFailedListener): this;
99 prependListener(event: 'blocked', listener: (arg: {
100 reason: string;
101 }) => void): this;
102 prependListener(event: 'unblocked', listener: () => void): this;
103 prependListener(event: 'disconnect', listener: (arg: {
104 err: Error;
105 }) => void): this;
106 prependOnceListener(event: string, listener: (...args: any[]) => void): this;
107 prependOnceListener(event: 'connect', listener: ConnectListener): this;
108 prependOnceListener(event: 'connectFailed', listener: ConnectFailedListener): this;
109 prependOnceListener(event: 'blocked', listener: (arg: {
110 reason: string;
111 }) => void): this;
112 prependOnceListener(event: 'unblocked', listener: () => void): this;
113 prependOnceListener(event: 'disconnect', listener: (arg: {
114 err: Error;
115 }) => void): this;
116 removeListener(event: string, listener: (...args: any[]) => void): this;
117 connect(options?: {
118 timeout?: number;
119 }): Promise<void>;
120 reconnect(): void;
121 createChannel(options?: CreateChannelOpts): ChannelWrapper;
122 close(): Promise<void>;
123 isConnected(): boolean;
124 /** The current connection. */
125 readonly connection: amqp.Connection | undefined;
126 /** Returns the number of registered channels. */
127 readonly channelCount: number;
128}
129export default class AmqpConnectionManager extends EventEmitter implements IAmqpConnectionManager {
130 private _channels;
131 private _currentUrl;
132 private _closed;
133 private _cancelRetriesHandler?;
134 private _connectPromise?;
135 private _currentConnection?;
136 private _findServers;
137 private _urls?;
138 connectionOptions: AmqpConnectionOptions | undefined;
139 heartbeatIntervalInSeconds: number;
140 reconnectTimeInSeconds: number;
141 /**
142 * Create a new AmqplibConnectionManager.
143 *
144 * @param urls - An array of brokers to connect to.
145 * Takes url strings or objects {url: string, connectionOptions?: object}
146 * If present, a broker's [connectionOptions] will be used instead
147 * of [options.connectionOptions] when passed to the amqplib connect method.
148 * AmqplibConnectionManager will round-robin between them whenever it
149 * needs to create a new connection.
150 * @param [options={}] -
151 * @param [options.heartbeatIntervalInSeconds=5] - The interval,
152 * in seconds, to send heartbeats.
153 * @param [options.reconnectTimeInSeconds] - The time to wait
154 * before trying to reconnect. If not specified, defaults to
155 * `heartbeatIntervalInSeconds`.
156 * @param [options.connectionOptions] - Passed to the amqplib
157 * connect method.
158 * @param [options.findServers] - A `fn(callback)` or a `fn()`
159 * which returns a Promise. This should resolve to one or more servers
160 * to connect to, either a single URL or an array of URLs. This is handy
161 * when you're using a service discovery mechanism such as Consul or etcd.
162 * Note that if this is supplied, then `urls` is ignored.
163 */
164 constructor(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions);
165 /**
166 * Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
167 * reconnect attempts will continue in the background.
168 * @param [options={}] -
169 * @param [options.timeout] - Time to wait for initial connect
170 */
171 connect({ timeout }?: {
172 timeout?: number;
173 }): Promise<void>;
174 createChannel(options?: CreateChannelOpts): ChannelWrapper;
175 close(): Promise<void>;
176 isConnected(): boolean;
177 /** Force reconnect - noop unless connected */
178 reconnect(): void;
179 /** The current connection. */
180 get connection(): amqp.Connection | undefined;
181 /** Returns the number of registered channels. */
182 get channelCount(): number;
183 private _connect;
184}
185
\No newline at end of file