UNPKG

3.83 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2moment = require 'moment'
3When = require 'when'
4GenericPool = require 'generic-pool'
5{ EventEmitter } = require 'events'
6Redis = require 'ioredis'
7RedisNS = require '@octoblu/redis-ns'
8debug = require('debug')('meshblu-core-job-manager:job-manager-base')
9
10class JobManagerBase extends EventEmitter
11 constructor: (options={}) ->
12 {
13 @namespace
14 @redisUri
15 @idleTimeoutMillis
16 @maxConnections
17 @minConnections
18 @evictionRunIntervalMillis
19 @acquireTimeoutMillis
20 @jobTimeoutSeconds
21 @queueTimeoutSeconds
22 } = options
23 @idleTimeoutMillis ?= 60000
24 @evictionRunIntervalMillis ?= 60000
25 @acquireTimeoutMillis ?= 5000
26 @minConnections ?= 0
27 @_heartbeat = moment()
28
29 throw new Error 'JobManagerResponder constructor is missing "namespace"' unless @namespace?
30 throw new Error 'JobManagerResponder constructor is missing "redisUri"' unless @redisUri?
31 throw new Error 'JobManagerResponder constructor is missing "idleTimeoutMillis"' unless @idleTimeoutMillis?
32 throw new Error 'JobManagerResponder constructor is missing "maxConnections"' unless @maxConnections?
33 throw new Error 'JobManagerResponder constructor is missing "minConnections"' unless @minConnections?
34 throw new Error 'JobManagerRequester constructor is missing "jobTimeoutSeconds"' unless @jobTimeoutSeconds?
35 throw new Error 'JobManagerRequester constructor is missing "queueTimeoutSeconds"' unless @queueTimeoutSeconds?
36
37 @_queuePool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
38 @_commandPool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
39 @_pubSubPool = @_createRedisPool { @maxConnections, @minConnections, @idleTimeoutMillis, @namespace, @redisUri }
40
41 addMetric: (metadata, metricName, callback) =>
42 return callback() if _.isEmpty metadata.jobLogs
43 metadata.metrics ?= {}
44 metadata.metrics[metricName] = Date.now()
45 callback()
46
47 _closeClient: (client) =>
48 client.on 'error', =>
49 # silently deal with it
50
51 try
52 if client.disconnect?
53 client.quit()
54 client.disconnect false
55 return
56
57 client.end true
58 catch
59
60 _createRedisPool: ({ maxConnections, minConnections, idleTimeoutMillis, evictionRunIntervalMillis, acquireTimeoutMillis, namespace, redisUri }) =>
61 factory =
62 create: =>
63 return When.promise (resolve, reject) =>
64 conx = new Redis redisUri, dropBufferSupport: true
65 client = new RedisNS namespace, conx
66 rejectError = (error) =>
67 return reject error
68
69 client.once 'error', rejectError
70 client.once 'ready', =>
71 client.removeListener 'error', rejectError
72 resolve client
73
74 destroy: (client) =>
75 return When.promise (resolve, reject) =>
76 @_closeClient client, (error) =>
77 return reject error if error?
78 resolve()
79
80 validate: (client) =>
81 return When.promise (resolve) =>
82 client.ping (error) =>
83 return resolve false if error?
84 resolve true
85
86 options = {
87 max: maxConnections
88 min: minConnections
89 testOnBorrow: true
90 idleTimeoutMillis
91 evictionRunIntervalMillis
92 acquireTimeoutMillis
93 }
94
95 pool = GenericPool.createPool factory, options
96
97 pool.on 'factoryCreateError', (error) =>
98 @emit 'factoryCreateError', error
99
100 return pool
101
102 healthcheck: (callback) =>
103 healthy = @_heartbeat.isAfter moment().subtract @jobTimeoutSeconds * 2, 'seconds'
104 _.defer =>
105 callback null, healthy
106
107 _updateHeartbeat: =>
108 @_heartbeat = moment()
109
110module.exports = JobManagerBase