UNPKG

2.82 kBPlain TextView Raw
1import { channelManager } from './ChannelManager'
2import { Channel } from "amqplib/callback_api"
3
4const EXCHANGE_PREFIX = "nimbus:event:";
5const EXCHANGE_ALL_EVENTS = "nimbus:events";
6const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
7const EXCHANGE_OPTIONS = {durable: true, autoDelete: false};
8
9export interface EventConstructorOptions {
10 exchange: string
11 topic: string
12 userId?: string
13}
14
15export interface Message {
16 exchange: string
17 topic: string
18 userId?: string
19 content: any
20}
21
22export class Event {
23 exchange:string;
24 topic:string;
25 userId:string;
26
27 constructor(options:EventConstructorOptions) {
28 this.exchange = options.exchange;
29 this.userId = options.userId;
30 this.topic = options.topic ? options.topic : 'nimbusEvent';
31 }
32
33 send(object:any) {
34 return this.sendString(this.prepareMessage(object));
35 }
36
37 get fullExchangeName():string {
38 if (this.userId) return EXCHANGE_EVENTS_BY_USER;
39 return EXCHANGE_PREFIX + this.exchange;
40 }
41
42 get routeKey(): string {
43 return this.exchange + '.' + this.topic + (this.userId ? '.' + this.userId : '');
44 }
45
46 private assertExchange() {
47 return channelManager.getChannel().then((channel) => {
48 return new Promise((resolve, reject) => {
49 channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
50 (err) => err ? reject(err) : resolve(channel));
51 })
52 })
53 }
54
55 private assertExchangeForAllEvents() {
56 return channelManager.getChannel().then((channel) => {
57 if (this.userId) return channel;
58 return new Promise((resolve, reject) => {
59 channel.assertExchange(EXCHANGE_ALL_EVENTS, "topic", EXCHANGE_OPTIONS,
60 (err) => err ? reject(err) : resolve(channel));
61 })
62 })
63 }
64
65 private bindToExchangeForAllEvents() {
66 return channelManager.getChannel().then((channel) => {
67 if (this.userId) return channel;
68 return new Promise<Channel>((resolve, reject) => {
69 channel.bindExchange(EXCHANGE_ALL_EVENTS, this.fullExchangeName, "#", {},
70 (err) => err ? reject(err) : resolve(channel));
71 })
72 })
73 }
74
75 sendBuffer(buffer) {
76 return channelManager.getChannel()
77 .then(() => this.assertExchange())
78 .then(() => this.assertExchangeForAllEvents())
79 .then(() => this.bindToExchangeForAllEvents())
80 .then((channel) => {
81 channel.publish(this.fullExchangeName, this.routeKey, buffer, {
82 contentType: "text/json",
83 persistent: true
84 });
85 });
86 }
87
88 sendString(string:string) {
89 return this.sendBuffer(new Buffer(string));
90 }
91
92 prepareMessage(object:any) {
93 var message:Message = {
94 exchange: this.exchange,
95 topic: this.topic,
96 content: object
97 };
98
99 if (this.userId) message.userId = this.userId;
100
101 return JSON.stringify(message);
102 }
103}
\No newline at end of file