UNPKG

8.29 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const async = require("async");
4const randomString = require("just.randomstring");
5const ChannelManager_1 = require("./ChannelManager");
6const promise_nodeify_1 = require("./promise-nodeify");
7const QUEUE_PREFIX = "_queue_rpc:";
8const CALL_TIMEOUT = 3600 * 1000;
9var returnCbs = {}, replyQueue = "", DEBUG = false;
10function dbg(...args) {
11 if (DEBUG) {
12 console.log.apply(console, args);
13 }
14}
15setInterval(() => {
16 var removeKeys = [], now = new Date().getTime(), k, timeCreated, data;
17 for (k in returnCbs) {
18 timeCreated = returnCbs[k].date.getTime();
19 if (now - timeCreated >= CALL_TIMEOUT) {
20 removeKeys.push(k);
21 }
22 }
23 removeKeys.forEach((k) => {
24 data = returnCbs[k];
25 delete returnCbs[k];
26 });
27}, 3600 * 1000);
28function _parseAction(event) {
29 return {
30 queue: QUEUE_PREFIX + event,
31 };
32}
33function _errorPrepare(err) {
34 if (!err) {
35 return null;
36 }
37 return {
38 code: err.code ? err.code : -1,
39 msg: err.message,
40 data: err.data,
41 errtype: err.errtype
42 };
43}
44class RPCManager {
45 constructor() {
46 this.processors = {};
47 }
48 createQueue(action, cb) {
49 let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
50 return new Promise((resolve, reject) => {
51 var actionParsed = _parseAction(action);
52 channel.assertQueue(actionParsed.queue, {}, (err, attrs) => {
53 if (err)
54 return reject(err);
55 channel.consume(actionParsed.queue, (msg) => {
56 var content = JSON.parse(msg.content.toString());
57 try {
58 dbg("Incoming RPC request", action);
59 this.processors[action].listener(content, (err, body) => {
60 var response = {
61 error: _errorPrepare(err),
62 body: typeof body !== "undefined" ? body : null
63 };
64 channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), {
65 correlationId: msg.properties.correlationId
66 });
67 dbg("Incoming RPC request", action, " processed! reply to", msg.properties.replyTo);
68 });
69 }
70 catch (ex) {
71 console.error("ERROR IN rpc processor\n", ex.message, ex.stack);
72 }
73 channel.ack(msg);
74 }, {}, (err, res) => {
75 if (err)
76 return reject(err);
77 resolve(res.consumerTag);
78 });
79 });
80 });
81 });
82 return promise_nodeify_1.promiseNodeify(promise, cb);
83 }
84 ;
85 register(action, cb, registerCb) {
86 registerCb = registerCb || (() => null);
87 if (this.processors[action]) {
88 throw new Error("Can't register same action processor twice");
89 }
90 var consumerTag;
91 async.series([
92 (next) => {
93 ChannelManager_1.channelManager.connect(() => {
94 next();
95 });
96 },
97 (next) => {
98 this.createQueue(action, (err, tag) => {
99 if (!err) {
100 consumerTag = tag;
101 }
102 next(err);
103 });
104 }
105 ], (err) => {
106 if (!err) {
107 this.processors[action] = {
108 listener: cb,
109 consumerTag: consumerTag
110 };
111 }
112 registerCb(err);
113 });
114 return true;
115 }
116 ;
117 unregister(action, cb) {
118 let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
119 return new Promise((resolve, reject) => {
120 if (!this.processors[action]) {
121 process.nextTick(() => resolve(null));
122 return;
123 }
124 channel.cancel(this.processors[action].consumerTag, (err) => {
125 if (err)
126 return reject(err);
127 delete this.processors[action];
128 resolve(null);
129 });
130 });
131 });
132 return promise_nodeify_1.promiseNodeify(promise, cb);
133 }
134 ;
135 call(action, params, cb) {
136 let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
137 return new Promise((resolve, reject) => {
138 if (typeof params === "function") {
139 cb = params;
140 params = {};
141 }
142 var actionParsed = _parseAction(action);
143 async.series([
144 (next) => {
145 ChannelManager_1.channelManager.connect(() => {
146 next();
147 });
148 },
149 (next) => {
150 if (replyQueue) {
151 return next();
152 }
153 channel.assertQueue("", {
154 durable: false,
155 autoDelete: true
156 }, (err, attrs) => {
157 if (err)
158 return reject(err);
159 replyQueue = attrs.queue;
160 channel.consume(replyQueue, (_msg) => {
161 var msg = JSON.parse(_msg.content.toString()), correlationId = _msg.properties.correlationId;
162 if (returnCbs[correlationId]) {
163 dbg("RPC Response", returnCbs[correlationId].action);
164 var resError = null;
165 if (msg.error) {
166 resError = new Error(msg.error.msg);
167 resError.code = msg.error.code;
168 resError.errtype = msg.error.errtype;
169 resError.data = msg.error.data;
170 }
171 var returnCb = returnCbs[correlationId].cb;
172 delete returnCbs[correlationId];
173 returnCb(resError, msg.body);
174 }
175 else {
176 dbg("Obtained reply but unrecognized by correlationId:", correlationId);
177 }
178 channel.ack(_msg);
179 });
180 next();
181 });
182 },
183 () => {
184 var correlationId = randomString(48);
185 dbg("RPC Call", action, "wait reply to", replyQueue);
186 returnCbs[correlationId] = {
187 date: new Date(),
188 cb: cb,
189 action: action,
190 params: params
191 };
192 channel.sendToQueue(actionParsed.queue, new Buffer(JSON.stringify(params)), {
193 correlationId: correlationId,
194 replyTo: replyQueue
195 });
196 }
197 ]);
198 });
199 });
200 return promise_nodeify_1.promiseNodeify(promise, cb);
201 }
202 static purgeActionQueue(action, cb) {
203 return ChannelManager_1.channelManager.getChannel().then((channel) => {
204 var actionParsed = _parseAction(action);
205 channel.purgeQueue(actionParsed.queue, cb);
206 });
207 }
208 ;
209}
210exports.RPCManager = RPCManager;
211//# sourceMappingURL=RPCManager.js.map
\No newline at end of file