1 | _ = require 'lodash'
|
2 | moment = require 'moment'
|
3 | When = require 'when'
|
4 | GenericPool = require 'generic-pool'
|
5 | { EventEmitter } = require 'events'
|
6 | Redis = require 'ioredis'
|
7 | RedisNS = require '@octoblu/redis-ns'
|
8 | debug = require('debug')('meshblu-core-job-manager:job-manager-base')
|
9 |
|
10 | class JobManagerBase extends EventEmitter
|
11 | constructor: (options={}) ->
|
12 | {
|
13 | @namespace
|
14 | @redisUri
|
15 | @idleTimeoutMillis
|
16 | @maxConnections
|
17 | @minConnections
|
18 | @evictionRunIntervalMillis
|
19 | @acquireTimeoutMillis
|
20 | @jobTimeoutSeconds
|
21 | @queueTimeoutSeconds
|
22 | } = options
|
23 | @idleTimeoutMillis ?= 60000
|
24 | @evictionRunIntervalMillis ?= 60000
|
25 | @acquireTimeoutMillis ?= 5000
|
26 | @minConnections ?= 0
|
27 | @_heartbeat = moment()
|
28 |
|
29 | throw new Error 'JobManagerResponder constructor is missing "namespace"' unless @namespace?
|
30 | throw new Error 'JobManagerResponder constructor is missing "redisUri"' unless @redisUri?
|
31 | throw new Error 'JobManagerResponder constructor is missing "idleTimeoutMillis"' unless @idleTimeoutMillis?
|
32 | throw new Error 'JobManagerResponder constructor is missing "maxConnections"' unless @maxConnections?
|
33 | throw new Error 'JobManagerResponder constructor is missing "minConnections"' unless @minConnections?
|
34 | throw new Error 'JobManagerRequester constructor is missing "jobTimeoutSeconds"' unless @jobTimeoutSeconds?
|
35 | throw new Error 'JobManagerRequester constructor is missing "queueTimeoutSeconds"' unless @queueTimeoutSeconds?
|
36 |
|
37 | @_queuePool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
|
38 | @_commandPool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
|
39 | @_pubSubPool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
|
40 |
|
41 | addMetric: (metadata, metricName, callback) =>
|
42 | return callback() if _.isEmpty metadata.jobLogs
|
43 | metadata.metrics ?= {}
|
44 | metadata.metrics[metricName] = Date.now()
|
45 | callback()
|
46 |
|
47 | _closeClient: (client) =>
|
48 | client.on 'error', =>
|
49 |
|
50 |
|
51 | try
|
52 | if client.disconnect?
|
53 | client.quit()
|
54 | client.disconnect false
|
55 | return
|
56 |
|
57 | client.end true
|
58 | catch
|
59 |
|
60 | _createRedisPool: ({ maxConnections, minConnections, idleTimeoutMillis, evictionRunIntervalMillis, acquireTimeoutMillis, namespace, redisUri }) =>
|
61 | factory =
|
62 | create: =>
|
63 | return When.promise (resolve, reject) =>
|
64 | conx = new Redis redisUri, dropBufferSupport: true
|
65 | client = new RedisNS namespace, conx
|
66 | rejectError = (error) =>
|
67 | return reject error
|
68 |
|
69 | client.once 'error', rejectError
|
70 | client.once 'ready', =>
|
71 | client.removeListener 'error', rejectError
|
72 | resolve client
|
73 |
|
74 | destroy: (client) =>
|
75 | return When.promise (resolve, reject) =>
|
76 | @_closeClient client, (error) =>
|
77 | return reject error if error?
|
78 | resolve()
|
79 |
|
80 | validate: (client) =>
|
81 | return When.promise (resolve) =>
|
82 | client.ping (error) =>
|
83 | return resolve false if error?
|
84 | resolve true
|
85 |
|
86 | options = {
|
87 | max: maxConnections
|
88 | min: minConnections
|
89 | testOnBorrow: true
|
90 | idleTimeoutMillis
|
91 | evictionRunIntervalMillis
|
92 | acquireTimeoutMillis
|
93 | }
|
94 |
|
95 | pool = GenericPool.createPool factory, options
|
96 |
|
97 | pool.on 'factoryCreateError', (error) =>
|
98 | @emit 'factoryCreateError', error
|
99 |
|
100 | return pool
|
101 |
|
102 | healthcheck: (callback) =>
|
103 | healthy = @_heartbeat.isAfter moment().subtract @jobTimeoutSeconds * 2, 'seconds'
|
104 | _.defer =>
|
105 | callback null, healthy
|
106 |
|
107 | _updateHeartbeat: =>
|
108 | @_heartbeat = moment()
|
109 |
|
110 | module.exports = JobManagerBase
|