1 | import * as events from "events";
|
2 | import * as util from "util";
|
3 | import * as async from "async";
|
4 | import { channelManager } from './ChannelManager';
|
5 | import { Event } from "./Event";
|
6 | import { EventListener } from "./EventListener";
|
7 | import { promiseNodeify } from './promise-nodeify';
|
8 |
|
9 | var EventEmitter = events.EventEmitter,
|
10 | addListenerMethods = ["addListener", "on", "once"],
|
11 | copyMethods = ["removeAllListeners", "setMaxListeners", "listeners"];
|
12 |
|
13 | function parseEvent(event) {
|
14 | var tmp = event.split(":");
|
15 | return {
|
16 | exchange: tmp[0],
|
17 | topic: tmp[1]
|
18 | };
|
19 | }
|
20 |
|
21 | export interface EventsListeners {
|
22 | [index: string]: EventListener
|
23 | }
|
24 |
|
25 | export interface EventOptions {
|
26 | event: string
|
27 | persistent?: boolean
|
28 | autoAck?: boolean
|
29 | prefetchCount?: number
|
30 | }
|
31 |
|
32 | export class AMQPEventEmitter {
|
33 | runtime: string;
|
34 | ee: events.EventEmitter;
|
35 | private eventsListeners: EventsListeners;
|
36 | public nowProcessingEvents: Map
|
37 | private trackEventsProcessing: boolean = true
|
38 |
|
39 | onStartProcesEvent(data) {
|
40 | if (!this.trackEventsProcessing) {
|
41 | return
|
42 | }
|
43 | this.nowProcessingEvents.set(data, true)
|
44 |
|
45 | this.emit('event-start', data)
|
46 | }
|
47 |
|
48 | onEndProcessEvent(data, err) {
|
49 | if (!this.trackEventsProcessing) {
|
50 | return
|
51 | }
|
52 | this.nowProcessingEvents.delete(data)
|
53 |
|
54 | this.emit('event-end', data)
|
55 | }
|
56 |
|
57 | disableTracking() {
|
58 | this.trackEventsProcessing = false
|
59 | }
|
60 |
|
61 | constructor(runtime) {
|
62 | this.runtime = runtime || "";
|
63 | this.ee = new EventEmitter();
|
64 | this.eventsListeners = {};
|
65 | this.nowProcessingEvents = new Map()
|
66 |
|
67 | addListenerMethods.forEach((method) => {
|
68 | this[method] = (options, eventFn, eventSetCb) => {
|
69 | const cb = (...args) => {
|
70 | this.onStartProcesEvent(args)
|
71 | const listenerResult = eventFn(...args)
|
72 | if (listenerResult instanceof Promise) {
|
73 | listenerResult.then(() => {
|
74 | this.onEndProcessEvent(args)
|
75 | }).catch(() => {
|
76 | this.onEndProcessEvent(args)
|
77 | })
|
78 | } else {
|
79 | this.onEndProcessEvent(args)
|
80 | }
|
81 | }
|
82 | cb.fn = eventFn
|
83 | if (typeof options === "string") {
|
84 | options = {
|
85 | event: options
|
86 | };
|
87 | }
|
88 | let event = options.event;
|
89 | if (["newListener", "removeListener"].indexOf(event) !== -1) {
|
90 | return this.ee[method].call(this.ee, event, cb);
|
91 | }
|
92 |
|
93 |
|
94 | this.ee[method].call(this.ee, event, cb);
|
95 | return this.preListen(options, (err) => {
|
96 | if (err) {
|
97 | this.removeListener(event, eventFn)
|
98 | if (!eventSetCb) {
|
99 |
|
100 |
|
101 | throw err
|
102 | }
|
103 | }
|
104 | if (eventSetCb) {
|
105 | eventSetCb(err);
|
106 | }
|
107 | });
|
108 | };
|
109 | });
|
110 |
|
111 | copyMethods.forEach((method) => {
|
112 | this[method] = (...args: any[]) => {
|
113 | this.ee[method].apply(this.ee, args);
|
114 | };
|
115 | });
|
116 | }
|
117 |
|
118 | private preListen(options, cb) {
|
119 | var event = options.event;
|
120 | var eParsed = parseEvent(event);
|
121 |
|
122 | if (this.eventsListeners[event]) {
|
123 | return cb(null);
|
124 | }
|
125 |
|
126 | Object.assign(options, {
|
127 | exchange: eParsed.exchange,
|
128 | topic: eParsed.topic,
|
129 | runtime: this.runtime
|
130 | })
|
131 | var eventListener = new EventListener(options, this);
|
132 |
|
133 | this.eventsListeners[event] = eventListener;
|
134 | let promise = eventListener.listen((message, extra) => {
|
135 | var content = message.content;
|
136 | if (Array.isArray(content) && content.length === 1 && content[0].context && content[0].message) {
|
137 |
|
138 | content = content[0];
|
139 | }
|
140 | return this.ee.emit.call(this.ee, event, content, extra);
|
141 | });
|
142 |
|
143 | return promiseNodeify(promise, cb);
|
144 | }
|
145 |
|
146 | emit(event, data) {
|
147 | var eParsed = parseEvent(event);
|
148 |
|
149 | var amqpEvent = new Event({
|
150 | exchange: eParsed.exchange,
|
151 | topic: eParsed.topic
|
152 | });
|
153 |
|
154 | amqpEvent.send(data);
|
155 | };
|
156 |
|
157 | getListenerForFn(event: string, fn: Function) {
|
158 | for (const listener of this.ee.listeners(event)) {
|
159 | if (listener.fn === fn) {
|
160 | return listener
|
161 | }
|
162 | }
|
163 | return null
|
164 | }
|
165 |
|
166 | addListener(event: string | EventOptions, listener: Function, cb?: Function) { };
|
167 | on(event: string | EventOptions, listener: Function, cb?: Function) { };
|
168 | once(event: string | EventOptions, listener: Function, cb?: Function) { };
|
169 | removeListener(event: string, fn: Function) {
|
170 | const listener = this.getListenerForFn(event, fn)
|
171 | if (!listener) {
|
172 | return
|
173 | }
|
174 | this.ee.removeListener(event, listener)
|
175 | }
|
176 | removeAllListeners(event?: string) { };
|
177 | setMaxListeners(n: number) { };
|
178 | listeners(event: string) { };
|
179 | } |
\ | No newline at end of file |