1 | import { channelManager } from './ChannelManager'
|
2 | import { Channel } from "amqplib/callback_api"
|
3 | import {Options} from "amqplib/properties";
|
4 | const EXCHANGE_PREFIX = "nimbus:event:";
|
5 | const EXCHANGE_ALL_EVENTS = "nimbus:events";
|
6 | const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
|
7 | const QUEUE_PREFIX = "nimbus:listener:";
|
8 | const QUEUE_OPTIONS = { durable: false, autoDelete: true, exclusive: true};
|
9 | const PERSISTENT_QUEUE_OPTIONS = { durable: true, autoDelete: false, exclusive: false};
|
10 | const QUEUE_RUNTIME_OPTIONS = { durable: false, autoDelete: true};
|
11 | const EXCHANGE_OPTIONS = { durable: true, autoDelete: false };
|
12 |
|
13 | import util = require("util");
|
14 |
|
15 | var debug = util.debuglog("amqptools");
|
16 |
|
17 | export interface EventListenerConstructorOptions {
|
18 | exchange?: string;
|
19 | runtime?: string;
|
20 | topic?: string;
|
21 | userId?: string;
|
22 | persistent?: boolean;
|
23 | autoAck?: boolean;
|
24 | prefetchCount?: number;
|
25 | }
|
26 |
|
27 | export interface MessageExtra {
|
28 | ack?: () => void
|
29 | }
|
30 |
|
31 | export interface ListenerFunc {
|
32 | (message: any, extra: MessageExtra): void
|
33 | }
|
34 |
|
35 | export class EventListener {
|
36 | exchange: string;
|
37 | topic: string;
|
38 | queue: string;
|
39 | userId: string;
|
40 |
|
41 | persistent: boolean = false;
|
42 |
|
43 | autoAck: boolean = true;
|
44 | prefetchCount: number;
|
45 | private listener: ListenerFunc;
|
46 | private queueOptions: Options.AssertQueue;
|
47 | private consumerTag: string;
|
48 |
|
49 | constructor(options: EventListenerConstructorOptions) {
|
50 | this.exchange = options.exchange;
|
51 | this.topic = options.topic;
|
52 | this.userId = options.userId;
|
53 | this.queueOptions = QUEUE_OPTIONS;
|
54 | this.prefetchCount = options.prefetchCount || 1
|
55 | if(options.hasOwnProperty("persistent")) {
|
56 | this.persistent = options.persistent;
|
57 | }
|
58 | if(options.hasOwnProperty("autoAck")) {
|
59 | this.autoAck = options.autoAck;
|
60 | }
|
61 | if (options.runtime) {
|
62 | this.queue = QUEUE_PREFIX + options.runtime +
|
63 | (this.exchange ? ':' + this.exchange : '') +
|
64 | (this.topic ? ':' + this.topic : '');
|
65 | this.queueOptions = QUEUE_RUNTIME_OPTIONS;
|
66 | if(this.persistent) {
|
67 | this.queueOptions = PERSISTENT_QUEUE_OPTIONS;
|
68 | }
|
69 | }
|
70 |
|
71 | channelManager.on("reconnect", this.onReconnect);
|
72 | }
|
73 |
|
74 | onReconnect = () => {
|
75 | debug("Trying to re establish consuming on event queue %s", this.queueName);
|
76 | this.consume();
|
77 | }
|
78 |
|
79 | get fullExchangeName(): string {
|
80 | if (this.userId) {
|
81 | return EXCHANGE_EVENTS_BY_USER;
|
82 | }
|
83 | return this.exchange ? EXCHANGE_PREFIX + this.exchange : EXCHANGE_ALL_EVENTS;
|
84 | }
|
85 |
|
86 | get queueName(): string {
|
87 | return this.queue;
|
88 | }
|
89 |
|
90 | get routeKey(): string {
|
91 | if (!this.topic && !this.exchange && !this.userId) return '#';
|
92 | return [this.exchange, this.topic]
|
93 | .map(str => (str ? str : '*'))
|
94 | .join('.')
|
95 | .concat(this.userId ? '.' + this.userId : '');
|
96 | }
|
97 |
|
98 | set queueName(val: string) {
|
99 | this.queue = val;
|
100 | }
|
101 |
|
102 | private assertExchange() {
|
103 | return channelManager.getChannel().then((channel) => {
|
104 | return new Promise((resolve, reject) => {
|
105 | channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
|
106 | (err) => err ? reject(err) : resolve(channel));
|
107 | })
|
108 | })
|
109 | }
|
110 |
|
111 | private assertQueue() {
|
112 | return channelManager.getChannel().then((channel) => {
|
113 | return new Promise((resolve, reject) => {
|
114 | channel.assertQueue(this.queueName, this.queueOptions, (err, ok) => {
|
115 | if (err) return reject(err);
|
116 | this.queueName = ok.queue;
|
117 | resolve(channel);
|
118 | });
|
119 | })
|
120 | })
|
121 | }
|
122 |
|
123 | private bindQueue() {
|
124 | return channelManager.getChannel().then((channel) => {
|
125 | return new Promise<Channel>((resolve, reject) => {
|
126 | channel.bindQueue(this.queueName, this.fullExchangeName, this.routeKey, {},
|
127 | (err) => err ? reject(err) : resolve(channel));
|
128 | })
|
129 | })
|
130 | }
|
131 |
|
132 | private ack(msg) {
|
133 | return channelManager.getChannel().then((channel) => {
|
134 | channel.ack(msg);
|
135 | });
|
136 | }
|
137 |
|
138 | private onMessageReceived = (msg) => {
|
139 | var message = JSON.parse(msg.content.toString());
|
140 | var extra: MessageExtra = {};
|
141 | if(this.autoAck) {
|
142 | this.ack(msg);
|
143 | }
|
144 | else {
|
145 | extra.ack = () => {
|
146 | this.ack(msg);
|
147 | };
|
148 | }
|
149 | this.listener(message, extra);
|
150 | }
|
151 |
|
152 | private consume() {
|
153 | let _this = this;
|
154 | return this.assertExchange()
|
155 | .then(() => this.assertQueue())
|
156 | .then(() => this.bindQueue())
|
157 | .then((channel) => {
|
158 | channel.prefetch(this.prefetchCount)
|
159 | channel.consume(this.queueName, this.onMessageReceived, undefined, function (err, ok) {
|
160 | if(err) {
|
161 | console.error("Fail to consume on queue " + _this.queueName, err)
|
162 | throw err
|
163 | }
|
164 | _this.consumerTag = ok.consumerTag;
|
165 | });
|
166 | });
|
167 | }
|
168 |
|
169 | listen(listener: (message, extra?) => void) {
|
170 | if(this.listener) {
|
171 | throw new Error("Listener is already set");
|
172 | }
|
173 | this.listener = listener;
|
174 | return this.consume();
|
175 | }
|
176 |
|
177 | cancel() {
|
178 | channelManager.getChannel().then((channel) => {
|
179 | channel.cancel(this.consumerTag);
|
180 | channelManager.removeListener("reconnect", this.onReconnect);
|
181 | })
|
182 | }
|
183 | } |
\ | No newline at end of file |