1 | _ = require 'lodash'
|
2 | async = require 'async'
|
3 | UUID = require 'uuid'
|
4 | JobManagerBase = require './base'
|
5 |
|
6 | class JobManagerRequester extends JobManagerBase
|
7 | constructor: (options={}) ->
|
8 | {
|
9 | @jobLogSampleRate
|
10 | @maxQueueLength
|
11 | @jobLogSampleRateOverrideUuids
|
12 | @responseQueueName
|
13 | @requestQueueName
|
14 | } = options
|
15 | @maxQueueLength ?= 10000
|
16 | @jobLogSampleRateOverrideUuids ?= []
|
17 |
|
18 | throw new Error 'JobManagerRequester constructor is missing "jobLogSampleRate"' unless @jobLogSampleRate?
|
19 | throw new Error 'JobManagerRequester constructor is missing "requestQueueName"' unless @requestQueueName?
|
20 | throw new Error 'JobManagerRequester constructor is missing "responseQueueName"' unless @responseQueueName?
|
21 |
|
22 | super
|
23 |
|
24 | _addResponseIdToOptions: (options) =>
|
25 | options = _.clone options
|
26 | { metadata } = options
|
27 | metadata = _.clone metadata
|
28 | metadata.responseId ?= @generateResponseId()
|
29 | options.metadata = metadata
|
30 | return options
|
31 |
|
32 | _checkMaxQueueLength: (callback) =>
|
33 | return callback() unless @maxQueueLength > 0
|
34 | @client.llen @requestQueueName, (error, queueLength) =>
|
35 | return callback error if error?
|
36 | return callback() if queueLength <= @maxQueueLength
|
37 |
|
38 | error = new Error 'Maximum Capacity Exceeded'
|
39 | error.code = 503
|
40 | callback error
|
41 | return
|
42 |
|
43 | createForeverRequest: (options, callback) =>
|
44 | options = @_addResponseIdToOptions options
|
45 | {metadata,data,rawData} = options
|
46 |
|
47 | @_checkMaxQueueLength (error) =>
|
48 | return callback error if error?
|
49 |
|
50 | metadata.jobLogs = []
|
51 | if Math.random() < @jobLogSampleRate
|
52 | metadata.jobLogs.push 'sampled'
|
53 |
|
54 | uuids = [ metadata.auth?.uuid, metadata.toUuid, metadata.fromUuid, metadata.auth?.as ]
|
55 | metadata.jobLogs.push 'override' unless _.isEmpty _.intersection @jobLogSampleRateOverrideUuids, uuids
|
56 |
|
57 | @addMetric metadata, 'enqueueRequestAt', (error) =>
|
58 | return callback error if error?
|
59 | { responseId } = metadata
|
60 | data ?= null
|
61 |
|
62 | metadataStr = JSON.stringify metadata
|
63 | rawData ?= JSON.stringify data
|
64 |
|
65 | values = [
|
66 | 'request:metadata', metadataStr
|
67 | 'request:data', rawData
|
68 | 'request:createdAt', Date.now()
|
69 | 'response:queueName', @responseQueueName
|
70 | ]
|
71 |
|
72 | async.series [
|
73 | async.apply @client.hmset, responseId, values
|
74 | async.apply @client.lpush, @requestQueueName, responseId
|
75 | ], (error) =>
|
76 | delete error.code if error?
|
77 | callback error, responseId
|
78 | return
|
79 |
|
80 | createRequest: (options, callback) =>
|
81 | @createForeverRequest options, (error, responseId) =>
|
82 | return callback error if error?
|
83 | @client.expire responseId, @jobTimeoutSeconds, (error) =>
|
84 | delete error.code if error?
|
85 | return callback error if error?
|
86 | callback null, responseId
|
87 |
|
88 | return
|
89 |
|
90 | do: (request, callback) =>
|
91 | callback = _.once callback
|
92 | request = @_addResponseIdToOptions request
|
93 | responseId = _.get request, 'metadata.responseId'
|
94 | responseTimeout = null
|
95 | return callback new Error 'do requires metadata.responseId' unless responseId?
|
96 |
|
97 | @once "response:#{responseId}", (response) =>
|
98 | clearTimeout responseTimeout if responseTimeout?
|
99 | @removeListener "error:#{responseId}", callback
|
100 | callback null, response
|
101 |
|
102 | @once "error:#{responseId}", (error) =>
|
103 | clearTimeout responseTimeout if responseTimeout?
|
104 | @removeListener "response:#{responseId}", callback
|
105 | callback error
|
106 |
|
107 | @createRequest request, (error) =>
|
108 | return @emit "error:#{responseId}", error if error?
|
109 | responseTimeout = setTimeout =>
|
110 | error = new Error('Response timeout exceeded')
|
111 | error.code = 504
|
112 | @emit "error:#{responseId}", error
|
113 | , @jobTimeoutSeconds * 1000
|
114 |
|
115 | _listenForResponses: (callback) =>
|
116 | @pubSubClient.subscribe @responseQueueName
|
117 | @pubSubClient.on 'message', (channel, data) =>
|
118 | @_updateHeartbeat()
|
119 | try
|
120 | data = JSON.parse data
|
121 | catch error
|
122 |
|
123 | { metadata, rawData } = data
|
124 |
|
125 | @_parseResponse { metadata, rawData }, (error, response) =>
|
126 | console.error error.stack if error?
|
127 | return if _.isEmpty response
|
128 | responseId = _.get response, 'metadata.responseId'
|
129 |
|
130 | @emit "response:#{responseId}", response
|
131 |
|
132 | _getResponse: (key, callback) =>
|
133 | @client.hmget key, ['response:metadata', 'response:data'], (error, data) =>
|
134 | delete error.code if error?
|
135 | return callback error if error?
|
136 | [ metadata, rawData ] = data
|
137 | metadata = JSON.parse metadata
|
138 | @_parseResponse { metadata, rawData }, callback
|
139 | return
|
140 |
|
141 | _parseResponse: ({ metadata, rawData }, callback) =>
|
142 | return if _.isEmpty metadata
|
143 |
|
144 | @addMetric metadata, 'dequeueResponseAt', (error) =>
|
145 | return callback error if error?
|
146 |
|
147 | response =
|
148 | metadata: metadata
|
149 | rawData: rawData
|
150 |
|
151 | callback null, response
|
152 |
|
153 | generateResponseId: =>
|
154 | UUID.v4()
|
155 |
|
156 | start: (callback=_.noop) =>
|
157 | @_commandPool.acquire()
|
158 | .then (@client) =>
|
159 | @client.once 'error', (error) =>
|
160 | @emit 'error', error
|
161 |
|
162 | @_pubSubPool.acquire()
|
163 | .then (@pubSubClient) =>
|
164 | @_startProcessing callback
|
165 | .catch callback
|
166 | return
|
167 |
|
168 | _startProcessing: (callback) =>
|
169 | callback = _.once callback
|
170 | @_allowProcessing = true
|
171 |
|
172 | @_listenForResponses()
|
173 | _.defer callback
|
174 |
|
175 | _stopProcessing: (callback) =>
|
176 | @_allowProcessing = false
|
177 | @pubSubClient.unsubscribe @responseQueueName
|
178 | callback()
|
179 |
|
180 | stop: (callback=_.noop) =>
|
181 | @_stopProcessing (error) =>
|
182 | @_pubSubPool.release @pubSubClient
|
183 | .then =>
|
184 | @_commandPool.release @client
|
185 | .then =>
|
186 | return @_pubSubPool.drain()
|
187 | .then =>
|
188 | return @_commandPool.drain()
|
189 | .then =>
|
190 | return @_queuePool.drain()
|
191 | .then =>
|
192 | callback error
|
193 | .catch callback
|
194 | return
|
195 |
|
196 | module.exports = JobManagerRequester
|