UNPKG

5.72 kBPlain TextView Raw
1import { channelManager } from './ChannelManager'
2import { Channel } from "amqplib/callback_api"
3import {Options} from "amqplib/properties";
4import { AMQPEventEmitter } from './EventEmitter'
5const EXCHANGE_PREFIX = "nimbus:event:";
6const EXCHANGE_ALL_EVENTS = "nimbus:events";
7const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
8const QUEUE_PREFIX = "nimbus:listener:";
9const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true};
10const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false};
11const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true};
12const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
13
14import util = require("util");
15
16var debug = util.debuglog("amqptools");
17
18export interface EventListenerConstructorOptions {
19 exchange?: string;
20 runtime?: string;
21 topic?: string;
22 userId?: string;
23 persistent?: boolean;
24 autoAck?: boolean;
25 prefetchCount?: number;
26}
27
28export interface MessageExtra {
29 ack?: () => void
30}
31
32export interface ListenerFunc {
33 (message: any, extra: MessageExtra): void | Promise
34}
35
36export class EventListener {
37 exchange: string;
38 topic: string;
39 queue: string;
40 userId: string;
41 // listener queue wont be removed after client disconnects(durable + no auto-delete)
42 persistent: boolean = false;
43 // auto-ack event message
44 autoAck: boolean = true;
45 prefetchCount: number;
46 private listener: ListenerFunc;
47 private queueOptions: Options.AssertQueue;
48 private consumerTag: string;
49 private eventEmitter: AMQPEventEmitter;
50
51 constructor(options: EventListenerConstructorOptions, eventEmitter: AMQPEventEmitter) {
52 this.exchange = options.exchange;
53 this.topic = options.topic;
54 this.userId = options.userId;
55 this.queueOptions = QUEUE_OPTIONS;
56 this.prefetchCount = options.prefetchCount || 1
57 if (!eventEmitter) {
58 throw new Error('eventEmitter is required for EventListener')
59 }
60 this.eventEmitter = eventEmitter
61 if(options.hasOwnProperty("persistent")) {
62 this.persistent = options.persistent;
63 }
64 if(options.hasOwnProperty("autoAck")) {
65 this.autoAck = options.autoAck;
66 }
67 if (options.runtime) {
68 this.queue = QUEUE_PREFIX + options.runtime +
69 (this.exchange ? ':' + this.exchange : '') +
70 (this.topic ? ':' + this.topic : '');
71 this.queueOptions = QUEUE_RUNTIME_OPTIONS;
72 if(this.persistent) {
73 this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
74 }
75 }
76
77 channelManager.on("reconnect", this.onReconnect);
78 channelManager.on("finalize", this.onFinalize);
79 }
80
81 onReconnect = () => {
82 debug("Trying to re establish consuming on event queue %s", this.queueName);
83 this.consume();
84 }
85
86 onFinalize = () => {
87 debug("Cancel consuming queue %s because of the finalization process", this.queueName);
88 this.cancel();
89 }
90
91 get fullExchangeName(): string {
92 if (this.userId) {
93 return EXCHANGE_EVENTS_BY_USER;
94 }
95 return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
96 }
97
98 get queueName(): string {
99 return this.queue;
100 }
101
102 get routeKey(): string {
103 if (!this.topic && !this.exchange && !this.userId) return '#';
104 return [this.exchange, this.topic]
105 .map(str => (str ? str : '*'))
106 .join('.')
107 .concat(this.userId ? '.' + this.userId : '');
108 }
109
110 set queueName(val: string) {
111 this.queue = val;
112 }
113
114 private assertExchange() {
115 return channelManager.getChannel().then((channel) => {
116 return new Promise((resolve, reject) => {
117 channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
118 (err) => err ? reject(err) : resolve(channel));
119 })
120 })
121 }
122
123 private assertQueue() {
124 return channelManager.getChannel().then((channel) => {
125 return new Promise((resolve, reject) => {
126 channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
127 if (err) return reject(err);
128 this.queueName = ok.queue;
129 resolve(channel);
130 });
131 })
132 })
133 }
134
135 private bindQueue() {
136 return channelManager.getChannel().then((channel) => {
137 return new Promise<Channel>((resolve, reject) => {
138 channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {},
139 (err) => err ? reject(err) : resolve(channel));
140 })
141 })
142 }
143
144 private ack(msg) {
145 return channelManager.getChannel().then((channel) => {
146 channel.ack(msg);
147 });
148 }
149
150 private onMessageReceived = (msg) => {
151 var message = JSON.parse(msg.content.toString());
152 var extra: MessageExtra = {};
153 if(this.autoAck) {
154 this.ack(msg);
155 }
156 else {
157 extra.ack = () => {
158 this.ack(msg);
159 };
160 }
161 this.listener(message, extra);
162 }
163
164 private consume() {
165 let _this = this;
166 return this.assertExchange()
167 .then(() => this.assertQueue())
168 .then(() => this.bindQueue())
169 .then((channel) => {
170 channel.prefetch(this.prefetchCount)
171 channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
172 if(err) {
173 console.error("Fail to consume on queue " + _this.queueName, err)
174 throw err
175 }
176 _this.consumerTag = ok.consumerTag;
177 });
178 });
179 }
180
181 listen(listener: (message, extra?) => void) {
182 if(this.listener) {
183 throw new Error("Listener is already set");
184 }
185 this.listener = listener;
186 return this.consume();
187 }
188
189 cancel() {
190 channelManager.getChannel().then((channel) => {
191 channel.cancel(this.consumerTag);
192 channelManager.removeListener("reconnect", this.onReconnect);
193 channelManager.removeListener("finalize", this.onFinalize);
194 })
195 }
196}
\No newline at end of file