UNPKG

6.1 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2async = require 'async'
3UUID = require 'uuid'
4JobManagerBase = require './base'
5
6class JobManagerRequester extends JobManagerBase
7 constructor: (options={}) ->
8 {
9 @jobLogSampleRate
10 @maxQueueLength
11 @jobLogSampleRateOverrideUuids
12 @responseQueueName
13 @requestQueueName
14 } = options
15 @maxQueueLength ?= 10000
16 @jobLogSampleRateOverrideUuids ?= []
17
18 throw new Error 'JobManagerRequester constructor is missing "jobLogSampleRate"' unless @jobLogSampleRate?
19 throw new Error 'JobManagerRequester constructor is missing "requestQueueName"' unless @requestQueueName?
20 throw new Error 'JobManagerRequester constructor is missing "responseQueueName"' unless @responseQueueName?
21
22 super
23
24 _addResponseIdToOptions: (options) =>
25 options = _.clone options
26 { metadata } = options
27 metadata = _.clone metadata
28 metadata.responseId ?= @generateResponseId()
29 options.metadata = metadata
30 return options
31
32 _checkMaxQueueLength: (callback) =>
33 return callback() unless @maxQueueLength > 0
34 @client.llen @requestQueueName, (error, queueLength) =>
35 return callback error if error?
36 return callback() if queueLength <= @maxQueueLength
37
38 error = new Error 'Maximum Capacity Exceeded'
39 error.code = 503
40 callback error
41 return # avoid returning redis
42
43 createForeverRequest: (options, callback) =>
44 options = @_addResponseIdToOptions options
45 {metadata,data,rawData} = options
46
47 @_checkMaxQueueLength (error) =>
48 return callback error if error?
49
50 metadata.jobLogs = []
51 if Math.random() < @jobLogSampleRate
52 metadata.jobLogs.push 'sampled'
53
54 uuids = [ metadata.auth?.uuid, metadata.toUuid, metadata.fromUuid, metadata.auth?.as ]
55 metadata.jobLogs.push 'override' unless _.isEmpty _.intersection @jobLogSampleRateOverrideUuids, uuids
56
57 @addMetric metadata, 'enqueueRequestAt', (error) =>
58 return callback error if error?
59 { responseId } = metadata
60 data ?= null
61
62 metadataStr = JSON.stringify metadata
63 rawData ?= JSON.stringify data
64
65 values = [
66 'request:metadata', metadataStr
67 'request:data', rawData
68 'request:createdAt', Date.now()
69 'response:queueName', @responseQueueName
70 ]
71
72 async.series [
73 async.apply @client.hmset, responseId, values
74 async.apply @client.lpush, @requestQueueName, responseId
75 ], (error) =>
76 delete error.code if error?
77 callback error, responseId
78 return # avoid returning redis
79
80 createRequest: (options, callback) =>
81 @createForeverRequest options, (error, responseId) =>
82 return callback error if error?
83 @client.expire responseId, @jobTimeoutSeconds, (error) =>
84 delete error.code if error?
85 return callback error if error?
86 callback null, responseId
87
88 return # avoid returning redis
89
90 do: (request, callback) =>
91 callback = _.once callback
92 request = @_addResponseIdToOptions request
93 responseId = _.get request, 'metadata.responseId'
94 responseTimeout = null
95 return callback new Error 'do requires metadata.responseId' unless responseId?
96
97 @once "response:#{responseId}", (response) =>
98 clearTimeout responseTimeout if responseTimeout?
99 @removeListener "error:#{responseId}", callback
100 callback null, response
101
102 @once "error:#{responseId}", (error) =>
103 clearTimeout responseTimeout if responseTimeout?
104 @removeListener "response:#{responseId}", callback
105 callback error
106
107 @createRequest request, (error) =>
108 return @emit "error:#{responseId}", error if error?
109 responseTimeout = setTimeout =>
110 error = new Error('Response timeout exceeded')
111 error.code = 504
112 @emit "error:#{responseId}", error
113 , @jobTimeoutSeconds * 1000
114
115 _listenForResponses: (callback) =>
116 @pubSubClient.subscribe @responseQueueName
117 @pubSubClient.on 'message', (channel, data) =>
118 @_updateHeartbeat()
119 try
120 data = JSON.parse data
121 catch error
122
123 { metadata, rawData } = data
124
125 @_parseResponse { metadata, rawData }, (error, response) =>
126 console.error error.stack if error? # log error and continue
127 return if _.isEmpty response
128 responseId = _.get response, 'metadata.responseId'
129
130 @emit "response:#{responseId}", response
131
132 _getResponse: (key, callback) =>
133 @client.hmget key, ['response:metadata', 'response:data'], (error, data) =>
134 delete error.code if error?
135 return callback error if error?
136 [ metadata, rawData ] = data
137 metadata = JSON.parse metadata
138 @_parseResponse { metadata, rawData }, callback
139 return # promises
140
141 _parseResponse: ({ metadata, rawData }, callback) =>
142 return if _.isEmpty metadata
143
144 @addMetric metadata, 'dequeueResponseAt', (error) =>
145 return callback error if error?
146
147 response =
148 metadata: metadata
149 rawData: rawData
150
151 callback null, response
152
153 generateResponseId: =>
154 UUID.v4()
155
156 start: (callback=_.noop) =>
157 @_commandPool.acquire()
158 .then (@client) =>
159 @client.once 'error', (error) =>
160 @emit 'error', error
161
162 @_pubSubPool.acquire()
163 .then (@pubSubClient) =>
164 @_startProcessing callback
165 .catch callback
166 return # nothing
167
168 _startProcessing: (callback) =>
169 callback = _.once callback
170 @_allowProcessing = true
171
172 @_listenForResponses()
173 _.defer callback
174
175 _stopProcessing: (callback) =>
176 @_allowProcessing = false
177 @pubSubClient.unsubscribe @responseQueueName
178 callback()
179
180 stop: (callback=_.noop) =>
181 @_stopProcessing (error) =>
182 @_pubSubPool.release @pubSubClient
183 .then =>
184 @_commandPool.release @client
185 .then =>
186 return @_pubSubPool.drain()
187 .then =>
188 return @_commandPool.drain()
189 .then =>
190 return @_queuePool.drain()
191 .then =>
192 callback error
193 .catch callback
194 return # nothing
195
196module.exports = JobManagerRequester