UNPKG

3.28 kBtext/coffeescriptView Raw
1{Pool} = require '@octoblu/generic-pool'
2redis = require 'ioredis'
3RedisNS = require '@octoblu/redis-ns'
4JobLogger = require 'job-logger'
5PooledJobManager = require 'meshblu-core-pooled-job-manager'
6
7class RedisPooledJobManager
8 constructor: (options={}) ->
9 {
10 jobLogIndexPrefix
11 jobLogQueue
12 jobLogRedisUri
13 jobLogSampleRate
14 jobLogType
15 jobTimeoutSeconds
16 maxConnections
17 minConnections
18 idleTimeoutMillis
19 namespace
20 redisUri
21 } = options
22
23 minConnections ?= 1
24 idleTimeoutMillis ?= 60000
25
26 throw new Error('RedisPooledJobManager: jobLogIndexPrefix is required') unless jobLogIndexPrefix?
27 throw new Error('RedisPooledJobManager: jobLogQueue is required') unless jobLogQueue?
28 throw new Error('RedisPooledJobManager: jobLogRedisUri is required') unless jobLogRedisUri?
29 throw new Error('RedisPooledJobManager: jobLogSampleRate is required') unless jobLogSampleRate?
30 throw new Error('RedisPooledJobManager: jobLogType is required') unless jobLogType?
31 throw new Error('RedisPooledJobManager: jobTimeoutSeconds is required') unless jobTimeoutSeconds?
32 throw new Error('RedisPooledJobManager: maxConnections is required') unless maxConnections?
33 throw new Error('RedisPooledJobManager: namespace is required') unless namespace?
34 throw new Error('RedisPooledJobManager: redisUri is required') unless redisUri?
35
36 @jobManager = new PooledJobManager
37 jobLogSampleRate: jobLogSampleRate
38 timeoutSeconds: jobTimeoutSeconds
39 jobLogger: @_createJobLogger {jobLogIndexPrefix, jobLogQueue, jobLogRedisUri, jobLogType}
40 pool: @_createPool {maxConnections, minConnections, idleTimeoutMillis, namespace, redisUri}
41
42 createResponse: (responseQueue, request, callback) =>
43 @jobManager.createResponse responseQueue, request, callback
44
45 do: (requestQueue, responseQueue, request, callback) =>
46 @jobManager.do requestQueue, responseQueue, request, callback
47
48 _createJobLogger: ({jobLogIndexPrefix, jobLogQueue, jobLogRedisUri, jobLogSampleRate, jobLogType}) =>
49 return new JobLogger
50 client: redis.createClient jobLogRedisUri, dropBufferSupport: true
51 indexPrefix: jobLogIndexPrefix
52 jobLogQueue: jobLogQueue
53 sampleRate: jobLogSampleRate
54 type: jobLogType
55
56 _closeClient: (client) =>
57 client.on 'error', =>
58 # silently deal with it
59
60 try
61 if client.disconnect?
62 client.quit()
63 client.disconnect false
64 return
65
66 client.end true
67 catch
68
69 _createPool: ({maxConnections, minConnections, idleTimeoutMillis, namespace, redisUri}) =>
70 return new Pool
71 max: maxConnections
72 min: minConnections
73 idleTimeoutMillis: idleTimeoutMillis
74 create: (callback) =>
75 client = new RedisNS namespace, redis.createClient(redisUri, dropBufferSupport: true)
76 client.ping (error) =>
77 return callback error if error?
78 client.once 'error', (error) =>
79 @_closeClient client
80
81 callback null, client
82
83 destroy: @_closeClient
84
85 validateAsync: (client, callback) =>
86 client.ping (error) =>
87 return callback false if error?
88 callback true
89
90module.exports = RedisPooledJobManager