1 | _ = require 'lodash'
|
2 | async = require 'async'
|
3 | JobManagerBase = require './base'
|
4 | debug = require('debug')('meshblu-core-job-manager:responder')
|
5 | debugSaturation = require('debug')('meshblu-core-job-manager:responder:saturation')
|
6 | SimpleBenchmark = require 'simple-benchmark'
|
7 | ResponderDequeuer = require './responder-dequeuer'
|
8 |
|
9 | class JobManagerResponder extends JobManagerBase
|
10 | constructor: (options={}) ->
|
11 | {
|
12 | @requestQueueName
|
13 | @workerFunc
|
14 | concurrency=1
|
15 | } = options
|
16 |
|
17 | throw new Error 'JobManagerResponder constructor is missing "requestQueueName"' unless @requestQueueName?
|
18 | throw new Error 'JobManagerResponder constructor is missing "workerFunc"' unless @workerFunc?
|
19 |
|
20 | {concurrency=1} = options
|
21 | @queue = async.queue @_work, concurrency
|
22 | @queue.empty = => @emit 'empty'
|
23 | super
|
24 | @dequeuers = []
|
25 | _.times Math.ceil(concurrency/3), (x) =>
|
26 | @dequeuers.push new ResponderDequeuer { @queue, @_queuePool, @_updateHeartbeat, @requestQueueName, @queueTimeoutSeconds, x }
|
27 |
|
28 | createResponse: ({responseId, response}, callback) =>
|
29 | { metadata, data, rawData } = response
|
30 | metadata.responseId ?= responseId
|
31 | data ?= null
|
32 | rawData ?= JSON.stringify data
|
33 |
|
34 | @client.hmget metadata.responseId, ['request:metadata', 'response:queueName'], (error, result) =>
|
35 | delete error.code if error?
|
36 | return callback error if error?
|
37 |
|
38 | [ requestMetadata, responseQueueName ] = result ? []
|
39 |
|
40 | return callback() if _.isEmpty(requestMetadata) and _.isEmpty(responseQueueName)
|
41 | return callback new Error 'missing responseQueueName' unless responseQueueName?
|
42 |
|
43 | try
|
44 | requestMetadata = JSON.parse requestMetadata
|
45 | catch
|
46 |
|
47 | requestMetadata ?= {}
|
48 |
|
49 | if requestMetadata.ignoreResponse
|
50 | @client.del metadata.responseId, (error) =>
|
51 | delete error.code if error?
|
52 | return callback error if error?
|
53 | return callback null, {metadata, rawData}
|
54 | return
|
55 |
|
56 | metadata.jobLogs = requestMetadata.jobLogs if requestMetadata.jobLogs?
|
57 | metadata.metrics = requestMetadata.metrics if requestMetadata.metrics?
|
58 |
|
59 | @addMetric metadata, 'enqueueResponseAt', (error) =>
|
60 | return callback error if error?
|
61 |
|
62 | async.series [
|
63 | async.apply @client.publish, responseQueueName, JSON.stringify({ metadata, rawData })
|
64 | async.apply @client.expire, metadata.responseId, @jobTimeoutSeconds
|
65 | ], (error) =>
|
66 | delete error.code if error?
|
67 | callback null, { metadata, rawData }
|
68 |
|
69 | return
|
70 |
|
71 | _work: (key, callback) =>
|
72 | benchmark = new SimpleBenchmark label: '_work'
|
73 | @getRequest key, (error, job) =>
|
74 | return callback error if error?
|
75 | process.nextTick =>
|
76 | @workerFunc job, (error, response) =>
|
77 | if error?
|
78 | console.error error.stack
|
79 | callback()
|
80 | responseId = _.get job, 'metadata.responseId'
|
81 | @createResponse {responseId, response}, (error) =>
|
82 | console.error error.stack if error?
|
83 | debug benchmark.toString()
|
84 | callback()
|
85 |
|
86 | getRequest: (key, callback) =>
|
87 | @client.hgetall key, (error, result) =>
|
88 | delete error.code if error?
|
89 | return callback error if error?
|
90 | return callback new Error 'Missing result' if _.isEmpty result
|
91 | return callback new Error 'Missing metadata' if _.isEmpty result['request:metadata']
|
92 |
|
93 | metadata = JSON.parse result['request:metadata']
|
94 | @addMetric metadata, 'dequeueRequestAt', (error) =>
|
95 | return callback error if error?
|
96 |
|
97 | request =
|
98 | createdAt: result['request:createdAt']
|
99 | metadata: metadata
|
100 | rawData: result['request:data']
|
101 |
|
102 | @client.hset key, 'request:metadata', JSON.stringify(metadata), (error) =>
|
103 | return callback error if error?
|
104 | callback null, request
|
105 |
|
106 | start: (callback=_.noop) =>
|
107 | @_commandPool.acquire().then (@client) =>
|
108 | @client.once 'error', (error) =>
|
109 | @emit 'error', error
|
110 |
|
111 | @_startProcessing callback
|
112 | .catch callback
|
113 | return
|
114 |
|
115 | _startProcessing: (callback) =>
|
116 | @_drained = true
|
117 | @queue.drain = =>
|
118 | debug 'drained'
|
119 | @_drained = true
|
120 | tasks = []
|
121 | _.each @dequeuers, (dequeuer) =>
|
122 | tasks.push dequeuer.start
|
123 | async.parallel tasks, callback
|
124 |
|
125 | _waitForStopped: (callback) =>
|
126 | _.delay callback, 100
|
127 |
|
128 | stop: (callback=_.noop) =>
|
129 | tasks = []
|
130 | _.each @dequeuers, (dequeuer) =>
|
131 | tasks.push dequeuer.stop
|
132 | async.parallel tasks, =>
|
133 | async.doUntil @_waitForStopped, (=> @_drained), callback
|
134 |
|
135 | module.exports = JobManagerResponder
|