1 | _ = require 'lodash'
|
2 | async = require 'async'
|
3 | JobManagerBase = require './base'
|
4 | debug = require('debug')('meshblu-core-job-manager:responder')
|
5 | debugSaturation = require('debug')('meshblu-core-job-manager:responder:saturation')
|
6 | SimpleBenchmark = require 'simple-benchmark'
|
7 |
|
8 | class ResponderDequeuer
|
9 | constructor: ({ @queue, @_queuePool, @_updateHeartbeat, @requestQueueName, @queueTimeoutSeconds, @x, @onPush }) ->
|
10 | @onPush ?= _.noop
|
11 | @_updateHeartbeat ?= _.noop
|
12 |
|
13 | start: (callback=_.noop) =>
|
14 | @_allowProcessing = true
|
15 | @_getNewQueueClient().then =>
|
16 | async.doWhilst @enqueueJob, (=> @_allowProcessing)
|
17 | callback()
|
18 | .catch callback
|
19 |
|
20 | stop: (callback=_.noop) =>
|
21 | @_allowProcessing = false
|
22 | async.doUntil @_waitForStopped, @_safeToStop, =>
|
23 | @_queuePool.release @queueClient
|
24 | .catch (error) =>
|
25 | console.error error
|
26 | callback()
|
27 | return
|
28 |
|
29 | _waitForStopped: (callback) =>
|
30 | _.delay callback, 100
|
31 |
|
32 | _safeToStop: =>
|
33 | @_allowProcessing == false && @_enqueuing == false
|
34 |
|
35 | _getNewQueueClient: (error) =>
|
36 | console.error error.stack if error?
|
37 | @_queuePool.acquire().then (@queueClient) =>
|
38 | @queueClient.once 'error', @_getNewQueueClient
|
39 |
|
40 | enqueueJob: (callback=_.noop) =>
|
41 | return _.defer callback if @queue.length() > @queue.concurrency
|
42 |
|
43 | benchmark = new SimpleBenchmark label: "enqueueJob #{@x}"
|
44 | @_enqueuing = true
|
45 | @dequeueJob (error, key) =>
|
46 | debug benchmark.toString()
|
47 |
|
48 | @_enqueuing = false
|
49 | return callback() if error?
|
50 | return callback() if _.isEmpty key
|
51 | @onPush()
|
52 | @queue.push key
|
53 | debugSaturation @x, "ql:", @queue.length(), "wl:", @queue.workersList().length
|
54 | callback()
|
55 |
|
56 | dequeueJob: (callback) =>
|
57 | @queueClient.brpop @requestQueueName, @queueTimeoutSeconds, (error, result) =>
|
58 | @_updateHeartbeat()
|
59 | return callback error if error?
|
60 | return callback new Error 'No Result' unless result?
|
61 |
|
62 | [ channel, key ] = result
|
63 | return callback null, key
|
64 |
|
65 | module.exports = ResponderDequeuer
|