UNPKG

3.38 kBPlain TextView Raw
1import * as events from "events";
2import * as util from "util";
3import * as async from "async";
4import { channelManager } from './ChannelManager';
5import { Event } from "./Event";
6import { EventListener } from "./EventListener";
7import { promiseNodeify } from './promise-nodeify';
8
9var EventEmitter = events.EventEmitter,
10 addListenerMethods = ["addListener", "on", "once"],
11 copyMethods = ["removeListener", "removeAllListeners", "setMaxListeners", "listeners"];
12
13function parseEvent(event) {
14 var tmp = event.split(":");
15 return {
16 exchange: tmp[0],
17 topic: tmp[1]
18 };
19}
20
21export interface EventsListeners {
22 [index: string]: EventListener
23}
24
25export interface EventOptions {
26 event: string
27 persistent?: boolean
28 autoAck?: boolean
29 prefetchCount?: number
30}
31
32export class AMQPEventEmitter {
33 runtime: string;
34 ee: events.EventEmitter;
35 private eventsListeners: EventsListeners;
36
37 constructor(runtime) {
38 this.runtime = runtime || "";
39 this.ee = new EventEmitter();
40 this.eventsListeners = {};
41
42 addListenerMethods.forEach((method) => {
43 this[method] = (options, cb, eventSetCb) => {
44 if (typeof options === "string") {
45 options = {
46 event: options
47 };
48 }
49 let event = options.event;
50 if (["newListener", "removeListener"].indexOf(event) !== -1) {
51 return this.ee[method].call(this.ee, event, cb);
52 }
53 // add listener to the event emitter before attaching to queue in order to be ready if messages are received
54 // before preListen callback is called
55 this.ee[method].call(this.ee, event, cb);
56 return this.preListen(options, (err) => {
57 if (err) {
58 this.ee.removeListener(event, cb);
59 }
60 if (eventSetCb) {
61 eventSetCb(err);
62 }
63 });
64 };
65 });
66
67 copyMethods.forEach((method) => {
68 this[method] = (...args: any[]) => {
69 this.ee[method].apply(this.ee, args);
70 };
71 });
72 }
73
74 private preListen(options, cb) {
75 var event = options.event;
76 var eParsed = parseEvent(event);
77
78 if (this.eventsListeners[event]) {
79 return cb(null);
80 }
81
82 Object.assign(options, {
83 exchange: eParsed.exchange,
84 topic: eParsed.topic,
85 runtime: this.runtime
86 })
87 var eventListener = new EventListener(options);
88
89 this.eventsListeners[event] = eventListener;
90 let promise = eventListener.listen((message, extra) => {
91 var content = message.content;
92 if (Array.isArray(content) && content.length === 1 && content[0].context && content[0].message) {
93 // old formatted message
94 content = content[0];
95 }
96 this.ee.emit.call(this.ee, event, content, extra);
97 });
98
99 return promiseNodeify(promise, cb);
100 }
101
102 emit(event, data) {
103 var eParsed = parseEvent(event);
104
105 var amqpEvent = new Event({
106 exchange: eParsed.exchange,
107 topic: eParsed.topic
108 });
109
110 amqpEvent.send(data);
111 };
112
113 addListener(event: string | EventOptions, listener: Function, cb?: Function) { };
114 on(event: string | EventOptions, listener: Function, cb?: Function) { };
115 once(event: string | EventOptions, listener: Function, cb?: Function) { };
116 removeListener(event: string, listener: Function) { };
117 removeAllListeners(event?: string) { };
118 setMaxListeners(n: number) { };
119 listeners(event: string) { };
120}
\No newline at end of file