1 | import { channelManager } from './ChannelManager'
|
2 | import { Channel } from "amqplib/callback_api"
|
3 |
|
4 | const EXCHANGE_PREFIX = "nimbus:event:";
|
5 | const EXCHANGE_ALL_EVENTS = "nimbus:events";
|
6 | const EXCHANGE_EVENTS_BY_USER = "nimbus:eventsByUser";
|
7 | const EXCHANGE_OPTIONS = {durable: true, autoDelete: false};
|
8 |
|
9 | export interface EventConstructorOptions {
|
10 | exchange: string
|
11 | topic: string
|
12 | userId?: string
|
13 | }
|
14 |
|
15 | export interface Message {
|
16 | exchange: string
|
17 | topic: string
|
18 | userId?: string
|
19 | content: any
|
20 | }
|
21 |
|
22 | export class Event {
|
23 | exchange:string;
|
24 | topic:string;
|
25 | userId:string;
|
26 |
|
27 | constructor(options:EventConstructorOptions) {
|
28 | this.exchange = options.exchange;
|
29 | this.userId = options.userId;
|
30 | this.topic = options.topic ? options.topic : 'nimbusEvent';
|
31 | }
|
32 |
|
33 | send(object:any) {
|
34 | return this.sendString(this.prepareMessage(object));
|
35 | }
|
36 |
|
37 | get fullExchangeName():string {
|
38 | if (this.userId) return EXCHANGE_EVENTS_BY_USER;
|
39 | return EXCHANGE_PREFIX + this.exchange;
|
40 | }
|
41 |
|
42 | get routeKey(): string {
|
43 | return this.exchange + '.' + this.topic + (this.userId ? '.' + this.userId : '');
|
44 | }
|
45 |
|
46 | private assertExchange() {
|
47 | return channelManager.getChannel().then((channel) => {
|
48 | return new Promise((resolve, reject) => {
|
49 | channel.assertExchange(this.fullExchangeName, "topic", EXCHANGE_OPTIONS,
|
50 | (err) => err ? reject(err) : resolve(channel));
|
51 | })
|
52 | })
|
53 | }
|
54 |
|
55 | private assertExchangeForAllEvents() {
|
56 | return channelManager.getChannel().then((channel) => {
|
57 | if (this.userId) return channel;
|
58 | return new Promise((resolve, reject) => {
|
59 | channel.assertExchange(EXCHANGE_ALL_EVENTS, "topic", EXCHANGE_OPTIONS,
|
60 | (err) => err ? reject(err) : resolve(channel));
|
61 | })
|
62 | })
|
63 | }
|
64 |
|
65 | private bindToExchangeForAllEvents() {
|
66 | return channelManager.getChannel().then((channel) => {
|
67 | if (this.userId) return channel;
|
68 | return new Promise<Channel>((resolve, reject) => {
|
69 | channel.bindExchange(EXCHANGE_ALL_EVENTS, this.fullExchangeName, "#", {},
|
70 | (err) => err ? reject(err) : resolve(channel));
|
71 | })
|
72 | })
|
73 | }
|
74 |
|
75 | sendBuffer(buffer) {
|
76 | return channelManager.getChannel()
|
77 | .then(() => this.assertExchange())
|
78 | .then(() => this.assertExchangeForAllEvents())
|
79 | .then(() => this.bindToExchangeForAllEvents())
|
80 | .then((channel) => {
|
81 | channel.publish(this.fullExchangeName, this.routeKey, buffer, {
|
82 | contentType: "text/json",
|
83 | persistent: true
|
84 | });
|
85 | });
|
86 | }
|
87 |
|
88 | sendString(string:string) {
|
89 | return this.sendBuffer(new Buffer(string));
|
90 | }
|
91 |
|
92 | prepareMessage(object:any) {
|
93 | var message:Message = {
|
94 | exchange: this.exchange,
|
95 | topic: this.topic,
|
96 | content: object
|
97 | };
|
98 |
|
99 | if (this.userId) message.userId = this.userId;
|
100 |
|
101 | return JSON.stringify(message);
|
102 | }
|
103 | } |
\ | No newline at end of file |