1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const events = require("events");
|
4 | const Event_1 = require("./Event");
|
5 | const EventListener_1 = require("./EventListener");
|
6 | const promise_nodeify_1 = require("./promise-nodeify");
|
7 | var EventEmitter = events.EventEmitter, addListenerMethods = ["addListener", "on", "once"], copyMethods = ["removeAllListeners", "setMaxListeners", "listeners"];
|
8 | function parseEvent(event) {
|
9 | var tmp = event.split(":");
|
10 | return {
|
11 | exchange: tmp[0],
|
12 | topic: tmp[1]
|
13 | };
|
14 | }
|
15 | class AMQPEventEmitter {
|
16 | constructor(runtime) {
|
17 | this.trackEventsProcessing = true;
|
18 | this.runtime = runtime || "";
|
19 | this.ee = new EventEmitter();
|
20 | this.eventsListeners = {};
|
21 | this.nowProcessingEvents = new Map();
|
22 | addListenerMethods.forEach((method) => {
|
23 | this[method] = (options, eventFn, eventSetCb) => {
|
24 | const cb = (...args) => {
|
25 | this.onStartProcesEvent(args);
|
26 | const listenerResult = eventFn(...args);
|
27 | if (listenerResult instanceof Promise) {
|
28 | listenerResult.then(() => {
|
29 | this.onEndProcessEvent(args);
|
30 | }).catch(() => {
|
31 | this.onEndProcessEvent(args);
|
32 | });
|
33 | }
|
34 | else {
|
35 | this.onEndProcessEvent(args);
|
36 | }
|
37 | };
|
38 | cb.fn = eventFn;
|
39 | if (typeof options === "string") {
|
40 | options = {
|
41 | event: options
|
42 | };
|
43 | }
|
44 | let event = options.event;
|
45 | if (["newListener", "removeListener"].indexOf(event) !== -1) {
|
46 | return this.ee[method].call(this.ee, event, cb);
|
47 | }
|
48 | this.ee[method].call(this.ee, event, cb);
|
49 | return this.preListen(options, (err) => {
|
50 | if (err) {
|
51 | this.removeListener(event, eventFn);
|
52 | if (!eventSetCb) {
|
53 | throw err;
|
54 | }
|
55 | }
|
56 | if (eventSetCb) {
|
57 | eventSetCb(err);
|
58 | }
|
59 | });
|
60 | };
|
61 | });
|
62 | copyMethods.forEach((method) => {
|
63 | this[method] = (...args) => {
|
64 | this.ee[method].apply(this.ee, args);
|
65 | };
|
66 | });
|
67 | }
|
68 | onStartProcesEvent(data) {
|
69 | if (!this.trackEventsProcessing) {
|
70 | return;
|
71 | }
|
72 | this.nowProcessingEvents.set(data, true);
|
73 | this.emit('event-start', data);
|
74 | }
|
75 | onEndProcessEvent(data, err) {
|
76 | if (!this.trackEventsProcessing) {
|
77 | return;
|
78 | }
|
79 | this.nowProcessingEvents.delete(data);
|
80 | this.emit('event-end', data);
|
81 | }
|
82 | disableTracking() {
|
83 | this.trackEventsProcessing = false;
|
84 | }
|
85 | preListen(options, cb) {
|
86 | var event = options.event;
|
87 | var eParsed = parseEvent(event);
|
88 | if (this.eventsListeners[event]) {
|
89 | return cb(null);
|
90 | }
|
91 | Object.assign(options, {
|
92 | exchange: eParsed.exchange,
|
93 | topic: eParsed.topic,
|
94 | runtime: this.runtime
|
95 | });
|
96 | var eventListener = new EventListener_1.EventListener(options, this);
|
97 | this.eventsListeners[event] = eventListener;
|
98 | let promise = eventListener.listen((message, extra) => {
|
99 | var content = message.content;
|
100 | if (Array.isArray(content) && content.length === 1 && content[0].context && content[0].message) {
|
101 | content = content[0];
|
102 | }
|
103 | return this.ee.emit.call(this.ee, event, content, extra);
|
104 | });
|
105 | return promise_nodeify_1.promiseNodeify(promise, cb);
|
106 | }
|
107 | emit(event, data) {
|
108 | var eParsed = parseEvent(event);
|
109 | var amqpEvent = new Event_1.Event({
|
110 | exchange: eParsed.exchange,
|
111 | topic: eParsed.topic
|
112 | });
|
113 | amqpEvent.send(data);
|
114 | }
|
115 | ;
|
116 | getListenerForFn(event, fn) {
|
117 | for (const listener of this.ee.listeners(event)) {
|
118 | if (listener.fn === fn) {
|
119 | return listener;
|
120 | }
|
121 | }
|
122 | return null;
|
123 | }
|
124 | addListener(event, listener, cb) { }
|
125 | ;
|
126 | on(event, listener, cb) { }
|
127 | ;
|
128 | once(event, listener, cb) { }
|
129 | ;
|
130 | removeListener(event, fn) {
|
131 | const listener = this.getListenerForFn(event, fn);
|
132 | if (!listener) {
|
133 | return;
|
134 | }
|
135 | this.ee.removeListener(event, listener);
|
136 | }
|
137 | removeAllListeners(event) { }
|
138 | ;
|
139 | setMaxListeners(n) { }
|
140 | ;
|
141 | listeners(event) { }
|
142 | ;
|
143 | }
|
144 | exports.AMQPEventEmitter = AMQPEventEmitter;
|
145 |
|
\ | No newline at end of file |