UNPKG

6.48 kBPlain TextView Raw
1import util = require("util");
2import async = require("async");
3import crypto = require("crypto");
4import randomString = require("just.randomstring");
5import { channelManager } from './ChannelManager';
6import { promiseNodeify } from './promise-nodeify';
7
8const QUEUE_PREFIX = "_queue_rpc:";
9const CALL_TIMEOUT = 3600 * 1000;
10
11var returnCbs = {},
12 replyQueue = "",
13 DEBUG = false;
14
15function dbg(...args:any[]) {
16 if (DEBUG) {
17 console.log.apply(console, args);
18 }
19}
20
21setInterval(() => {
22 // check every hour for expired callbacks
23 var removeKeys = [],
24 now = new Date().getTime(),
25 k,
26 timeCreated,
27 data;
28 for (k in returnCbs) {
29 timeCreated = returnCbs[k].date.getTime();
30 if (now - timeCreated >= CALL_TIMEOUT) {
31 removeKeys.push(k);
32 }
33 }
34 removeKeys.forEach((k) => {
35 data = returnCbs[k];
36 delete returnCbs[k];
37 });
38}, 3600 * 1000);
39
40function _parseAction(event) {
41 return {
42 queue: QUEUE_PREFIX + event,
43 };
44}
45
46function _errorPrepare(err) {
47 if (!err) {
48 return null;
49 }
50 return {
51 code: err.code ? err.code : -1,
52 msg: err.message,
53 data: err.data,
54 errtype: err.errtype
55 };
56}
57
58export interface Processors {
59
60}
61
62export class RPCManager {
63 processors:Processors;
64
65 constructor() {
66 this.processors = {};
67 }
68
69 private createQueue(action, cb?) {
70 let promise = channelManager.getChannel().then((channel) => {
71 return new Promise((resolve, reject) => {
72 var actionParsed = _parseAction(action);
73 channel.assertQueue(actionParsed.queue, {}, (err, attrs) => {
74 if (err) return reject(err);
75
76 channel.consume(actionParsed.queue, (msg) => {
77 var content = JSON.parse(msg.content.toString());
78 try {
79 dbg("Incoming RPC request", action);
80 this.processors[action].listener(content, (err, body) => {
81 var response = {
82 error: _errorPrepare(err),
83 body: typeof body !== "undefined" ? body : null
84 };
85 channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), {
86 correlationId: msg.properties.correlationId
87 });
88
89 dbg("Incoming RPC request", action, " processed! reply to",
90 msg.properties.replyTo);
91 });
92 }
93 catch (ex) {
94 console.error("ERROR IN rpc processor\n", ex.message, ex.stack);
95 }
96 channel.ack(msg);
97 }, {}, (err, res) => {
98 if (err) return reject(err);
99 resolve(res.consumerTag);
100 });
101 });
102 })
103 });
104
105 return promiseNodeify(promise, cb);
106 };
107
108 register(action, cb, registerCb) {
109 registerCb = registerCb || (() => null);
110 if (this.processors[action]) {
111 throw new Error("Can't register same action processor twice");
112 }
113 var consumerTag;
114 async.series([
115 (next) => {
116 channelManager.connect(() => {
117 next();
118 });
119 },
120 (next) => {
121 this.createQueue(action, (err, tag) => {
122 if (!err) {
123 consumerTag = tag;
124 }
125 next(err);
126 });
127 }
128 ], (err) => {
129 if (!err) {
130 this.processors[action] = {
131 listener: cb,
132 consumerTag: consumerTag
133 };
134 }
135 registerCb(err);
136 });
137 return true;
138 };
139
140 unregister(action, cb?) {
141 let promise = channelManager.getChannel().then((channel) => {
142 return new Promise((resolve, reject) => {
143 if (!this.processors[action]) {
144 process.nextTick(() => resolve(null));
145 return;
146 }
147 channel.cancel(this.processors[action].consumerTag, (err) => {
148 if (err) return reject(err);
149
150 delete this.processors[action];
151 resolve(null);
152 });
153 })
154 });
155
156 return promiseNodeify(promise, cb);
157 };
158
159 call(action, params, cb?) {
160 let promise = channelManager.getChannel().then((channel) => {
161 return new Promise((resolve, reject) => {
162 if (typeof params === "function") {
163 cb = params;
164 params = {};
165 }
166 var actionParsed = _parseAction(action);
167 async.series([
168 (next) => {
169 channelManager.connect(() => {
170 next();
171 });
172 },
173 (next) => {
174 if (replyQueue) {
175 return next();
176 }
177 channel.assertQueue("", {
178 durable: false,
179 autoDelete: true
180 }, (err, attrs) => {
181 if (err) return reject(err);
182 replyQueue = attrs.queue;
183 channel.consume(replyQueue, (_msg) => {
184 var msg = JSON.parse(_msg.content.toString()),
185 correlationId = _msg.properties.correlationId;
186 if (returnCbs[correlationId]) {
187 dbg("RPC Response", returnCbs[correlationId].action);
188
189 var resError = null;
190 if (msg.error) {
191 resError = new Error(msg.error.msg);
192 resError.code = msg.error.code;
193 resError.errtype = msg.error.errtype;
194 resError.data = msg.error.data;
195 }
196 var returnCb = returnCbs[correlationId].cb;
197 delete returnCbs[correlationId];
198 returnCb(resError, msg.body);
199 }
200 else {
201 dbg("Obtained reply but unrecognized by correlationId:", correlationId);
202 }
203 channel.ack(_msg);
204 });
205 next();
206 });
207 },
208 () => {
209 var correlationId = randomString(48);
210 dbg("RPC Call", action, "wait reply to", replyQueue);
211 returnCbs[correlationId] = {
212 date: new Date(),
213 cb: cb,
214 action: action,
215 params: params
216 };
217 channel.sendToQueue(actionParsed.queue, new Buffer(JSON.stringify(params)), {
218 correlationId: correlationId,
219 replyTo: replyQueue
220 });
221 }
222 ]);
223 })
224 });
225
226 return promiseNodeify(promise, cb);
227 }
228
229 static purgeActionQueue(action, cb) {
230 return channelManager.getChannel().then((channel) => {
231 var actionParsed = _parseAction(action);
232 channel.purgeQueue(actionParsed.queue, cb);
233 });
234 };
235
236}
237
238
\No newline at end of file