UNPKG

4.9 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const events = require("events");
4const Event_1 = require("./Event");
5const EventListener_1 = require("./EventListener");
6const promise_nodeify_1 = require("./promise-nodeify");
7var EventEmitter = events.EventEmitter, addListenerMethods = ["addListener", "on", "once"], copyMethods = ["removeAllListeners", "setMaxListeners", "listeners"];
8function parseEvent(event) {
9 var tmp = event.split(":");
10 return {
11 exchange: tmp[0],
12 topic: tmp[1]
13 };
14}
15class AMQPEventEmitter {
16 constructor(runtime) {
17 this.trackEventsProcessing = true;
18 this.runtime = runtime || "";
19 this.ee = new EventEmitter();
20 this.eventsListeners = {};
21 this.nowProcessingEvents = new Map();
22 addListenerMethods.forEach((method) => {
23 this[method] = (options, eventFn, eventSetCb) => {
24 const cb = (...args) => {
25 this.onStartProcesEvent(args);
26 const listenerResult = eventFn(...args);
27 if (listenerResult instanceof Promise) {
28 listenerResult.then(() => {
29 this.onEndProcessEvent(args);
30 }).catch(() => {
31 this.onEndProcessEvent(args);
32 });
33 }
34 else {
35 this.onEndProcessEvent(args);
36 }
37 };
38 cb.fn = eventFn;
39 if (typeof options === "string") {
40 options = {
41 event: options
42 };
43 }
44 let event = options.event;
45 if (["newListener", "removeListener"].indexOf(event) !== -1) {
46 return this.ee[method].call(this.ee, event, cb);
47 }
48 this.ee[method].call(this.ee, event, cb);
49 return this.preListen(options, (err) => {
50 if (err) {
51 this.removeListener(event, eventFn);
52 if (!eventSetCb) {
53 throw err;
54 }
55 }
56 if (eventSetCb) {
57 eventSetCb(err);
58 }
59 });
60 };
61 });
62 copyMethods.forEach((method) => {
63 this[method] = (...args) => {
64 this.ee[method].apply(this.ee, args);
65 };
66 });
67 }
68 onStartProcesEvent(data) {
69 if (!this.trackEventsProcessing) {
70 return;
71 }
72 this.nowProcessingEvents.set(data, true);
73 this.emit('event-start', data);
74 }
75 onEndProcessEvent(data, err) {
76 if (!this.trackEventsProcessing) {
77 return;
78 }
79 this.nowProcessingEvents.delete(data);
80 this.emit('event-end', data);
81 }
82 disableTracking() {
83 this.trackEventsProcessing = false;
84 }
85 preListen(options, cb) {
86 var event = options.event;
87 var eParsed = parseEvent(event);
88 if (this.eventsListeners[event]) {
89 return cb(null);
90 }
91 Object.assign(options, {
92 exchange: eParsed.exchange,
93 topic: eParsed.topic,
94 runtime: this.runtime
95 });
96 var eventListener = new EventListener_1.EventListener(options, this);
97 this.eventsListeners[event] = eventListener;
98 let promise = eventListener.listen((message, extra) => {
99 var content = message.content;
100 if (Array.isArray(content) && content.length === 1 && content[0].context && content[0].message) {
101 content = content[0];
102 }
103 return this.ee.emit.call(this.ee, event, content, extra);
104 });
105 return promise_nodeify_1.promiseNodeify(promise, cb);
106 }
107 emit(event, data) {
108 var eParsed = parseEvent(event);
109 var amqpEvent = new Event_1.Event({
110 exchange: eParsed.exchange,
111 topic: eParsed.topic
112 });
113 amqpEvent.send(data);
114 }
115 ;
116 getListenerForFn(event, fn) {
117 for (const listener of this.ee.listeners(event)) {
118 if (listener.fn === fn) {
119 return listener;
120 }
121 }
122 return null;
123 }
124 addListener(event, listener, cb) { }
125 ;
126 on(event, listener, cb) { }
127 ;
128 once(event, listener, cb) { }
129 ;
130 removeListener(event, fn) {
131 const listener = this.getListenerForFn(event, fn);
132 if (!listener) {
133 return;
134 }
135 this.ee.removeListener(event, listener);
136 }
137 removeAllListeners(event) { }
138 ;
139 setMaxListeners(n) { }
140 ;
141 listeners(event) { }
142 ;
143}
144exports.AMQPEventEmitter = AMQPEventEmitter;
145//# sourceMappingURL=EventEmitter.js.map
\No newline at end of file