UNPKG

5.19 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const ChannelManager_1 = require("./ChannelManager");
4const EXCHANGE_PREFIX = "nimbus:event:";
5const EXCHANGE_ALL_EVENTS = "nimbus:events";
6const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
7const QUEUE_PREFIX = "nimbus:listener:";
8const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true };
9const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false };
10const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true };
11const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
12const util = require("util");
13var debug = util.debuglog("amqptools");
14class EventListener {
15 constructor(options) {
16 this.persistent = false;
17 this.autoAck = true;
18 this.onReconnect = () => {
19 debug("Trying to re establish consuming on event queue %s", this.queueName);
20 this.consume();
21 };
22 this.onMessageReceived = (msg) => {
23 var message = JSON.parse(msg.content.toString());
24 var extra = {};
25 if (this.autoAck) {
26 this.ack(msg);
27 }
28 else {
29 extra.ack = () => {
30 this.ack(msg);
31 };
32 }
33 this.listener(message, extra);
34 };
35 this.exchange = options.exchange;
36 this.topic = options.topic;
37 this.userId = options.userId;
38 this.queueOptions = QUEUE_OPTIONS;
39 this.prefetchCount = options.prefetchCount || 1;
40 if (options.hasOwnProperty("persistent")) {
41 this.persistent = options.persistent;
42 }
43 if (options.hasOwnProperty("autoAck")) {
44 this.autoAck = options.autoAck;
45 }
46 if (options.runtime) {
47 this.queue = QUEUE_PREFIX + options.runtime +
48 (this.exchange ? ':' + this.exchange : '') +
49 (this.topic ? ':' + this.topic : '');
50 this.queueOptions = QUEUE_RUNTIME_OPTIONS;
51 if (this.persistent) {
52 this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
53 }
54 }
55 ChannelManager_1.channelManager.on("reconnect", this.onReconnect);
56 }
57 get fullExchangeName() {
58 if (this.userId) {
59 return EXCHANGE_EVENTS_BY_USER;
60 }
61 return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
62 }
63 get queueName() {
64 return this.queue;
65 }
66 get routeKey() {
67 if (!this.topic && !this.exchange && !this.userId)
68 return '#';
69 return [this.exchange, this.topic]
70 .map(str => (str ? str : '*'))
71 .join('.')
72 .concat(this.userId ? '.' + this.userId : '');
73 }
74 set queueName(val) {
75 this.queue = val;
76 }
77 assertExchange() {
78 return ChannelManager_1.channelManager.getChannel().then((channel) => {
79 return new Promise((resolve, reject) => {
80 channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS, (err) => err ? reject(err) : resolve(channel));
81 });
82 });
83 }
84 assertQueue() {
85 return ChannelManager_1.channelManager.getChannel().then((channel) => {
86 return new Promise((resolve, reject) => {
87 channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
88 if (err)
89 return reject(err);
90 this.queueName = ok.queue;
91 resolve(channel);
92 });
93 });
94 });
95 }
96 bindQueue() {
97 return ChannelManager_1.channelManager.getChannel().then((channel) => {
98 return new Promise((resolve, reject) => {
99 channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {}, (err) => err ? reject(err) : resolve(channel));
100 });
101 });
102 }
103 ack(msg) {
104 return ChannelManager_1.channelManager.getChannel().then((channel) => {
105 channel.ack(msg);
106 });
107 }
108 consume() {
109 let _this = this;
110 return this.assertExchange()
111 .then(() => this.assertQueue())
112 .then(() => this.bindQueue())
113 .then((channel) => {
114 channel.prefetch(this.prefetchCount);
115 channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
116 if (err) {
117 console.error("Fail to consume on queue " + _this.queueName, err);
118 throw err;
119 }
120 _this.consumerTag = ok.consumerTag;
121 });
122 });
123 }
124 listen(listener) {
125 if (this.listener) {
126 throw new Error("Listener is already set");
127 }
128 this.listener = listener;
129 return this.consume();
130 }
131 cancel() {
132 ChannelManager_1.channelManager.getChannel().then((channel) => {
133 channel.cancel(this.consumerTag);
134 ChannelManager_1.channelManager.removeListener("reconnect", this.onReconnect);
135 });
136 }
137}
138exports.EventListener = EventListener;
139//# sourceMappingURL=EventListener.js.map
\No newline at end of file