1 | uuid = require('node-uuid')
|
2 | $q = require('node-promise')
|
3 | lru = require('lru')
|
4 |
|
5 | debug = process.env['DEBUG']
|
6 |
|
7 | opts =
|
8 | max: 1000
|
9 | maxAgeInMilliseconds: 1000 * 60 * 60 * 24 * 4
|
10 |
|
11 | class spinredis
|
12 |
|
13 | constructor: (dbUrl) ->
|
14 | console.log 'spinclient +++++++++ constructor called +++++++++++'
|
15 | @subscribers = []
|
16 | @objsubscribers = []
|
17 | @objectsSubscribedTo = []
|
18 |
|
19 | @outstandingMessages = []
|
20 | @modelcache = []
|
21 |
|
22 | @seenMessages = []
|
23 | @sessionId = null
|
24 | @objects = new lru(opts)
|
25 |
|
26 | @savedMessagesInCaseOfRetries = new lru({max:1000, maxAgeInMilliseconds: 5000})
|
27 |
|
28 | if debug then console.log 'redis-spincycle dbUrl = ' + dbUrl
|
29 | rhost = dbUrl or process.env['REDIS_PORT_6379_TCP_ADDR'] or '127.0.0.1'
|
30 | rport = process.env['REDIS_PORT_6379_TCP_PORT'] or '6379'
|
31 |
|
32 | @sendredis = require('redis').createClient(rport, rhost)
|
33 | @listenredis = require('redis').createClient(rport, rhost)
|
34 |
|
35 | @listenredis.on 'error', (err) ->
|
36 | console.log 'spinredis listen ERROR: ' + err
|
37 |
|
38 | @listenredis.on 'end', (err) ->
|
39 | console.log 'spinredis listen end event: ' + err
|
40 |
|
41 | @sendredis.on 'error', (err) ->
|
42 | console.log 'spinredis send ERROR: ' + err
|
43 |
|
44 | @sendredis.on 'end', (err) ->
|
45 | console.log 'spinredis send end event: ' + err
|
46 |
|
47 |
|
48 | @subscribers['OBJECT_UPDATE'] = [(obj) =>
|
49 | console.log 'spinredis +++++++++ obj update message router got obj '+obj.id+' of type '+obj.type
|
50 | console.dir(obj);
|
51 |
|
52 | objsubs = @objsubscribers[obj.id] or []
|
53 | for k,v of objsubs
|
54 |
|
55 | if not @objects.get(obj.id)
|
56 | @objects.set(obj.id, obj)
|
57 | else
|
58 | o = @objects.get(obj.id)
|
59 | for prop, val of obj
|
60 | o[prop] = val
|
61 | v obj
|
62 | ]
|
63 |
|
64 | @setup()
|
65 |
|
66 | failed: (msg)->
|
67 | console.log 'spinclient message failed!! ' + msg
|
68 |
|
69 | setSessionId: (id) ->
|
70 | if(id)
|
71 | console.log '++++++++++++++++++++++++++++++++++++++ spinclient setting session id to ' + id
|
72 | @sessionId = id
|
73 |
|
74 | dumpOutstanding: ()->
|
75 | console.log '-------------------------------- ' + @outstandingMessages.length + ' outstanding messages ---------------------------------'
|
76 | @outstandingMessages.forEach (os)->
|
77 | console.log os.messageId + ' -> ' + os.target + ' - ' + os.d
|
78 | console.log '-----------------------------------------------------------------------------------------'
|
79 |
|
80 | emit: (message) =>
|
81 | message.channelID = 'spinchannel_' + @channelID
|
82 | if @open
|
83 | _emit(message)
|
84 | else
|
85 | setTimeout(
|
86 | ()=>
|
87 | @emit(message)
|
88 | ,200+parseInt(Math.random()*100)
|
89 | )
|
90 |
|
91 | _emit:(message)=>
|
92 | if debug then console.log 'redisclient emitting message..'
|
93 | if debug then console.dir message
|
94 | @savedMessagesInCaseOfRetries.set(message.messageId, message)
|
95 | @sendredis.publish('spinchannel', JSON.stringify(message))
|
96 |
|
97 | openChannel:()=>
|
98 |
|
99 | if not @open
|
100 | @sendredis.publish('spinchannel', JSON.stringify({target: 'listcommands'}))
|
101 | setTimeout(
|
102 | ()=>
|
103 | @openChannel()
|
104 | ,100
|
105 | )
|
106 |
|
107 | setup: () =>
|
108 | @channelID = uuid.v4()
|
109 | @listenredis.subscribe('spinchannel_' + @channelID)
|
110 | @openChannel()
|
111 |
|
112 | @listenredis.on 'message', (channel, replystr) =>
|
113 | if debug then console.log 'spinredis on message got ' + replystr
|
114 |
|
115 | reply = JSON.parse(replystr)
|
116 | status = reply.status
|
117 | message = reply.payload
|
118 | info = reply.info
|
119 |
|
120 | if info == 'list of available targets'
|
121 | console.log 'Spincycle server channel is up and awake'
|
122 | @open = true
|
123 | else
|
124 | if message and message.error and message.error == 'ERRCHILLMAN'
|
125 | console.log 'got ERRCHILLMAN from spinycle service, preparing to retry sending message...'
|
126 | oldmsg = @savedMessagesInCaseOfRetries[reply.messageId]
|
127 | setTimeout(
|
128 | ()=>
|
129 | console.log 'resending message '+oldmsg.messageId+' due to target endpoint not open yet'
|
130 | @emit(oldmsg)
|
131 | ,250
|
132 | )
|
133 |
|
134 | else if not @hasSeenThisMessage reply.messageId
|
135 | @savedMessagesInCaseOfRetries.remove(reply.messageId)
|
136 | if reply.messageId and reply.messageId isnt 'undefined' then @seenMessages.push(reply.messageId)
|
137 | if @seenMessages.length > 10 then @seenMessages.shift()
|
138 | if debug then console.log 'redis-spincycle got reply messageId ' + reply.messageId + ' status ' + status + ', info ' + info + ' data ' + message + ' outstandingMessages = ' + @outstandingMessages.length
|
139 | if debug then @dumpOutstanding()
|
140 |
|
141 | index = -1
|
142 | if reply.messageId
|
143 | i = 0
|
144 | while i < @outstandingMessages.length
|
145 | index = i
|
146 | detail = @outstandingMessages[i]
|
147 | if detail and not detail.delivered and detail.messageId == reply.messageId
|
148 | if reply.status == 'FAILURE' or reply.status == 'NOT_ALLOWED'
|
149 | console.log 'spinclient message FAILURE'
|
150 | console.dir reply
|
151 | detail.d.reject reply
|
152 | break
|
153 | else
|
154 |
|
155 | detail.d.resolve(message)
|
156 | break
|
157 | detail.delivered = true
|
158 | i++
|
159 | if index > -1
|
160 | @outstandingMessages.splice index, 1
|
161 | else
|
162 | subs = @subscribers[info]
|
163 | if subs
|
164 | subs.forEach (listener) ->
|
165 | listener message
|
166 | else
|
167 | if debug then console.log 'no subscribers for message ' + message
|
168 | if debug then console.dir reply
|
169 | else
|
170 | if debug then console.log '-- skipped resent message ' + reply.messageId
|
171 |
|
172 | hasSeenThisMessage: (messageId) =>
|
173 | @seenMessages.some (mid) -> messageId == mid
|
174 |
|
175 | registerListener: (detail) =>
|
176 |
|
177 | subs = @subscribers[detail.message] or []
|
178 | subs.push detail.callback
|
179 | @subscribers[detail.message] = subs
|
180 |
|
181 | registerObjectSubscriber: (detail) =>
|
182 | d = $q.defer()
|
183 | sid = uuid.v4()
|
184 | localsubs = @objectsSubscribedTo[detail.id]
|
185 |
|
186 |
|
187 | if not localsubs
|
188 | localsubs = []
|
189 | console.log 'spinredis no local subs, so get the original server-side subscription for id ' + detail.id
|
190 |
|
191 | @_registerObjectSubscriber({
|
192 | id: detail.id, type: detail.type, cb: (updatedobj) =>
|
193 |
|
194 | lsubs = @objectsSubscribedTo[detail.id]
|
195 |
|
196 | for k,v of lsubs
|
197 | if (v.cb)
|
198 |
|
199 | v.cb updatedobj
|
200 | }).then( (remotesid) =>
|
201 | localsubs['remotesid'] = remotesid
|
202 | localsubs[sid] = detail
|
203 |
|
204 | @objectsSubscribedTo[detail.id] = localsubs
|
205 | d.resolve(sid)
|
206 | ,(rejection)=>
|
207 | console.log 'spinredis registerObjectSubscriber rejection: '+rejection
|
208 | console.dir rejection
|
209 | )
|
210 | else
|
211 | localsubs[sid] = detail
|
212 | return d.promise
|
213 |
|
214 | _registerObjectSubscriber: (detail) =>
|
215 | d = $q.defer()
|
216 |
|
217 | subs = @objsubscribers[detail.id] or []
|
218 |
|
219 | @emitMessage({target: 'registerForUpdatesOn', obj: {id: detail.id, type: detail.type}}).then(
|
220 | (reply)=>
|
221 | console.log 'spinredis server subscription id for id ' + detail.id + ' is ' + reply
|
222 | subs[reply] = detail.cb
|
223 | @objsubscribers[detail.id] = subs
|
224 | d.resolve(reply)
|
225 | , (reply)=>
|
226 | @failed(reply)
|
227 | )
|
228 | return d.promise
|
229 |
|
230 | deRegisterObjectsSubscriber: (sid, o) =>
|
231 | localsubs = @objectsSubscribedTo[o.id] or []
|
232 | if localsubs[sid]
|
233 | console.log 'deregistering local updates for @objects ' + o.id
|
234 | delete localsubs[sid]
|
235 | count = 0
|
236 | for k,v in localsubs
|
237 | count++
|
238 | if count == 1
|
239 | @_deRegisterObjectsSubscriber('remotesid', o)
|
240 |
|
241 | _deRegisterObjectsSubscriber: (sid, o) =>
|
242 | subs = @objsubscribers[o.id] or []
|
243 | if subs and subs[sid]
|
244 | delete subs[sid]
|
245 | @objsubscribers[o.id] = subs
|
246 | @emitMessage({target: 'deRegisterForUpdatesOn', id: o.id, type: o.type, listenerid: sid}).then (reply)->
|
247 | console.log 'deregistering server updates for @objects ' + o.id
|
248 |
|
249 | emitMessage: (detail) =>
|
250 | if debug then console.log 'emitMessage called'
|
251 | if debug then console.dir detail
|
252 | d = $q.defer()
|
253 | detail.messageId = uuid.v4()
|
254 | detail.sessionId = detail.sessionId or @sessionId
|
255 | detail.d = d
|
256 | @outstandingMessages.push detail
|
257 | if debug then console.log 'saving outstanding reply to messageId ' + detail.messageId + ' and @sessionId ' + detail.sessionId
|
258 | @emit detail
|
259 |
|
260 | return d.promise
|
261 |
|
262 |
|
263 |
|
264 | getModelFor: (type) =>
|
265 | d = $q.defer()
|
266 | if @modelcache[type]
|
267 | d.resolve(@modelcache[type])
|
268 | else
|
269 | @emitMessage({target: 'getModelFor', modelname: type}).then((model)->
|
270 | @modelcache[type] = model
|
271 | d.resolve(model)
|
272 | ,(rejection)=>
|
273 | console.log 'spinredis getModelFor rejection: '+rejection
|
274 | console.dir rejection
|
275 | )
|
276 | return d.promise
|
277 |
|
278 | listTargets: () =>
|
279 | d = $q.defer()
|
280 | @emitMessage({target: 'listcommands'}).then((targets)->
|
281 | d.resolve(targets)
|
282 | ,(rejection)->
|
283 | console.log 'spinredis listTargets rejection: '+rejection
|
284 | console.dir rejection
|
285 | )
|
286 | return d.promise
|
287 |
|
288 | flattenModel: (model) =>
|
289 | rv = {}
|
290 | for k,v of model
|
291 | if angular.isArray(v)
|
292 | rv[k] = v.map (e) -> e.id
|
293 | else
|
294 | rv[k] = v
|
295 | return rv
|
296 |
|
297 |
|
298 | module.exports = spinredis
|