UNPKG

10.4 kBtext/coffeescriptView Raw
1uuid = require('node-uuid')
2$q = require('node-promise')
3lru = require('lru')
4
5debug = process.env['DEBUG']
6
7opts =
8 max: 1000
9 maxAgeInMilliseconds: 1000 * 60 * 60 * 24 * 4 # 4 days timeout of objects no matter what
10
11class spinredis
12
13 constructor: (dbUrl) ->
14 console.log 'spinclient +++++++++ constructor called +++++++++++'
15 @subscribers = []
16 @objsubscribers = []
17 @objectsSubscribedTo = []
18
19 @outstandingMessages = []
20 @modelcache = []
21
22 @seenMessages = []
23 @sessionId = null
24 @objects = new lru(opts)
25
26 @savedMessagesInCaseOfRetries = new lru({max:1000, maxAgeInMilliseconds: 5000})
27
28 if debug then console.log 'redis-spincycle dbUrl = ' + dbUrl
29 rhost = dbUrl or process.env['REDIS_PORT_6379_TCP_ADDR'] or '127.0.0.1'
30 rport = process.env['REDIS_PORT_6379_TCP_PORT'] or '6379'
31
32 @sendredis = require('redis').createClient(rport, rhost)
33 @listenredis = require('redis').createClient(rport, rhost)
34
35 @listenredis.on 'error', (err) ->
36 console.log 'spinredis listen ERROR: ' + err
37
38 @listenredis.on 'end', (err) ->
39 console.log 'spinredis listen end event: ' + err
40
41 @sendredis.on 'error', (err) ->
42 console.log 'spinredis send ERROR: ' + err
43
44 @sendredis.on 'end', (err) ->
45 console.log 'spinredis send end event: ' + err
46
47
48 @subscribers['OBJECT_UPDATE'] = [(obj) =>
49 console.log 'spinredis +++++++++ obj update message router got obj '+obj.id+' of type '+obj.type
50 console.dir(obj);
51 #console.dir(@objsubscribers)
52 objsubs = @objsubscribers[obj.id] or []
53 for k,v of objsubs
54 #console.log 'updating subscriber to @objects updates on id '+k
55 if not @objects.get(obj.id)
56 @objects.set(obj.id, obj)
57 else
58 o = @objects.get(obj.id)
59 for prop, val of obj
60 o[prop] = val
61 v obj
62 ]
63
64 @setup()
65
66 failed: (msg)->
67 console.log 'spinclient message failed!! ' + msg
68
69 setSessionId: (id) ->
70 if(id)
71 console.log '++++++++++++++++++++++++++++++++++++++ spinclient setting session id to ' + id
72 @sessionId = id
73
74 dumpOutstanding: ()->
75 console.log '-------------------------------- ' + @outstandingMessages.length + ' outstanding messages ---------------------------------'
76 @outstandingMessages.forEach (os)->
77 console.log os.messageId + ' -> ' + os.target + ' - ' + os.d
78 console.log '-----------------------------------------------------------------------------------------'
79
80 emit: (message) =>
81 message.channelID = 'spinchannel_' + @channelID
82 if @open
83 _emit(message)
84 else
85 setTimeout(
86 ()=>
87 @emit(message)
88 ,200+parseInt(Math.random()*100)
89 )
90
91 _emit:(message)=>
92 if debug then console.log 'redisclient emitting message..'
93 if debug then console.dir message
94 @savedMessagesInCaseOfRetries.set(message.messageId, message)
95 @sendredis.publish('spinchannel', JSON.stringify(message))
96
97 openChannel:()=>
98 # 'list of available targets'
99 if not @open
100 @sendredis.publish('spinchannel', JSON.stringify({target: 'listcommands'}))
101 setTimeout(
102 ()=>
103 @openChannel()
104 ,100
105 )
106
107 setup: () =>
108 @channelID = uuid.v4()
109 @listenredis.subscribe('spinchannel_' + @channelID)
110 @openChannel()
111
112 @listenredis.on 'message', (channel, replystr) =>
113 if debug then console.log 'spinredis on message got ' + replystr
114
115 reply = JSON.parse(replystr)
116 status = reply.status
117 message = reply.payload
118 info = reply.info
119
120 if info == 'list of available targets'
121 console.log 'Spincycle server channel is up and awake'
122 @open = true
123 else
124 if message and message.error and message.error == 'ERRCHILLMAN'
125 console.log 'got ERRCHILLMAN from spinycle service, preparing to retry sending message...'
126 oldmsg = @savedMessagesInCaseOfRetries[reply.messageId]
127 setTimeout(
128 ()=>
129 console.log 'resending message '+oldmsg.messageId+' due to target endpoint not open yet'
130 @emit(oldmsg)
131 ,250
132 )
133
134 else if not @hasSeenThisMessage reply.messageId
135 @savedMessagesInCaseOfRetries.remove(reply.messageId)
136 if reply.messageId and reply.messageId isnt 'undefined' then @seenMessages.push(reply.messageId)
137 if @seenMessages.length > 10 then @seenMessages.shift()
138 if debug then console.log 'redis-spincycle got reply messageId ' + reply.messageId + ' status ' + status + ', info ' + info + ' data ' + message + ' outstandingMessages = ' + @outstandingMessages.length
139 if debug then @dumpOutstanding()
140 #console.dir reply
141 index = -1
142 if reply.messageId
143 i = 0
144 while i < @outstandingMessages.length
145 index = i
146 detail = @outstandingMessages[i]
147 if detail and not detail.delivered and detail.messageId == reply.messageId
148 if reply.status == 'FAILURE' or reply.status == 'NOT_ALLOWED'
149 console.log 'spinclient message FAILURE'
150 console.dir reply
151 detail.d.reject reply
152 break
153 else
154 #console.log 'delivering message '+message+' reply to '+detail.target+' to '+reply.messageId
155 detail.d.resolve(message)
156 break
157 detail.delivered = true
158 i++
159 if index > -1
160 @outstandingMessages.splice index, 1
161 else
162 subs = @subscribers[info]
163 if subs
164 subs.forEach (listener) ->
165 listener message
166 else
167 if debug then console.log 'no subscribers for message ' + message
168 if debug then console.dir reply
169 else
170 if debug then console.log '-- skipped resent message ' + reply.messageId
171
172 hasSeenThisMessage: (messageId) =>
173 @seenMessages.some (mid) -> messageId == mid
174
175 registerListener: (detail) =>
176 #console.log 'spinclient::registerListener called for ' + detail.message
177 subs = @subscribers[detail.message] or []
178 subs.push detail.callback
179 @subscribers[detail.message] = subs
180
181 registerObjectSubscriber: (detail) =>
182 d = $q.defer()
183 sid = uuid.v4()
184 localsubs = @objectsSubscribedTo[detail.id]
185 #console.log 'register@objectsSubscriber localsubs is'
186 #console.dir localsubs
187 if not localsubs
188 localsubs = []
189 console.log 'spinredis no local subs, so get the original server-side subscription for id ' + detail.id
190 # actually set up subscription, once for each @objects
191 @_registerObjectSubscriber({
192 id: detail.id, type: detail.type, cb: (updatedobj) =>
193 #console.log '-- register@objectsSubscriber getting obj update callback for ' + detail.id
194 lsubs = @objectsSubscribedTo[detail.id]
195 #console.dir(lsubs)
196 for k,v of lsubs
197 if (v.cb)
198 #console.log '--*****--*****-- calling back @objects update to local sid --****--*****-- ' + k
199 v.cb updatedobj
200 }).then( (remotesid) =>
201 localsubs['remotesid'] = remotesid
202 localsubs[sid] = detail
203 #console.log '-- adding local callback listener to @objects updates for ' + detail.id + ' local sid = ' + sid + ' remotesid = ' + remotesid
204 @objectsSubscribedTo[detail.id] = localsubs
205 d.resolve(sid)
206 ,(rejection)=>
207 console.log 'spinredis registerObjectSubscriber rejection: '+rejection
208 console.dir rejection
209 )
210 else
211 localsubs[sid] = detail
212 return d.promise
213
214 _registerObjectSubscriber: (detail) =>
215 d = $q.defer()
216 #console.log 'spinredis message-router registering subscriber for @objects ' + detail.id + ' type ' + detail.type
217 subs = @objsubscribers[detail.id] or []
218
219 @emitMessage({target: 'registerForUpdatesOn', obj: {id: detail.id, type: detail.type}}).then(
220 (reply)=>
221 console.log 'spinredis server subscription id for id ' + detail.id + ' is ' + reply
222 subs[reply] = detail.cb
223 @objsubscribers[detail.id] = subs
224 d.resolve(reply)
225 , (reply)=>
226 @failed(reply)
227 )
228 return d.promise
229
230 deRegisterObjectsSubscriber: (sid, o) =>
231 localsubs = @objectsSubscribedTo[o.id] or []
232 if localsubs[sid]
233 console.log 'deregistering local updates for @objects ' + o.id
234 delete localsubs[sid]
235 count = 0
236 for k,v in localsubs
237 count++
238 if count == 1 # only remotesid property left
239 @_deRegisterObjectsSubscriber('remotesid', o)
240
241 _deRegisterObjectsSubscriber: (sid, o) =>
242 subs = @objsubscribers[o.id] or []
243 if subs and subs[sid]
244 delete subs[sid]
245 @objsubscribers[o.id] = subs
246 @emitMessage({target: 'deRegisterForUpdatesOn', id: o.id, type: o.type, listenerid: sid}).then (reply)->
247 console.log 'deregistering server updates for @objects ' + o.id
248
249 emitMessage: (detail) =>
250 if debug then console.log 'emitMessage called'
251 if debug then console.dir detail
252 d = $q.defer()
253 detail.messageId = uuid.v4()
254 detail.sessionId = detail.sessionId or @sessionId
255 detail.d = d
256 @outstandingMessages.push detail
257 if debug then console.log 'saving outstanding reply to messageId ' + detail.messageId + ' and @sessionId ' + detail.sessionId
258 @emit detail
259
260 return d.promise
261
262# ------------------------------------------------------------------------------------------------------------------
263
264 getModelFor: (type) =>
265 d = $q.defer()
266 if @modelcache[type]
267 d.resolve(@modelcache[type])
268 else
269 @emitMessage({target: 'getModelFor', modelname: type}).then((model)->
270 @modelcache[type] = model
271 d.resolve(model)
272 ,(rejection)=>
273 console.log 'spinredis getModelFor rejection: '+rejection
274 console.dir rejection
275 )
276 return d.promise
277
278 listTargets: () =>
279 d = $q.defer()
280 @emitMessage({target: 'listcommands'}).then((targets)->
281 d.resolve(targets)
282 ,(rejection)->
283 console.log 'spinredis listTargets rejection: '+rejection
284 console.dir rejection
285 )
286 return d.promise
287
288 flattenModel: (model) =>
289 rv = {}
290 for k,v of model
291 if angular.isArray(v)
292 rv[k] = v.map (e) -> e.id
293 else
294 rv[k] = v
295 return rv
296
297
298module.exports = spinredis