UNPKG

5.69 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const ChannelManager_1 = require("./ChannelManager");
4const EXCHANGE_PREFIX = "nimbus:event:";
5const EXCHANGE_ALL_EVENTS = "nimbus:events";
6const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
7const QUEUE_PREFIX = "nimbus:listener:";
8const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true };
9const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false };
10const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true };
11const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
12const util = require("util");
13var debug = util.debuglog("amqptools");
14class EventListener {
15 constructor(options, eventEmitter) {
16 this.persistent = false;
17 this.autoAck = true;
18 this.onReconnect = () => {
19 debug("Trying to re establish consuming on event queue %s", this.queueName);
20 this.consume();
21 };
22 this.onFinalize = () => {
23 debug("Cancel consuming queue %s because of the finalization process", this.queueName);
24 this.cancel();
25 };
26 this.onMessageReceived = (msg) => {
27 var message = JSON.parse(msg.content.toString());
28 var extra = {};
29 if (this.autoAck) {
30 this.ack(msg);
31 }
32 else {
33 extra.ack = () => {
34 this.ack(msg);
35 };
36 }
37 this.listener(message, extra);
38 };
39 this.exchange = options.exchange;
40 this.topic = options.topic;
41 this.userId = options.userId;
42 this.queueOptions = QUEUE_OPTIONS;
43 this.prefetchCount = options.prefetchCount || 1;
44 if (!eventEmitter) {
45 throw new Error('eventEmitter is required for EventListener');
46 }
47 this.eventEmitter = eventEmitter;
48 if (options.hasOwnProperty("persistent")) {
49 this.persistent = options.persistent;
50 }
51 if (options.hasOwnProperty("autoAck")) {
52 this.autoAck = options.autoAck;
53 }
54 if (options.runtime) {
55 this.queue = QUEUE_PREFIX + options.runtime +
56 (this.exchange ? ':' + this.exchange : '') +
57 (this.topic ? ':' + this.topic : '');
58 this.queueOptions = QUEUE_RUNTIME_OPTIONS;
59 if (this.persistent) {
60 this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
61 }
62 }
63 ChannelManager_1.channelManager.on("reconnect", this.onReconnect);
64 ChannelManager_1.channelManager.on("finalize", this.onFinalize);
65 }
66 get fullExchangeName() {
67 if (this.userId) {
68 return EXCHANGE_EVENTS_BY_USER;
69 }
70 return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
71 }
72 get queueName() {
73 return this.queue;
74 }
75 get routeKey() {
76 if (!this.topic && !this.exchange && !this.userId)
77 return '#';
78 return [this.exchange, this.topic]
79 .map(str => (str ? str : '*'))
80 .join('.')
81 .concat(this.userId ? '.' + this.userId : '');
82 }
83 set queueName(val) {
84 this.queue = val;
85 }
86 assertExchange() {
87 return ChannelManager_1.channelManager.getChannel().then((channel) => {
88 return new Promise((resolve, reject) => {
89 channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS, (err) => err ? reject(err) : resolve(channel));
90 });
91 });
92 }
93 assertQueue() {
94 return ChannelManager_1.channelManager.getChannel().then((channel) => {
95 return new Promise((resolve, reject) => {
96 channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
97 if (err)
98 return reject(err);
99 this.queueName = ok.queue;
100 resolve(channel);
101 });
102 });
103 });
104 }
105 bindQueue() {
106 return ChannelManager_1.channelManager.getChannel().then((channel) => {
107 return new Promise((resolve, reject) => {
108 channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {}, (err) => err ? reject(err) : resolve(channel));
109 });
110 });
111 }
112 ack(msg) {
113 return ChannelManager_1.channelManager.getChannel().then((channel) => {
114 channel.ack(msg);
115 });
116 }
117 consume() {
118 let _this = this;
119 return this.assertExchange()
120 .then(() => this.assertQueue())
121 .then(() => this.bindQueue())
122 .then((channel) => {
123 channel.prefetch(this.prefetchCount);
124 channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
125 if (err) {
126 console.error("Fail to consume on queue " + _this.queueName, err);
127 throw err;
128 }
129 _this.consumerTag = ok.consumerTag;
130 });
131 });
132 }
133 listen(listener) {
134 if (this.listener) {
135 throw new Error("Listener is already set");
136 }
137 this.listener = listener;
138 return this.consume();
139 }
140 cancel() {
141 ChannelManager_1.channelManager.getChannel().then((channel) => {
142 channel.cancel(this.consumerTag);
143 ChannelManager_1.channelManager.removeListener("reconnect", this.onReconnect);
144 ChannelManager_1.channelManager.removeListener("finalize", this.onFinalize);
145 });
146 }
147}
148exports.EventListener = EventListener;
149//# sourceMappingURL=EventListener.js.map
\No newline at end of file