UNPKG

2.3 kBJavaScriptView Raw
1const events = require('events');
2
3class Receiver extends events {
4 constructor(channel, confirmChannel) {
5 super();
6 this._channel = channel;
7 this.confirmChannel = confirmChannel;
8 this.queues = [];
9 }
10
11 set channel(value) {
12 let that = this;
13 this._channel = value;
14 this.queues.forEach(queue => {
15 that.consume(queue.queue, queue.opt, queue.assOpt, true);
16 });
17 }
18
19 consume(queue, consumeOpt = { noAck: false }, queueAssertOpt = {}, exchange = null, excangeType = "topic", exchangeAssertOpt = {}, routingKey = null, requeue = false) {
20 let that = this;
21 if (!requeue) this.queues.push({ queue: queue, consumeOpt: consumeOpt, queueAssertOpt: queueAssertOpt, exchange: exchange, exchangeAssertOpt: exchangeAssertOpt, routingKey: routingKey });
22 this._channel.assertQueue(queue, queueAssertOpt).then(queue => {
23 if (exchange != null) {
24 that._channel.assertExchange(exchange, excangeType, exchangeAssertOpt).then(() => {
25 that._channel.bindQueue(queue.queue, exchange, routingKey).catch(err => {
26 console.log(err);
27 that.emit('error', err);
28 });
29 that._channel.consume(queue.queue, msg => {
30 that.emit('message', msg);
31 }, consumeOpt).catch(err => {
32 console.log(err);
33 that.emit('error', err);
34 });
35 }).catch(err => {
36 console.log(err);
37 that.emit('error', err);
38 });
39 } else {
40 that._channel.consume(queue.queue, msg => {
41 that.emit('message', msg);
42 }, consumeOpt).catch(err => {
43 console.log(err);
44 that.emit('error', err);
45 });
46 }
47 }).catch(err => {
48 console.log(err);
49 that.emit('error', err);
50 });
51 }
52
53 ack(msg) {
54 this._channel.ack(msg);
55 }
56
57 nack(msg, requeue = false) {
58 this._channel.ack(msg, requeue);
59 }
60}
61
62module.exports = Receiver;
\No newline at end of file