1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const ChannelManager_1 = require("./ChannelManager");
|
4 | const EXCHANGE_PREFIX = "nimbus:event:";
|
5 | const EXCHANGE_ALL_EVENTS = "nimbus:events";
|
6 | const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
|
7 | const QUEUE_PREFIX = "nimbus:listener:";
|
8 | const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true };
|
9 | const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false };
|
10 | const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true };
|
11 | const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
|
12 | const util = require("util");
|
13 | var debug = util.debuglog("amqptools");
|
14 | class EventListener {
|
15 | constructor(options) {
|
16 | this.persistent = false;
|
17 | this.autoAck = true;
|
18 | this.onReconnect = () => {
|
19 | debug("Trying to re establish consuming on event queue %s", this.queueName);
|
20 | this.consume();
|
21 | };
|
22 | this.onMessageReceived = (msg) => {
|
23 | var message = JSON.parse(msg.content.toString());
|
24 | var extra = {};
|
25 | if (this.autoAck) {
|
26 | this.ack(msg);
|
27 | }
|
28 | else {
|
29 | extra.ack = () => {
|
30 | this.ack(msg);
|
31 | };
|
32 | }
|
33 | this.listener(message, extra);
|
34 | };
|
35 | this.exchange = options.exchange;
|
36 | this.topic = options.topic;
|
37 | this.userId = options.userId;
|
38 | this.queueOptions = QUEUE_OPTIONS;
|
39 | this.prefetchCount = options.prefetchCount || 1;
|
40 | if (options.hasOwnProperty("persistent")) {
|
41 | this.persistent = options.persistent;
|
42 | }
|
43 | if (options.hasOwnProperty("autoAck")) {
|
44 | this.autoAck = options.autoAck;
|
45 | }
|
46 | if (options.runtime) {
|
47 | this.queue = QUEUE_PREFIX + options.runtime +
|
48 | (this.exchange ? ':' + this.exchange : '') +
|
49 | (this.topic ? ':' + this.topic : '');
|
50 | this.queueOptions = QUEUE_RUNTIME_OPTIONS;
|
51 | if (this.persistent) {
|
52 | this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
|
53 | }
|
54 | }
|
55 | ChannelManager_1.channelManager.on("reconnect", this.onReconnect);
|
56 | }
|
57 | get fullExchangeName() {
|
58 | if (this.userId) {
|
59 | return EXCHANGE_EVENTS_BY_USER;
|
60 | }
|
61 | return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
|
62 | }
|
63 | get queueName() {
|
64 | return this.queue;
|
65 | }
|
66 | get routeKey() {
|
67 | if (!this.topic && !this.exchange && !this.userId)
|
68 | return '#';
|
69 | return [this.exchange, this.topic]
|
70 | .map(str => (str ? str : '*'))
|
71 | .join('.')
|
72 | .concat(this.userId ? '.' + this.userId : '');
|
73 | }
|
74 | set queueName(val) {
|
75 | this.queue = val;
|
76 | }
|
77 | assertExchange() {
|
78 | return ChannelManager_1.channelManager.getChannel().then((channel) => {
|
79 | return new Promise((resolve, reject) => {
|
80 | channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS, (err) => err ? reject(err) : resolve(channel));
|
81 | });
|
82 | });
|
83 | }
|
84 | assertQueue() {
|
85 | return ChannelManager_1.channelManager.getChannel().then((channel) => {
|
86 | return new Promise((resolve, reject) => {
|
87 | channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
|
88 | if (err)
|
89 | return reject(err);
|
90 | this.queueName = ok.queue;
|
91 | resolve(channel);
|
92 | });
|
93 | });
|
94 | });
|
95 | }
|
96 | bindQueue() {
|
97 | return ChannelManager_1.channelManager.getChannel().then((channel) => {
|
98 | return new Promise((resolve, reject) => {
|
99 | channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {}, (err) => err ? reject(err) : resolve(channel));
|
100 | });
|
101 | });
|
102 | }
|
103 | ack(msg) {
|
104 | return ChannelManager_1.channelManager.getChannel().then((channel) => {
|
105 | channel.ack(msg);
|
106 | });
|
107 | }
|
108 | consume() {
|
109 | let _this = this;
|
110 | return this.assertExchange()
|
111 | .then(() => this.assertQueue())
|
112 | .then(() => this.bindQueue())
|
113 | .then((channel) => {
|
114 | channel.prefetch(this.prefetchCount);
|
115 | channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
|
116 | if (err) {
|
117 | console.error("Fail to consume on queue " + _this.queueName, err);
|
118 | throw err;
|
119 | }
|
120 | _this.consumerTag = ok.consumerTag;
|
121 | });
|
122 | });
|
123 | }
|
124 | listen(listener) {
|
125 | if (this.listener) {
|
126 | throw new Error("Listener is already set");
|
127 | }
|
128 | this.listener = listener;
|
129 | return this.consume();
|
130 | }
|
131 | cancel() {
|
132 | ChannelManager_1.channelManager.getChannel().then((channel) => {
|
133 | channel.cancel(this.consumerTag);
|
134 | ChannelManager_1.channelManager.removeListener("reconnect", this.onReconnect);
|
135 | });
|
136 | }
|
137 | }
|
138 | exports.EventListener = EventListener;
|
139 |
|
\ | No newline at end of file |