1 | {Pool} = require '@octoblu/generic-pool'
|
2 | redis = require 'ioredis'
|
3 | RedisNS = require '@octoblu/redis-ns'
|
4 | JobLogger = require 'job-logger'
|
5 | PooledJobManager = require 'meshblu-core-pooled-job-manager'
|
6 |
|
7 | class RedisPooledJobManager
|
8 | constructor: (options={}) ->
|
9 | {
|
10 | jobLogIndexPrefix
|
11 | jobLogQueue
|
12 | jobLogRedisUri
|
13 | jobLogSampleRate
|
14 | jobLogType
|
15 | jobTimeoutSeconds
|
16 | maxConnections
|
17 | minConnections
|
18 | idleTimeoutMillis
|
19 | namespace
|
20 | redisUri
|
21 | } = options
|
22 |
|
23 | minConnections ?= 1
|
24 | idleTimeoutMillis ?= 60000
|
25 |
|
26 | throw new Error('RedisPooledJobManager: jobLogIndexPrefix is required') unless jobLogIndexPrefix?
|
27 | throw new Error('RedisPooledJobManager: jobLogQueue is required') unless jobLogQueue?
|
28 | throw new Error('RedisPooledJobManager: jobLogRedisUri is required') unless jobLogRedisUri?
|
29 | throw new Error('RedisPooledJobManager: jobLogSampleRate is required') unless jobLogSampleRate?
|
30 | throw new Error('RedisPooledJobManager: jobLogType is required') unless jobLogType?
|
31 | throw new Error('RedisPooledJobManager: jobTimeoutSeconds is required') unless jobTimeoutSeconds?
|
32 | throw new Error('RedisPooledJobManager: maxConnections is required') unless maxConnections?
|
33 | throw new Error('RedisPooledJobManager: namespace is required') unless namespace?
|
34 | throw new Error('RedisPooledJobManager: redisUri is required') unless redisUri?
|
35 |
|
36 | @jobManager = new PooledJobManager
|
37 | jobLogSampleRate: jobLogSampleRate
|
38 | timeoutSeconds: jobTimeoutSeconds
|
39 | jobLogger: @_createJobLogger {jobLogIndexPrefix, jobLogQueue, jobLogRedisUri, jobLogType}
|
40 | pool: @_createPool {maxConnections, minConnections, idleTimeoutMillis, namespace, redisUri}
|
41 |
|
42 | createResponse: (responseQueue, request, callback) =>
|
43 | @jobManager.createResponse responseQueue, request, callback
|
44 |
|
45 | do: (requestQueue, responseQueue, request, callback) =>
|
46 | @jobManager.do requestQueue, responseQueue, request, callback
|
47 |
|
48 | _createJobLogger: ({jobLogIndexPrefix, jobLogQueue, jobLogRedisUri, jobLogSampleRate, jobLogType}) =>
|
49 | return new JobLogger
|
50 | client: redis.createClient jobLogRedisUri, dropBufferSupport: true
|
51 | indexPrefix: jobLogIndexPrefix
|
52 | jobLogQueue: jobLogQueue
|
53 | sampleRate: jobLogSampleRate
|
54 | type: jobLogType
|
55 |
|
56 | _closeClient: (client) =>
|
57 | client.on 'error', =>
|
58 |
|
59 |
|
60 | try
|
61 | if client.disconnect?
|
62 | client.quit()
|
63 | client.disconnect false
|
64 | return
|
65 |
|
66 | client.end true
|
67 | catch
|
68 |
|
69 | _createPool: ({maxConnections, minConnections, idleTimeoutMillis, namespace, redisUri}) =>
|
70 | return new Pool
|
71 | max: maxConnections
|
72 | min: minConnections
|
73 | idleTimeoutMillis: idleTimeoutMillis
|
74 | create: (callback) =>
|
75 | client = new RedisNS namespace, redis.createClient(redisUri, dropBufferSupport: true)
|
76 | client.ping (error) =>
|
77 | return callback error if error?
|
78 | client.once 'error', (error) =>
|
79 | @_closeClient client
|
80 |
|
81 | callback null, client
|
82 |
|
83 | destroy: @_closeClient
|
84 |
|
85 | validateAsync: (client, callback) =>
|
86 | client.ping (error) =>
|
87 | return callback false if error?
|
88 | callback true
|
89 |
|
90 | module.exports = RedisPooledJobManager
|