1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const async = require("async");
|
4 | const randomString = require("just.randomstring");
|
5 | const ChannelManager_1 = require("./ChannelManager");
|
6 | const promise_nodeify_1 = require("./promise-nodeify");
|
7 | const QUEUE_PREFIX = "_queue_rpc:";
|
8 | const CALL_TIMEOUT = 3600 * 1000;
|
9 | var returnCbs = {}, replyQueue = "", DEBUG = false;
|
10 | function dbg(...args) {
|
11 | if (DEBUG) {
|
12 | console.log.apply(console, args);
|
13 | }
|
14 | }
|
15 | setInterval(() => {
|
16 | var removeKeys = [], now = new Date().getTime(), k, timeCreated, data;
|
17 | for (k in returnCbs) {
|
18 | timeCreated = returnCbs[k].date.getTime();
|
19 | if (now - timeCreated >= CALL_TIMEOUT) {
|
20 | removeKeys.push(k);
|
21 | }
|
22 | }
|
23 | removeKeys.forEach((k) => {
|
24 | data = returnCbs[k];
|
25 | delete returnCbs[k];
|
26 | });
|
27 | }, 3600 * 1000);
|
28 | function _parseAction(event) {
|
29 | return {
|
30 | queue: QUEUE_PREFIX + event,
|
31 | };
|
32 | }
|
33 | function _errorPrepare(err) {
|
34 | if (!err) {
|
35 | return null;
|
36 | }
|
37 | return {
|
38 | code: err.code ? err.code : -1,
|
39 | msg: err.message,
|
40 | data: err.data,
|
41 | errtype: err.errtype
|
42 | };
|
43 | }
|
44 | class RPCManager {
|
45 | constructor() {
|
46 | this.processors = {};
|
47 | }
|
48 | createQueue(action, cb) {
|
49 | let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
|
50 | return new Promise((resolve, reject) => {
|
51 | var actionParsed = _parseAction(action);
|
52 | channel.assertQueue(actionParsed.queue, {}, (err, attrs) => {
|
53 | if (err)
|
54 | return reject(err);
|
55 | channel.consume(actionParsed.queue, (msg) => {
|
56 | var content = JSON.parse(msg.content.toString());
|
57 | try {
|
58 | dbg("Incoming RPC request", action);
|
59 | this.processors[action].listener(content, (err, body) => {
|
60 | var response = {
|
61 | error: _errorPrepare(err),
|
62 | body: typeof body !== "undefined" ? body : null
|
63 | };
|
64 | channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), {
|
65 | correlationId: msg.properties.correlationId
|
66 | });
|
67 | dbg("Incoming RPC request", action, " processed! reply to", msg.properties.replyTo);
|
68 | });
|
69 | }
|
70 | catch (ex) {
|
71 | console.error("ERROR IN rpc processor\n", ex.message, ex.stack);
|
72 | }
|
73 | channel.ack(msg);
|
74 | }, {}, (err, res) => {
|
75 | if (err)
|
76 | return reject(err);
|
77 | resolve(res.consumerTag);
|
78 | });
|
79 | });
|
80 | });
|
81 | });
|
82 | return promise_nodeify_1.promiseNodeify(promise, cb);
|
83 | }
|
84 | ;
|
85 | register(action, cb, registerCb) {
|
86 | registerCb = registerCb || (() => null);
|
87 | if (this.processors[action]) {
|
88 | throw new Error("Can't register same action processor twice");
|
89 | }
|
90 | var consumerTag;
|
91 | async.series([
|
92 | (next) => {
|
93 | ChannelManager_1.channelManager.connect(() => {
|
94 | next();
|
95 | });
|
96 | },
|
97 | (next) => {
|
98 | this.createQueue(action, (err, tag) => {
|
99 | if (!err) {
|
100 | consumerTag = tag;
|
101 | }
|
102 | next(err);
|
103 | });
|
104 | }
|
105 | ], (err) => {
|
106 | if (!err) {
|
107 | this.processors[action] = {
|
108 | listener: cb,
|
109 | consumerTag: consumerTag
|
110 | };
|
111 | }
|
112 | registerCb(err);
|
113 | });
|
114 | return true;
|
115 | }
|
116 | ;
|
117 | unregister(action, cb) {
|
118 | let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
|
119 | return new Promise((resolve, reject) => {
|
120 | if (!this.processors[action]) {
|
121 | process.nextTick(() => resolve(null));
|
122 | return;
|
123 | }
|
124 | channel.cancel(this.processors[action].consumerTag, (err) => {
|
125 | if (err)
|
126 | return reject(err);
|
127 | delete this.processors[action];
|
128 | resolve(null);
|
129 | });
|
130 | });
|
131 | });
|
132 | return promise_nodeify_1.promiseNodeify(promise, cb);
|
133 | }
|
134 | ;
|
135 | call(action, params, cb) {
|
136 | let promise = ChannelManager_1.channelManager.getChannel().then((channel) => {
|
137 | return new Promise((resolve, reject) => {
|
138 | if (typeof params === "function") {
|
139 | cb = params;
|
140 | params = {};
|
141 | }
|
142 | var actionParsed = _parseAction(action);
|
143 | async.series([
|
144 | (next) => {
|
145 | ChannelManager_1.channelManager.connect(() => {
|
146 | next();
|
147 | });
|
148 | },
|
149 | (next) => {
|
150 | if (replyQueue) {
|
151 | return next();
|
152 | }
|
153 | channel.assertQueue("", {
|
154 | durable: false,
|
155 | autoDelete: true
|
156 | }, (err, attrs) => {
|
157 | if (err)
|
158 | return reject(err);
|
159 | replyQueue = attrs.queue;
|
160 | channel.consume(replyQueue, (_msg) => {
|
161 | var msg = JSON.parse(_msg.content.toString()), correlationId = _msg.properties.correlationId;
|
162 | if (returnCbs[correlationId]) {
|
163 | dbg("RPC Response", returnCbs[correlationId].action);
|
164 | var resError = null;
|
165 | if (msg.error) {
|
166 | resError = new Error(msg.error.msg);
|
167 | resError.code = msg.error.code;
|
168 | resError.errtype = msg.error.errtype;
|
169 | resError.data = msg.error.data;
|
170 | }
|
171 | var returnCb = returnCbs[correlationId].cb;
|
172 | delete returnCbs[correlationId];
|
173 | returnCb(resError, msg.body);
|
174 | }
|
175 | else {
|
176 | dbg("Obtained reply but unrecognized by correlationId:", correlationId);
|
177 | }
|
178 | channel.ack(_msg);
|
179 | });
|
180 | next();
|
181 | });
|
182 | },
|
183 | () => {
|
184 | var correlationId = randomString(48);
|
185 | dbg("RPC Call", action, "wait reply to", replyQueue);
|
186 | returnCbs[correlationId] = {
|
187 | date: new Date(),
|
188 | cb: cb,
|
189 | action: action,
|
190 | params: params
|
191 | };
|
192 | channel.sendToQueue(actionParsed.queue, new Buffer(JSON.stringify(params)), {
|
193 | correlationId: correlationId,
|
194 | replyTo: replyQueue
|
195 | });
|
196 | }
|
197 | ]);
|
198 | });
|
199 | });
|
200 | return promise_nodeify_1.promiseNodeify(promise, cb);
|
201 | }
|
202 | static purgeActionQueue(action, cb) {
|
203 | return ChannelManager_1.channelManager.getChannel().then((channel) => {
|
204 | var actionParsed = _parseAction(action);
|
205 | channel.purgeQueue(actionParsed.queue, cb);
|
206 | });
|
207 | }
|
208 | ;
|
209 | }
|
210 | exports.RPCManager = RPCManager;
|
211 |
|
\ | No newline at end of file |