UNPKG

4.67 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2async = require 'async'
3JobManagerBase = require './base'
4debug = require('debug')('meshblu-core-job-manager:responder')
5debugSaturation = require('debug')('meshblu-core-job-manager:responder:saturation')
6SimpleBenchmark = require 'simple-benchmark'
7ResponderDequeuer = require './responder-dequeuer'
8
9class JobManagerResponder extends JobManagerBase
10 constructor: (options={}) ->
11 {
12 @requestQueueName
13 @workerFunc
14 concurrency=1
15 } = options
16
17 throw new Error 'JobManagerResponder constructor is missing "requestQueueName"' unless @requestQueueName?
18 throw new Error 'JobManagerResponder constructor is missing "workerFunc"' unless @workerFunc?
19
20 {concurrency=1} = options
21 @queue = async.queue @_work, concurrency
22 @queue.empty = => @emit 'empty'
23 super
24 @dequeuers = []
25 _.times Math.ceil(concurrency/3), (x) =>
26 @dequeuers.push new ResponderDequeuer { @queue, @_queuePool, @_updateHeartbeat, @requestQueueName, @queueTimeoutSeconds, x }
27
28 createResponse: ({responseId, response}, callback) =>
29 { metadata, data, rawData } = response
30 metadata.responseId ?= responseId
31 data ?= null
32 rawData ?= JSON.stringify data
33
34 @client.hmget metadata.responseId, ['request:metadata', 'response:queueName'], (error, result) =>
35 delete error.code if error?
36 return callback error if error?
37
38 [ requestMetadata, responseQueueName ] = result ? []
39
40 return callback() if _.isEmpty(requestMetadata) and _.isEmpty(responseQueueName)
41 return callback new Error 'missing responseQueueName' unless responseQueueName?
42
43 try
44 requestMetadata = JSON.parse requestMetadata
45 catch
46
47 requestMetadata ?= {}
48
49 if requestMetadata.ignoreResponse
50 @client.del metadata.responseId, (error) =>
51 delete error.code if error?
52 return callback error if error?
53 return callback null, {metadata, rawData}
54 return
55
56 metadata.jobLogs = requestMetadata.jobLogs if requestMetadata.jobLogs?
57 metadata.metrics = requestMetadata.metrics if requestMetadata.metrics?
58
59 @addMetric metadata, 'enqueueResponseAt', (error) =>
60 return callback error if error?
61
62 async.series [
63 async.apply @client.publish, responseQueueName, JSON.stringify({ metadata, rawData })
64 async.apply @client.expire, metadata.responseId, @jobTimeoutSeconds
65 ], (error) =>
66 delete error.code if error?
67 callback null, { metadata, rawData }
68
69 return # avoid returning redis
70
71 _work: (key, callback) =>
72 benchmark = new SimpleBenchmark label: '_work'
73 @getRequest key, (error, job) =>
74 return callback error if error?
75 process.nextTick =>
76 @workerFunc job, (error, response) =>
77 if error?
78 console.error error.stack
79 callback()
80 responseId = _.get job, 'metadata.responseId'
81 @createResponse {responseId, response}, (error) =>
82 console.error error.stack if error?
83 debug benchmark.toString()
84 callback()
85
86 getRequest: (key, callback) =>
87 @client.hgetall key, (error, result) =>
88 delete error.code if error?
89 return callback error if error?
90 return callback new Error 'Missing result' if _.isEmpty result
91 return callback new Error 'Missing metadata' if _.isEmpty result['request:metadata']
92
93 metadata = JSON.parse result['request:metadata']
94 @addMetric metadata, 'dequeueRequestAt', (error) =>
95 return callback error if error?
96
97 request =
98 createdAt: result['request:createdAt']
99 metadata: metadata
100 rawData: result['request:data']
101
102 @client.hset key, 'request:metadata', JSON.stringify(metadata), (error) =>
103 return callback error if error?
104 callback null, request
105
106 start: (callback=_.noop) =>
107 @_commandPool.acquire().then (@client) =>
108 @client.once 'error', (error) =>
109 @emit 'error', error
110
111 @_startProcessing callback
112 .catch callback
113 return # promises
114
115 _startProcessing: (callback) =>
116 @_drained = true
117 @queue.drain = =>
118 debug 'drained'
119 @_drained = true
120 tasks = []
121 _.each @dequeuers, (dequeuer) =>
122 tasks.push dequeuer.start
123 async.parallel tasks, callback
124
125 _waitForStopped: (callback) =>
126 _.delay callback, 100
127
128 stop: (callback=_.noop) =>
129 tasks = []
130 _.each @dequeuers, (dequeuer) =>
131 tasks.push dequeuer.stop
132 async.parallel tasks, =>
133 async.doUntil @_waitForStopped, (=> @_drained), callback
134
135module.exports = JobManagerResponder