1 |
|
2 |
|
3 | import type * as amqplib from 'amqplib';
|
4 | import { Options } from 'amqplib';
|
5 | import { EventEmitter } from 'events';
|
6 | import pb from 'promise-breaker';
|
7 | import { IAmqpConnectionManager } from './AmqpConnectionManager.js';
|
8 | export type Channel = amqplib.ConfirmChannel | amqplib.Channel;
|
9 | export type SetupFunc = ((channel: Channel, callback: (error?: Error) => void) => void) | ((channel: Channel) => Promise<void>) | ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: amqplib.ConfirmChannel) => Promise<void>);
|
10 | export interface CreateChannelOpts {
|
11 | /** Name for this channel. Used for debugging. */
|
12 | name?: string;
|
13 | /**
|
14 | * A function to call whenever we reconnect to the broker (and therefore create a new underlying channel.)
|
15 | * This function should either accept a callback, or return a Promise. See addSetup below
|
16 | */
|
17 | setup?: SetupFunc;
|
18 | /**
|
19 | * True to create a ConfirmChannel (default). False to create a regular Channel.
|
20 | */
|
21 | confirm?: boolean;
|
22 | /**
|
23 | * if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects.
|
24 | * These will be encoded automatically before being sent.
|
25 | */
|
26 | json?: boolean;
|
27 | /**
|
28 | * Default publish timeout in ms. Messages not published within the given time are rejected with a timeout error.
|
29 | */
|
30 | publishTimeout?: number;
|
31 | }
|
32 | export interface PublishOptions extends Options.Publish {
|
33 | /** Message will be rejected after timeout ms */
|
34 | timeout?: number;
|
35 | }
|
36 | export interface ConsumerOptions extends amqplib.Options.Consume {
|
37 | prefetch?: number;
|
38 | }
|
39 | export interface Consumer {
|
40 | consumerTag: string | null;
|
41 | queue: string;
|
42 | onMessage: (msg: amqplib.ConsumeMessage) => void;
|
43 | options: ConsumerOptions;
|
44 | }
|
45 |
|
46 |
|
47 |
|
48 |
|
49 |
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 | export default class ChannelWrapper extends EventEmitter {
|
58 | private _connectionManager;
|
59 | private _json;
|
60 | |
61 |
|
62 |
|
63 | private _settingUp;
|
64 | private _setups;
|
65 |
|
66 | private _messages;
|
67 |
|
68 | private _unconfirmedMessages;
|
69 |
|
70 | private _irrecoverableCode;
|
71 |
|
72 | private _consumers;
|
73 | |
74 |
|
75 |
|
76 |
|
77 |
|
78 | private _channel?;
|
79 | |
80 |
|
81 |
|
82 | private _confirm;
|
83 | |
84 |
|
85 |
|
86 |
|
87 | private _working;
|
88 | |
89 |
|
90 |
|
91 |
|
92 |
|
93 | private _workerNumber;
|
94 | |
95 |
|
96 |
|
97 | private _channelHasRoom;
|
98 | |
99 |
|
100 |
|
101 | private _publishTimeout?;
|
102 | name?: string;
|
103 | addListener(event: string, listener: (...args: any[]) => void): this;
|
104 | addListener(event: 'connect', listener: () => void): this;
|
105 | addListener(event: 'error', listener: (err: Error, info: {
|
106 | name: string;
|
107 | }) => void): this;
|
108 | addListener(event: 'close', listener: () => void): this;
|
109 | on(event: string, listener: (...args: any[]) => void): this;
|
110 | on(event: 'connect', listener: () => void): this;
|
111 | on(event: 'error', listener: (err: Error, info: {
|
112 | name: string;
|
113 | }) => void): this;
|
114 | on(event: 'close', listener: () => void): this;
|
115 | once(event: string, listener: (...args: any[]) => void): this;
|
116 | once(event: 'connect', listener: () => void): this;
|
117 | once(event: 'error', listener: (err: Error, info: {
|
118 | name: string;
|
119 | }) => void): this;
|
120 | once(event: 'close', listener: () => void): this;
|
121 | prependListener(event: string, listener: (...args: any[]) => void): this;
|
122 | prependListener(event: 'connect', listener: () => void): this;
|
123 | prependListener(event: 'error', listener: (err: Error, info: {
|
124 | name: string;
|
125 | }) => void): this;
|
126 | prependListener(event: 'close', listener: () => void): this;
|
127 | prependOnceListener(event: string, listener: (...args: any[]) => void): this;
|
128 | prependOnceListener(event: 'connect', listener: () => void): this;
|
129 | prependOnceListener(event: 'error', listener: (err: Error, info: {
|
130 | name: string;
|
131 | }) => void): this;
|
132 | prependOnceListener(event: 'close', listener: () => void): this;
|
133 | |
134 |
|
135 |
|
136 |
|
137 |
|
138 |
|
139 |
|
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 |
|
151 |
|
152 |
|
153 | addSetup(setup: SetupFunc, done?: pb.Callback<void>): Promise<void>;
|
154 | |
155 |
|
156 |
|
157 |
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 |
|
164 |
|
165 | removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback<void>): Promise<void>;
|
166 | |
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 | waitForConnect(done?: pb.Callback<void>): Promise<void>;
|
174 | publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
|
175 | sendToQueue(queue: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
|
176 | private _enqueueMessage;
|
177 | |
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 |
|
184 |
|
185 |
|
186 |
|
187 |
|
188 |
|
189 |
|
190 |
|
191 | constructor(connectionManager: IAmqpConnectionManager, options?: CreateChannelOpts);
|
192 | private _onConnect;
|
193 | private _onChannelClose;
|
194 | /** Called whenever the channel drains. */
|
195 | private _onChannelDrain;
|
196 | private _onDisconnect;
|
197 | queueLength(): number;
|
198 | close(): Promise<void>;
|
199 | private _shouldPublish;
|
200 | private _startWorker;
|
201 | private _canWaitReconnection;
|
202 | private _messageResolved;
|
203 | private _messageRejected;
|
204 | private _getEncodedMessage;
|
205 | private _publishQueuedMessages;
|
206 | /**
|
207 | * Setup a consumer
|
208 | * This consumer will be reconnected on cancellation and channel errors.
|
209 | */
|
210 | consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<amqplib.Replies.Consume>;
|
211 | private _consume;
|
212 | private _reconnectConsumer;
|
213 | /**
|
214 | * Cancel all consumers
|
215 | */
|
216 | cancelAll(): Promise<void>;
|
217 | cancel(consumerTag: string): Promise<void>;
|
218 | /** Send an `ack` to the underlying channel. */
|
219 | ack(message: amqplib.Message, allUpTo?: boolean): void;
|
220 | /** Send an `ackAll` to the underlying channel. */
|
221 | ackAll(): void;
|
222 | /** Send a `nack` to the underlying channel. */
|
223 | nack(message: amqplib.Message, allUpTo?: boolean, requeue?: boolean): void;
|
224 | /** Send a `nackAll` to the underlying channel. */
|
225 | nackAll(requeue?: boolean): void;
|
226 | /** Send a `purgeQueue` to the underlying channel. */
|
227 | purgeQueue(queue: string): Promise<amqplib.Replies.PurgeQueue>;
|
228 | /** Send a `checkQueue` to the underlying channel. */
|
229 | checkQueue(queue: string): Promise<amqplib.Replies.AssertQueue>;
|
230 | /** Send a `assertQueue` to the underlying channel. */
|
231 | assertQueue(queue: string, options?: amqplib.Options.AssertQueue): Promise<amqplib.Replies.AssertQueue>;
|
232 | /** Send a `bindQueue` to the underlying channel. */
|
233 | bindQueue(queue: string, source: string, pattern: string, args?: any): Promise<void>;
|
234 | /** Send a `unbindQueue` to the underlying channel. */
|
235 | unbindQueue(queue: string, source: string, pattern: string, args?: any): Promise<void>;
|
236 | /** Send a `deleteQueue` to the underlying channel. */
|
237 | deleteQueue(queue: string, options?: Options.DeleteQueue): Promise<amqplib.Replies.DeleteQueue>;
|
238 | /** Send a `assertExchange` to the underlying channel. */
|
239 | assertExchange(exchange: string, type: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string, options?: Options.AssertExchange): Promise<amqplib.Replies.AssertExchange>;
|
240 | /** Send a `bindExchange` to the underlying channel. */
|
241 | bindExchange(destination: string, source: string, pattern: string, args?: any): Promise<amqplib.Replies.Empty>;
|
242 | /** Send a `checkExchange` to the underlying channel. */
|
243 | checkExchange(exchange: string): Promise<amqplib.Replies.Empty>;
|
244 | /** Send a `deleteExchange` to the underlying channel. */
|
245 | deleteExchange(exchange: string, options?: Options.DeleteExchange): Promise<amqplib.Replies.Empty>;
|
246 | /** Send a `unbindExchange` to the underlying channel. */
|
247 | unbindExchange(destination: string, source: string, pattern: string, args?: any): Promise<amqplib.Replies.Empty>;
|
248 | /** Send a `get` to the underlying channel. */
|
249 | get(queue: string, options?: Options.Get): Promise<amqplib.GetMessage | false>;
|
250 | }
|