1 | const events = require('events');
|
2 |
|
3 | class Receiver extends events {
|
4 | constructor(channel, confirmChannel) {
|
5 | super();
|
6 | this._channel = channel;
|
7 | this.confirmChannel = confirmChannel;
|
8 | this.queues = [];
|
9 | }
|
10 |
|
11 | set channel(value) {
|
12 | let that = this;
|
13 | this._channel = value;
|
14 | this.queues.forEach(queue => {
|
15 | that.consume(queue.queue, queue.opt, queue.assOpt, true);
|
16 | });
|
17 | }
|
18 |
|
19 | consume(queue, consumeOpt = { noAck: false }, queueAssertOpt = {}, exchange = null, excangeType = "topic", exchangeAssertOpt = {}, routingKey = null, requeue = false) {
|
20 | let that = this;
|
21 | if (!requeue) this.queues.push({ queue: queue, consumeOpt: consumeOpt, queueAssertOpt: queueAssertOpt, exchange: exchange, exchangeAssertOpt: exchangeAssertOpt, routingKey: routingKey });
|
22 | this._channel.assertQueue(queue, queueAssertOpt).then(queue => {
|
23 | if (exchange != null) {
|
24 | that._channel.assertExchange(exchange, excangeType, exchangeAssertOpt).then(() => {
|
25 | that._channel.bindQueue(queue.queue, exchange, routingKey).catch(err => {
|
26 | console.log(err);
|
27 | that.emit('error', err);
|
28 | });
|
29 | that._channel.consume(queue.queue, msg => {
|
30 | that.emit('message', msg);
|
31 | }, consumeOpt).catch(err => {
|
32 | console.log(err);
|
33 | that.emit('error', err);
|
34 | });
|
35 | }).catch(err => {
|
36 | console.log(err);
|
37 | that.emit('error', err);
|
38 | });
|
39 | } else {
|
40 | that._channel.consume(queue.queue, msg => {
|
41 | that.emit('message', msg);
|
42 | }, consumeOpt).catch(err => {
|
43 | console.log(err);
|
44 | that.emit('error', err);
|
45 | });
|
46 | }
|
47 | }).catch(err => {
|
48 | console.log(err);
|
49 | that.emit('error', err);
|
50 | });
|
51 | }
|
52 |
|
53 | ack(msg) {
|
54 | this._channel.ack(msg);
|
55 | }
|
56 |
|
57 | nack(msg, requeue = false) {
|
58 | this._channel.ack(msg, requeue);
|
59 | }
|
60 | }
|
61 |
|
62 | module.exports = Receiver; |
\ | No newline at end of file |