UNPKG

5.2 kBPlain TextView Raw
1import { channelManager } from './ChannelManager'
2import { Channel } from "amqplib/callback_api"
3import {Options} from "amqplib/properties";
4const EXCHANGE_PREFIX = "nimbus:event:";
5const EXCHANGE_ALL_EVENTS = "nimbus:events";
6const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
7const QUEUE_PREFIX = "nimbus:listener:";
8const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true};
9const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false};
10const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true};
11const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
12
13import util = require("util");
14
15var debug = util.debuglog("amqptools");
16
17export interface EventListenerConstructorOptions {
18 exchange?: string;
19 runtime?: string;
20 topic?: string;
21 userId?: string;
22 persistent?: boolean;
23 autoAck?: boolean;
24 prefetchCount?: number;
25}
26
27export interface MessageExtra {
28 ack?: () => void
29}
30
31export interface ListenerFunc {
32 (message: any, extra: MessageExtra): void
33}
34
35export class EventListener {
36 exchange: string;
37 topic: string;
38 queue: string;
39 userId: string;
40 // listener queue wont be removed after client disconnects(durable + no auto-delete)
41 persistent: boolean = false;
42 // auto-ack event message
43 autoAck: boolean = true;
44 prefetchCount: number;
45 private listener: ListenerFunc;
46 private queueOptions: Options.AssertQueue;
47 private consumerTag: string;
48
49 constructor(options: EventListenerConstructorOptions) {
50 this.exchange = options.exchange;
51 this.topic = options.topic;
52 this.userId = options.userId;
53 this.queueOptions = QUEUE_OPTIONS;
54 this.prefetchCount = options.prefetchCount || 1
55 if(options.hasOwnProperty("persistent")) {
56 this.persistent = options.persistent;
57 }
58 if(options.hasOwnProperty("autoAck")) {
59 this.autoAck = options.autoAck;
60 }
61 if (options.runtime) {
62 this.queue = QUEUE_PREFIX + options.runtime +
63 (this.exchange ? ':' + this.exchange : '') +
64 (this.topic ? ':' + this.topic : '');
65 this.queueOptions = QUEUE_RUNTIME_OPTIONS;
66 if(this.persistent) {
67 this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
68 }
69 }
70
71 channelManager.on("reconnect", this.onReconnect);
72 }
73
74 onReconnect = () => {
75 debug("Trying to re establish consuming on event queue %s", this.queueName);
76 this.consume();
77 }
78
79 get fullExchangeName(): string {
80 if (this.userId) {
81 return EXCHANGE_EVENTS_BY_USER;
82 }
83 return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
84 }
85
86 get queueName(): string {
87 return this.queue;
88 }
89
90 get routeKey(): string {
91 if (!this.topic && !this.exchange && !this.userId) return '#';
92 return [this.exchange, this.topic]
93 .map(str => (str ? str : '*'))
94 .join('.')
95 .concat(this.userId ? '.' + this.userId : '');
96 }
97
98 set queueName(val: string) {
99 this.queue = val;
100 }
101
102 private assertExchange() {
103 return channelManager.getChannel().then((channel) => {
104 return new Promise((resolve, reject) => {
105 channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
106 (err) => err ? reject(err) : resolve(channel));
107 })
108 })
109 }
110
111 private assertQueue() {
112 return channelManager.getChannel().then((channel) => {
113 return new Promise((resolve, reject) => {
114 channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
115 if (err) return reject(err);
116 this.queueName = ok.queue;
117 resolve(channel);
118 });
119 })
120 })
121 }
122
123 private bindQueue() {
124 return channelManager.getChannel().then((channel) => {
125 return new Promise<Channel>((resolve, reject) => {
126 channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {},
127 (err) => err ? reject(err) : resolve(channel));
128 })
129 })
130 }
131
132 private ack(msg) {
133 return channelManager.getChannel().then((channel) => {
134 channel.ack(msg);
135 });
136 }
137
138 private onMessageReceived = (msg) => {
139 var message = JSON.parse(msg.content.toString());
140 var extra: MessageExtra = {};
141 if(this.autoAck) {
142 this.ack(msg);
143 }
144 else {
145 extra.ack = () => {
146 this.ack(msg);
147 };
148 }
149 this.listener(message, extra);
150 }
151
152 private consume() {
153 let _this = this;
154 return this.assertExchange()
155 .then(() => this.assertQueue())
156 .then(() => this.bindQueue())
157 .then((channel) => {
158 channel.prefetch(this.prefetchCount)
159 channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
160 if(err) {
161 console.error("Fail to consume on queue " + _this.queueName, err)
162 throw err
163 }
164 _this.consumerTag = ok.consumerTag;
165 });
166 });
167 }
168
169 listen(listener: (message, extra?) => void) {
170 if(this.listener) {
171 throw new Error("Listener is already set");
172 }
173 this.listener = listener;
174 return this.consume();
175 }
176
177 cancel() {
178 channelManager.getChannel().then((channel) => {
179 channel.cancel(this.consumerTag);
180 channelManager.removeListener("reconnect", this.onReconnect);
181 })
182 }
183}
\No newline at end of file