UNPKG

15 kBJavaScriptView Raw
1// Generated by CoffeeScript 1.9.3
2(function() {
3 var $q, debug, lru, opts, spinredis, uuid,
4 bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };
5
6 uuid = require('node-uuid');
7
8 $q = require('node-promise');
9
10 lru = require('lru');
11
12 debug = process.env['DEBUG'];
13
14 opts = {
15 max: 1000,
16 maxAgeInMilliseconds: 1000 * 60 * 60 * 24 * 4
17 };
18
19 spinredis = (function() {
20 function spinredis(dbUrl) {
21 this.flattenModel = bind(this.flattenModel, this);
22 this.listTargets = bind(this.listTargets, this);
23 this.getModelFor = bind(this.getModelFor, this);
24 this.emitMessage = bind(this.emitMessage, this);
25 this._deRegisterObjectsSubscriber = bind(this._deRegisterObjectsSubscriber, this);
26 this.deRegisterObjectsSubscriber = bind(this.deRegisterObjectsSubscriber, this);
27 this._registerObjectSubscriber = bind(this._registerObjectSubscriber, this);
28 this.registerObjectSubscriber = bind(this.registerObjectSubscriber, this);
29 this.registerListener = bind(this.registerListener, this);
30 this.hasSeenThisMessage = bind(this.hasSeenThisMessage, this);
31 this.setup = bind(this.setup, this);
32 this.openChannel = bind(this.openChannel, this);
33 this._emit = bind(this._emit, this);
34 this.emit = bind(this.emit, this);
35 var rhost, rport;
36 console.log('spinclient +++++++++ constructor called +++++++++++');
37 this.subscribers = [];
38 this.objsubscribers = [];
39 this.objectsSubscribedTo = [];
40 this.outstandingMessages = [];
41 this.modelcache = [];
42 this.seenMessages = [];
43 this.sessionId = null;
44 this.objects = new lru(opts);
45 this.savedMessagesInCaseOfRetries = new lru({
46 max: 1000,
47 maxAgeInMilliseconds: 5000
48 });
49 if (debug) {
50 console.log('redis-spincycle dbUrl = ' + dbUrl);
51 }
52 rhost = dbUrl || process.env['REDIS_PORT_6379_TCP_ADDR'] || '127.0.0.1';
53 rport = process.env['REDIS_PORT_6379_TCP_PORT'] || '6379';
54 this.sendredis = require('redis').createClient(rport, rhost);
55 this.listenredis = require('redis').createClient(rport, rhost);
56 this.listenredis.on('error', function(err) {
57 return console.log('spinredis listen ERROR: ' + err);
58 });
59 this.listenredis.on('end', function(err) {
60 return console.log('spinredis listen end event: ' + err);
61 });
62 this.sendredis.on('error', function(err) {
63 return console.log('spinredis send ERROR: ' + err);
64 });
65 this.sendredis.on('end', function(err) {
66 return console.log('spinredis send end event: ' + err);
67 });
68 this.subscribers['OBJECT_UPDATE'] = [
69 (function(_this) {
70 return function(obj) {
71 var k, o, objsubs, prop, results, v, val;
72 console.log('spinredis +++++++++ obj update message router got obj ' + obj.id + ' of type ' + obj.type);
73 console.dir(obj);
74 objsubs = _this.objsubscribers[obj.id] || [];
75 results = [];
76 for (k in objsubs) {
77 v = objsubs[k];
78 if (!_this.objects.get(obj.id)) {
79 _this.objects.set(obj.id, obj);
80 } else {
81 o = _this.objects.get(obj.id);
82 for (prop in obj) {
83 val = obj[prop];
84 o[prop] = val;
85 }
86 }
87 results.push(v(obj));
88 }
89 return results;
90 };
91 })(this)
92 ];
93 this.setup();
94 }
95
96 spinredis.prototype.failed = function(msg) {
97 return console.log('spinclient message failed!! ' + msg);
98 };
99
100 spinredis.prototype.setSessionId = function(id) {
101 if (id) {
102 console.log('++++++++++++++++++++++++++++++++++++++ spinclient setting session id to ' + id);
103 return this.sessionId = id;
104 }
105 };
106
107 spinredis.prototype.dumpOutstanding = function() {
108 console.log('-------------------------------- ' + this.outstandingMessages.length + ' outstanding messages ---------------------------------');
109 this.outstandingMessages.forEach(function(os) {
110 return console.log(os.messageId + ' -> ' + os.target + ' - ' + os.d);
111 });
112 return console.log('-----------------------------------------------------------------------------------------');
113 };
114
115 spinredis.prototype.emit = function(message) {
116 message.channelID = 'spinchannel_' + this.channelID;
117 if (this.open) {
118 return _emit(message);
119 } else {
120 return setTimeout((function(_this) {
121 return function() {
122 return _this.emit(message);
123 };
124 })(this), 200 + parseInt(Math.random() * 100));
125 }
126 };
127
128 spinredis.prototype._emit = function(message) {
129 if (debug) {
130 console.log('redisclient emitting message..');
131 }
132 if (debug) {
133 console.dir(message);
134 }
135 this.savedMessagesInCaseOfRetries.set(message.messageId, message);
136 return this.sendredis.publish('spinchannel', JSON.stringify(message));
137 };
138
139 spinredis.prototype.openChannel = function() {
140 if (!this.open) {
141 this.sendredis.publish('spinchannel', JSON.stringify({
142 target: 'listcommands'
143 }));
144 return setTimeout((function(_this) {
145 return function() {
146 return _this.openChannel();
147 };
148 })(this), 100);
149 }
150 };
151
152 spinredis.prototype.setup = function() {
153 this.channelID = uuid.v4();
154 this.listenredis.subscribe('spinchannel_' + this.channelID);
155 this.openChannel();
156 return this.listenredis.on('message', (function(_this) {
157 return function(channel, replystr) {
158 var detail, i, index, info, message, oldmsg, reply, status, subs;
159 if (debug) {
160 console.log('spinredis on message got ' + replystr);
161 }
162 reply = JSON.parse(replystr);
163 status = reply.status;
164 message = reply.payload;
165 info = reply.info;
166 if (info === 'list of available targets') {
167 console.log('Spincycle server channel is up and awake');
168 return _this.open = true;
169 } else {
170 if (message && message.error && message.error === 'ERRCHILLMAN') {
171 console.log('got ERRCHILLMAN from spinycle service, preparing to retry sending message...');
172 oldmsg = _this.savedMessagesInCaseOfRetries[reply.messageId];
173 return setTimeout(function() {
174 console.log('resending message ' + oldmsg.messageId + ' due to target endpoint not open yet');
175 return _this.emit(oldmsg);
176 }, 250);
177 } else if (!_this.hasSeenThisMessage(reply.messageId)) {
178 _this.savedMessagesInCaseOfRetries.remove(reply.messageId);
179 if (reply.messageId && reply.messageId !== 'undefined') {
180 _this.seenMessages.push(reply.messageId);
181 }
182 if (_this.seenMessages.length > 10) {
183 _this.seenMessages.shift();
184 }
185 if (debug) {
186 console.log('redis-spincycle got reply messageId ' + reply.messageId + ' status ' + status + ', info ' + info + ' data ' + message + ' outstandingMessages = ' + _this.outstandingMessages.length);
187 }
188 if (debug) {
189 _this.dumpOutstanding();
190 }
191 index = -1;
192 if (reply.messageId) {
193 i = 0;
194 while (i < _this.outstandingMessages.length) {
195 index = i;
196 detail = _this.outstandingMessages[i];
197 if (detail && !detail.delivered && detail.messageId === reply.messageId) {
198 if (reply.status === 'FAILURE' || reply.status === 'NOT_ALLOWED') {
199 console.log('spinclient message FAILURE');
200 console.dir(reply);
201 detail.d.reject(reply);
202 break;
203 } else {
204 detail.d.resolve(message);
205 break;
206 }
207 detail.delivered = true;
208 }
209 i++;
210 }
211 if (index > -1) {
212 return _this.outstandingMessages.splice(index, 1);
213 }
214 } else {
215 subs = _this.subscribers[info];
216 if (subs) {
217 return subs.forEach(function(listener) {
218 return listener(message);
219 });
220 } else {
221 if (debug) {
222 console.log('no subscribers for message ' + message);
223 }
224 if (debug) {
225 return console.dir(reply);
226 }
227 }
228 }
229 } else {
230 if (debug) {
231 return console.log('-- skipped resent message ' + reply.messageId);
232 }
233 }
234 }
235 };
236 })(this));
237 };
238
239 spinredis.prototype.hasSeenThisMessage = function(messageId) {
240 return this.seenMessages.some(function(mid) {
241 return messageId === mid;
242 });
243 };
244
245 spinredis.prototype.registerListener = function(detail) {
246 var subs;
247 subs = this.subscribers[detail.message] || [];
248 subs.push(detail.callback);
249 return this.subscribers[detail.message] = subs;
250 };
251
252 spinredis.prototype.registerObjectSubscriber = function(detail) {
253 var d, localsubs, sid;
254 d = $q.defer();
255 sid = uuid.v4();
256 localsubs = this.objectsSubscribedTo[detail.id];
257 if (!localsubs) {
258 localsubs = [];
259 console.log('spinredis no local subs, so get the original server-side subscription for id ' + detail.id);
260 this._registerObjectSubscriber({
261 id: detail.id,
262 type: detail.type,
263 cb: (function(_this) {
264 return function(updatedobj) {
265 var k, lsubs, results, v;
266 lsubs = _this.objectsSubscribedTo[detail.id];
267 results = [];
268 for (k in lsubs) {
269 v = lsubs[k];
270 if (v.cb) {
271 results.push(v.cb(updatedobj));
272 } else {
273 results.push(void 0);
274 }
275 }
276 return results;
277 };
278 })(this)
279 }).then((function(_this) {
280 return function(remotesid) {
281 localsubs['remotesid'] = remotesid;
282 localsubs[sid] = detail;
283 _this.objectsSubscribedTo[detail.id] = localsubs;
284 return d.resolve(sid);
285 };
286 })(this), (function(_this) {
287 return function(rejection) {
288 console.log('spinredis registerObjectSubscriber rejection: ' + rejection);
289 return console.dir(rejection);
290 };
291 })(this));
292 } else {
293 localsubs[sid] = detail;
294 }
295 return d.promise;
296 };
297
298 spinredis.prototype._registerObjectSubscriber = function(detail) {
299 var d, subs;
300 d = $q.defer();
301 subs = this.objsubscribers[detail.id] || [];
302 this.emitMessage({
303 target: 'registerForUpdatesOn',
304 obj: {
305 id: detail.id,
306 type: detail.type
307 }
308 }).then((function(_this) {
309 return function(reply) {
310 console.log('spinredis server subscription id for id ' + detail.id + ' is ' + reply);
311 subs[reply] = detail.cb;
312 _this.objsubscribers[detail.id] = subs;
313 return d.resolve(reply);
314 };
315 })(this), (function(_this) {
316 return function(reply) {
317 return _this.failed(reply);
318 };
319 })(this));
320 return d.promise;
321 };
322
323 spinredis.prototype.deRegisterObjectsSubscriber = function(sid, o) {
324 var count, j, k, len, localsubs, v;
325 localsubs = this.objectsSubscribedTo[o.id] || [];
326 if (localsubs[sid]) {
327 console.log('deregistering local updates for @objects ' + o.id);
328 delete localsubs[sid];
329 count = 0;
330 for (v = j = 0, len = localsubs.length; j < len; v = ++j) {
331 k = localsubs[v];
332 count++;
333 }
334 if (count === 1) {
335 return this._deRegisterObjectsSubscriber('remotesid', o);
336 }
337 }
338 };
339
340 spinredis.prototype._deRegisterObjectsSubscriber = function(sid, o) {
341 var subs;
342 subs = this.objsubscribers[o.id] || [];
343 if (subs && subs[sid]) {
344 delete subs[sid];
345 this.objsubscribers[o.id] = subs;
346 return this.emitMessage({
347 target: 'deRegisterForUpdatesOn',
348 id: o.id,
349 type: o.type,
350 listenerid: sid
351 }).then(function(reply) {
352 return console.log('deregistering server updates for @objects ' + o.id);
353 });
354 }
355 };
356
357 spinredis.prototype.emitMessage = function(detail) {
358 var d;
359 if (debug) {
360 console.log('emitMessage called');
361 }
362 if (debug) {
363 console.dir(detail);
364 }
365 d = $q.defer();
366 detail.messageId = uuid.v4();
367 detail.sessionId = detail.sessionId || this.sessionId;
368 detail.d = d;
369 this.outstandingMessages.push(detail);
370 if (debug) {
371 console.log('saving outstanding reply to messageId ' + detail.messageId + ' and @sessionId ' + detail.sessionId);
372 }
373 this.emit(detail);
374 return d.promise;
375 };
376
377 spinredis.prototype.getModelFor = function(type) {
378 var d;
379 d = $q.defer();
380 if (this.modelcache[type]) {
381 d.resolve(this.modelcache[type]);
382 } else {
383 this.emitMessage({
384 target: 'getModelFor',
385 modelname: type
386 }).then(function(model) {
387 this.modelcache[type] = model;
388 return d.resolve(model);
389 }, (function(_this) {
390 return function(rejection) {
391 console.log('spinredis getModelFor rejection: ' + rejection);
392 return console.dir(rejection);
393 };
394 })(this));
395 }
396 return d.promise;
397 };
398
399 spinredis.prototype.listTargets = function() {
400 var d;
401 d = $q.defer();
402 this.emitMessage({
403 target: 'listcommands'
404 }).then(function(targets) {
405 return d.resolve(targets);
406 }, function(rejection) {
407 console.log('spinredis listTargets rejection: ' + rejection);
408 return console.dir(rejection);
409 });
410 return d.promise;
411 };
412
413 spinredis.prototype.flattenModel = function(model) {
414 var k, rv, v;
415 rv = {};
416 for (k in model) {
417 v = model[k];
418 if (angular.isArray(v)) {
419 rv[k] = v.map(function(e) {
420 return e.id;
421 });
422 } else {
423 rv[k] = v;
424 }
425 }
426 return rv;
427 };
428
429 return spinredis;
430
431 })();
432
433 module.exports = spinredis;
434
435}).call(this);
436
437//# sourceMappingURL=spinredis.js.map