UNPKG

2.1 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2async = require 'async'
3JobManagerBase = require './base'
4debug = require('debug')('meshblu-core-job-manager:responder')
5debugSaturation = require('debug')('meshblu-core-job-manager:responder:saturation')
6SimpleBenchmark = require 'simple-benchmark'
7
8class ResponderDequeuer
9 constructor: ({ @queue, @_queuePool, @_updateHeartbeat, @requestQueueName, @queueTimeoutSeconds, @x, @onPush }) ->
10 @onPush ?= _.noop
11 @_updateHeartbeat ?= _.noop
12
13 start: (callback=_.noop) =>
14 @_allowProcessing = true
15 @_getNewQueueClient().then =>
16 async.doWhilst @enqueueJob, (=> @_allowProcessing)
17 callback()
18 .catch callback
19
20 stop: (callback=_.noop) =>
21 @_allowProcessing = false
22 async.doUntil @_waitForStopped, @_safeToStop, =>
23 @_queuePool.release @queueClient
24 .catch (error) =>
25 console.error error
26 callback()
27 return # promises
28
29 _waitForStopped: (callback) =>
30 _.delay callback, 100
31
32 _safeToStop: =>
33 @_allowProcessing == false && @_enqueuing == false
34
35 _getNewQueueClient: (error) =>
36 console.error error.stack if error?
37 @_queuePool.acquire().then (@queueClient) =>
38 @queueClient.once 'error', @_getNewQueueClient
39
40 enqueueJob: (callback=_.noop) =>
41 return _.defer callback if @queue.length() > @queue.concurrency
42
43 benchmark = new SimpleBenchmark label: "enqueueJob #{@x}"
44 @_enqueuing = true
45 @dequeueJob (error, key) =>
46 debug benchmark.toString()
47 # order is important here
48 @_enqueuing = false
49 return callback() if error?
50 return callback() if _.isEmpty key
51 @onPush()
52 @queue.push key
53 debugSaturation @x, "ql:", @queue.length(), "wl:", @queue.workersList().length
54 callback()
55
56 dequeueJob: (callback) =>
57 @queueClient.brpop @requestQueueName, @queueTimeoutSeconds, (error, result) =>
58 @_updateHeartbeat()
59 return callback error if error?
60 return callback new Error 'No Result' unless result?
61
62 [ channel, key ] = result
63 return callback null, key
64
65module.exports = ResponderDequeuer