UNPKG

4.96 kBPlain TextView Raw
1import * as events from "events";
2import * as util from "util";
3import * as async from "async";
4import { channelManager } from './ChannelManager';
5import { Event } from "./Event";
6import { EventListener } from "./EventListener";
7import { promiseNodeify } from './promise-nodeify';
8
9var EventEmitter = events.EventEmitter,
10 addListenerMethods = ["addListener", "on", "once"],
11 copyMethods = ["removeAllListeners", "setMaxListeners", "listeners"];
12
13function parseEvent(event) {
14 var tmp = event.split(":");
15 return {
16 exchange: tmp[0],
17 topic: tmp[1]
18 };
19}
20
21export interface EventsListeners {
22 [index: string]: EventListener
23}
24
25export interface EventOptions {
26 event: string
27 persistent?: boolean
28 autoAck?: boolean
29 prefetchCount?: number
30}
31
32export class AMQPEventEmitter {
33 runtime: string;
34 ee: events.EventEmitter;
35 private eventsListeners: EventsListeners;
36 public nowProcessingEvents: Map
37 private trackEventsProcessing: boolean = true
38
39 onStartProcesEvent(data) {
40 if (!this.trackEventsProcessing) {
41 return
42 }
43 this.nowProcessingEvents.set(data, true)
44
45 this.emit('event-start', data)
46 }
47
48 onEndProcessEvent(data, err) {
49 if (!this.trackEventsProcessing) {
50 return
51 }
52 this.nowProcessingEvents.delete(data)
53
54 this.emit('event-end', data)
55 }
56
57 disableTracking() {
58 this.trackEventsProcessing = false
59 }
60
61 constructor(runtime) {
62 this.runtime = runtime || "";
63 this.ee = new EventEmitter();
64 this.eventsListeners = {};
65 this.nowProcessingEvents = new Map()
66
67 addListenerMethods.forEach((method) => {
68 this[method] = (options, eventFn, eventSetCb) => {
69 const cb = (...args) => {
70 this.onStartProcesEvent(args)
71 const listenerResult = eventFn(...args)
72 if (listenerResult instanceof Promise) {
73 listenerResult.then(() => {
74 this.onEndProcessEvent(args)
75 }).catch(() => {
76 this.onEndProcessEvent(args)
77 })
78 } else {
79 this.onEndProcessEvent(args)
80 }
81 }
82 cb.fn = eventFn
83 if (typeof options === "string") {
84 options = {
85 event: options
86 };
87 }
88 let event = options.event;
89 if (["newListener", "removeListener"].indexOf(event) !== -1) {
90 return this.ee[method].call(this.ee, event, cb);
91 }
92 // add listener to the event emitter before attaching to queue in order to be ready if messages are received
93 // before preListen callback is called
94 this.ee[method].call(this.ee, event, cb);
95 return this.preListen(options, (err) => {
96 if (err) {
97 this.removeListener(event, eventFn)
98 if (!eventSetCb) {
99 // throw error here if no callback is set, it will be right in most of the cases because apps rely heavily
100 // on the rabbitmq connection, no connection means broken app
101 throw err
102 }
103 }
104 if (eventSetCb) {
105 eventSetCb(err);
106 }
107 });
108 };
109 });
110
111 copyMethods.forEach((method) => {
112 this[method] = (...args: any[]) => {
113 this.ee[method].apply(this.ee, args);
114 };
115 });
116 }
117
118 private preListen(options, cb) {
119 var event = options.event;
120 var eParsed = parseEvent(event);
121
122 if (this.eventsListeners[event]) {
123 return cb(null);
124 }
125
126 Object.assign(options, {
127 exchange: eParsed.exchange,
128 topic: eParsed.topic,
129 runtime: this.runtime
130 })
131 var eventListener = new EventListener(options, this);
132
133 this.eventsListeners[event] = eventListener;
134 let promise = eventListener.listen((message, extra) => {
135 var content = message.content;
136 if (Array.isArray(content) && content.length === 1 && content[0].context && content[0].message) {
137 // old formatted message
138 content = content[0];
139 }
140 return this.ee.emit.call(this.ee, event, content, extra);
141 });
142
143 return promiseNodeify(promise, cb);
144 }
145
146 emit(event, data) {
147 var eParsed = parseEvent(event);
148
149 var amqpEvent = new Event({
150 exchange: eParsed.exchange,
151 topic: eParsed.topic
152 });
153
154 amqpEvent.send(data);
155 };
156
157 getListenerForFn(event: string, fn: Function) {
158 for (const listener of this.ee.listeners(event)) {
159 if (listener.fn === fn) {
160 return listener
161 }
162 }
163 return null
164 }
165
166 addListener(event: string | EventOptions, listener: Function, cb?: Function) { };
167 on(event: string | EventOptions, listener: Function, cb?: Function) { };
168 once(event: string | EventOptions, listener: Function, cb?: Function) { };
169 removeListener(event: string, fn: Function) {
170 const listener = this.getListenerForFn(event, fn)
171 if (!listener) {
172 return
173 }
174 this.ee.removeListener(event, listener)
175 }
176 removeAllListeners(event?: string) { };
177 setMaxListeners(n: number) { };
178 listeners(event: string) { };
179}
\No newline at end of file