1 | import util = require("util");
|
2 | import async = require("async");
|
3 | import crypto = require("crypto");
|
4 | import randomString = require("just.randomstring");
|
5 | import { channelManager } from './ChannelManager';
|
6 | import { promiseNodeify } from './promise-nodeify';
|
7 |
|
8 | const QUEUE_PREFIX = "_queue_rpc:";
|
9 | const CALL_TIMEOUT = 3600 * 1000;
|
10 |
|
11 | var returnCbs = {},
|
12 | replyQueue = "",
|
13 | DEBUG = false;
|
14 |
|
15 | function dbg(...args:any[]) {
|
16 | if (DEBUG) {
|
17 | console.log.apply(console, args);
|
18 | }
|
19 | }
|
20 |
|
21 | setInterval(() => {
|
22 |
|
23 | var removeKeys = [],
|
24 | now = new Date().getTime(),
|
25 | k,
|
26 | timeCreated,
|
27 | data;
|
28 | for (k in returnCbs) {
|
29 | timeCreated = returnCbs[k].date.getTime();
|
30 | if (now - timeCreated >= CALL_TIMEOUT) {
|
31 | removeKeys.push(k);
|
32 | }
|
33 | }
|
34 | removeKeys.forEach((k) => {
|
35 | data = returnCbs[k];
|
36 | delete returnCbs[k];
|
37 | });
|
38 | }, 3600 * 1000);
|
39 |
|
40 | function _parseAction(event) {
|
41 | return {
|
42 | queue: QUEUE_PREFIX + event,
|
43 | };
|
44 | }
|
45 |
|
46 | function _errorPrepare(err) {
|
47 | if (!err) {
|
48 | return null;
|
49 | }
|
50 | return {
|
51 | code: err.code ? err.code : -1,
|
52 | msg: err.message,
|
53 | data: err.data,
|
54 | errtype: err.errtype
|
55 | };
|
56 | }
|
57 |
|
58 | export interface Processors {
|
59 |
|
60 | }
|
61 |
|
62 | export class RPCManager {
|
63 | processors:Processors;
|
64 |
|
65 | constructor() {
|
66 | this.processors = {};
|
67 | }
|
68 |
|
69 | private createQueue(action, cb?) {
|
70 | let promise = channelManager.getChannel().then((channel) => {
|
71 | return new Promise((resolve, reject) => {
|
72 | var actionParsed = _parseAction(action);
|
73 | channel.assertQueue(actionParsed.queue, {}, (err, attrs) => {
|
74 | if (err) return reject(err);
|
75 |
|
76 | channel.consume(actionParsed.queue, (msg) => {
|
77 | var content = JSON.parse(msg.content.toString());
|
78 | try {
|
79 | dbg("Incoming RPC request", action);
|
80 | this.processors[action].listener(content, (err, body) => {
|
81 | var response = {
|
82 | error: _errorPrepare(err),
|
83 | body: typeof body !== "undefined" ? body : null
|
84 | };
|
85 | channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), {
|
86 | correlationId: msg.properties.correlationId
|
87 | });
|
88 |
|
89 | dbg("Incoming RPC request", action, " processed! reply to",
|
90 | msg.properties.replyTo);
|
91 | });
|
92 | }
|
93 | catch (ex) {
|
94 | console.error("ERROR IN rpc processor\n", ex.message, ex.stack);
|
95 | }
|
96 | channel.ack(msg);
|
97 | }, {}, (err, res) => {
|
98 | if (err) return reject(err);
|
99 | resolve(res.consumerTag);
|
100 | });
|
101 | });
|
102 | })
|
103 | });
|
104 |
|
105 | return promiseNodeify(promise, cb);
|
106 | };
|
107 |
|
108 | register(action, cb, registerCb) {
|
109 | registerCb = registerCb || (() => null);
|
110 | if (this.processors[action]) {
|
111 | throw new Error("Can't register same action processor twice");
|
112 | }
|
113 | var consumerTag;
|
114 | async.series([
|
115 | (next) => {
|
116 | channelManager.connect(() => {
|
117 | next();
|
118 | });
|
119 | },
|
120 | (next) => {
|
121 | this.createQueue(action, (err, tag) => {
|
122 | if (!err) {
|
123 | consumerTag = tag;
|
124 | }
|
125 | next(err);
|
126 | });
|
127 | }
|
128 | ], (err) => {
|
129 | if (!err) {
|
130 | this.processors[action] = {
|
131 | listener: cb,
|
132 | consumerTag: consumerTag
|
133 | };
|
134 | }
|
135 | registerCb(err);
|
136 | });
|
137 | return true;
|
138 | };
|
139 |
|
140 | unregister(action, cb?) {
|
141 | let promise = channelManager.getChannel().then((channel) => {
|
142 | return new Promise((resolve, reject) => {
|
143 | if (!this.processors[action]) {
|
144 | process.nextTick(() => resolve(null));
|
145 | return;
|
146 | }
|
147 | channel.cancel(this.processors[action].consumerTag, (err) => {
|
148 | if (err) return reject(err);
|
149 |
|
150 | delete this.processors[action];
|
151 | resolve(null);
|
152 | });
|
153 | })
|
154 | });
|
155 |
|
156 | return promiseNodeify(promise, cb);
|
157 | };
|
158 |
|
159 | call(action, params, cb?) {
|
160 | let promise = channelManager.getChannel().then((channel) => {
|
161 | return new Promise((resolve, reject) => {
|
162 | if (typeof params === "function") {
|
163 | cb = params;
|
164 | params = {};
|
165 | }
|
166 | var actionParsed = _parseAction(action);
|
167 | async.series([
|
168 | (next) => {
|
169 | channelManager.connect(() => {
|
170 | next();
|
171 | });
|
172 | },
|
173 | (next) => {
|
174 | if (replyQueue) {
|
175 | return next();
|
176 | }
|
177 | channel.assertQueue("", {
|
178 | durable: false,
|
179 | autoDelete: true
|
180 | }, (err, attrs) => {
|
181 | if (err) return reject(err);
|
182 | replyQueue = attrs.queue;
|
183 | channel.consume(replyQueue, (_msg) => {
|
184 | var msg = JSON.parse(_msg.content.toString()),
|
185 | correlationId = _msg.properties.correlationId;
|
186 | if (returnCbs[correlationId]) {
|
187 | dbg("RPC Response", returnCbs[correlationId].action);
|
188 |
|
189 | var resError = null;
|
190 | if (msg.error) {
|
191 | resError = new Error(msg.error.msg);
|
192 | resError.code = msg.error.code;
|
193 | resError.errtype = msg.error.errtype;
|
194 | resError.data = msg.error.data;
|
195 | }
|
196 | var returnCb = returnCbs[correlationId].cb;
|
197 | delete returnCbs[correlationId];
|
198 | returnCb(resError, msg.body);
|
199 | }
|
200 | else {
|
201 | dbg("Obtained reply but unrecognized by correlationId:", correlationId);
|
202 | }
|
203 | channel.ack(_msg);
|
204 | });
|
205 | next();
|
206 | });
|
207 | },
|
208 | () => {
|
209 | var correlationId = randomString(48);
|
210 | dbg("RPC Call", action, "wait reply to", replyQueue);
|
211 | returnCbs[correlationId] = {
|
212 | date: new Date(),
|
213 | cb: cb,
|
214 | action: action,
|
215 | params: params
|
216 | };
|
217 | channel.sendToQueue(actionParsed.queue, new Buffer(JSON.stringify(params)), {
|
218 | correlationId: correlationId,
|
219 | replyTo: replyQueue
|
220 | });
|
221 | }
|
222 | ]);
|
223 | })
|
224 | });
|
225 |
|
226 | return promiseNodeify(promise, cb);
|
227 | }
|
228 |
|
229 | static purgeActionQueue(action, cb) {
|
230 | return channelManager.getChannel().then((channel) => {
|
231 | var actionParsed = _parseAction(action);
|
232 | channel.purgeQueue(actionParsed.queue, cb);
|
233 | });
|
234 | };
|
235 |
|
236 | }
|
237 |
|
238 |
|
\ | No newline at end of file |